Coder Social home page Coder Social logo

raphaelmpinheiro / barterdude Goto Github PK

View Code? Open in Web Editor NEW

This project forked from olxbr/barterdude

0.0 0.0 0.0 231 KB

Message exchange engine to build pipelines in brokers like RabbitMQ

License: Apache License 2.0

Makefile 0.79% Python 99.03% Dockerfile 0.18%

barterdude's Introduction

BarterDude

Build Status Coverage Status

Message exchange engine to build pipelines using brokers like RabbitMQ. This project is build on top of the great async-worker.

barter

Install

Using Python 3.6+

pip install barterdude

Usage

Build your consumer with this complete example:

import logging
from barterdude import BarterDude
from barterdude.monitor import Monitor
from barterdude.hooks.healthcheck import Healthcheck
from barterdude.hooks.logging import Logging
from barterdude.hooks.metrics.prometheus import Prometheus
from barterdude.message import Message
from barterdude.conf import getLogger

# from my_project import MyHook # (you can build your own hooks)


# configure RabbitMQ
barterdude = BarterDude(
    hostname="localhost",
    username="guest",
    password="guest",
    prefetch=256,
)

# Prometheus labels for automatic metrics
labels = dict(
    app_name="my_app",
    team_name="my_team"
)
healthcheck = Healthcheck(barterdude) # automatic and customizable healthcheck
prometheus = Prometheus(labels) # automatic and customizable Prometheus integration

self.logger = getLogger("my_app", logging.DEBUG) # automatic json log with barterdude namespace
# BARTERDUDE_DEFAULT_APP_NAME is an env var to control your project namespace
# BARTERDUDE_DEFAULT_LOG_LEVEL is an env var to control loglevel by number 0, 10, 20, etc...

monitor = Monitor(
    healthcheck,
    prometheus,
    # MyHook(barterdude, "/path"), # (will make localhost:8080/path url)
    Logging() # automatic and customizable logging
)

my_metric = prometheus.metrics.counter(name="fail", description="fail again")  # It's the same as https://github.com/prometheus/client_python


@barterdude.consume_amqp(
    ["queue1", "queue2"],
    monitor,
    coroutines = 10,  # number of coroutines spawned to consume messages (1 per message)
    bulk_flush_interval = 60.0,  #  max waiting time for messages to start process n_coroutines
    requeue_on_fail = True  # should retry or not the message
)
async def your_consumer(msg: Message): # you receive only one message and we parallelize processing for you
    await barterdude.publish_amqp(
        exchange="my_exchange",
        data=msg.body
    )
    if msg.body == "fail":
    
        my_metric.inc() # you can use prometheus metrics
        healthcheck.force_fail() # you can use your hooks inside consumer too
        msg.reject(requeue=False) # You can force to reject a message, exactly equal https://b2wdigital.github.io/async-worker/src/asyncworker/asyncworker.rabbitmq.html#asyncworker.rabbitmq.message.RabbitMQMessage

    if msg.body == "exception":
        raise Exception() # this will reject the message and requeue

    # if everything is fine, than message automatically is accepted


barterdude.run() # you will start consume and start a server on http://localhost:8080
# Change host and port with ASYNCWORKER_HTTP_HOST and ASYNCWORKER_HTTP_PORT env vars

Build your own Hook

Base Hook (Simple One)

These hooks are called when message retreive, have success and fail.

from barterdude.hooks import BaseHook
from asyncworker.rabbitmq.message import RabbitMQMessage

class MyCounterHook(BaseHook):
    _consume = _fail = _success = 0

    async def on_success(self, message: RabbitMQMessage):
        self._success += 1

    async def on_fail(self, message: RabbitMQMessage, error: Exception):
        self._fail += 1

    async def before_consume(self, message: RabbitMQMessage):
        self._consume += 1

Http Hook (Open Route)

These hooks can do everything simple hook does, but responding to a route.

from aiohttp import web
from barterdude.hooks import HttpHook
from asyncworker.rabbitmq.message import RabbitMQMessage

class MyCounterHttpHook(HttpHook):
    _consume = _fail = _success = 0

    async def __call__(self, req: web.Request):
        return web.json_response(dict(
            consumed=self._consume,
            success=self._success,
            fail=self._fail
        ))

    async def on_success(self, message: RabbitMQMessage):
        self._success += 1

    async def on_fail(self, message: RabbitMQMessage, error: Exception):
        self._fail += 1

    async def before_consume(self, message: RabbitMQMessage):
        self._consume += 1

Data Sharing

Following the approach found in async-worker and in aiohttp, BarterDude discourages the use of global variables, aka singletons.

To share data states globally in an application, BarterDude behaves like a dict. As an example, one can save a global-like variable in a BarterDude instance:

from barterdude import BarterDude

barterdude = BarterDude()
baterdude["my_variable"] = data

and get it back in a consumer

async def consumer_access_storage(msg):
    data = baterdude["my_variable"]

Schema Validation

Consumed messages can be validated by json schema:

@barterdude.consume_amqp(
    ["queue1", "queue2"],
    monitor,
    validation_schema={
            "$schema": "http://json-schema.org/draft-04/schema#",
            "$id": "http://example.com/example.json",
            "type": "object",
            "title": "Message Schema",
            "description": "The root schema comprises the entire JSON document.",
            "additionalProperties": True,
            "required": [
                "key"
            ],
            "properties": {
                "key": {
                    "$id": "#/properties/key",
                    "type": "string",
                    "title": "The Key Schema",
                    "description": "An explanation about message.",
                    "default": ""
                }
            }
        },
    requeue_on_validation_fail=False # invalid messages are removed without requeue
)

You can still validate messages before produce them or when you want:

from barterdude.message import MessageValidation

validator = MessageValidation(json_schema)
validator.validate(message)

Data Protection

Barterdude takes in account GDPR data protection and by default doesn't log message body, but you can deactivate with environment variable BARTERDUDE_LOG_REDACTED=0

Now messages body will be logged by Logging hook.

This configuration just controls BarterDude's default Logging Hook and doesn't have effect on user custom user log. If you want to control your log with this configuration use:

from baterdude.conf import BARTERDUDE_LOG_REDACTED

Testing

To test async consumers we recommend asynctest lib

from asynctest import TestCase


class TestMain(TestCase):
    def test_should_pass(self):
        self.assertTrue(True)

We hope you enjoy! ๐Ÿ˜‰

Contribute

For development and contributing, please follow Contributing Guide and ALWAYS respect the Code of Conduct

barterdude's People

Contributors

dmvieira avatar gligneul avatar bluesdog164 avatar daltonmatos avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.