Coder Social home page Coder Social logo

benthosdev / benthos Goto Github PK

View Code? Open in Web Editor NEW
7.6K 107.0 732.0 28.94 MB

Fancy stream processing made operationally mundane

Home Page: https://www.benthos.dev

License: MIT License

Go 98.09% Makefile 0.06% Dockerfile 0.02% HCL 0.01% Shell 0.12% JavaScript 0.40% CSS 0.31% HTML 0.12% Lua 0.03% CUE 0.01% TypeScript 0.77% Rust 0.06%
message-queue stream-processing streaming-data message-bus logs stream-processor cqrs event-sourcing go golang

benthos's People

Contributors

adamwasila avatar aidan- avatar albinou avatar alexthecarp avatar calmera avatar chrisriley avatar cludden avatar codegangsta avatar codingconcepts avatar crtomirmajer avatar dependabot[bot] avatar disintegrator avatar eduardodbr avatar ekeric13 avatar filippog avatar ghballiet avatar jeffail avatar lucasoares avatar mfamador avatar mihaitodor avatar nenad avatar nibbleshift avatar nmbryant avatar nwest1 avatar ollystephens avatar oscil8 avatar pcha avatar peczenyj avatar razor-x avatar stephanebruckert avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

benthos's Issues

Date Processing

hi @Jeffail , what are your thoughts on having date processing support (format, parse, get year, get month, get timestamp, subtract, add, etc) in bentos, be it function or processor? I think it will be helpful in processing time-series data.

Custom Inputs/Outputs

First off, great work on this! I would like to utilize benthos with non-supported inputs and outputs, is this possible today? If not, is this something you would be open to supporting? Ideally, as long as user defined inputs and outputs satisfy the benthos input/output interfaces, this seems like it could be integrated fairly easily, and perhaps allow the community to build a nice ecosystem of benthos plugins.

Switch output type

The idea would be to create an output type that selects the first of a list of outputs where a condition passes. The config spec might look something like this:

type: switch
switch:
- output:
    type: foo
    foo: {}
  condition:
    type: foo
    foo: {}
- output:
    type: bar
    bar: {}
  condition:
    type: bar
    bar: {}

AMQP TLS support

Hello,

I'm considering using Benthos for sending messages to remote tenants of a platform I'm developing. I'd like to offer public access to a RabbitMQ server, which the tenants can then consume the messages from using Benthos. However, in this scenario using a secure connection is critical in order to prevent the users from sending their RabbitMQ credentials over the public internet in plain text.

RabbitMQ supports TLS, and so does the amqp library you're using, so it shouldn't be too hard to add a TLS feature to the amqp inputs / outputs I think. I might give it a shot myself if you don't have time for it right now once I start trying out Benthos, but I think this is something other users might appreciate as well.

Thanks!

Inproc example?

I'm sure its just that I am misunderstanding the configuration, but is there a simple example of using inproc for input & output?

What I am trying to achieve is a Kafka input that is then sent to a few different processor paths, where I can filter based off of different things and send to different outputs or calculate metrics.

Add sampling processor

It would be good to have a processor type for sampling data. It could allow both randomly and sequentially dropping a percentage of messages. This would allow us to sample a particular input or write a sample to an output.

A walk through document detailing the new processor would also be good.

PSA: Broker types (fan_in, fan_out, round_robin) are changing.

Broker types are going to be merged into a single broker input and output type, where the pattern will be specified as a field. There are a number of reasons for this change, mostly related to making documentation and config discovery simpler.

Pull request: #22

Updating your configs

Wherever you were using the fan_in type simply swap it with broker:

type: fan_in
fan_in:
  inputs:

To:

type: broker
broker:
  inputs:

Wherever you were using fan_out or round_robin you do the same, but must also specify the pattern of the broker as there are multiple options:

type: fan_out
fan_out:
  outputs:

To:

type: broker
broker:
  pattern: fan_out
  outputs:

Combine JSON processors into `json`

I'm planning to combine the following JSON based processors:

  • set_json
  • select_json
  • delete_json

Into a single json processor in order to simplify the list. I'm conscious of the fact that the list is constantly growing and there's no reason why other data structure formats can't also be supported. I want to avoid a situation where we potentially have select_json, set_json, select_xml, set_xml etc.

The new format would look something like this:

