Coder Social home page Coder Social logo

skafka's Introduction

Skafka

Build Status Coverage Status Codacy Badge Version License: MIT

Scala wrapper for kafka-clients v3.4.0

Motivation

Kafka provides an official Java client out of the box, which could be used from Scala code without any additional modifications.

The main disadvantage of using an official client directly is that it implies a very specific threading model to the application. I.e. the consumer is not thread safe and also expects a rebalance listener to do the operations in the same thread.

This makes wrapping a client with Cats Effect classes a bit more complicated than just calling IO { consumer.poll() } unless this is the only call, which is expected to be used.

Skafka does exactly that: a very thin wrapper over official Kafka client to provide a ready-made Cats Effect API and handle some corner cases concerning ConsumerRebalanceListener calls.

Comparing to more full-featured libraries such as FS2 Kafka, it might be a little bit more reliable, because there is little code/logic to hide the accidenital bugs in.

To summarize:

  1. If it suits your goals (i.e. you only ever need to do consumer.poll() without acting on rebalance etc.) then using an official Kafka client directly, optionally, wrapping all the calls with cats.effect.IO, is a totally fine idea.
  2. If more complicated integration to Cats Effect is required, i.e. ConsumerRebalanceListener is going to be used then consider using Skafka.
  3. If streaming with FS2 is required or any other features the library provides then FS2 Kafka could be a good choice. Note, that it is less trivial then Skafka and may contain more bugs on top of the official Kafka client.

Key features

  1. It provides null-less Scala apis for Producer & Consumer

  2. Makes it easy to use your effect monad with help of cats-effect

  3. Blocking calls are being executed on provided ExecutionContext.

  4. Simple case class based configuration

  5. Support of typesafe config

Producer usage example

val producer = Producer.of[IO](config, ecBlocking)
val metadata: IO[RecordMetadata] = producer.use { producer =>
  val record = ProducerRecord(topic = "topic", key = "key", value = "value") 
  producer.send(record).flatten 
}

Consumer usage example

val consumer = Consumer.of[IO, String, String](config, ecBlocking)
val records: IO[ConsumerRecords[String, String]] = consumer.use { consumer => 
  for {
    _       <- consumer.subscribe(Nel("topic"), None)
    records <- consumer.poll(100.millis)
  } yield records 
}

Setup

addSbtPlugin("com.evolution" % "sbt-artifactory-plugin" % "0.0.2")

libraryDependencies += "com.evolutiongaming" %% "skafka" % "15.0.0"

Notes

While Skafka provides an ability to use ConsumerRebalanceListener functionality, not all of the method calls are supported.

See the following PRs for more details: #150 #122

To our latest knowledge neither FS2 Kafka supports all of the methods / functionality.

skafka's People

Contributors

devilab avatar dfakhritdinov avatar grzegorz-bielski avatar johanmast avatar lavrov avatar mr-git avatar nikitapecasa avatar odbc avatar riblestrype avatar rtar avatar scala-steward avatar t3hnar avatar z1kkurat avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

skafka's Issues

add metrics for subscribe with RebalanceListener1

This is a follow-up task to #122.
Most probably we need to add similar metrics as for subscribe with RebalanceListener

def subscribe(topics: Nes[Topic], listener: Option[RebalanceListener[F]]) = {
  val listener1 = listener.map(rebalanceListener)
  for {
    _ <- count("subscribe", topics.toList)
    r <- self.subscribe(topics, listener1)
  } yield r
}

