Coder Social home page Coder Social logo

clj-kafka's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

clj-kafka's Issues

Multi-Message Sending Not Supported

I'd like to use the multi-message sending using the example:

(send-messages p "test" (->> ["message payload 1" "message payload 2"]
                             (map #(.getBytes %))
                             (map message)))

but whenever I try that I get:

Error in main: java.lang.ClassCastException: clojure.lang.LazySeq cannot be cast to kafka.message.Message

I've tried looking at the source code and it appears that it's supposed to be covered by the scala code in kafka, and I've looked there and it says it wants a java.util.List, but when I wrapped the seq of messages in a java.util.LinkedList constructor it still says it's not a kafka.message.Message and fails.

I need the kafka 0.7.0 compatibility for now, but I'd really like to be able to ship a bunch of messages at once to save the overhead of creating and dropping the connection to the server.

Is there something I'm missing? Can this be added easily? Is there a work-around for this to get the batch mode working as advertised?

new producer send callback not invoked

I tried to print the produced offset with callback like below but it seems the callback is never invoked.

(send producer record
    (reify Callback (onCompletion [_ metadata exception] (println "offset: " (:offset metadata))))
 )

Implement new KafkaConsumer

Hey!

With 0.8.2, Kafka has introduced a new KafkaConsumer API. It handles both high level and low level aspects. For example, it is possible to use or not use offset management (both ZK and builtin). It is also possible to read from several partitions while not using offset management. It would be great to have access to this API.

fetch-consumer-offsets returning error

Started zookeeper on port 2181
Started kafka broker on 9092

Inserted few messages using console producer in test topic

bin/kafka-console-producer.sh --topic test --broker-list localhost:9092

Consumed messages using console consumer in test topic

 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test

Running kafka offset checker script returns the following output

bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic test --group console-consumer-54827 
Group                   Topic        Pid Offset          logSize         Lag             Owner
console-consumer-54827 test          0   17              17              0               console-consumer-54827_xxx

Executing fetch-consumer-offsets function is returning the following error

(fetch-consumer-offsets "localhost:9092"
                          {"zookeeper.connect" "localhost:2181"}
                          "test"
                          "console-consumer-54827")

{"test:0" #object[kafka.common.OffsetMetadataAndError 0x4d7f1000 "OffsetMetadataAndError[-1,,0]"]}
I am not able to figure out what wrong I am doing. Can someone please help me.

Officially published 0.8 Kafka Jar

Currently we're using a uSwitch build of the Apache git repo. This is built by our own internal Continuous Integration machine but before release it would be nice to move to an official release.

There was some discussion on the Kafka JIRA about getting a maven dep out with 0.8 so fingers crossed come final release we can just update.

release 0.3.2

Could you please cut a release of 0.3.2 to clojars?

[doc] Request for a CHANGELOG

Hello All,

I'm trying to upgrade clj-kafka at work from 0.2.8-0.8.1.1 to 0.3.4, and there is no way for me to quickly eyeball the changes. It is important for me to know if this upgrade is a breaking upgrade or not. Is there any documentation around this that I can refer to?

Thanks!

clj-kafka.new.producer/producer code error

(producer config) error

([^java.util.Map config]
   (KafkaProducer. config))

should be:

([^java.util.Map config]
   (KafkaProducer. config (byte-array-serializer) (byte-array-serializer)))

now use (producer config) will be:
Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "value.serializer" which has no default value.

Using scala library 2.8.0

[org.scala-lang/scala-library "2.8.0"]

any idea of upgrading to scala [org.scala-lang/scala-library "2.9.2"]

CompilerException java.lang.NoClassDefFoundError

I get the following error when I try to use clj-kafka.

user=> (use 'clj-kafka.producer)

CompilerException java.lang.NoClassDefFoundError: scala/reflect/ClassManifest, compiling:(clj_kafka/core.clj:1:1)

The project.clj look like this:

:dependencies [[clj-kafka "0.2.6-0.8"]
               [org.clojure/clojure "1.6.0"]
               [com.google.guava/guava "18.0"]]

What will be the next steps to investigate?

Ensure 0.8 compliance

Kafka 0.8 adds support for message keys and a few other new things. Update the clj-kafka API to allow for these.

Method in high level consumer to get all message streams

The messages function in consumer/zk.clj seems to return a single stream of messages but also takes in an optional number of threads. Since the number of threads is passed to Kafka's .createMessageStreams, there are extra streams that are being dropped by messages (as the code always takes the first stream).

Would it be appropriate to add a method like message-streams to the high level consumer that can return all the message streams? Additionally, I'm not sure why it's useful to pass in the number of threads to messages if there's not a way to get the other message streams.

I'd be happy to try to come up with a PR adding message-streams if I'm on the right track, but please do let me know if I'm missing something!

Update README and examples

The 0.8 API changes things a bit (and the keys associated with the consumer and producer configs are also new).

Update the README and some examples to reflect new API.

Refactor tests

The tests have spurious sections of connection and topic details spread through the test utils and test code. Refactor to tidy this up.

Reduce logging level

Tests output a lot of info messages from the kafka server and zookeeper servers; minimise this to the console + move to a log file in dev/test profiles.

Upgrade for newer Kafka Versions

The library still uses the kafka 2.10-0.8.2.1 version library and client.
Are there any plans to upgrade them to newer version for the new 0.9 release of Kafka.

Convert Kafka byte-array to String\json

Hi All

We are using Kafka for our logging\metrics\events and we found that the metrics (from CollectD) are coming as byte array, and there is no specific function to convert from the byte array to simple String\json, maybe someone here knows about easy way to do that? our consumer is Riemann.

Thanks
D.

Multiple Topic Consumption

The Kafka ZooKeeper consumer allows for clients to retrieve messages from multiple topics concurrently. It is convenient to be able to retrieve (and combine) messages into a single sequence that can be read:

(messages consumer "testing1" "testing2")

Given messages are likely to be encoded, it might also be necessary to pass functions that can be used to interpret the messages as they arrive so that the resulting sequence contains immediately usable types.

0.8.1 migration

Hello. It will be nice to release new version for 0.8.1 Kafka.

Runtime Dependencies

If you run lein deps, depending only on clj-kafka you cannot even run the examples in the readme because Exceptions would pop up.

So I looked into your project.clj and after adding these two dependencies to my own project.clj, it worked as expected.

[com.yammer.metrics/metrics-core "2.2.0"]
[com.101tec/zkclient "0.3"]

Maybe they need to be dependencies of clj-kafka? Or will it resolve with the final 0.8 release of kafka?

Offset management

Hey folks,

What are your thoughts about the new method of managing offsets in kafka. There's some documentation (in the form of example code) here...

https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka

The TLDR is that there's a quite a bit of overhead to maintaining the offset in zookeeper so there's another approach which involves writing to a topic, and keeping an in-memory cache of the current offset so that consumers with high throughput, or lots of consumers groups (or both) can still commit after processing each message rather than trying to limit the frequency of commits. Would you like clj-kafka to provide something like this?

Add more options to `messages` method for simple consumer

Currently, it is not possible to set maxWait and minBytes for the request to be built. Therefore, it defaults to 0 for both values.

It means that if there is no message available, messages will just return without blocking and therefore, if we try to wait for new messages, we enter a tight loop except if we sleep between each requests (but in this case, latency is increased).

Being able to specify those values would be useful.

Expose with-test-broker?

Hey All,

It seems to me like with-test-broker would be generally useful to any apps that need to test integration with kafka. Would you consider exposing this functionality to client apps or should I make another project to do something like this?

how to handle zk disconnect

the kafka (messages (consumers) method report: "Failed to find leader for Set" when the zookeeper is disconnect.
i can't find the method to reconnect the zookeeper, as i can't pass any watch int it.

Make test helpers available

Thanks for clj-kafka, it's been great to use. ๐Ÿ‘

When it comes to testing our microservices, we've been copying clj-kafka's test helpers to our projects. It seems generally useful for writing tests that integrate through kafka.

Would it be appropriate to make this available in clj-kafka or a separate test helper library?

Readme port numbers.

In the example code in the readme file, standard server port numbers for kafka and zookeeper are intermixed, seemingly at random, with non-standard ports. It would be helpful if the examples started with defs of the port config rather than magic numbers, or at least sticking to the port numbers that will work with a vanilla install for a new user.

README.md New Producer example error

(with-open [p (producer {"bootstrap.servers" "127.0.0.1:9092"})]
  (send (record "test-topic" (.getBytes "hello world!"))))

should be:

(with-open [p (producer {"bootstrap.servers" "127.0.0.1:9092"})]
  (send p (record "test-topic" (.getBytes "hello world!"))))

I cannot get messages using clj-kafka.consumer.zk

I tried to get kafka messages using clj-kafka.consumer.zk but I cannot do it yet.
The producer code is as follows:

(ns chapter04.kafka
  (:require [clj-kafka.producer :as p]
            [clj-kafka.core :as core]
            [clj-kafka.consumer.zk :as zk-consumer]
            )
  (:import (kafka.consumer Consumer ConsumerConfig KafkaStream)
           (kafka.producer KeyedMessage ProducerConfig)
           (kafka.javaapi.producer Producer)
           (java.util Properties)
           (java.util.concurrent Executors))
  )

(def p (p/producer {"metadata.broker.list" "localhost:9092"
                  "serializer.class" "kafka.serializer.DefaultEncoder"
                  "partitioner.class" "kafka.producer.DefaultPartitioner"}))

(p/send-message p (p/message "test" (.getBytes "this is my message")))
(p/send-message p (p/message "test" (.getBytes "this is my message")))

The below is the consumer code:

(def consumer-config {"zookeeper.connect" "localhost:2181"
             "group.id" "clj-kafka.consumer"
             "auto.offset.reset" "smallest"
             "auto.commit.enable" "false"})

(core/with-resource [c (zk-consumer/consumer consumer-config)]
  zk-consumer/shutdown
  (take 2 (zk-consumer/messages c "test"))
  )

The last consumer expression returns (). I am expecting the result is ("this is my message" "this is my message").

Using kafka bundled kafka-console-consumer.sh can retrieve messages correctly.

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
this is my message
this is my message

I am using kafka_2.10-0.8.2.1. Is there something wrong in my environment ? Or is there any way to solve this problem ?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.