Coder Social home page Coder Social logo

firebolt's Introduction

firebolt Code Coverage Badge by Gopherbadger Build Status Go Report Card

firebolt logo

A golang framework for streaming event processing & data pipeline apps

Introduction

Firebolt has a simple model intended to make it easier to write reliable pipeline applications that process a stream of data.

It can be used to build systems such as:

  • logging/observability pipelines
  • streaming ETL
  • event processing pipelines

Every application's pipeline starts with a single source, the component that receives events from some external system. Sources must implement the node.Source interface.

We provide one built-in source:

  • kafkaconsumer - Events come from a Kafka topic, and are passed to the root nodes as []byte

The processing of your application is executed by its nodes which form a processing tree. Data - events - flow down this tree. A parent node passes results down to it's child nodes. Nodes may process events synchronously or asynchronously. A synchronous node type node.FanoutNode returns a slice of results for fanout or 'demultiplexing' use cases. Each node must implement the node.SyncNode, node.FanoutNode, or node.AsyncNode interfaces accordingly.

We provide two built-in node types:

  • kafkaproducer - Events are produced onto a kafka topic by an asynchronous producer.
  • elasticsearch - Events are bulk indexed into Elasticsearch.

Firebolt has both run and compile-time dependencies on librdkafka, see Developing

Example: Logging Pipeline

At DigitalOcean, our first use of Firebolt was in our logging pipeline. This pipeline consumes logs from just about every system we run. The diagram below depicts the source and nodes in this application.

This system uses the built-in kafkaconsumer source (in yellow) and kafkaproducer and elasticsearch nodes (in green). The blue nodes are custom to this application.

Logging Pipeline Node Diagram

What does Firebolt do for me?

Firebolt is intended to address a number of concerns that are common to near-realtime data pipeline applications, making it easy to run a clustered application that scales predictably to handle large data volume.

It is not an analytics tool - it does not provide an easy way to support 'wide operations' like record grouping, windowing, or sorting that require shuffling data within the cluster. Firebolt is for 'straight through' processing pipelines that are not sensitive to the order in which events are processed.

Some of the concerns Firebolt addresses include:

  • kafka sources Minimal configuration and no code required to consume from a Kafka topic, consumer lag metrics included
  • kafka sinks Same for producing to a Kafka topic
  • loose coupling Nodes in the pipeline are loosely coupled, making them easily testable and highly reusable
  • simple stream filtering Filter the stream by returning nil in your nodes
  • convenient error handling Send events that fail processing to a kafka topic for recovery or analysis with a few lines of config
  • outage recovery: offset management Configurable Kafka offset management during recovery lets you determine the maximum "catch up" to attempt after an outage, so you can quickly get back to realtime processing.
  • outage recovery: parallel recovery After an outage, process realtime data and "fill-in" the outage time window in parallel, with a rate limit on the recovery window.
  • monitorability Firebolt exposes Prometheus metrics to track the performance of your Source and all Nodes without writing code. Your nodes can expose their own custom internal metrics as needed.
  • leader election Firebolt uses Zookeeper to conduct leader elections, facilitating any processing that may need to be conducted on one-and-only-one instance.

Documentation

  1. Configuration The configuration file format

  2. Execution How Firebolt processes your data

  3. Registry Adding node types to the registry

  4. Sample Application Code Example code for running the Firebolt executor

  5. Sources Implementing and using sources

  6. Sync Nodes Implementing and using synchronous nodes

  7. Fanout Nodes Implementing and using fanout nodes

  8. Async Nodes Implementing and using asynchronous nodes

  9. Leader Election Starting leader election and accessing election results

  10. Messaging How to send and receive messages between the components of your system

  11. Metrics What metrics are exposed by default, and how to add custom metrics to your nodes

Built-In Types

  1. Kafka Producer Node for producing events onto a Kafka topic

  2. Elasticsearch Node for indexing documents to an Elasticsearch cluster