def subscribe(topics: Nes[Topic], listener: Option[RebalanceListener[F]]) = {

Review producer API

At the moment we are only able to produce records one at a time and are also forced to flatten to "block" waiting for the broker ack if we do not want to take advantage of the asynchronous behaviour introduced by the exposed produce method.

It would be nice to change API to something like:

  • produce(..) :F[Metadata]

  • produceAsync(...) : F[F[Metadata]]

  • produceChunk() : F[List[Metadata]]

  • produceChunkAsync() : F[F[List[Metadata]]]

This is super pseudo code but I think the intent is here. WDYT ?

Remove Kafka SASL configuration warnings

SASL configuration options are creating a lot warnings as by default they are not used by consumer or producer instances.
Config model should be updated to provide SASL settings only if SASL is enabled and omit otherwise.

ProducerConfig The configuration 'sasl.kerberos.kinit.cmd' was supplied but isn't a known config.
ProducerConfig The configuration 'sasl.kerberos.min.time.before.relogin' was supplied but isn't a known config.
ProducerConfig The configuration 'sasl.kerberos.ticket.renew.jitter' was supplied but isn't a known config.
ProducerConfig The configuration 'sasl.kerberos.ticket.renew.window.factor' was supplied but isn't a known config.
ProducerConfig The configuration 'sasl.login.refresh.buffer.seconds' was supplied but isn't a known config.
ProducerConfig The configuration 'sasl.login.refresh.min.period.seconds' was supplied but isn't a known config.
ProducerConfig The configuration 'sasl.login.refresh.window.factor' was supplied but isn't a known config.
ProducerConfig The configuration 'sasl.login.refresh.window.jitter' was supplied but isn't a known config.
ConsumerConfig The configuration 'sasl.kerberos.kinit.cmd' was supplied but isn't a known config.
ConsumerConfig The configuration 'sasl.kerberos.min.time.before.relogin' was supplied but isn't a known config.
ConsumerConfig The configuration 'sasl.kerberos.ticket.renew.jitter' was supplied but isn't a known config.
ConsumerConfig The configuration 'sasl.kerberos.ticket.renew.window.factor' was supplied but isn't a known config.
ConsumerConfig The configuration 'sasl.login.refresh.buffer.seconds' was supplied but isn't a known config.
ConsumerConfig The configuration 'sasl.login.refresh.min.period.seconds' was supplied but isn't a known config.
ConsumerConfig The configuration 'sasl.login.refresh.window.factor' was supplied but isn't a known config.
ConsumerConfig The configuration 'sasl.login.refresh.window.jitter' was supplied but isn't a known config.

change `PrometheusProducer` metric naming

Instead of providing full name in constructor, we could use single default metric name and provide producers' name as label.

Main reason - when one has many producers, it gets quite time-consuming to add them all to generate graphs. And it is impossible to use single-stat panel to show max latency per quantile, as Grafana interface doesn't allow to select max from several metrics.

consumer.offsetsForTimes - consider using Instant as query types instead of Offset

Currently it's defined as

def offsetsForTimes(timestampsToSearch: Map[TopicPartition, Offset]): F[Map[TopicPartition, Option[OffsetAndTimestamp]]]

while working on #122 we agreed with @t3hnar to have following method def for new RebalanceCallback API

def offsetsForTimes[F[_]](
    timestampsToSearch: Nem[TopicPartition, Instant]
  ): RebalanceCallback[F, Map[TopicPartition, Option[OffsetAndTimestamp]]]

The diff is

  • usage of Instant instead of Offset
  • usage of NonEmptyMap instead of regular scala Map (which can be empty)

The same can be applied to corresponding Consumer methods

  • offsetsForTimes
  • offsetsForTimes with timeout

in `1.1.0`, the metrics are not shared

When one creates more than one PrometheusProducer with default (or one global) prefix, then there is an exception on second such producer, like:

Exception in thread "main" java.lang.IllegalArgumentException: Collector already registered that provides name: skafka_producer_latency_count
	at io.prometheus.client.CollectorRegistry.register(CollectorRegistry.java:54)
	at io.prometheus.client.SimpleCollector$Builder.register(SimpleCollector.java:246)
	at com.evolutiongaming.skafka.producer.PrometheusProducer$.apply(PrometheusProducer.scala:27)

It would be better, if metrics would get initialized at PrometheusProducer level, not repeatedly for each newly created Producer.
As consequence - have to discard prefix argument - skafka_producer could be internally hardcoded value.

[Discussion] Derive ToTry timeout based on ConsumerConfig.maxPollInterval

This is a follow-up task to #122.

  • The issue is not critical as workaround exists and user can pass custom ToTry instance with any timeout duration
  • Most probably we would just close it without any attempt to fix, as it has too much corner cases, uncertainties, lack of execution cancelation guarantees, and different use cases for different users of the library.

So creating this task for history records, if we would need to refer to such idea later.

  • Such special (derived) ToTry timeout should be used only in rebalance listener
  • I don't expect from implementation to be trivial as we would need to track total time spent in all rebalance methods during single execution of a consumer.poll method, as well as taking into account all other aspects of kafka consumer rebalance protocol (as a starting point see the documentation for max.poll.interval.ms consumer config parameter)

Reasoning/idea

  • max.poll.interval.ms - roughly speaking if we spend too much time blocking in rebalance listener (recovering state for assigned partitions or saving current progress on partitions revoked) - then it would trigger another rebalance, which can have various undesirable effects for user's application (stale state can overwrite latest one if persistent storage does not support optimistic locking or single-writer guarantees)
  • ideally we want to complete rebalance within allowed time, so current consumer instance won't be kicked out of the consumer group
  • Assumption is that we can safely piggyback on max.poll.interval.ms and derive ToTry timeout so that we can cancel user action (which might not be desired/expected default behaviour by user, and some actions cannot be cancelled with any guarantees)
  • It can be used to give a clear hint to user that rebalance is taking too long and that max.poll.interval.ms might need adjustment

Definition of client.rack at CommonConfig level causes kafka ProducerConfig to complain

Since introduction of com.evolutiongaming.skafka.CommonConfig.clientRack property that is supplied to both Consumer and Producer config of Kafka client, there are following warnings printed to application log if there's a producer used:

[org.apache.kafka.clients.producer.ProducerConfig] - The configuration 'client.rack' was supplied but isn't a known config.

As per documentation the client.rack property is applicable to Consumer only.

Incompatible with kafka-clients 3.7.0 due to usage of NoOpConsumerRebalanceListener

Skafka is incompatible with kafka-clients 3.7.0, because it uses org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener, which has been removed.

This leads to runtime errors (NoClassDefFound) when older kafka-clients versions are evicted by version 3.7.0 or higher.

The usage of NoOpConsumerRebalanceListener should be replaced.

Support error handling in RebalanceCallback

This is a follow-up task to #122.

Currently errors from consumer related methods are thrown in a poll thread,
and there's no way to provide recovering code inside [[RebalanceCallback]].

For example:

def onPartitionsAssigned(partitions: Nes[TopicPartition]) = {
  for {
    state <- lift(restoreStateFor(partitions))
    _       <- state.offsets.traverse_(o => seek(o.partition, o.offset))
  } yield ()
}

if seek operation fails, exception is thrown in a poll thread, and user cannot provide recovering code while building a RebalanceCallback.

To fix it we need to implement MonadThrowable instance for RebalanceCallback

Circular dependency between skafka and smetrics

There's a circular dependency between skafka and smetrics. It's caused by usages of com.evolutiongaming.smetrics.MeasureDuration in Consumer, ConsumerLogging, ConsumerMetrics, ConsumerOf, RebalanceListener and the corresponding Producer-related classes (also com.evolutiongaming.smetrics.{CollectorRegistry, LabelNames, Quantiles} in ConsumerMetrics and ProducerMetrics).

There can be multiple ways to mitigate that:

  1. Move metrics-related code from skafka to smetrics. This means introducing a version of MeasureDuration local to skafka codebase and moving specific implementations of ConsumerMetrics and ProducerMetrics (using CollectorRegistry) to smetrics. This might be reasonable in the sense that a user can first depend on skafka (a Kafka client with no additional features) and later bring in a dependency to smetrics that contains additional metrics-related functionality, not directly related to the interaction with Kafka
  2. Move metrics-related code from smetrics to skafka. This means removing kafka submodule from smetrics and moving KafkaMetricsCollector to skafka (it's the single class in the module). It would somewhat contradict the concept of smetrics being an additional library, providing features on top of other libraries via plug-in modules (doobie, http4s, etc.); also, it would impose a dependency on smetrics in skafka which is already there at the moment.

Potential StackOverflowError in mapK of RebalanceCallback

@t3hnar says

I believe this may cause stackoverflow for huge enough data structure, however I don't think you should fix this.

def mapK[G[_]](fg: F ~> G): RebalanceCallback[G, A] = {
      self match {
        case Pure(a)          => Pure(a)
        case Bind(source, f)  => Bind(source.mapK(fg), f andThen (_.mapK(fg)))

Document ToTry timeout importance in README

This is need to give a clear warning/understanding about default timeout which would be used in case of RebalanceListener1 (implemented in #122 )

Improve documentation

It is not very clear on how to use each kind of Producers or what are the benefits of skafa project over default kafka-clients

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.