pingles / clj-kafka Goto Github PK
View Code? Open in Web Editor NEWWrapper to the Java API for interacting with Kafka
License: Eclipse Public License 1.0
Wrapper to the Java API for interacting with Kafka
License: Eclipse Public License 1.0
I'd like to use the multi-message sending using the example:
(send-messages p "test" (->> ["message payload 1" "message payload 2"]
(map #(.getBytes %))
(map message)))
but whenever I try that I get:
Error in main: java.lang.ClassCastException: clojure.lang.LazySeq cannot be cast to kafka.message.Message
I've tried looking at the source code and it appears that it's supposed to be covered by the scala code in kafka, and I've looked there and it says it wants a java.util.List
, but when I wrapped the seq of messages in a java.util.LinkedList
constructor it still says it's not a kafka.message.Message
and fails.
I need the kafka 0.7.0 compatibility for now, but I'd really like to be able to ship a bunch of messages at once to save the overhead of creating and dropping the connection to the server.
Is there something I'm missing? Can this be added easily? Is there a work-around for this to get the batch mode working as advertised?
I tried to print the produced offset with callback like below but it seems the callback is never invoked.
(send producer record
(reify Callback (onCompletion [_ metadata exception] (println "offset: " (:offset metadata))))
)
Hey!
With 0.8.2, Kafka has introduced a new KafkaConsumer API. It handles both high level and low level aspects. For example, it is possible to use or not use offset management (both ZK and builtin). It is also possible to read from several partitions while not using offset management. It would be great to have access to this API.
Started zookeeper on port 2181
Started kafka broker on 9092
Inserted few messages using console producer in test topic
bin/kafka-console-producer.sh --topic test --broker-list localhost:9092
Consumed messages using console consumer in test topic
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
Running kafka offset checker script returns the following output
bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic test --group console-consumer-54827
Group Topic Pid Offset logSize Lag Owner
console-consumer-54827 test 0 17 17 0 console-consumer-54827_xxx
Executing fetch-consumer-offsets function is returning the following error
(fetch-consumer-offsets "localhost:9092"
{"zookeeper.connect" "localhost:2181"}
"test"
"console-consumer-54827")
{"test:0" #object[kafka.common.OffsetMetadataAndError 0x4d7f1000 "OffsetMetadataAndError[-1,,0]"]}
I am not able to figure out what wrong I am doing. Can someone please help me.
Currently we're using a uSwitch build of the Apache git repo. This is built by our own internal Continuous Integration machine but before release it would be nice to move to an official release.
There was some discussion on the Kafka JIRA about getting a maven dep out with 0.8 so fingers crossed come final release we can just update.
Could you please cut a release of 0.3.2 to clojars?
Hello All,
I'm trying to upgrade clj-kafka at work from 0.2.8-0.8.1.1
to 0.3.4
, and there is no way for me to quickly eyeball the changes. It is important for me to know if this upgrade is a breaking upgrade or not. Is there any documentation around this that I can refer to?
Thanks!
(producer config) error
([^java.util.Map config]
(KafkaProducer. config))
should be:
([^java.util.Map config]
(KafkaProducer. config (byte-array-serializer) (byte-array-serializer)))
now use (producer config) will be:
Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "value.serializer" which has no default value.
Is there a specific reason this lib brings [org.clojure/clojure "1.5.1"]
and not the latest release?
[org.scala-lang/scala-library "2.8.0"]
any idea of upgrading to scala [org.scala-lang/scala-library "2.9.2"]
I get the following error when I try to use clj-kafka.
user=> (use 'clj-kafka.producer)
CompilerException java.lang.NoClassDefFoundError: scala/reflect/ClassManifest, compiling:(clj_kafka/core.clj:1:1)
The project.clj
look like this:
:dependencies [[clj-kafka "0.2.6-0.8"]
[org.clojure/clojure "1.6.0"]
[com.google.guava/guava "18.0"]]
What will be the next steps to investigate?
Kafka 0.8 adds support for message keys and a few other new things. Update the clj-kafka API to allow for these.
The messages
function in consumer/zk.clj seems to return a single stream of messages but also takes in an optional number of threads. Since the number of threads is passed to Kafka's .createMessageStreams
, there are extra streams that are being dropped by messages
(as the code always takes the first stream).
Would it be appropriate to add a method like message-streams
to the high level consumer that can return all the message streams? Additionally, I'm not sure why it's useful to pass in the number of threads to messages
if there's not a way to get the other message streams.
I'd be happy to try to come up with a PR adding message-streams
if I'm on the right track, but please do let me know if I'm missing something!
The 0.8 API changes things a bit (and the keys associated with the consumer and producer configs are also new).
Update the README and some examples to reflect new API.
Hello,
I'm curious what you think about using cheshire
instead of data.json
.
The tests have spurious sections of connection and topic details spread through the test utils and test code. Refactor to tidy this up.
It would be very useful for testing.
Example: https://gist.github.com/mardambey/2650743
PS: Maybe I'm wrong and it's impossible to use Kafka without ZooKeeper..
http://kafka.apache.org/082/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
It would be good to move clj-kafka onto using the new producer, namely I am interested in the callback on ack feature, also it would be nice to surface the future returned by send calls (we could clojureify the payload)
If I get some time I may take a look at this myself if you are accepting pull requests.
Regards
Tests output a lot of info messages from the kafka server and zookeeper servers; minimise this to the console + move to a log file in dev/test profiles.
The library still uses the kafka 2.10-0.8.2.1 version library and client.
Are there any plans to upgrade them to newer version for the new 0.9 release of Kafka.
Hi All
We are using Kafka for our logging\metrics\events and we found that the metrics (from CollectD) are coming as byte array, and there is no specific function to convert from the byte array to simple String\json, maybe someone here knows about easy way to do that? our consumer is Riemann.
Thanks
D.
The Kafka ZooKeeper consumer allows for clients to retrieve messages from multiple topics concurrently. It is convenient to be able to retrieve (and combine) messages into a single sequence that can be read:
(messages consumer "testing1" "testing2")
Given messages are likely to be encoded, it might also be necessary to pass functions that can be used to interpret the messages as they arrive so that the resulting sequence contains immediately usable types.
When SimpleConsumer retrieves messages to be mapped to the record KafkaMessage the topic attribute is not available, ensure this is set in the client code.
Hello. It will be nice to release new version for 0.8.1 Kafka.
It doesn't appear that that the simple consumer keeps track of it's own offset but I could be wrong. It looks like anyone using the simple consumer would have to move the offset themselves. Correct?
If you run lein deps, depending only on clj-kafka you cannot even run the examples in the readme because Exceptions would pop up.
So I looked into your project.clj and after adding these two dependencies to my own project.clj, it worked as expected.
[com.yammer.metrics/metrics-core "2.2.0"]
[com.101tec/zkclient "0.3"]
Maybe they need to be dependencies of clj-kafka? Or will it resolve with the final 0.8 release of kafka?
Hey folks,
What are your thoughts about the new method of managing offsets in kafka. There's some documentation (in the form of example code) here...
https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
The TLDR is that there's a quite a bit of overhead to maintaining the offset in zookeeper so there's another approach which involves writing to a topic, and keeping an in-memory cache of the current offset so that consumers with high throughput, or lots of consumers groups (or both) can still commit after processing each message rather than trying to limit the frequency of commits. Would you like clj-kafka to provide something like this?
Currently, it is not possible to set maxWait
and minBytes
for the request to be built. Therefore, it defaults to 0 for both values.
It means that if there is no message available, messages
will just return without blocking and therefore, if we try to wait for new messages, we enter a tight loop except if we sleep between each requests (but in this case, latency is increased).
Being able to specify those values would be useful.
Hey All,
It seems to me like with-test-broker
would be generally useful to any apps that need to test integration with kafka. Would you consider exposing this functionality to client apps or should I make another project to do something like this?
lein with-profile production deps :tree
reports two dependencies on org.apache.zookeeper/zookeeper -one direct (3.3.4) and the other a transitive dependency via zookeeper-clj (3.4.0). This problem percolates up to consuming projects like ours.
Here is a brief discussion of the general issue: https://nelsonmorris.net/2012/09/13/solving-version-conflicts.html
Hi,
First I would like to thank for this project.
I need to create a custom encoder according to http://incubator.apache.org/kafka/quickstart.html
How to do this in clj-kafka?
Thanks.
the kafka (messages (consumers) method report: "Failed to find leader for Set" when the zookeeper is disconnect.
i can't find the method to reconnect the zookeeper, as i can't pass any watch int it.
https://github.com/pingles/clj-kafka/blob/0.8/src/clj_kafka/consumer/zk.clj#L31
Previously, we would convert the incoming Kafka messages using the ToClojure
protocol. Whilst writing the automated tests we found that using (map to-clojure ...)
caused the consumer to block.
Not too sure what the cause is- will dig more into it tomorrow.
Can we upgrade clj-kafka to match?
Thanks for clj-kafka, it's been great to use. ๐
When it comes to testing our microservices, we've been copying clj-kafka's test helpers to our projects. It seems generally useful for writing tests that integrate through kafka.
Would it be appropriate to make this available in clj-kafka or a separate test helper library?
What is needed? Can we help?
In the example code in the readme file, standard server port numbers for kafka and zookeeper are intermixed, seemingly at random, with non-standard ports. It would be helpful if the examples started with defs of the port config rather than magic numbers, or at least sticking to the port numbers that will work with a vanilla install for a new user.
(with-open [p (producer {"bootstrap.servers" "127.0.0.1:9092"})]
(send (record "test-topic" (.getBytes "hello world!"))))
should be:
(with-open [p (producer {"bootstrap.servers" "127.0.0.1:9092"})]
(send p (record "test-topic" (.getBytes "hello world!"))))
I tried to get kafka messages using clj-kafka.consumer.zk but I cannot do it yet.
The producer code is as follows:
(ns chapter04.kafka
(:require [clj-kafka.producer :as p]
[clj-kafka.core :as core]
[clj-kafka.consumer.zk :as zk-consumer]
)
(:import (kafka.consumer Consumer ConsumerConfig KafkaStream)
(kafka.producer KeyedMessage ProducerConfig)
(kafka.javaapi.producer Producer)
(java.util Properties)
(java.util.concurrent Executors))
)
(def p (p/producer {"metadata.broker.list" "localhost:9092"
"serializer.class" "kafka.serializer.DefaultEncoder"
"partitioner.class" "kafka.producer.DefaultPartitioner"}))
(p/send-message p (p/message "test" (.getBytes "this is my message")))
(p/send-message p (p/message "test" (.getBytes "this is my message")))
The below is the consumer code:
(def consumer-config {"zookeeper.connect" "localhost:2181"
"group.id" "clj-kafka.consumer"
"auto.offset.reset" "smallest"
"auto.commit.enable" "false"})
(core/with-resource [c (zk-consumer/consumer consumer-config)]
zk-consumer/shutdown
(take 2 (zk-consumer/messages c "test"))
)
The last consumer expression returns (). I am expecting the result is ("this is my message" "this is my message").
Using kafka bundled kafka-console-consumer.sh can retrieve messages correctly.
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
this is my message
this is my message
I am using kafka_2.10-0.8.2.1. Is there something wrong in my environment ? Or is there any way to solve this problem ?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.