Coder Social home page Coder Social logo

kasper's Introduction

kasper

GoDoc Build Status Go Report Card Coverage Status

This project is currently in Beta. The API is ~95% stable so you can expect only minor breaking changes.

For an introduction to Kasper and the motivation behind it, you can read our introductory blog post.

Kasper is a lightweight library for processing Kafka topics. It is heavily inspired by Apache Samza (See http://samza.apache.org). Kasper processes Kafka messages in small batches and is designed to work with centralized key-value stores such as Redis, Cassandra or Elasticsearch for maintaining state during processing. Kasper is a good fit for high-throughput applications (> 10k messages per second) that can tolerate a moderate amount of processing latency (~1000ms). Please note that Kasper is designed for idempotent processing of at-least-once semantics streams. If you require exactly-once semantics or need to perform non-idempotent operations, Kasper is likely not a good choice.

Step 1 - Create a sarama Client

Kasper uses Shopify's excellent sarama library (see https://github.com/Shopify/sarama) for consuming and producing messages to Kafka. All Kasper application must begin with instantiating a sarama Client. Choose the parameters in sarama.Config carefully; the performance, reliability, and correctness of your application are all highly sensitive to these settings. We recommend setting sarama.Config.Producer.RequiredAcks to WaitForAll.

saramaConfig := sarama.NewConfig()
saramaConfig.Producer.RequiredAcks = sarama.WaitForAll
client, err := sarama.NewClient([]string{"kafka-broker.local:9092"}, saramaConfig)

Step 2 - create a Config

TopicProcessorName is used for logging, labeling metrics, and is used as a suffix to the Kafka consumer group. InputTopics and InputPartitions are the lists of topics and partitions to consume. Please note that Kasper currently does not support consuming topics with differing numbers of partitions. This limitation can be alleviated by manually adding an extra fan-out step in your processing pipeline to a new topic with the desired number of partitions.

config := &kasper.Config{
	TopicProcessorName:    "twitter-reach",
	Client:                client,
	InputTopics:           []string{"tweets", "twitter-followers"},
	InputPartitions:       []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
	BatchSize:             10000,
	BatchWaitDuration:     5 * time.Second,
	Logger:                kasper.NewJSONLogger("twitter-reach-0", false),
	MetricsProvider:       kasper.NewPrometheus("twitter-reach-0"),
	MetricsUpdateInterval: 60 * time.Second,
}

Kasper is instrumented with a number of useful metrics so we recommend setting MetricsProvider for production applications. Kasper includes an implementation for collecting metrics in Prometheus and adapting the interface to other tools should be easy.

Step 3 - Create a MessageProcessor per input partition

You need to create a map[int]MessageProcessor. The MessageProcessor instances can safely be shared across partitions. Each MessageProcessor must implement a single function:

func (*TweetProcessor) Process(messages []*sarama.ConsumerMessage, sender Sender) error {
	// process messages here
}

All messages for the input topics on the specified partition will be passed to the appropriate MessageProcessor instance. This is useful for implementing partition-wise joins of different topics. The Sender instance must be used to produce messages to output topics. Messages passed to Sender are not sent directly but are collected in an array instead. When Process returns, the messages are sent to Kafka and Kasper waits for the configured number of acks. When all messages have been successfully produced, Kasper updates the consumer offsets of the input partitions and resumes processing. If Process returns a non-nil error value, Kasper stops all processing.

Step 4 - Create a TopicProcessor

To start processing messages, call TopicProcessor.RunLoop(). Kasper does not spawn any goroutines and runs a single-threaded event loop instead. RunLoop() will block the current goroutine and will run forever until an error occurs or until Close() is called. For parallel processing, run multiple TopicProcessor instances in different goroutines or processes (the input partitions cannot overlap). You should set Config.TopicProcessorName to the same value on all instances in order to easily scale the processing up or down.

kasper's People

Contributors

henzenvandijk avatar nmaquet avatar ouchxp avatar therealplato avatar vemel avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.