Coder Social home page Coder Social logo

justintung / taskq Goto Github PK

View Code? Open in Web Editor NEW

This project forked from vmihailenco/taskq

0.0 2.0 0.0 518 KB

Golang asynchronous task/job queue with Redis, SQS, IronMQ, and in-memory backends

Home Page: https://godoc.org/github.com/vmihailenco/taskq

License: BSD 2-Clause "Simplified" License

Makefile 0.13% Go 99.87%

taskq's Introduction

Golang asynchronous task/job queue with Redis, SQS, IronMQ, and in-memory backends

Build Status GoDoc

Installation

taskq requires a Go version with Modules support and uses import versioning. So please make sure to initialize a Go module before installing taskq:

go get github.com/vmihailenco/taskq/v2

Features

  • Redis, SQS, IronMQ, and in-memory backends.
  • Automatically scaling number of goroutines used to fetch (fetcher) and process messages (worker).
  • Global rate limiting.
  • Global limit of workers.
  • Call once - deduplicating messages with same name.
  • Automatic retries with exponential backoffs.
  • Automatic pausing when all messages in queue fail.
  • Fallback handler for processing failed messages.
  • Message batching. It is used in SQS and IronMQ backends to add/delete messages in batches.
  • Automatic message compression using zstd.

Quickstart

I recommend that you split your app into two parts:

  • An API that accepts requests from customers and adds tasks to the queues.
  • A Worker that fetches tasks from the queues and processes them.

This way you can:

  • Isolate API and worker from each other;
  • Scale API and worker separately;
  • Have different configs for API and worker (like timeouts).

There is an api_worker example that demonstrates this approach using Redis as backend:

cd examples/api_worker
go run worker/main.go
go run api/main.go

You start by choosing backend to use - in our case Redis:

package api_worker

var QueueFactory = redisq.NewFactory()

Using that factory you create queue that contains task(s):

var MainQueue = QueueFactory.RegisterQueue(&taskq.QueueOptions{
	Name:  "api-worker",
	Redis: Redis, // go-redis client
})

Using the queue you create task with handler that does some useful work:

var CountTask = taskq.RegisterTask(&taskq.TaskOptions{
	Name: "counter",
	Handler: func() error {
		IncrLocalCounter()
		return nil
	},
})

Then in API you use the task to add messages/jobs to the queues:

ctx := context.Background()
for {
	// call task handler without any args
	err := api_worker.MainQueue.Add(api_worker.CountTask.WithArgs(ctx))
	if err != nil {
		log.Fatal(err)
	}
}

And in worker you start processing the queue:

err := api_worker.MainQueue.Start(context.Background())
if err != nil {
	log.Fatal(err)
}

API overview

t := myQueue.RegisterTask(&taskq.TaskOptions{
	Name:    "greeting",
	Handler: func(name string) error {
		fmt.Println("Hello", name)
		return nil
	},
})

// Say "Hello World".
err := myQueue.Add(t.WithArgs(context.Background(), "World"))
if err != nil {
    panic(err)
}

// Say "Hello World" with 1 hour delay.
msg := t.WithArgs(ctx, "World")
msg.Delay = time.Hour
_ = myQueue.Add(msg)

// Say "Hello World" once.
for i := 0; i < 100; i++ {
    msg := t.WithArgs(ctx, "World")
    msg.Name = "hello-world" // unique
    _ = myQueue.Add(msg)
}

// Say "Hello World" once with 1 hour delay.
for i := 0; i < 100; i++ {
    msg := t.WithArgs(ctx, "World")
    msg.Name = "hello-world"
    msg.Delay = time.Hour
    _ = myQueue.Add(msg)
}

// Say "Hello World" once in an hour.
for i := 0; i < 100; i++ {
    msg := t.WithArgs(ctx, "World").OnceInPeriod(time.Hour)
    _ = myQueue.Add(msg)
}

// Say "Hello World" for Europe region once in an hour.
for i := 0; i < 100; i++ {
    msg := t.WithArgs(ctx, "World").OnceInPeriod(time.Hour, "World", "europe")
    _ = myQueue.Add(msg)
}

Message deduplication

If a Message has a Name then this will be used as unique identifier and messages with the same name will be deduplicated (i.e. not processed again) within a 24 hour period (or possibly longer if not evicted from local cache after that period). Where Name is omitted then non deduplication occurs and each message will be processed. Task's WithMessage and WithArgs both produces messages with no Name so will not be deduplicated. OnceWithArgs sets a name based off a consistent hash of the arguments and a quantised period of time (i.e. 'this hour', 'today') passed to OnceWithArgs a period. This guarantees that the same function will not be called with the same arguments during `period'.

Handlers

A Handler and FallbackHandler are supplied to RegisterTask in the TaskOptions.

There are three permitted types of signature:

  1. A zero-argument function
  2. A function whose arguments are assignable in type from those which are passed in the message
  3. A function which takes a single *Message argument

If a task is registered with a handler that takes a Go context.Context as its first argument then when that handler is invoked it will be passed the same Context that was passed to Consumer.Start(ctx). This can be used to transmit a signal to abort to all tasks being processed:

var AbortableTask = MainQueue.RegisterTask(&taskq.TaskOptions{
	Name: "SomethingLongwinded",
	Handler: func(ctx context.Context) error {
		for range time.Tick(time.Second) {
			select {
			    case <-ctx.Done():
			    	return ctx.Err()
			    default:
			    	fmt.Println("Wee!")
			}
		}
		return nil
	},
})

Custom message delay

If error returned by handler implements Delay() time.Duration interface then that delay is used to postpone message processing.

type RateLimitError string

func (e RateLimitError) Error() string {
    return string(e)
}

func (RateLimitError) Delay() time.Duration {
    return time.Hour
}

func handler() error {
    return RateLimitError("calm down")
}

taskq's People

Contributors

vmihailenco avatar anmic avatar

Watchers

James Cloos avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.