type: json
json:
  parts: []
  operator: set
  path: foo.bar
  value: baz

The potential values for operator would be things like set, append, select, delete. In the case of select and delete the value field would be ignored.

There will be a transition period where I'll support both.

EDIT: The transition period was significantly shorter than originally implied. Please let me know if this impacts you and I'll avoid this in future.

Idea: Rate limit resource

It might be nice to have a rate limit field in the resources section of a Benthos config, where we might specify a maximum rate per interval for an arbitrary resource. This rate limit could then potentially be shared across inputs, processors or outputs that choose to use them.

For example, suppose we are using the http processor to hit external services, and in order to get a high throughput we have a large number of processing pipelines in a config something like this:

input:
  type: foo
buffer:
  type: memory
pipeline:
  threads: 40
  processors:
  - type: http
    http:
      max_parallel: 50
      url: http://fooresource.bar/process

This rate is fairly predictable, we can see that 40 threads each with a max_parallel of 50 would potentially create 2000 requests in parallel at any given time. We should be able to tune the parallelism, processor pipeline threads, and retries of the http processor in order to prevent stressing out the resource.

However, imagine we had a more complex pipeline where hitting this http resource is conditional on other factors such as the content of a message passing through. In this case it might be difficult to predict how often messages of a batch would be sent to this resource per interval per processing thread, and it's also possible that certain events may result in an odd distribution of these messages, therefore spiking the number of requests.

If our service is brittle and could potentially die as a result of being hit too often (as oppose to simply returning a 429) we might need some mechanism for rate limiting our requests in a way that is shared across pipeline processing threads and conditional processors of those pipelines. If we were able to specify a shared rate limit we could do something like this:

input:
  type: foo
buffer:
  type: memory
pipeline:
  threads: 40
  processors:
  - type: http
    http:
      max_parallel: 50
      rate_limit: foo_limiter
      url: http://fooresource.bar/process
resources:
  rate_limiters:
    foo_limiter:
      interval: 1s
      capacity: 1000

This would create a shared rate limit across all the http processors that specify the rate limiter foo_limiter, where the requests are limited to 1000 per second. It would then be easy to cap the usage of a particular resource even when your processing pipelines are complex. These rate limits could potentially be used by any inputs, outputs, processors, caches, etc where it seems sensible.

Websockets

This is a pretty cool lib. Kind of surprised there are not more issues.

I want to use NATS as my central control plan to talk to micro services but also to send messages to clients over websockets.

I can see that this lib can help me with decoupled pipelines and orchestration but am wondering if any thought using this with websockets has been attempted.

There is a similar but different project by IBM in golang and Js running inside a golang Js VM that has some relationship to this project too and do worth adding it here as maybe people are interested. It's more concerned with many database types and doing pipelines across them.
It's also decoupled at runtime because of the Js aspects.

https://github.com/compose/transporter

Anyway hope I can get some feedback on the websockets aspect :)

proposal: refactor conditions to support operator configuration

I am working on adding additonal operators to both the metadata and jmespath (which currently does not support operators) conditions. One of my use cases is to match against a long list of enumerations (to go along with the switch output). It currently feels awkward to force config for an enum operator into the string arg field. I'm proposing changing the way condition configuration is currently implemented to mirror the configuration structure of other benthos components (namely inputs, outputs, and processors). Below is an example to illustrate the proposal:

change this:

condition:
  type: metadata
  metadata:
    operator: less_than
    key: foo
    part: 0
    arg: "3"

to this:

condition:
  type: metadata
  metadata:
    operator: less_than
    key: foo
    part: 0
    less_than: 3

The difference may seem subtle, but is better illustrated with more complex (hypothetical) operators:

condition:
  type: metadata
  metadata:
    operator: enum
    key: foo
    part: 0
    enum:
      - "a"
      - "very, long"
      - "list"
      - "of"
      - "values!"

or

condition:
  type: jmespath
  jmespath:
    operator: json_schema
    query: foo
    part: 0
    json_schema:
      type: object
      properties:
        bar:
          type: string
          pattern: date-time
        baz:
          type: integer
          minimum: 5
          exclusiveMinimum: true
        qux:
          type: array
          uniqueItems: true
          items:
            type: string
            enum: ["quux","quz","corge"]
      required:
        - bar
        - baz

We could maintain backwards compatibility by continuing to support the arg field for existing operators. @Jeffail what are your thoughts?

