Coder Social home page Coder Social logo

goworker's Introduction

goworker

Build GoDoc

goworker is a Resque-compatible, Go-based background worker. It allows you to push jobs into a queue using an expressive language like Ruby while harnessing the efficiency and concurrency of Go to minimize job latency and cost.

goworker workers can run alongside Ruby Resque clients so that you can keep all but your most resource-intensive jobs in Ruby.

Installation

To install goworker, use

go get github.com/benmanns/goworker

to install the package, and then from your worker

import "github.com/benmanns/goworker"

Getting Started

To create a worker, write a function matching the signature

func(string, ...interface{}) error

and register it using

goworker.Register("MyClass", myFunc)

Here is a simple worker that prints its arguments:

package main

import (
	"fmt"
	"github.com/benmanns/goworker"
)

func myFunc(queue string, args ...interface{}) error {
	fmt.Printf("From %s, %v\n", queue, args)
	return nil
}

func init() {
	goworker.Register("MyClass", myFunc)
}

func main() {
	if err := goworker.Work(); err != nil {
		fmt.Println("Error:", err)
	}
}

To create workers that share a database pool or other resources, use a closure to share variables.

package main

import (
	"fmt"
	"github.com/benmanns/goworker"
)

func newMyFunc(uri string) (func(queue string, args ...interface{}) error) {
	foo := NewFoo(uri)
	return func(queue string, args ...interface{}) error {
		foo.Bar(args)
		return nil
	}
}

func init() {
	goworker.Register("MyClass", newMyFunc("http://www.example.com/"))
}

func main() {
	if err := goworker.Work(); err != nil {
		fmt.Println("Error:", err)
	}
}

Here is a simple worker with settings:

package main

import (
	"fmt"
	"github.com/benmanns/goworker"
)

func myFunc(queue string, args ...interface{}) error {
	fmt.Printf("From %s, %v\n", queue, args)
	return nil
}

func init() {
	settings := goworker.WorkerSettings{
		URI:            "redis://localhost:6379/",
		Connections:    100,
		Queues:         []string{"myqueue", "delimited", "queues"},
		UseNumber:      true,
		ExitOnComplete: false,
		Concurrency:    2,
		Namespace:      "resque:",
		Interval:       5.0,
	}
	goworker.SetSettings(settings)
	goworker.Register("MyClass", myFunc)
}

func main() {
	if err := goworker.Work(); err != nil {
		fmt.Println("Error:", err)
	}
}

goworker worker functions receive the queue they are serving and a slice of interfaces. To use them as parameters to other functions, use Go type assertions to convert them into usable types.

// Expecting (int, string, float64)
func myFunc(queue, args ...interface{}) error {
	idNum, ok := args[0].(json.Number)
	if !ok {
		return errorInvalidParam
	}
	id, err := idNum.Int64()
	if err != nil {
		return errorInvalidParam
	}
	name, ok := args[1].(string)
	if !ok {
		return errorInvalidParam
	}
	weightNum, ok := args[2].(json.Number)
	if !ok {
		return errorInvalidParam
	}
	weight, err := weightNum.Float64()
	if err != nil {
		return errorInvalidParam
	}
	doSomething(id, name, weight)
	return nil
}

For testing, it is helpful to use the redis-cli program to insert jobs onto the Redis queue:

redis-cli -r 100 RPUSH resque:queue:myqueue '{"class":"MyClass","args":["hi","there"]}'

will insert 100 jobs for the MyClass worker onto the myqueue queue. It is equivalent to:

class MyClass
  @queue = :myqueue
end

100.times do
  Resque.enqueue MyClass, ['hi', 'there']
end

or

goworker.Enqueue(&goworker.Job{
    Queue: "myqueue",
    Payload: goworker.Payload{
        Class: "MyClass",
        Args: []interface{}{"hi", "there"},
    },
})

Flags

