benthosdev / benthos Goto Github PK
View Code? Open in Web Editor NEWFancy stream processing made operationally mundane
Home Page: https://www.benthos.dev
License: MIT License
Fancy stream processing made operationally mundane
Home Page: https://www.benthos.dev
License: MIT License
hi @Jeffail , what are your thoughts on having date processing support (format, parse, get year, get month, get timestamp, subtract, add, etc) in bentos, be it function or processor? I think it will be helpful in processing time-series data.
First off, great work on this! I would like to utilize benthos with non-supported inputs and outputs, is this possible today? If not, is this something you would be open to supporting? Ideally, as long as user defined inputs and outputs satisfy the benthos input/output interfaces, this seems like it could be integrated fairly easily, and perhaps allow the community to build a nice ecosystem of benthos plugins.
The idea would be to create an output type that selects the first of a list of outputs where a condition passes. The config spec might look something like this:
type: switch
switch:
- output:
type: foo
foo: {}
condition:
type: foo
foo: {}
- output:
type: bar
bar: {}
condition:
type: bar
bar: {}
Add support for NATS as an input and as an output.
Hello,
I'm considering using Benthos for sending messages to remote tenants of a platform I'm developing. I'd like to offer public access to a RabbitMQ server, which the tenants can then consume the messages from using Benthos. However, in this scenario using a secure connection is critical in order to prevent the users from sending their RabbitMQ credentials over the public internet in plain text.
RabbitMQ supports TLS, and so does the amqp library you're using, so it shouldn't be too hard to add a TLS feature to the amqp inputs / outputs I think. I might give it a shot myself if you don't have time for it right now once I start trying out Benthos, but I think this is something other users might appreciate as well.
Thanks!
I'm sure its just that I am misunderstanding the configuration, but is there a simple example of using inproc for input & output?
What I am trying to achieve is a Kafka input that is then sent to a few different processor paths, where I can filter based off of different things and send to different outputs or calculate metrics.
It would be good to have a processor type for sampling data. It could allow both randomly and sequentially dropping a percentage of messages. This would allow us to sample a particular input or write a sample to an output.
A walk through document detailing the new processor would also be good.
Broker types are going to be merged into a single broker
input and output type, where the pattern will be specified as a field. There are a number of reasons for this change, mostly related to making documentation and config discovery simpler.
Pull request: #22
Wherever you were using the fan_in
type simply swap it with broker
:
type: fan_in
fan_in:
inputs:
To:
type: broker
broker:
inputs:
Wherever you were using fan_out
or round_robin
you do the same, but must also specify the pattern of the broker as there are multiple options:
type: fan_out
fan_out:
outputs:
To:
type: broker
broker:
pattern: fan_out
outputs:
I'm planning to combine the following JSON based processors:
set_json
select_json
delete_json
Into a single json
processor in order to simplify the list. I'm conscious of the fact that the list is constantly growing and there's no reason why other data structure formats can't also be supported. I want to avoid a situation where we potentially have select_json
, set_json
, select_xml
, set_xml
etc.
The new format would look something like this:
type: json
json:
parts: []
operator: set
path: foo.bar
value: baz
The potential values for operator
would be things like set
, append
, select
, delete
. In the case of select
and delete
the value
field would be ignored.
There will be a transition period where I'll support both.
EDIT: The transition period was significantly shorter than originally implied. Please let me know if this impacts you and I'll avoid this in future.
It might be nice to have a rate limit field in the resources
section of a Benthos config, where we might specify a maximum rate per interval for an arbitrary resource. This rate limit could then potentially be shared across inputs, processors or outputs that choose to use them.
For example, suppose we are using the http
processor to hit external services, and in order to get a high throughput we have a large number of processing pipelines in a config something like this:
input:
type: foo
buffer:
type: memory
pipeline:
threads: 40
processors:
- type: http
http:
max_parallel: 50
url: http://fooresource.bar/process
This rate is fairly predictable, we can see that 40 threads each with a max_parallel
of 50 would potentially create 2000 requests in parallel at any given time. We should be able to tune the parallelism, processor pipeline threads, and retries of the http
processor in order to prevent stressing out the resource.
However, imagine we had a more complex pipeline where hitting this http
resource is conditional on other factors such as the content of a message passing through. In this case it might be difficult to predict how often messages of a batch would be sent to this resource per interval per processing thread, and it's also possible that certain events may result in an odd distribution of these messages, therefore spiking the number of requests.
If our service is brittle and could potentially die as a result of being hit too often (as oppose to simply returning a 429) we might need some mechanism for rate limiting our requests in a way that is shared across pipeline processing threads and conditional processors of those pipelines. If we were able to specify a shared rate limit we could do something like this:
input:
type: foo
buffer:
type: memory
pipeline:
threads: 40
processors:
- type: http
http:
max_parallel: 50
rate_limit: foo_limiter
url: http://fooresource.bar/process
resources:
rate_limiters:
foo_limiter:
interval: 1s
capacity: 1000
This would create a shared rate limit across all the http
processors that specify the rate limiter foo_limiter
, where the requests are limited to 1000 per second. It would then be easy to cap the usage of a particular resource even when your processing pipelines are complex. These rate limits could potentially be used by any inputs
, outputs
, processors
, caches
, etc where it seems sensible.
This is a pretty cool lib. Kind of surprised there are not more issues.
I want to use NATS as my central control plan to talk to micro services but also to send messages to clients over websockets.
I can see that this lib can help me with decoupled pipelines and orchestration but am wondering if any thought using this with websockets has been attempted.
There is a similar but different project by IBM in golang and Js running inside a golang Js VM that has some relationship to this project too and do worth adding it here as maybe people are interested. It's more concerned with many database types and doing pipelines across them.
It's also decoupled at runtime because of the Js aspects.
https://github.com/compose/transporter
Anyway hope I can get some feedback on the websockets aspect :)
I am working on adding additonal operators to both the metadata
and jmespath
(which currently does not support operators) conditions. One of my use cases is to match against a long list of enumerations (to go along with the switch output). It currently feels awkward to force config for an enum
operator into the string arg
field. I'm proposing changing the way condition configuration is currently implemented to mirror the configuration structure of other benthos components (namely inputs, outputs, and processors). Below is an example to illustrate the proposal:
change this:
condition:
type: metadata
metadata:
operator: less_than
key: foo
part: 0
arg: "3"
to this:
condition:
type: metadata
metadata:
operator: less_than
key: foo
part: 0
less_than: 3
The difference may seem subtle, but is better illustrated with more complex (hypothetical) operators:
condition:
type: metadata
metadata:
operator: enum
key: foo
part: 0
enum:
- "a"
- "very, long"
- "list"
- "of"
- "values!"
or
condition:
type: jmespath
jmespath:
operator: json_schema
query: foo
part: 0
json_schema:
type: object
properties:
bar:
type: string
pattern: date-time
baz:
type: integer
minimum: 5
exclusiveMinimum: true
qux:
type: array
uniqueItems: true
items:
type: string
enum: ["quux","quz","corge"]
required:
- bar
- baz
We could maintain backwards compatibility by continuing to support the arg
field for existing operators. @Jeffail what are your thoughts?
Hi, thanks for such a great design btw.
I would love to use it in my scenario where i need to export and procesa logs to elasticsearch. But since benthos guarantees at-least-once delivery, i think i need a way to generate id based on the event processed so that it doesnt get duplicated in elasticsearch. Something like logstash fingerprint plugin.
Will this be supported?
First of all I would like to say thank you for making benthos ๐
I have configured benthos.yaml with http server as input and kafka as output and few processors in pipeline. All is fine while http client request is minimal.
When I increased to 20 parallel http client request, I could see http timeout on client side, even after increasing http server timeout to 20 seconds. But I found almost all request reached to kafka.
But when I increased http client parallel request to 50, I could see almost 50% data loss in kafka along with http timeout on client side.
In logs I didn't see any warnings/errors. I am running benthos on 8 CPU and I configured 8 threads for pipeline processor.
Interestingly when I change the output to file, I don't see any data loss even for 50 parallel http request.
My question is how can I isolate/debug the issue. problem could be in both the places
Appreciate your help !!! Thanks in advance ๐
Below is my benthos.yaml snippet.
type: kafka
kafka:
ack_replicas: false
addresses:
logger:
prefix: benthos
level: DEBUG
add_timestamp: true
json_format: true
Hi @Jeffail
I feel it would be pretty nice (even nicer than it's already is!) if Benthos would support various distributed tracing system just like it does for metrics!
The various tracing system could be abstracted by using OpenTracing's API, this includes Jaeger which I'm specifically interested in.
It seems that even Zipkin could be used according to this package.
Here's a quick list of what would needed:
Part.Metadata()
would make sense but maybe we want to prefix it or store it somwhere else to avoid collision when doing thisLet me know what you thing
Cheers ๐
In the same way that we can already specify TargetVersion
with the Kafka
input type, I'd like to be able to select a TargetVersion
with KafkaBalanced
. This is already supported by the sarama-cluster config (because it is based on the sarama Config struct) so we would just need to add (and then pass) this configuration option.
Hi @Jeffail, sorry for bothering you again with small boring thing like this.
i'm just wondering wether you could turn kafka output topic
config to support interpolation.
Currently, all of my docker container logs--which are stored in syslog--are exported to one kafka topic using filebeat. I would like to use benthos to parse the syslog's program name to extract container name and/or image and export that parsed message to each different kafka topic so that future stream processing on a specific container logs don't need to receive and filter message from unrelated container, thus saving bandwidth.
We are trying to connect our RabbitMQ server exchange but durable param is not compatible. It's great if we can support this configuration.
Thanks
I've added support for metadata fields within Benthos, that means three things:
metadata
processor can be used to modify the metadata of a message${!metadata:foo}
This means we can now start rolling out metadata support to each input and output. Inputs can be modified to add metadata. Outputs can be modified to both allow function interpolation in more fields and add all metadata fields to messages if there's an appropriate way to do so.
I'll list all inputs and outputs here that are candidates for supporting metadata, and I'll remove the ones that have been done.
Outputs:
Hi,
Thoughts on support for AMQP 1.0, or would that conflict with the current implementation? As far as i can tell they are basically different protocols.
There is a go lib for it: https://github.com/vcabbage/amqp
I can have a go at implementing it but might be above my level of go at the moment
Paul
Hello Jeff,
at fist I want to say that your Benthos is great tool and many thanks for that! :]
I would like to ask you if is possible to add support for protobuf in input http_server plugin?
What are your thoughts on extending (or replicating) the jmespath condition to fetch fields and expose them as metrics? Adding counters/gauges would allow you to scrape the stream from Prometheus and generate alerts and dashboards. We'd be most interested at doing this off of a KSQL table, but I imagine it could be useful from all sorts of inputs.
Currently there is a standard metric for output.send.success
, this is per message batch rather than per message, so it would be useful to complement this metric with something like output.parts.send.success
.
Is it possible to have some retry rules / logic for failed HTTP/HTTPS push attempts?
Would be good to have a standard implementation of a plugin to share, a good starting point is adapting #76 into one.
Edit: Here's an example of an isolated plugin project: https://github.com/Jeffail/benthos-plugin-example
I still need to create a build system that makes plugins built this way viable. However, you can copy/paste the implementation within main.go
and use that to build a plugin directly into a fork of Benthos, or a project that uses Benthos as a framework and that should work right now.
The metrics.Type interface is going to change. This has no impact on users of Benthos, only those using it as a framework or building their own components.
The change will involve adding optional labels to the GetCounter
, GetGauge
and GetTimer
calls, and will also likely include changing path ...string
for path string
. If you have any strong feelings about this change please post them here. I'll add more details as they are worked out.
In testing, I found that if you use AMQP (RabbitMQ) as an input, the messages will never get ACK'd in rabbitmq, which causes a backlog to build.
In the AMQP input file, you've turned off AutoACK, but never ack any messages as being delivered.
You also never set a QOS, which then defaults to 0 in the rabbitmq. This also could cause problems if multiple benthos are sharing the same queue, or performance problems in heavy queue traffic.
Add support for RabbitMQ as an input and as an output.
Hey @Jeffail ,
I promise I won't just keep making new issues (and please let me know if it is a hassle) ... but it would be really useful to have an XMPP component input. XMPP components are pretty much like a simpler form of XMPP pubsub where merely connecting from the component is considered a subscribe. anything sent to the component via XMPP e.g. [email protected] will be forwarded to the component.
I recently use this library to make my own bridge between XMPP and kafka. I would be happy to share my code but there is nothing much to it beyond the sample usage:
https://github.com/sheenobu/go-xco
if it was part of benthos I wouldn't need my own homebrew solution. and seems like others may find it useful.
There's a branch for introducing HTTP stream as an input that includes HTTP auth, it would be good to pick out the auth parts for all HTTP inputs/outputs to use.
Are there any plans for supporting writing to HDFS? I'd imagine it would be similar to the S3 writer. I'd be interested in contributing, if it seems like a reasonable idea.
Hi,
Thanks for open sourcing this, its pretty awesome. I couldn't see any scope for custom processors, is that a case of fork and add them? Im planning a encyrpt/decrypt processor, would they be of interest in the main project?
Paul
It would be nice to have a docker image created with ZMQ4 support.
I'm planning to eventually merge https://github.com/Jeffail/benthos/tree/feature/wip-refactor-pipelines which brings in a large overhaul of the underlying interfaces within Benthos pipeline components. If you are a user of these APIs and want to update to this latest version you will need to implement these new interfaces.
This change will bring in a major paradigm shift. Response channels are now propagated down pipelines along with the message payload, and are therefore owned by the source of the payload instead of the receiver.
This means downstream components can no longer indicate to upstream that they are closing (by closing their response channels), but this behaviour was brittle. The benefit is that upstream siblings can now make parallel claims to idle downstream siblings, i.e. M brokered inputs to N round_robin
outputs can have up to N in-flight outputs at any time.
This allows users to compensate for situations where the output is a hard bottleneck of their stream.
The respective interfaces for consumers and producers are simpler now, and can be found here: https://github.com/Jeffail/benthos/blob/master/lib/types/interface.go#L83
The interface for processors has also changed, where messages are no longer passed as pointers.
Now, the type of the index is hardcoded as "doc", could you provide a config option to allow customize this? Thanks!
I have noticed some slow Kafka inputs and it looks like offsetLastCommitted
isn't set in the kafka_balanced plugin.
It would be nice to have block diagrams to complement this document.
Hi,
I can see there is a redis pubsub input but I'd really like to be able to use redis list semantics (rpush -> blpop etc) which would offer some resilence if the benthos process were not alive when a message was queued.
that way my application can just write to the redis list (rpush) and benthos would poll the list and then deliver it to any of the output plugins (e.g. kafka). just like pubsub but with the greater resilence of the redis lists to protect from data loss.
What is your opinion?
Thanks
See #18
I am wondering whether the index
configuration support interpolation or not.
in the docs, i only see the id
fields as being interpolated. Index
interpolation is usefull in scenarios such as time based index
{
"data": {
"className": "foo",
"packageName": "bar"
}
}
In the above JSON I need to add one more attribute like below
{
"data": {
"className": "foo",
"packageName": "bar"
},
"startedBy": "user1"
}
What should I provide in 'path' ? Tried providing dot(.) to 'path' as mentioned below. It didn't work as expected.
processors:
We should have flag protected integration unit tests for all inputs/outputs that interact with third party services, which can be kicked off using docker-compose. Ideally they can then be run inside a CI suite.
My current integration test set up is manual and involves the following steps:
stdin -> benthos -> third party service -> benthos -> stdout
It would be useful to have a new check_field
condition that would work similarly to the process_field
processor: https://github.com/Jeffail/benthos/tree/master/docs/processors#process_field
The idea is that sometimes we want to resolve a condition on an extracted section of a message. For example, we might want to perform a text
based contains
operator on the JSON path foo.bar
of a message. This would currently require us to do something like copy the value of foo.bar
into a metadata field and then use a metadata condition instead.
What I'm proposing is we instead have a general check_field
condition that allows you to reduce the contents of a message before performing a child condition, the boolean result is then returned and the original contents are left unchanged. The config might look like this:
condition:
type: check_field
check_field:
parts: []
path: foo.bar
condition:
type: text
part: 0
operator: contains
arg: foobar
In this case parts
would be used similar to processors, where if it's left empty then the reduction is applied to all message parts. Like all other components this would mean users may still distinguish message parts of a batch by their index if they need to, but others can simply ignore those fields and they behave as expected.
I think this would be fairly straightforward, and useful for those in the unfortunate situation of having to use mutual tls authentication.
In addition to the configuration options and reading the client cert adding something like the following would do.
return &tls.Config{
Certificates: []tls.Certificate{cert},
InsecureSkipVerify: c.InsecureSkipVerify,
RootCAs: rootCAs,
}, nil
We have a project at the company I work where we connect to PgSQL using a logical replication slot to receive all data changes (insert, update, delete) for a configurable set of tables
and stream data directly from the database to streams (in our case kinesis). We do that using PgSQL WAL.
That's used for multiple reasons, faning-out to an ES instance from multiple sources, metrics/statistics collection, as well as data-warehousing e.g.: using kinesis firehose...
So, we have implemented similar things to benthos, in regards to acknowledgement and sinks. Yet, since our code is also written in go, moving forward, it would be easier for us to contribute to benthos, by adding support to PgSQL, rather than re-implementing lots of things you have already created.
It could be an interesting addition, since we could stream either events and data from the database.
I would like to hear the maintainers opinion on this, if it's something you're interested in I could work on a PR base.
It would be nice to have MQTT inputs and outputs. There's a golang library already: https://github.com/eclipse/paho.mqtt.golang
Hey Ash,
First off, this is lovely and so frickin' useful. Thanks for open-sourcing it.
I was curious - are there (immediate) plans for supporting Redis streams?
This face is very sad....
I'm changing the declaration behaviour of AMQP inputs and outputs. The old way was to force you to specify the declaration fields of the target queue and exchange regardless of whether they already exists or whether it's even possible to know. This is mostly redundant and can cause issues when Benthos doesn't expose all the fields necessary to match an existing exchange or queue. The only advantage was that Benthos created the exchange, queue and bound them for you.
I've changed the behaviour such that it is still possible to declare a queue, and bind it to an exchange, on the input type, but this is disabled by default and can be left blank. It is also possible to declare an exchange on the output type, but this is similarly only an optional step and can be left blank when the exchange exists.
This shouldn't break backwards compatibility since the only fields being moved would be a no-op in an existing pipeline.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.