Coder Social home page Coder Social logo

samber / go-amqp-pubsub Goto Github PK

View Code? Open in Web Editor NEW
20.0 3.0 0.0 625 KB

Fault tolerant Pub/Sub library for RabbitMQ

Home Page: https://pkg.go.dev/github.com/samber/go-amqp-pubsub

License: MIT License

Makefile 3.88% Go 96.12%
go amqp rabbitmq pubsub queue broker retry consumer producer publish

go-amqp-pubsub's Introduction

Resilient Pub/Sub framework for RabbitMQ and Go

tag Go Version GoDoc Build Status Go report Coverage Contributors License

  • Based on github.com/rabbitmq/amqp091-go driver
  • Resilient to network failure
  • Auto reconnect: recreate channels, bindings, producers, consumers...
  • Hot update of queue bindings (thread-safe)
  • Optional retry queue on message rejection
  • Optional dead letter queue on message rejection
  • Optional deferred message consumption

How to

During your tests, feel free to restart Rabbitmq. This library will reconnect automatically.

Connection

import pubsub "github.com/samber/go-amqp-pubsub"

conn, err := pubsub.NewConnection("connection-1", pubsub.ConnectionOptions{
    URI: "amqp://dev:dev@localhost:5672",
    Config: amqp.Config{
        Dial:      amqp.DefaultDial(time.Second),
    },
})

// ...

conn.Close()

Producer

import (
    pubsub "github.com/samber/go-amqp-pubsub"
    "github.com/samber/lo"
    "github.com/samber/mo"
)

// `err` can be ignored since it will connect lazily to rabbitmq
conn, err := pubsub.NewConnection("connection-1", pubsub.ConnectionOptions{
    URI: "amqp://dev:dev@localhost:5672",
    LazyConnection: mo.Some(true),
})

producer := pubsub.NewProducer(conn, "producer-1", pubsub.ProducerOptions{
    Exchange: pubsub.ProducerOptionsExchange{
        Name: "product.event",
        Kind: pubsub.ExchangeKindTopic,
    },
})

err := producer.Publish(routingKey, false, false, amqp.Publishing{
    ContentType:  "application/json",
    DeliveryMode: amqp.Persistent,
    Body:         []byte(`{"hello": "world"}`),
})

producer.Close()
conn.Close()

Consumer

import (
    pubsub "github.com/samber/go-amqp-pubsub"
    "github.com/samber/lo"
    "github.com/samber/mo"
)

// `err` can be ignore since it will connect lazily to rabbitmq
conn, err := pubsub.NewConnection("connection-1", pubsub.ConnectionOptions{
    URI: "amqp://dev:dev@localhost:5672",
    LazyConnection: mo.Some(true),
})

consumer := pubsub.NewConsumer(conn, "consumer-1", pubsub.ConsumerOptions{
    Queue: pubsub.ConsumerOptionsQueue{
        Name: "product.onEdit",
    },
    Bindings: []pubsub.ConsumerOptionsBinding{
        {ExchangeName: "product.event", RoutingKey: "product.created"},
        {ExchangeName: "product.event", RoutingKey: "product.updated"},
    },
    Message: pubsub.ConsumerOptionsMessage{
        PrefetchCount: mo.Some(100),
    },
    EnableDeadLetter: mo.Some(true),     // will create a "product.onEdit.deadLetter" DL queue
})

for msg := range consumer.Consume() {
    lo.Try0(func() { // handle exceptions
        // ...
        msg.Ack(false)
    })
}

consumer.Close()
conn.Close()

Consumer with pooling and batching

See examples/consumer-with-pool-and-batch.

Consumer with retry strategy

Retry architecture

See examples/consumer-with-retry.

3 retry strategies are available:

  • Exponential backoff
  • Constant interval
  • Lazy retry

Examples

Exponential backoff:

consumer := pubsub.NewConsumer(conn, "example-consumer-1", pubsub.ConsumerOptions{
    Queue: pubsub.ConsumerOptionsQueue{
        Name: "product.onEdit",
    },
    // ...
    RetryStrategy:    mo.Some(pubsub.NewExponentialRetryStrategy(3, 3*time.Second, 2)), // will create a "product.onEdit.retry" queue
})

for msg := range consumer.Consume() {
    // ...
    msg.Reject(false)   // will retry 3 times with exponential backoff
}

Lazy retry:

