Coder Social home page Coder Social logo

mkuthan / example-spark-kafka Goto Github PK

View Code? Open in Web Editor NEW
123.0 14.0 65.0 131 KB

Apache Spark and Apache Kafka integration example

Home Page: http://mkuthan.github.io/blog/2016/01/29/spark-kafka-integration2/

Scala 100.00%
spark spark-streaming kafka

example-spark-kafka's Introduction

Apache Spark and Apache Kafka integration example

Build Status Coverage Status

This example shows how to send processing results from Spark Streaming to Apache Kafka in reliable way. The example follows Spark convention for integration with external data sinks:

// import implicit conversions
import org.mkuthan.spark.KafkaDStreamSink._

// send dstream to Kafka
dstream.sendToKafka(kafkaProducerConfig, topic)

Features

  • KafkaDStreamSink for sending streaming results to Apache Kafka in reliable way.
  • Stream processing fail fast, if the results could not be sent to Apache Kafka.
  • Stream processing is blocked (back pressure), if the Kafka producer is too slow.
  • Stream processing results are flushed explicitly from Kafka producer internal buffer.
  • Kafka producer is shared by all tasks on single JVM (see KafkaProducerFactory).
  • Kafka producer is properly closed when Spark executor is shutdown (see KafkaProducerFactory).
  • Twitter Bijection is used for encoding/decoding KafkaPayload from/into String or Avro.

Quickstart guide

Download latest Apache Kafka distribution and un-tar it.

Start ZooKeeper server:

./bin/zookeeper-server-start.sh config/zookeeper.properties

Start Kafka server:

./bin/kafka-server-start.sh config/server.properties

Create input topic:

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic input

Create output topic:

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic output

Start Kafka producer:

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic input

Start Kafka consumer:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic output

Run example application:

sbt "runMain example.WordCountJob"

Publish a few words on input topic using Kafka console producer and check the processing result on output topic using Kafka console producer.

References

example-spark-kafka's People

Contributors

lpmi-13 avatar mkuthan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

example-spark-kafka's Issues

Losing Messages

I have used the sink + callback pattern outside of streaming in a Spark SQL job that publishes dataframe rows to Kafka. Now that I've moved the solution to our production environment, it appears that I am losing about 10 messages per 100000.

My guess is that a message in the tail end of each partition is not getting flushed to topic as a given batch job terminates. Does that make sense? Am I naive to use this pattern for a non-streaming scenario?

This is the primary method in our sink:

  def sendDeltasToKafka(dataframe: DataFrame, changeFlag: String, counter: LongAccumulator): Unit = {
    dataframe.foreachPartition { partitionRecords =>
      val producer = ProducerFactory.getOrCreateDeltaProducer(deltas)
      val context = TaskContext.get
      val schema: Schema = new Schema.Parser().parse(registeredSchema.schema)
      val keys = registeredDataset.keys.map(k => k.columnName)
      val writer = new AvroRowWriter(schema, keys)

      val callback = new KafkaSinkExceptionHandler

      logInfo(s"Sending Spark partition: ${context.partitionId} to Kafka topic: ${registeredDataset.kafkaTopic}")
      val metadata = partitionRecords.map { row =>
        callback.throwExceptionIfAny()

        val key = writer.GetKey(row)
        val record = writer.GetValue(row, changeFlag, auditTimestamp)
        val message = new ProducerRecord[String, Array[Byte]](registeredDataset.kafkaTopic, key, serialize(record, schema))

        counter.add(1)
        producer.send(message, callback)
      }.toList

      metadata.foreach { metadata => metadata.get() }
      logInfo(s"Flushed Spark partition: ${context.partitionId} to Kafka topic: ${registeredDataset.kafkaTopic}")

      callback.throwExceptionIfAny()
    }
  }

The callback class is exactly as you wrote it in your example.

java.io.NotSerializableException: Object of org.apache.spark.streaming.dstream.ShuffledDStream is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects.

Hi, I got this problem when trying to send msg to kafka. Iam using spark 2.0.0, and kafka 1.0. It seems that the kafka producer can not be serialized correctly.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.