Comments (8)
Hi,
Not something I'm very familiar with- I'll take a look and see what I can find.
from clj-kafka.
Which version of clj-kafka are you using? You mention 0.7 compatibility- are you still using the older release that uses Kafka 0.7?
from clj-kafka.
I'm using the Leiningen project line:
[clj-kafka "0.0.7-0.7"]
from clojars, I'm pretty sure.
And yes, I need to be able to talk to Kafka 0.7.1 servers as that's the install version we have.
from clj-kafka.
Hmmm, do you have more of the stack trace from the exception? I wonder whether adding a ^List type hint when instantiating ProducerData would help:
(defn send-messages
[^Producer producer ^String topic ^List msgs]
(.send producer (ProducerData. topic msgs)))
from clj-kafka.
Certainly... the code that I'm using is:
(doseq [chunk (partition-all blk-sz (take limit (line-seq rdr)))
:let [lines (map #(line-to-json % tstamp deals) chunk)]]
(prod/send-messages p topic (->> lines
(map #(.getBytes %))
(map prod/message))))
so that I'm reading lines from a file, converting them in chunks and then trying to send the messages in each of those chunks out using the multi-message send API.
What I'm getting is:
$ lein run -d resources -t test -l 50
Error in main: java.lang.ClassCastException: clojure.lang.LazySeq cannot be cast to kafka.message.Message
Exception in thread "main" java.lang.ClassCastException: clojure.lang.LazySeq cannot be cast to kafka.message.Message
at kafka.serializer.DefaultEncoder.toMessage(Encoder.scala:26)
at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$3$$anonfun$apply$1.apply(ProducerPool.scala:109)
at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$3$$anonfun$apply$1.apply(ProducerPool.scala:109)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:474)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
at scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:521)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
at scala.collection.JavaConversions$JListWrapper.map(JavaConversions.scala:521)
at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$3.apply(ProducerPool.scala:109)
at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$3.apply(ProducerPool.scala:107)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:43)
at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(ProducerPool.scala:107)
at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
at kafka.producer.ProducerPool.send(ProducerPool.scala:102)
at kafka.producer.Producer.zkSend(Producer.scala:143)
at kafka.producer.Producer.send(Producer.scala:105)
at kafka.javaapi.producer.Producer.send(Producer.scala:104)
at clj_kafka.producer$send_messages.invoke(producer.clj:25)
at ddo_to_kafka.kafka$send_file$fn__2292.invoke(kafka.clj:55)
at ddo_to_kafka.kafka$send_file.doInvoke(kafka.clj:51)
at clojure.lang.RestFn.invoke(RestFn.java:500)
at ddo_to_kafka.main$handle_args.invoke(main.clj:70)
at ddo_to_kafka.main$_main$fn__2341.invoke(main.clj:75)
at ddo_to_kafka.main$wrap_error_handling.invoke(main.clj:24)
at ddo_to_kafka.main$_main.doInvoke(main.clj:75)
at clojure.lang.RestFn.invoke(RestFn.java:512)
at clojure.lang.Var.invoke(Var.java:435)
at user$eval5$fn__7.invoke(form-init4837497553286694467.clj:1)
at user$eval5.invoke(form-init4837497553286694467.clj:1)
at clojure.lang.Compiler.eval(Compiler.java:6619)
at clojure.lang.Compiler.eval(Compiler.java:6609)
at clojure.lang.Compiler.load(Compiler.java:7064)
at clojure.lang.Compiler.loadFile(Compiler.java:7020)
at clojure.main$load_script.invoke(main.clj:294)
at clojure.main$init_opt.invoke(main.clj:299)
at clojure.main$initialize.invoke(main.clj:327)
at clojure.main$null_opt.invoke(main.clj:362)
at clojure.main$main.doInvoke(main.clj:440)
at clojure.lang.RestFn.invoke(RestFn.java:421)
at clojure.lang.Var.invoke(Var.java:419)
at clojure.lang.AFn.applyToHelper(AFn.java:163)
at clojure.lang.Var.applyTo(Var.java:532)
at clojure.main.main(main.java:37)
$
from clj-kafka.
So I just built Kafka 0.7.2 and went back to clj-kafka e97faeb (which is around where 0.7 support finished). When trying the send-messages example in the project README I get the same error as you. However, if I use the following send-messages*
it works
(defn send-messages*
[^kafka.javaapi.producer.Producer producer ^String topic ^java.util.List msgs]
(.send producer (kafka.javaapi.producer.ProducerData. topic msgs)))
It seems that without the explicit type hint ProducerData is being incorrectly initialized. Of course, this function will now only send a sequence of messages but it works for me- could you check that you get the same result?
from clj-kafka.
I'll check as soon as I can - just fighting a fire now. But I'll try this as another function for bulk messages and let you know as soon as possible.
from clj-kafka.
@pingles Perfect! I followed your lead of having two functions: send-message
and send-messages
and the latter being the one above and the former what was already in the code.
This works perfectly for me - I've forked the repo, gone to that commit, made a branch, added my stuff and deployed it to our local nexus library. So this is fixed.
You've already fixed this in the latest code, so I think this is closed.
Thanks so much.
from clj-kafka.
Related Issues (20)
- data.json vs cheshire HOT 1
- README.md New Producer example error
- clj-kafka.new.producer/producer code error HOT 1
- release 0.3.2 HOT 1
- Expose with-test-broker? HOT 1
- Make test helpers available HOT 1
- Readme port numbers.
- Offset management HOT 6
- Convert Kafka byte-array to String\json HOT 3
- Implement new KafkaConsumer
- Add more options to `messages` method for simple consumer HOT 4
- Update to Kafka 0.9 HOT 11
- I cannot get messages using clj-kafka.consumer.zk HOT 2
- fetch-consumer-offsets returning error HOT 3
- new producer send callback not invoked HOT 2
- Any reason for an old Clojure dep?
- Does this breaks with clojure 1.8? HOT 4
- [doc] Request for a CHANGELOG
- Upgrade for newer Kafka Versions HOT 1
- how to handle zk disconnect
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from clj-kafka.