consumer := pubsub.NewConsumer(conn, "example-consumer-1", pubsub.ConsumerOptions{
    Queue: pubsub.ConsumerOptionsQueue{
        Name: "product.onEdit",
    },
    // ...
    RetryStrategy:    mo.Some(pubsub.NewLazyRetryStrategy(3)), // will create a "product.onEdit.retry" queue
})

for msg := range consumer.Consume() {
    // ...
    
	err := json.Unmarshal(body, &object)
    if err != nil {
        // retry is not necessary
        msg.Reject(false)
        continue
    }

    // ...

    err = sql.Exec(query)
    if err != nil {
        // retry on network error
        pubsub.RejectWithRetry(msg, 10*time.Second)
        continue
    }

    // ...
    msg.Ack(false)
}

Custom retry strategy

Custom strategies can be provided to the consumer.

type MyCustomRetryStrategy struct {}

func NewMyCustomRetryStrategy() RetryStrategy {
	return &MyCustomRetryStrategy{}
}

func (rs *MyCustomRetryStrategy) NextBackOff(msg *amqp.Delivery, attempts int) (time.Duration, bool) {
    // retries every 10 seconds, until message get older than 5 minutes
    if msg.Timestamp.Add(5*time.Minute).After(time.Now()) {
        return 10 * time.Second, true
    }

    return time.Duration{}, false
}

Consistency

On retry, the message is published into the retry queue then is acked from the initial queue. This 2 phases delivery is unsafe, since connection could drop during operation. With the ConsistentRetry policy, the steps will be embbeded into a transaction. Use it carefully because the delivery rate will be reduced by an order of magnitude.

consumer := pubsub.NewConsumer(conn, "example-consumer-1", pubsub.ConsumerOptions{
    Queue: pubsub.ConsumerOptionsQueue{
        Name: "product.onEdit",
    },
    // ...
    RetryStrategy:    mo.Some(pubsub.NewExponentialRetryStrategy(3, 3*time.Second, 2)),
    RetryConsistency: mo.Some(pubsub.ConsistentRetry),
})

Defer message consumption

See examples/consumer-with-delay.

On publishing, the first consumption of the message can be delayed. The message will instead be sent to the .defer queue, expire, and then go to the initial queue.

consumer := pubsub.NewConsumer(conn, "example-consumer-1", pubsub.ConsumerOptions{
    Queue: pubsub.ConsumerOptionsQueue{
        Name: "product.onEdit",
    },
    // ...
    Defer:            mo.Some(5 * time.Second),
})

Run examples

# run rabbitmq
docker-compose up rabbitmq
# run producer
cd examples/producer/
go mod download
go run main.go --rabbitmq-uri amqp://dev:dev@localhost:5672
# run consumer
cd examples/consumer/
go mod download
go run main.go --rabbitmq-uri amqp://dev:dev@localhost:5672

Then trigger network failure, by restarting rabbitmq:

docker-compose restart rabbitmq

๐Ÿค Contributing

Don't hesitate ;)

# Install some dev dependencies
make tools

# Run tests
make test
# or
make watch-test

Todo

  • Connection pooling (eg: 10 connections, 100 channels per connection)
  • Better documentation
  • Testing + CI
  • BatchPublish + PublishWithConfirmation + BatchPublishWithConfirmation

๐Ÿ‘ค Contributors

Contributors

๐Ÿ’ซ Show your support

Give a โญ๏ธ if this project helped you!

GitHub Sponsors

๐Ÿ“ License

Copyright ยฉ 2023 Samuel Berthe.

This project is MIT licensed.

go-amqp-pubsub's People

Contributors

dependabot[bot] avatar romainjolidon avatar samber avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

go-amqp-pubsub's Issues

Add delay before first message consumption attempt

4 queues:

  • on-post-updated
  • on-post-updated.delay (ttl 10s / on-post-updated as DLX)
  • on-post-updated.retry (ttl 60s / on-post-updated as DLX)
  • on-post-updated.deadLetter

The routing key is added to on-post-updated.delay instead of on-post-updated.

1- Push message to exchange
2- Message is routed to on-post-updated.delay
3- After 10s, the message is dropped from on-post-updated.delay and forwarded to on-post-updated
4- In case of error, message is routed to on-post-updated.retry or on-post-updated.deadLetter

Remove orphan bindings

A binding should be removed when absent from the consumer configuration.

Pretty useful when we deploy a new release of an app and need to remove a binding.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.