Fingerprint processor

Hi, thanks for such a great design btw.

I would love to use it in my scenario where i need to export and procesa logs to elasticsearch. But since benthos guarantees at-least-once delivery, i think i need a way to generate id based on the event processed so that it doesnt get duplicated in elasticsearch. Something like logstash fingerprint plugin.

Will this be supported?

http server timing out

First of all I would like to say thank you for making benthos ๐Ÿ‘

I have configured benthos.yaml with http server as input and kafka as output and few processors in pipeline. All is fine while http client request is minimal.

When I increased to 20 parallel http client request, I could see http timeout on client side, even after increasing http server timeout to 20 seconds. But I found almost all request reached to kafka.

But when I increased http client parallel request to 50, I could see almost 50% data loss in kafka along with http timeout on client side.

In logs I didn't see any warnings/errors. I am running benthos on 8 CPU and I configured 8 threads for pipeline processor.

Interestingly when I change the output to file, I don't see any data loss even for 50 parallel http request.

My question is how can I isolate/debug the issue. problem could be in both the places

  1. kafka client (i.e. benthos output)
  2. kafka server side

Appreciate your help !!! Thanks in advance ๐Ÿ‘

Below is my benthos.yaml snippet.

type: kafka
kafka:
ack_replicas: false
addresses:

  • xxxxx:9092
    client_id: benthos_kafka_output
    compression: none
    key: ""
    max_msg_bytes: 1e+06
    round_robin_partitions: false
    target_version: 1.0.0
    timeout_ms: 20000
    tls:
    cas_file: ""
    enabled: false
    skip_cert_verify: false
    topic: xxxxxxx

logger:
prefix: benthos
level: DEBUG
add_timestamp: true
json_format: true

Support for distributed tracing

Hi @Jeffail

