Coder Social home page Coder Social logo

go-carrot's Introduction

go-carrot

Declarative API for AMQP consumers in Go.


(It's supposed to be a cute logo...)

tl;dr features list

  • Topology declaration API
  • Consumer listener API
  • Message handler API
  • Consumers router
    • Middleware support
    • Common middlewares implementation
  • Graceful shutdown
  • Automatic reconnection

Description

Carrot exposes a nice API for dealing with AMQP connections, such as declaring topologies (exchanges, queues, ...) and declaring consumers on one or more queues.

Check out the examples for more information.

Architecture

Carrot uses three main components for its API:

  1. Topology declarator, to declare the AMQP topology from the application that uses it
  2. Message handlers, to define functions able to handle incoming messages from consumers
  3. Consumer listeners, to receive messages from one or more queues

Topology declarator

Carrot allows to define a topology by exposing an expressive API backed by topology.Declarer interface.

The current supported topologies are:

Topology declaration is optional, and can be controlled with carrot.WithTopology:

carrot.WithTopology(topology.All(
    exchange.Declare("messages"),
    queue.Declare("consumer.message.received",
        queue.BindTo("messages", "message.published"),
    ),
    queue.Declare("consumer.message.deleted",
        queue.BindTo("messages", "message.deleted"),
    ),
)),

When specified, Carrot will open a dedicated AMQP channel to declare the topology, before listening to messages.

Carrot can also be used exclusively for topology declaration:

conn, err := amqp.Dial("amqp://guest:guest@rabbit:5672")
if err != nil {
    panic(err)
}

// The carrot.Closer handle returned is useless if only topology declaration
// is used.
_, err := carrot.Run(conn,
    // Declare your topology here
    carrot.WithTopology(topology.All(
        // topology.All is used to declare more than one topology
        // in a single transaction.
        exchange.Declare("messages"),
        queue.Declare("consumer.message.received",
            queue.BindTo("messages", "message.published"),
        ),
        queue.Declare("consumer.message.deleted",
            queue.BindTo("messages", "message.deleted"),
        ),
    )),
)

Message handlers

Carrot defines an interface for handling incoming messages (amqp.Delivery) in handler.Handler interface:

type Handler interface {
    Handle(context.Context, amqp.Delivery) error
}

Message handlers are fallible, so they can return an error.

Error handling can be specified at Consumer Listeners level.

You can specify a message handler for all incoming messages by using carrot.WithHandler:

carrot.WithHandler(handler.Func(func(context.Context, amqp.Delivery) error {
    // Handle messages here!
    return nil
}))

Router

Carrot also exposes a Router interface and implementation to support:

  • Multiple listeners with their own message handlers
  • Middleware support

An example of how a Router setup might look like:

// Router implements the handler.Handler interface.
router.New().Group(func(r router.Router) {
    // This is how you set middlewares.
    r.Use(LogMessages(logger))
    r.Use(middleware.Timeout(50 * time.Millisecond))
    r.Use(SimulateWork(100*time.Millisecond, logger))

    // This is how you bind an handler function to a specific queue.
    // In order for it to work, you must register these queues
    // in the listener.
    r.Bind("consumer.message.received", handler.Func(Acknowledger))
    r.Bind("consumer.message.deleted", handler.Func(Acknowledger))

    // You can also specify additional middlewares only for one queue:
    r.With(AuthenticateUser).
        Bind("consumer.message.created", handler.Func(Acknowledger))
})

Consumer listeners

As the name says, Listeners listens for incoming messages on a specific queue.

Carrot defines a listener.Listener interface to represent these components:

type Listener interface {
    Listen(Connection, Channel, handler.Handler) (Closer, error)
}

so that the listener can:

  • Start listening to incoming amqp.Delivery from a Channel
  • Serving these messages using the provided handler.Handler
  • Hand out a Closer handler to close the listener/server goroutine and/or wait for its closing

An example of how to define Listeners:

// WithListener specifies the listener.Listener to start.
carrot.WithListener(listener.Sink(
    // listener.Sink allows to listen to messages coming from one or more consumers,
    // and pilots closing the child listeners.
    consumer.Listen("consumer.message.deleted"),
    listener.UseDedicatedChannel(
        // By default, carrot uses a single amqp.Channel to establish
        // consumer listeners. But we can tell carrot to use a dedicated
        // amqp.Channel for certain consumers.
        consumer.Listen("consumer.message.received"),
    ),
))

Full example

Let's put all the pieces together now!

conn, err := amqp.Dial("amqp://guest:guest@rabbit:5672")
if err != nil {
    panic(err)
}

closer, err := carrot.Run(conn,
    // First, declare your topology...
    carrot.WithTopology(topology.All(
        exchange.Declare("messages"),
        queue.Declare("consumer.message.received",
            queue.BindTo("messages", "message.published"),
        ),
        queue.Declare("consumer.message.deleted",
            queue.BindTo("messages", "message.deleted"),
        ),
    )),
    // Second, declare the consumers to receive messages from...
    carrot.WithListener(listener.Sink(
        consumer.Listen("consumer.message.deleted"),
        listener.UseDedicatedChannel(
            consumer.Listen("consumer.message.received"),
        ),
    ))
    // Lastly, specify an handler function that will receive the messages
    // coming from the specified consumers.
    carrot.WithHandler(router.New().Group(func(r router.Router) {
        r.Use(LogMessages(logger))
        r.Use(middleware.Timeout(50 * time.Millisecond))
        r.Use(SimulateWork(100*time.Millisecond, logger))

        r.Bind("consumer.message.received", handler.Func(Acknowledger))
        r.Bind("consumer.message.deleted", handler.Func(Acknowledger))
    })),
)

if err != nil {
    panic(err)
}

// Wait on the main goroutine until the consumer has exited:
err := <-closer.Closed()
log.Fatalf("Consumers closed (error %s)", err)

License

This project is licensed under the MIT license.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in go-carrot by you, shall be licensed as MIT, without any additional terms or conditions.

go-carrot's People

Contributors

ar3s3ru avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

go-carrot's Issues

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.