Coder Social home page Coder Social logo

trendyol / kafka-cronsumer Goto Github PK

View Code? Open in Web Editor NEW
78.0 6.0 13.0 755 KB

Cron based Kafka exception consumer with the power of auto retry & concurrency

Home Page: https://medium.com/trendyol-tech/kafka-konsumer-two-years-journey-3e00b46c9ea3

License: MIT License

Go 97.98% Makefile 2.02%
cron exception-handling go golang kafka

kafka-cronsumer's Introduction

Kafka C[r]onsumer Go Reference Go Report Card

Description

Kafka Cronsumer is mainly used for retry/exception strategy management. It works based on cron expression and consumes messages in a timely manner with the power of auto pause and concurrency.

For details check our blog post

If you need a whole consumer lifecycle with exception management, check Kafka Konsumer

How Kafka Cronsumer Works

How Kafka Cronsumer Works

When to use it?

  • Iteration-based back-off strategies are applicable
  • Messages could be processed in an eventually consistent state
  • Max retry exceeded messages could be ignored and send to dead letter topic
  • To increase consumer resiliency
  • To increase consumer performance with concurrency

When to avoid?

  • Messages should be processed in order
  • Messages should be certainly processed (we discard messages if max retry is exceeded)
  • Messages should be committed (we use auto-commit interval for increasing performance)
  • Messages with TTL (Time to Live)

Guide

Installation

go get github.com/Trendyol/kafka-cronsumer@latest

Examples

You can find a number of ready-to-run examples at this directory.

After running docker-compose up command, you can run any application you want. Don't forget its cron based :)

Single Consumer

func main() {
  // ...
  var consumeFn kafka.ConsumeFn = func (message kafka.Message) error {
    fmt.Printf("consumer > Message received: %s\n", string(message.Value))
    return nil
  }

  c := cronsumer.New(kafkaConfig, consumeFn)
  c.Run()
}

Single Consumer With Dead Letter

func main() {
  // ...
  var consumeFn kafka.ConsumeFn = func (message kafka.Message) error {
    fmt.Printf("consumer > Message received: %s\n", string(message.Value))
    return errors.New("error occurred")
  }

  c := cronsumer.New(kafkaConfig, consumeFn)
  c.Run()
}

Multiple Consumers

func main() {
  // ...
  var firstConsumerFn kafka.ConsumeFn = func (message kafka.Message) error {
    fmt.Printf("First consumer > Message received: %s\n", string(message.Value))
    return nil
  }
  first := cronsumer.New(firstCfg, firstConsumerFn)
  first.Start()

  var secondConsumerFn kafka.ConsumeFn = func (message kafka.Message) error {
    fmt.Printf("Second consumer > Message received: %s\n", string(message.Value))
    return nil
  }
  second := cronsumer.New(secondCfg, secondConsumerFn)
  second.Start()
  // ...    
}

Single Consumer With Metric collector

func main() {
  // ...
  var consumeFn kafka.ConsumeFn = func(message kafka.Message) error {
    return errors.New("err occurred")
  }
  
  c := cronsumer.New(config, consumeFn)
  StartAPI(*config, c.GetMetricCollectors()...)
  c.Start()
  // ...    
}

func StartAPI(cfg kafka.Config, metricCollectors ...prometheus.Collector) {
  // ...
  f := fiber.New(
    fiber.Config{},
  )
  
  metricMiddleware, err := NewMetricMiddleware(cfg, f, metricCollectors...)
  
  f.Use(metricMiddleware)
  // ...
}

Configurations

config description default example
logLevel Describes log level, valid options are debug, info, warn, and error info
metricPrefix MetricPrefix is used for prometheus fq name prefix. If not provided, default metric prefix value is kafka_cronsumer. Currently, there are two exposed prometheus metrics. retried_messages_total_current and discarded_messages_total_current. So, if default metric prefix used, metrics names are kafka_cronsumer_retried_messages_total_current and kafka_cronsumer_discarded_messages_total_current kafka_cronsumer
consumer.clientId see doc
consumer.cron Cron expression when exception consumer starts to work at */1 * * * *
consumer.backOffStrategy Define consumer backoff strategy for retry topics fixed exponential, linear
consumer.duration Work duration exception consumer actively consuming messages 20s, 15m, 1h
consumer.topic Exception topic names exception-topic
consumer.groupId Exception consumer group id exception-consumer-group
consumer.maxRetry Maximum retry value for attempting to retry a message 3
consumer.concurrency Number of goroutines used at listeners 1
consumer.minBytes see doc 1
consumer.maxBytes see doc 1 MB
consumer.maxWait see doc 10s
consumer.commitInterval see doc 1s
consumer.heartbeatInterval see doc 3s
consumer.sessionTimeout see doc 30s
consumer.rebalanceTimeout see doc 30s
consumer.startOffset see doc earliest
consumer.retentionTime see doc 24h
consumer.skipMessageByHeaderFn Function to filter messages based on headers, return true if you want to skip the message nil
producer.clientId see doc
producer.batchSize see doc 100
producer.batchTimeout see doc 1s
producer.balancer see doc leastBytes
sasl.enabled It enables sasl authentication mechanism false
sasl.authType Currently we only support SCRAM ""
sasl.username SCRAM username ""
sasl.password SCRAM password ""
sasl.rootCAPath see doc ""
sasl.intermediateCAPath ""
sasl.rack see doc ""

Exposed Metrics

Metric Name Description Value Type
kafka_cronsumer_retried_messages_total Total number of retried messages. Counter
kafka_cronsumer_discarded_messages_total Total number of discarded messages. Counter

Contribute

Use issues for everything

  • For a small change, just send a PR.
  • For bigger changes open an issue for discussion before sending a PR.
  • PR should have:
    • Test case
    • Documentation
    • Example (If it makes sense)
  • You can also contribute by:
    • Reporting issues
    • Suggesting new features or enhancements
    • Improve/fix documentation

Please adhere to this project's code of conduct.

Maintainers

Code of Conduct

Contributor Code of Conduct. By participating in this project you agree to abide by its terms.

Libraries Used For This Project

Additional References

kafka-cronsumer's People

Contributors

abdulsametileri avatar ademekici avatar anilmisirlioglu avatar dependabot[bot] avatar dilaragorum avatar emrekosen avatar emreodabas avatar gunerhuseyin avatar henesgokdag avatar mehmetcank48 avatar nihatalim avatar oguzyildirim avatar ugurcanerdogan avatar yunusuzug avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

kafka-cronsumer's Issues

Avoid calling NewAPI for multiple times.

Right now if we have more than one consumer and apiEnabled is true, for every consumer we are creating new api. Instead we can create the api once and use this API for all consumers.

Expose kafkaProducer

Users want to use kafkaProducer in order to publish their messages to an exception topic for the first time.

Instead of they create Kafka producer implementation, we can expose our kafka producer and present them for use.

Client Id

We could add client id configuration to the producer.

producerConf := kafka.WriterConfig{
ClientID: "your-client-id",
...
}

Goroutine leak problem

We realized that at every iteration we created new goroutines without releasing previous ones, we can re-use the same goroutines or close and create new ones.

Consume message when header has expected value

On the consumer side of Kafka topics, we want to ignore some messages or process only some messages depending on the message type. Mostly, message producers send their events with some details on the message header. So, could the cronsumer project gives us a feature about ignoring/accepting messages for a given header key/values.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.