Developing

Firebolt depends on librdkafka v1.3.0 or later. To get started building a firebolt app (or working on firebolt itself), install it following the instructions here.

An example for debian-based distros:

sudo wget -qO - https://packages.confluent.io/deb/5.4/archive.key | sudo apt-key add -
sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.4 stable main"
sudo apt-get update
sudo apt-get install -y librdkafka1 librdkafka-dev

firebolt's People

Contributors

ahelium avatar cristovarghese avatar dependabot[bot] avatar ful09003 avatar fylie avatar gliptak avatar jnadler avatar kimmachinegun avatar taotetek avatar xiaoyier avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

firebolt's Issues

DATA RACE in node/elasticsearch/connectionfactory.go

Generate an app binary with race detector which uses the elasticsearch bulkindexer async node in the firebolt yaml config.
Run it and you should be able to see the following race condition. The fix seems to be pretty simple in node/elasticsearch/connectionfactory.go, we can just use the results from atomic.AddInt64() operation instead of referencing e.batchCount.

If this fix seems OK, I can submit a PR.


==================
WARNING: DATA RACE
Write at 0x00c000110070 by goroutine 93:
  sync/atomic.AddInt64()
      src/runtime/race_amd64.s:276 +0xb
  github.com/digitalocean/firebolt/node/elasticsearch.(*esBulkServiceFactory).BulkService()
      external/com_github_digitalocean_firebolt/node/elasticsearch/connectionfactory.go:58 +0x72
  github.com/digitalocean/firebolt/node/elasticsearch.(*ElasticIndexClient).doBulkIndex()
      external/com_github_digitalocean_firebolt/node/elasticsearch/elastic_index_client.go:164 +0x8f
  github.com/digitalocean/firebolt/node/elasticsearch.(*ElasticIndexClient).retryBulkIndex.func1()
      external/com_github_digitalocean_firebolt/node/elasticsearch/elastic_index_client.go:139 +0x141

Previous read at 0x00c000110070 by goroutine 92:
  [failed to restore the stack]

Goroutine 93 (running) created at:
  github.com/digitalocean/firebolt/node/elasticsearch.(*ElasticIndexClient).retryBulkIndex()
      external/com_github_digitalocean_firebolt/node/elasticsearch/elastic_index_client.go:133 +0x146
  github.com/digitalocean/firebolt/node/elasticsearch.(*ElasticIndexClient).batch()
      external/com_github_digitalocean_firebolt/node/elasticsearch/elastic_index_client.go:114 +0x310

Goroutine 92 (running) created at:
  github.com/digitalocean/firebolt/node/elasticsearch.(*ElasticIndexClient).retryBulkIndex()
      external/com_github_digitalocean_firebolt/node/elasticsearch/elastic_index_client.go:133 +0x146
  github.com/digitalocean/firebolt/node/elasticsearch.(*ElasticIndexClient).batch()
      external/com_github_digitalocean_firebolt/node/elasticsearch/elastic_index_client.go:114 +0x310
==================

User Override of Elasticsearch Bulk Index Time Histo Buckets

The histogram for bulk_process_time has buckets set to ExponentialBuckets(0.01, 3, 8). This isn't working that well for us, and is likely a problem for other users as well. Users can set their own bulk index timeout already, so it would be normal to want buckets that align with the selected timeout.

Increase the default resolution and offer the user a way to specify their own buckets in firebolt.yml config.

some examples like consume from rest api ingest to elasticearch

This git repo too cursed with documentation problem.no body knows how your system needs to be integrated and setup etl.
you need to give examples like how to consume rest api and push same data to elastic search.
Consume mysql data and ingest to other db or ellaseticsearch these common example end to end you should cover.

Multiple Pipelines in a Single Executor

Firebolt does not support multiple executors in a single go executable. Either fix this limitation, or directly support the configuration of multiple sources within a single executor. Today this can force some use cases to have multiple deployed applications with some duplicated functionality, which is super inconvenient.

