Examples: Basic Reverse Example

This is just about the most basic gearman example that can be produced and will server as a teach you the basics of how the server, client, and worker interact with one another.

Before Starting

Make sure you have gearmand installed and language bindings available.

The Involved Parties

Server
The server, gearmand, will coordinate clients and workers ensuring that calls from the clients are delivered to workers and that results from workers are sent back to the client.
Client
A process which has a blob of data to process, a known function which can process it, and a desire to get the processed results back. In our case, the client wants a string reversed.
Worker
A process which connected to the server and offers to process function calls. In this example, the client can reverse strings.

The Client

<?php

// Create our client object
$client = new GearmanClient();

// Add a server
$client->addServer(); // by default host/port will be "localhost" & 4730

echo "Sending job\n";

// Send reverse job
$result = $client->doNormal("reverse", "Hello!");
if ($result) {
  echo "Success: $result\n";
}
package main

import (
    "log"
    "sync"
    "time"
    "github.com/mikespook/gearman-go/client"
)

func main() {
    var wg sync.WaitGroup
    // Set the autoinc id generator
    // You can write your own id generator
    // by implementing IdGenerator interface.
    // client.IdGen = client.NewAutoIncId()

    c, err := client.New("127.0.0.1:4730")
    if err != nil {
        log.Fatalln(err)
    }
    defer c.Close()
    c.ErrHandler = func(e error) {
        log.Println(e)
    }
    echo := []byte("Hello\x00 world")
    wg.Add(1)
    echomsg, err := c.Echo(echo, time.Second)
    if err != nil {
        log.Fatalln(err)
    }
    log.Println(string(echomsg))
    wg.Done()
    jobHandler := func(job *client.Job) {
        log.Printf("%s", job.Data)
        wg.Done()
    }
    handle := c.Do("reverse", echo, client.JOB_NORMAL, jobHandler)
    wg.Add(1)
    status, err := c.Status(handle, time.Second)
    if err != nil {
        log.Fatalln(err)
    }
    log.Printf("%t", status)

    wg.Wait()
}
import gearman

def check_request_status(job_request):
    if job_request.complete:
        print "Job %s finished!  Result: %s - %s" % (job_request.job.unique, job_request.state, job_request.result)
    elif job_request.timed_out:
        print "Job %s timed out!" % job_request.unique
    elif job_request.state == JOB_UNKNOWN:
        print "Job %s connection failed!" % job_request.unique

gm_client = gearman.GearmanClient(['localhost:4730'])

completed_job_request = gm_client.submit_job("reverse", "Hello World!")
check_request_status(completed_job_request)
var Gearman, client;

Gearman = require('gearman').Gearman;

client = new Gearman("localhost", 4730);

client.on('WORK_COMPLETE', function (job) {
    console.log('job completed, result:', job.payload.toString());
    return client.close();
});

client.connect(function () {
    return client.submitJob('reverse', 'Hello, World!');
});

The Worker

<?php

// Create our worker object
$worker = new GearmanWorker();

// Add a server (again, same defaults apply as a worker)
$worker->addServer();

// Inform the server that this worker can process "reverse" function calls
$worker->addFunction("reverse", "reverse_fn");

while (1) {
  print "Waiting for job...\n";
  $ret = $worker->work(); // work() will block execution until a job is delivered
  if ($worker->returnCode() != GEARMAN_SUCCESS) {
    break;
  }
}

// A much simple reverse function
function reverse_fn(GearmanJob $job) {
  $workload = $job->workload();
  echo "Received job: " . $job->handle() . "\n";
  echo "Workload: $workload\n";
  $result = strrev($workload);
  echo "Result: $result\n";
  return $result;
}
package main

import (
    "os"
    "log"
    "github.com/mikespook/golib/signal"
    "github.com/mikespook/gearman-go/worker"
)

func Reverse(job *worker.Job) ([]byte, error) {
    log.Printf("Reverse: Handle=[%s]; UID=[%s], Data=[%s]\n",
        job.Handle, job.UniqueId, job.Data)
    runes := []rune(string(job.Data))
    for i, j := 0, len(runes)-1; i < j; i, j = i+1, j-1 {
        runes[i], runes[j] = runes[j], runes[i]
    }
    data := []byte(string(runes))
    return data, nil
}

func main() {
    log.Println("Starting ...")
    defer log.Println("Shutdown complete!")
    w := worker.New(worker.Unlimited)
    defer w.Close()
    w.ErrHandler = func(e error) {
        log.Println(e)
        if e == worker.ErrConnection {
            proc, err := os.FindProcess(os.Getpid())
            if err != nil {
                log.Println(err)
            }
            if err := proc.Signal(os.Interrupt); err != nil {
                log.Println(err)
            }
        }
    }
    w.JobHandler = func(job *worker.Job) error {
        log.Printf("H=%s, UID=%s, Data=%s, DataType=%d\n", job.Handle,
            job.UniqueId, job.Data, job.DataType)
        return nil
    }
    w.AddServer("127.0.0.1:4730")
    w.AddFunc("reverse", Reverse, worker.Immediately)
    go w.Work()
    sh := signal.NewHandler()
    sh.Bind(os.Interrupt, func() bool {return true})
    sh.Loop()
}
import gearman

gm_worker = gearman.GearmanWorker(['localhost:4730'])

def task_listener_reverse(gearman_worker, gearman_job):
    print 'Reversing string: ' + gearman_job.data
    return gearman_job.data[::-1]

# gm_worker.set_client_id is optional
gm_worker.set_client_id('python-worker')
gm_worker.register_task('reverse', task_listener_reverse)

# Enter our work loop and call gm_worker.after_poll() after each time we timeout/see socket activity
gm_worker.work()
var Gearman, worker;

Gearman = require('gearman').Gearman;

worker = new Gearman('127.0.0.1', 4730);

worker.on('JOB_ASSIGN', function (job) {
    var result;
    console.log('"' + job.func_name + '" job assigned to this worker with payload: "' + job.payload + '"');
    result = job.payload.toString().split('').reverse().join('');
    worker.sendWorkComplete(job.handle, result);
    return worker.preSleep();
});

worker.on('NOOP', function () {
    return worker.grabJob();
});

worker.connect(function () {
    worker.addFunction('reverse');
    return worker.preSleep();
});

Running The Client And Worker

Now that we have our client and our worker, startup the gearmand server, start the worker, then the client. You should see something like the following.

Startup Order

Be mindful of what happens if you start the client before the worker. You’ll find that the client blocks (pauses execution) at the doNormal() call. This is because you’re performaing a “foreground job” in which you wait for the job to be processed and the results returned. If there are no workers available the process will block until one shows up. You can check out this behaviour by starting the client, waiting a few moments, and then starting the worker.

Different Languages

Try turning up the Go worker and then run the Python client. Or the Nodejs worker and then the PHP client – you’ll find that every language, through gearmand, can easily talk to one another.