There are several flags which control the operation of the goworker client.

  • -queues="comma,delimited,queues" — This is the only required flag. The recommended practice is to separate your Resque workers from your goworkers with different queues. Otherwise, Resque worker classes that have no goworker analog will cause the goworker process to fail the jobs. Because of this, there is no default queue, nor is there a way to select all queues (à la Resque's * queue). If you have multiple queues you can assign them weights. A queue with a weight of 2 will be checked twice as often as a queue with a weight of 1: -queues='high=2,low=1'.
  • -interval=5.0 — Specifies the wait period between polling if no job was in the queue the last time one was requested.
  • -concurrency=25 — Specifies the number of concurrently executing workers. This number can be as low as 1 or rather comfortably as high as 100,000, and should be tuned to your workflow and the availability of outside resources.
  • -connections=2 — Specifies the maximum number of Redis connections that goworker will consume between the poller and all workers. There is not much performance gain over two and a slight penalty when using only one. This is configurable in case you need to keep connection counts low for cloud Redis providers who limit plans on maxclients.
  • -uri=redis://localhost:6379/ — Specifies the URI of the Redis database from which goworker polls for jobs. Accepts URIs of the format redis://user:pass@host:port/db or unix:///path/to/redis.sock. The flag may also be set by the environment variable $($REDIS_PROVIDER) or $REDIS_URL. E.g. set $REDIS_PROVIDER to REDISTOGO_URL on Heroku to let the Redis To Go add-on configure the Redis database.
  • -namespace=resque: — Specifies the namespace from which goworker retrieves jobs and stores stats on workers.
  • -exit-on-complete=false — Exits goworker when there are no jobs left in the queue. This is helpful in conjunction with the time command to benchmark different configurations.

You can also configure your own flags for use within your workers. Be sure to set them before calling goworker.Main(). It is okay to call flags.Parse() before calling goworker.Main() if you need to do additional processing on your flags.

Signal Handling in goworker

To stop goworker, send a QUIT, TERM, or INT signal to the process. This will immediately stop job polling. There can be up to $CONCURRENCY jobs currently running, which will continue to run until they are finished.

Failure Modes

Like Resque, goworker makes no guarantees about the safety of jobs in the event of process shutdown. Workers must be both idempotent and tolerant to loss of the job in the event of failure.

If the process is killed with a KILL or by a system failure, there may be one job that is currently in the poller's buffer that will be lost without any representation in either the queue or the worker variable.

If you are running goworker on a system like Heroku, which sends a TERM to signal a process that it needs to stop, ten seconds later sends a KILL to force the process to stop, your jobs must finish within 10 seconds or they may be lost. Jobs will be recoverable from the Redis database under

resque:worker:<hostname>:<process-id>-<worker-id>:<queues>

as a JSON object with keys queue, run_at, and payload, but the process is manual. Additionally, there is no guarantee that the job in Redis under the worker key has not finished, if the process is killed before goworker can flush the update to Redis.

Contributing

  1. Fork it
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create new Pull Request

goworker's People

Contributors

aaron1011 avatar benmanns avatar brockwood avatar ghais avatar jmonteiro avatar jpatters avatar koron avatar pda avatar robscc avatar sergeylanzman avatar theorioli avatar tinygrasshopper avatar wuman avatar xaka avatar xboston avatar xescugc avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

goworker's Issues

Access to actual JSON payload.

I'm new to Go, and am trying to figure out things. One thing which made goworker a bit hard to use was that the worker get an args ... interface{} as the payload. This means that the args would need to be manually type asserted to build the actual object structures. This can get cumbersome when its a complex JSON payload.

I had asked if there was another way to do this on SO, but it seems like there isn't any.

If access to the JSON is provided, then a struct can be passed to unmarshal to build that type.

Let me know, and thanks for this brilliant package.

Expose Redis connection pool

Is there any way that the Redis connection pool could be exposed to worker functions somehow? It's a slight inconvenience to have to set up a separate connection pool, reimplement configuration logic, etc. It'd be great to be able to easily ".Get" a connection from the pool to use.

Not sure if this is something wanted for this project though.

Goworker does not recover from loss of connection to redis

If the connection to redis goes down (due to redis restart, netsplit, ...) the library does
not recover, but only logs the error:

2014-03-10 19:07:23,682 DEBG 'my-worker' stdout output:
1394474843680784649 [Error] Error on my-worker-1:14479-poller:my_queue getting job from [my_queue]: use of closed network connection

Goworker should recognize the redis client instance is dead and try reconnecting.

package broken

It seems like vitess.io/vitess/go/pool has changed their function signature and it requires more arguments.


func newRedisPool(uri string, capacity int, maxCapacity int, idleTimout time.Duration) *pools.ResourcePool {
	return pools.NewResourcePool(newRedisFactory(uri), capacity, maxCapacity, idleTimout)
}

Above lines are source of this problem and must be changed to accept 5 arguments as needed by
NewResourcePool(factory Factory, capacity int, maxCap int, idleTimeout time.Duration, prefillParallelism int) *ResourcePool

Retry failed jobs

It would be nice to have features like sidekiq provides (https://github.com/mperham/sidekiq/wiki/Error-Handling), especially retry failed jobs.
Something like:

"If you don't fix the bug within 25 retries (about 21 days), Sidekiq will stop retrying and move your job to the Dead Job Queue. You can fix the bug and retry the job manually anytime within the next 6 months using the Web UI."

Timeout for stuck workers

It can happen that workers get stuck silently. We were using node-resque worker before, which handled this scenario very well. With goworker, jobs just keep shown as running in resque-web.
Also, the worker count in resque-web keeps increasing (should be 4).

screen shot 2018-08-29 at 13 26 38

How can I redirect all logs to stderr?

Hi, I'm writing a worker that print out jobs' info to stdout. But the output get mess up with logs when the worker shutdown or cannot connect to redis.
I tried -logtostderr flag but it didn't help.
How can I redirect all logs to stderr to avoid parsing the output with filter for logs?

100% cpu usage

When running goworker it takes up 100% cpu, meaning one cpu and pushes the server load to one, and stays there. Is this normal and expected? Can anything be done about this? This is using the example provided which is basically doing nothing, it also seems to push up the load of redis to 50% CPU and it's not doing anything. Wondering if this is normal behavior? Messing with the settings does not change anything, i.e. number of workers or polling time.

pools.NewResourcePool request param has changed

github.com/benmanns/goworker

/data/gopath/src/github.com/benmanns/goworker/redis.go:29:9: cannot use func literal (type func() (pools.Resource, error)) as type pools.Factory in return argument
/data/gopath/src/github.com/benmanns/goworker/redis.go:35:30: not enough arguments in call to pools.NewResourcePool
have (pools.Factory, int, int, time.Duration)
want (pools.Factory, int, int, time.Duration, int, func(time.Time))

How to panic handling on Job?

Hi! if it occurs is panic,nothing log will not be output.

package main

import (
        "errors"
        "fmt"
        "github.com/benmanns/goworker"
)

func myFunc(queue string, args ...interface{}) error {
            panic(errors.New("panic"))
                fmt.Printf("From %s, %v\n", queue, args)
                return nil
}

func init() {
        goworker.Register("MyClass", myFunc)
}

func main() {
        if err := goworker.Work(); err != nil {
                fmt.Println("Error:", err)
        }
}

I would like to the error handling , or is not only to recover on their own ?

Non-command-line configuration

Currently goworker can only be configured with command-line flags. It would be convenient if goworker could be configured programmatically. This would allow the configuration to be stored elsewhere (e.g. in a application-defined configuration file).

Would you accept a pull request that adds this functionality? Any preference as to how you would like it implemented? I was thinking of something along the lines of:

package main

// ...

func main() {
    var options = &goworker.Options{
        Queues:      []string{"high", "medium", "low"},
        Connections: 5,
    }
    if err := goworker.Configure(options); err != nil {
        panic(err)
    }
    // ...
}

show better error message in case Redis server is not available

Currently, if the Redis server is not available, running the example code from the homepage results in,

$ ./worker -queues=hello
1391970160808977666 [Critical] Error on getting connection in poller hostname:20800-poller:hello
1391970160809764448 [Critical] Error on getting connection in worker hostname:20800-0:hello
1391970160809953790 [Critical] Error on getting connection in worker hostname:20800-1:hello
1391970160810271765 [Critical] Error on getting connection in worker hostname:20800-2:hello
1391970160810447307 [Critical] Error on getting connection in worker hostname:20800-3:hello
1391970160810619467 [Critical] Error on getting connection in poller hostname:20800-poller:hello
1391970160810783042 [Critical] Error on getting connection in poller hostname:20800-poller:hello
1391970160810944713 [Critical] Error on getting connection in worker hostname:20800-4:hello
1391970160811127112 [Critical] Error on getting connection in worker hostname:20800-5:hello
1391970160811294090 [Critical] Error on getting connection in worker hostname:20800-0:hello
1391970160811451285 [Critical] Error on getting connection in worker hostname:20800-3:hello
1391970160811605366 [Critical] Error on getting connection in worker hostname:20800-6:hello
1391970160811769875 [Critical] Error on getting connection in worker hostname:20800-7:hello
1391970160811935685 [Critical] Error on getting connection in worker hostname:20800-1:hello
1391970160812122838 [Critical] Error on getting connection in worker hostname:20800-5:hello
1391970160812328184 [Critical] Error on getting connection in worker hostname:20800-8:hello
1391970160812552045 [Critical] Error on getting connection in worker hostname:20800-9:hello
1391970160812796857 [Critical] Error on getting connection in worker hostname:20800-4:hello
1391970160813031975 [Critical] Error on getting connection in worker hostname:20800-7:hello
1391970160813273275 [Critical] Error on getting connection in worker hostname:20800-10:hello
1391970160813515649 [Critical] Error on getting connection in worker hostname:20800-11:hello
1391970160813765257 [Critical] Error on getting connection in worker hostname:20800-6:hello
1391970160813966490 [Critical] Error on getting connection in worker hostname:20800-9:hello
1391970160814185941 [Critical] Error on getting connection in worker hostname:20800-12:hello
1391970160814416075 [Critical] Error on getting connection in worker hostname:20800-13:hello
1391970160814635214 [Critical] Error on getting connection in worker hostname:20800-8:hello
1391970160814820036 [Critical] Error on getting connection in worker hostname:20800-11:hello
1391970160815009785 [Critical] Error on getting connection in worker hostname:20800-14:hello
1391970160815232195 [Critical] Error on getting connection in worker hostname:20800-15:hello
1391970160815428000 [Critical] Error on getting connection in worker hostname:20800-10:hello
1391970160815615223 [Critical] Error on getting connection in worker hostname:20800-13:hello
1391970160815834367 [Critical] Error on getting connection in worker hostname:20800-16:hello
1391970160816038209 [Critical] Error on getting connection in worker hostname:20800-17:hello
1391970160816242444 [Critical] Error on getting connection in worker hostname:20800-12:hello
1391970160816458387 [Critical] Error on getting connection in worker hostname:20800-15:hello
1391970160816644701 [Critical] Error on getting connection in worker hostname:20800-18:hello
1391970160816847689 [Critical] Error on getting connection in worker hostname:20800-19:hello
...
...

Can we improve on this?

can it run on multiple servers?

I ran the same hello example in two processes to test multi-server deployment

# terminal 
go run main.go -queues="hello" -concurrency=25
# another terminal
go run main.go -queues="hello" -concurrency=25
# queued up 100 messages
redis-cli -r 100 RPUSH resque:queue:hello '{"class":"MyClass","args":["hi","there"]}'

I expected the 100 messages to be processed equally between the two processes. The first process processsed all 100 messages. Does goworker support running on multiple servers?

cannot unmarshal object into Go value of type []interface {}

I set up a worker for a job that contains 1 argument, a json structure. I get the error below when goworker attempts to process the job. Note that this error occurs before my worker function which does not even get called.

getting job from [default]: json: cannot unmarshal object into Go value of type []interface {}

Allow for maps instead of interfaces for worker functions

The getting started page says:

To create a worker, write a function matching the signature

func(string, ...interface{}) error

It'd be great if you could instead write functions like

func(string, map[string]string) error

for instance, if I wanted to pass a group of files to a worker for processing, I'd have to queue up the job with the right files in the right order in my args parameter, but I'd also have to reference them as args indices. So basically instead of doing something like:

redis-cli -r 100 RPUSH resque:queue:myqueue '{"class":"MyClass","product_file":"whatever.csv","category_file":"something_else.csv"}'

and

func(queue string, args map[string]string) error{
    productFile := os.Open(args["product_file"])
    // more things done here
    return nil
}

my actual workflow looks more like

redis-cli -r 100 RPUSH resque:queue:myqueue '{"class":"MyClass","args":["whatever.csv", "something_else.csv"]}'

and

func(queue string,  ...interface{}) error{
    productFileIndex = 0
    productFile := os.Open(args[productFileIndex])
    // more things done here
    return nil
}

obviously this isn't the worst problem to have ever, but I think it's a less error prone use case.

Refactor the Enqueue API

This issue is more of a question on which way is the better to approach the following issue.

I've started using this lib to implement a web service using Resque just in GO. With the Enqueue API works fine but there is a problem with it, this library was thought at the beginning to implement Workers that consume Resque, so that the code that is running this lib is a worker, not a web service.

This presents the following issue, if you have a web service that enqueues jobs (ex: Send Emails) when you run this:

From the documentation

package main

import (
        "fmt"

        "github.com/benmanns/goworker"
)

func main() {
        err := goworker.Enqueue(&goworker.Job{
                Queue: "myqueue",
                Payload: goworker.Payload{
                        Class: "MyClass",
                        Args:  []interface{}{"hi", "there"},
                },
        })
        fmt.Println(err)
}
$> go run enqueue.go

This returns an error saying that you must specify at least one queue, of course this should not happen because this code is not registering a worker, just pushing data to Redis.

$> go run enqueue.go -queues somequeue

This is the way it works now, the queue in which the Job is going to be pushed is the myqueue specified on the code, not the somequeue of the flags.

For this problem i have two possible solutions:

  • Separate the API for Pushing data to Resque (Enqueue) and the API for consuming it (Workers) so when you import it, you specify which one you want.

  • Delay the validations until they are used (lazy), when a Worker is created, then validate the queues, not before.

Which is of them ( or others ) do you prefer ? I ask this because sending a PR with a big change like that should be consulted before, that's what I think hehe.

Edited: Of course from this 2 solutions the first one changes the API so on back compatibility, which is important 😢 .

New version

Hello we are using go dep for manage all the dependencies of our project and it seems that this tool download the latest tag of goworker project that is "v0.1.2", the thing is that this version doesn't include the WorkerSettings.

Which is the plan to release the next tag?

Thanks

How does goworker work?

I came across this project and I am wondering how it manages to execute arbitrary ruby code.

As from the ready I assume that I can have goworker run on my servers and from my ruby application I create some Resque-style background jobs which get processed by goworker.

I dig the code but I cannot find the spot where the ruby code gets executed and how you manage to get the correct ruby environment ...

Maybe I got something wrong though. Would be great if someone could shed some light on this?

Maybe @benmanns, @johnTurknett or @rjrobinson?

Thank you very much!

Panic on enqueue

When enqueuing and working on jobs simultaneously a panic occurs.
"attempt to Put into a full ResourcePool"

This is caused by calling Init() every time Enqueue is called.

Example from Getting Started section doesn't run

The following example from Getting Started section doesn't run as is.

package main

import (
	"fmt"
	"github.com/benmanns/goworker"
)

func myFunc(queue string, args ...interface{}) error {
	fmt.Printf("From %s, %v\n", queue, args)
	return nil
}

func init() {
	goworker.Register("MyClass", myFunc)
}

func main() {
	if err := goworker.Work(); err != nil {
		fmt.Println("Error:", err)
	}
}

It prints "Error: you must specify at least one queue". Perhaps updating this code so newcomers can execute it without further knowledge would be a good idea.

Another go get failure

../github.com/benmanns/goworker/goworker.go:47: cannot use ctx (type context.Context) as type time.Duration in argument to pool.Get

fail to install

[vagrant@localhost gadmin]$ go get github.com/benmanns/goworker
go get: github.com/youtube/vitess@none updating to
        github.com/youtube/[email protected]: parsing go.mod:
        module declares its path as: vitess.io/vitess
                but was required as: github.com/youtube/vitess

missing the prefillParallelism argument in redis.go

go/src/github.com/benmanns/goworker/redis.go:35:30: not enough arguments in call to pools.NewResourcePool

In vitess.io/vitess/go/pools the NewResourcePool is defined as

NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Duration, prefillParallelism int) *ResourcePool

Is missing the argument prefillParallelism in redis.go?

return pools.NewResourcePool(newRedisFactory(uri), capacity, maxCapacity, idleTimout, 0)
or
return pools.NewResourcePool(newRedisFactory(uri), capacity, maxCapacity, idleTimout, 1)

etcd support

Hi,

Is it planed to support etcd ? I guess a common interface with Redis could be use and this would be useful for cloud-native applications that uses etcd for both cache and configuration. What do you think?

Thanks

go get failure

go get github.com/benmanns/goworker
package code.google.com/p/vitess/go/pools: unable to detect version control system for code.google.com/ path

RabbitMQ Support

Is there any plan about rabbitmq implementation ? I think it will be very usefull

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.