Support For Node Returning Multiple Events

Today, a node may pass to its children a single event (or none). Add support for returning multiple events. This will support use cases like routing a log to multiple elasticsearch indexes, or generating multiple kafka messages based on a condition.

Problem: some misspellings in code

From go report card:

Misspell Finds commonly misspelled English words

firebolt/message/kafkamessagewire.go
Line 11: warning: "Acknowleged" is a misspelling of "Acknowledge" (misspell)
firebolt/node/kafkaconsumer/consumermetrics_test.go
Line 14: warning: "commited" is a misspelling of "committed" (misspell)
Line 14: warning: "commited" is a misspelling of "committed" (misspell)
Line 58: warning: "commited" is a misspelling of "committed" (misspell)
Line 58: warning: "commited" is a misspelling of "committed" (misspell)
Line 64: warning: "commited" is a misspelling of "committed" (misspell)
Line 64: warning: "commited" is a misspelling of "committed" (misspell)
Line 70: warning: "commited" is a misspelling of "committed" (misspell)
Line 70: warning: "commited" is a misspelling of "committed" (misspell)
Line 76: warning: "commited" is a misspelling of "committed" (misspell)
Line 76: warning: "commited" is a misspelling of "committed" (misspell)
Line 87: warning: "commited" is a misspelling of "committed" (misspell)
Line 87: warning: "commited" is a misspelling of "committed" (misspell)
Line 106: warning: "commited" is a misspelling of "committed" (misspell)
Line 106: warning: "commited" is a misspelling of "committed" (misspell)
Line 106: warning: "commited" is a misspelling of "committed" (misspell)
firebolt/leader/leader.go
Line 120: warning: "conection" is a misspelling of "connection" (misspell)
firebolt/message/kafkamessagesender.go
Line 58: warning: "Acknowleged" is a misspelling of "Acknowledge" (misspell)

Certain built-in metrics could improve conformity against Prometheus metric naming best practice

While working on #45, I am noticing that Elasticsearch metrics (presumably others, but I have not checked) could improve their self-descriptiveness by following https://prometheus.io/docs/practices/naming/#metric-names, in particular:

...should have a suffix describing the unit, in plural form. Note that an accumulating count has total as a suffix, in addition to the unit if applicable.

As a concrete example, BulkProcessTime is observed as seconds, currently which suggests a metrics suffix like _seconds could be added.

Support for reading / writing kafka message headers

Hello,

Nice work on this project - it looks really cool!

My shop is looking at using this as the base for a golang stream processor. We have existing SPs which implement our message format / conventions, one of which is that we use kafka headers for message metadata (correlation_id, timestamp, ser/des details, etc).

I see that the provided KafkaConsumer passes on the received message's body to the next node, but does not pass headers.

Similarly, the provided KafkaProducer does not provide a way to assign/write headers (that I can see).

Do you have any guidance on the best way to accomplish this?

One option seems to be to copy KafkaConsumer and KafkaProducer and modify them to do what we want. This seems inelegant, since there would be a ton of functionality in those that we would not be touching, but would be accepting a maintenance burden for.

Or maybe (hopefully?) I've missed something and there is some header functionality already available?

Thanks,
Dan

node.Shutdown() doesnt seem to be called when pipeline decides to exit.

As per documentation node.Shutdown() provides an opportunity for the Node to clean up resources. However I think firebolt executor doesnt seem to be calling this function on exit after any os.signals

stopworkers seem to be called only on a timeout.

	if waitTimeout(&e.wg, time.Duration(e.config.ShutdownTimeOut)*time.Second) {
		// there was a timeout, resort to forcible shutdown
		for _, rootNode := range e.rootNodes {
			e.stopWorkers(rootNode)
		}
	}

stopworkers seem to be calling Shutdown explicitly. However doesnt node.ShutDown() be called everytime and not just on timeout ?