I feel it would be pretty nice (even nicer than it's already is!) if Benthos would support various distributed tracing system just like it does for metrics!

The various tracing system could be abstracted by using OpenTracing's API, this includes Jaeger which I'm specifically interested in.
It seems that even Zipkin could be used according to this package.

Here's a quick list of what would needed:

  • Start a new span pretty much everywhere a counter is incremented
  • Pass the current span around (from the input to the various processors all the way up to the output)
    • Part.Metadata() would make sense but maybe we want to prefix it or store it somwhere else to avoid collision when doing this
  • Extract the parent span from each input and inject the current span in each output

Let me know what you thing
Cheers ๐Ÿ‘

Specify TargetVersion with KafkaBalanced input type

In the same way that we can already specify TargetVersion with the Kafka input type, I'd like to be able to select a TargetVersion with KafkaBalanced. This is already supported by the sarama-cluster config (because it is based on the sarama Config struct) so we would just need to add (and then pass) this configuration option.

Kafka output topic interpolation

Hi @Jeffail, sorry for bothering you again with small boring thing like this.

i'm just wondering wether you could turn kafka output topic config to support interpolation.

Currently, all of my docker container logs--which are stored in syslog--are exported to one kafka topic using filebeat. I would like to use benthos to parse the syslog's program name to extract container name and/or image and export that parsed message to each different kafka topic so that future stream processing on a specific container logs don't need to receive and filter message from unrelated container, thus saving bandwidth.

Metadata fields

I've added support for metadata fields within Benthos, that means three things:

  • Inputs can add arbitrary metadata to messages
  • The metadata processor can be used to modify the metadata of a message
  • Function interpolation can be used to access metadata fields with ${!metadata:foo}

This means we can now start rolling out metadata support to each input and output. Inputs can be modified to add metadata. Outputs can be modified to both allow function interpolation in more fields and add all metadata fields to messages if there's an appropriate way to do so.

I'll list all inputs and outputs here that are candidates for supporting metadata, and I'll remove the ones that have been done.

Outputs:

  • amazon_s3
  • amazon_sqs
  • elasticsearch
  • http_client
  • http_server
  • kafka
  • mqtt
  • nats
  • nats_stream
  • nsq
  • websocket

Support for AMQP 1.0

Hi,

Thoughts on support for AMQP 1.0, or would that conflict with the current implementation? As far as i can tell they are basically different protocols.

There is a go lib for it: https://github.com/vcabbage/amqp

I can have a go at implementing it but might be above my level of go at the moment

Paul

Add new metric processor

What are your thoughts on extending (or replicating) the jmespath condition to fetch fields and expose them as metrics? Adding counters/gauges would allow you to scrape the stream from Prometheus and generate alerts and dashboards. We'd be most interested at doing this off of a KSQL table, but I imagine it could be useful from all sorts of inputs.

Output metrics per message of a batch sent

Currently there is a standard metric for output.send.success, this is per message batch rather than per message, so it would be useful to complement this metric with something like output.parts.send.success.

Create plugin example

Would be good to have a standard implementation of a plugin to share, a good starting point is adapting #76 into one.

Edit: Here's an example of an isolated plugin project: https://github.com/Jeffail/benthos-plugin-example
I still need to create a build system that makes plugins built this way viable. However, you can copy/paste the implementation within main.go and use that to build a plugin directly into a fork of Benthos, or a project that uses Benthos as a framework and that should work right now.

PSA: Incoming changes to metrics.Type

The metrics.Type interface is going to change. This has no impact on users of Benthos, only those using it as a framework or building their own components.

The change will involve adding optional labels to the GetCounter, GetGauge and GetTimer calls, and will also likely include changing path ...string for path string. If you have any strong feelings about this change please post them here. I'll add more details as they are worked out.

AMQP as input nevers ack it's messages..

In testing, I found that if you use AMQP (RabbitMQ) as an input, the messages will never get ACK'd in rabbitmq, which causes a backlog to build.

In the AMQP input file, you've turned off AutoACK, but never ack any messages as being delivered.

You also never set a QOS, which then defaults to 0 in the rabbitmq. This also could cause problems if multiple benthos are sharing the same queue, or performance problems in heavy queue traffic.

XMPP component input

Hey @Jeffail ,

I promise I won't just keep making new issues (and please let me know if it is a hassle) ... but it would be really useful to have an XMPP component input. XMPP components are pretty much like a simpler form of XMPP pubsub where merely connecting from the component is considered a subscribe. anything sent to the component via XMPP e.g. [email protected] will be forwarded to the component.

I recently use this library to make my own bridge between XMPP and kafka. I would be happy to share my code but there is nothing much to it beyond the sample usage:

https://github.com/sheenobu/go-xco

if it was part of benthos I wouldn't need my own homebrew solution. and seems like others may find it useful.

Add basic auth to HTTP inputs/outputs

There's a branch for introducing HTTP stream as an input that includes HTTP auth, it would be good to pick out the auth parts for all HTTP inputs/outputs to use.

Support for HDFS

Are there any plans for supporting writing to HDFS? I'd imagine it would be similar to the S3 writer. I'd be interested in contributing, if it seems like a reasonable idea.

Add new encrypt/decrypt processors

Hi,

Thanks for open sourcing this, its pretty awesome. I couldn't see any scope for custom processors, is that a case of fork and add them? Im planning a encyrpt/decrypt processor, would they be of interest in the main project?

Paul

PSA: Go API Changes

I'm planning to eventually merge https://github.com/Jeffail/benthos/tree/feature/wip-refactor-pipelines which brings in a large overhaul of the underlying interfaces within Benthos pipeline components. If you are a user of these APIs and want to update to this latest version you will need to implement these new interfaces.

Justification

This change will bring in a major paradigm shift. Response channels are now propagated down pipelines along with the message payload, and are therefore owned by the source of the payload instead of the receiver.

This means downstream components can no longer indicate to upstream that they are closing (by closing their response channels), but this behaviour was brittle. The benefit is that upstream siblings can now make parallel claims to idle downstream siblings, i.e. M brokered inputs to N round_robin outputs can have up to N in-flight outputs at any time.

This allows users to compensate for situations where the output is a hard bottleneck of their stream.

Migration

The respective interfaces for consumers and producers are simpler now, and can be found here: https://github.com/Jeffail/benthos/blob/master/lib/types/interface.go#L83

The interface for processors has also changed, where messages are no longer passed as pointers.

redis list as input?

Hi,

I can see there is a redis pubsub input but I'd really like to be able to use redis list semantics (rpush -> blpop etc) which would offer some resilence if the benthos process were not alive when a message was queued.

that way my application can just write to the redis list (rpush) and benthos would poll the list and then deliver it to any of the output plugins (e.g. kafka). just like pubsub but with the greater resilence of the redis lists to protect from data loss.

What is your opinion?

Thanks

Elasticsearch interpolate index

I am wondering whether the index configuration support interpolation or not.
in the docs, i only see the id fields as being interpolated. Index interpolation is usefull in scenarios such as time based index

Facing issue in json set processor

{
"data": {
"className": "foo",
"packageName": "bar"
}
}

In the above JSON I need to add one more attribute like below

{
"data": {
"className": "foo",
"packageName": "bar"
},
"startedBy": "user1"
}

What should I provide in 'path' ? Tried providing dot(.) to 'path' as mentioned below. It didn't work as expected.

processors:

  • type: json
    json:
    operator: set
    parts: []
    path: "."
    value:
    startedBy: user1
    threads: 1

Add integration unit tests for all inputs/outputs

We should have flag protected integration unit tests for all inputs/outputs that interact with third party services, which can be kicked off using docker-compose. Ideally they can then be run inside a CI suite.

My current integration test set up is manual and involves the following steps:

  • Run benthos instances with both input and output to third party service like this: stdin -> benthos -> third party service -> benthos -> stdout
  • Watch as they fail to connect (as service is not yet running)
  • Run third party service as docker container
  • Observe benthos instances successfully connect
  • Send messages from one benthos to the other
  • Shut down third party service
  • Queue more messages in the input benthos
  • Start service
  • Ensure messages begin flowing again without loss

New `check_field` condition type

It would be useful to have a new check_field condition that would work similarly to the process_field processor: https://github.com/Jeffail/benthos/tree/master/docs/processors#process_field

The idea is that sometimes we want to resolve a condition on an extracted section of a message. For example, we might want to perform a text based contains operator on the JSON path foo.bar of a message. This would currently require us to do something like copy the value of foo.bar into a metadata field and then use a metadata condition instead.

What I'm proposing is we instead have a general check_field condition that allows you to reduce the contents of a message before performing a child condition, the boolean result is then returned and the original contents are left unchanged. The config might look like this:

condition:
  type: check_field
  check_field:
    parts: []
    path: foo.bar
    condition:
      type: text
      part: 0
      operator: contains
      arg: foobar

In this case parts would be used similar to processors, where if it's left empty then the reduction is applied to all message parts. Like all other components this would mean users may still distinguish message parts of a batch by their index if they need to, but others can simply ignore those fields and they behave as expected.

Support for TLS client auth for kafka

I think this would be fairly straightforward, and useful for those in the unfortunate situation of having to use mutual tls authentication.

In addition to the configuration options and reading the client cert adding something like the following would do.

return &tls.Config{
    Certificates: []tls.Certificate{cert},
    InsecureSkipVerify: c.InsecureSkipVerify,
    RootCAs:            rootCAs,
}, nil

SQL Change Data Capture

We have a project at the company I work where we connect to PgSQL using a logical replication slot to receive all data changes (insert, update, delete) for a configurable set of tables
and stream data directly from the database to streams (in our case kinesis). We do that using PgSQL WAL.

That's used for multiple reasons, faning-out to an ES instance from multiple sources, metrics/statistics collection, as well as data-warehousing e.g.: using kinesis firehose...

So, we have implemented similar things to benthos, in regards to acknowledgement and sinks. Yet, since our code is also written in go, moving forward, it would be easier for us to contribute to benthos, by adding support to PgSQL, rather than re-implementing lots of things you have already created.

It could be an interesting addition, since we could stream either events and data from the database.

I would like to hear the maintainers opinion on this, if it's something you're interested in I could work on a PR base.

PSA: AMQP Declarations Changes

I'm changing the declaration behaviour of AMQP inputs and outputs. The old way was to force you to specify the declaration fields of the target queue and exchange regardless of whether they already exists or whether it's even possible to know. This is mostly redundant and can cause issues when Benthos doesn't expose all the fields necessary to match an existing exchange or queue. The only advantage was that Benthos created the exchange, queue and bound them for you.

I've changed the behaviour such that it is still possible to declare a queue, and bind it to an exchange, on the input type, but this is disabled by default and can be left blank. It is also possible to declare an exchange on the output type, but this is similarly only an optional step and can be left blank when the exchange exists.

This shouldn't break backwards compatibility since the only fields being moved would be a no-op in an existing pipeline.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.