func (e *Executor) stopWorkers(node *node.Context) {
	for i := 0; i < node.Config.Workers; i++ {
		go func() {
			node.StopCh <- true
		}()
	}

	log.WithField("node_id", node.Config.ID).Info("executor: shutting down node")
	err := node.NodeProcessor.Shutdown()
	if err != nil {
		log.WithError(err).WithField("node_id", node.Config.ID).Error("executor: error during node shutdown")
	}

	for _, child := range node.Children {
		e.stopWorkers(child)
	}
}

Panic When Returning nil From An AsyncEvent

An internal team at DO found a panic in a custom async node. When the node called event.ReturnEvent(nil), the following panic results:

panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0xfd5c6a]
goroutine 264 [running]:
do/vendor/github.com/digitalocean/firebolt/node.(*Context).invokeProcessorAsync.func2(0x0)
/tmp/build/44cb8d9a/cthulhu/docode/src/do/vendor/github.com/digitalocean/firebolt/node/node.go:199 +0x1aa
do/teams/accts_productivity/ecmp-ingestor/internal/node.(*Deployment).ProcessAsync.func1(0xc00054c480, 0xc000b49600, 0xc000b6a4e0, 0xc000b6a4e0)
/tmp/build/44cb8d9a/cthulhu/docode/src/do/teams/accts_productivity/ecmp-ingestor/internal/node/deployment.go:66 +0x2bd
created by do/teams/accts_productivity/ecmp-ingestor/internal/node.(*Deployment).ProcessAsync
/tmp/build/44cb8d9a/cthulhu/docode/src/do/teams/accts_productivity/ecmp-ingestor/internal/node/deployment.go:46 +0xab
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0xfd5c6a]
goroutine 265 [running]:
do/vendor/github.com/digitalocean/firebolt/node.(*Context).invokeProcessorAsync.func2(0x0)
/tmp/build/44cb8d9a/cthulhu/docode/src/do/vendor/github.com/digitalocean/firebolt/node/node.go:199 +0x1aa
do/teams/accts_productivity/ecmp-ingestor/internal/node.(*Deployment).ProcessAsync.func1(0xc0006fefc0, 0xc000b49600, 0xc000b6b8e0, 0xc000b6b8e0)
/tmp/build/44cb8d9a/cthulhu/docode/src/do/teams/accts_productivity/ecmp-ingestor/internal/node/deployment.go:66 +0x2bd
created by do/teams/accts_productivity/ecmp-ingestor/internal/node.(*Deployment).ProcessAsync
/tmp/build/44cb8d9a/cthulhu/docode/src/do/teams/accts_productivity/ecmp-ingestor/internal/node/deployment.go:46 +0xab

This appears to be caused by a missing nil check here, this code should treat a nil event value as a filter event for consistency with how sync nodes work:

nc.handleResult(nil, event, result.Event)

App can't be be normaly sopped if source wait to reload

  1. Source can't connect to kafka\other resource - executor wait 10 sedonds (
    time.Sleep(10 * time.Second)
    )
  2. During this period of time, a shutdown message is received by app (
    func (e *Executor) Shutdown() (done chan struct{}) {
    )
  3. Executor try to shutdown source, but old instance (
    err := e.source.Shutdown()
    )
  4. Executor continues to execute an infinite loop ( )
  5. New source instance doesn't know anything about studown signal
  6. Go to step 4
    =>
    App does not respond to system shutdown signals

Direct Injection of Events

Some types of applications have a need to directly / programatically inject events (all events? some additional events?) to their Source for processing. This is possible today but it bypasses some of the metrics in executor.go and could really use a simple clean API.

Examples Make Getting Started Easier

An /examples directory with 2 simple, working examples would be the fastest way to get started for many users.

One example could be kafka topic -> simple transform -> different topic

Another could be a simplified logging pipeline, kafka topic -> validation -> elasticsearch

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.