Coder Social home page Coder Social logo

Comments (8)

pingles avatar pingles commented on June 30, 2024

Hi,

Not something I'm very familiar with- I'll take a look and see what I can find.

from clj-kafka.

pingles avatar pingles commented on June 30, 2024

Which version of clj-kafka are you using? You mention 0.7 compatibility- are you still using the older release that uses Kafka 0.7?

from clj-kafka.

drbobbeaty avatar drbobbeaty commented on June 30, 2024

I'm using the Leiningen project line:

[clj-kafka "0.0.7-0.7"]

from clojars, I'm pretty sure.

And yes, I need to be able to talk to Kafka 0.7.1 servers as that's the install version we have.

from clj-kafka.

pingles avatar pingles commented on June 30, 2024

Hmmm, do you have more of the stack trace from the exception? I wonder whether adding a ^List type hint when instantiating ProducerData would help:

(defn send-messages
  [^Producer producer ^String topic ^List msgs]
  (.send producer (ProducerData. topic msgs)))

from clj-kafka.

drbobbeaty avatar drbobbeaty commented on June 30, 2024

Certainly... the code that I'm using is:

(doseq [chunk (partition-all blk-sz (take limit (line-seq rdr)))
        :let [lines (map #(line-to-json % tstamp deals) chunk)]]
  (prod/send-messages p topic (->> lines
                                    (map #(.getBytes %))
                                    (map prod/message))))

so that I'm reading lines from a file, converting them in chunks and then trying to send the messages in each of those chunks out using the multi-message send API.

What I'm getting is:

$ lein run -d resources -t test -l 50
Error in main: java.lang.ClassCastException: clojure.lang.LazySeq cannot be cast to kafka.message.Message
Exception in thread "main" java.lang.ClassCastException: clojure.lang.LazySeq cannot be cast to kafka.message.Message
    at kafka.serializer.DefaultEncoder.toMessage(Encoder.scala:26)
    at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$3$$anonfun$apply$1.apply(ProducerPool.scala:109)
    at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$3$$anonfun$apply$1.apply(ProducerPool.scala:109)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
    at scala.collection.Iterator$class.foreach(Iterator.scala:631)
    at scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:474)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
    at scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:521)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
    at scala.collection.JavaConversions$JListWrapper.map(JavaConversions.scala:521)
    at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$3.apply(ProducerPool.scala:109)
    at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$3.apply(ProducerPool.scala:107)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
    at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:43)
    at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(ProducerPool.scala:107)
    at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
    at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
    at kafka.producer.ProducerPool.send(ProducerPool.scala:102)
    at kafka.producer.Producer.zkSend(Producer.scala:143)
    at kafka.producer.Producer.send(Producer.scala:105)
    at kafka.javaapi.producer.Producer.send(Producer.scala:104)
    at clj_kafka.producer$send_messages.invoke(producer.clj:25)
    at ddo_to_kafka.kafka$send_file$fn__2292.invoke(kafka.clj:55)
    at ddo_to_kafka.kafka$send_file.doInvoke(kafka.clj:51)
    at clojure.lang.RestFn.invoke(RestFn.java:500)
    at ddo_to_kafka.main$handle_args.invoke(main.clj:70)
    at ddo_to_kafka.main$_main$fn__2341.invoke(main.clj:75)
    at ddo_to_kafka.main$wrap_error_handling.invoke(main.clj:24)
    at ddo_to_kafka.main$_main.doInvoke(main.clj:75)
    at clojure.lang.RestFn.invoke(RestFn.java:512)
    at clojure.lang.Var.invoke(Var.java:435)
    at user$eval5$fn__7.invoke(form-init4837497553286694467.clj:1)
    at user$eval5.invoke(form-init4837497553286694467.clj:1)
    at clojure.lang.Compiler.eval(Compiler.java:6619)
    at clojure.lang.Compiler.eval(Compiler.java:6609)
    at clojure.lang.Compiler.load(Compiler.java:7064)
    at clojure.lang.Compiler.loadFile(Compiler.java:7020)
    at clojure.main$load_script.invoke(main.clj:294)
    at clojure.main$init_opt.invoke(main.clj:299)
    at clojure.main$initialize.invoke(main.clj:327)
    at clojure.main$null_opt.invoke(main.clj:362)
    at clojure.main$main.doInvoke(main.clj:440)
    at clojure.lang.RestFn.invoke(RestFn.java:421)
    at clojure.lang.Var.invoke(Var.java:419)
    at clojure.lang.AFn.applyToHelper(AFn.java:163)
    at clojure.lang.Var.applyTo(Var.java:532)
    at clojure.main.main(main.java:37)
$

from clj-kafka.

pingles avatar pingles commented on June 30, 2024

So I just built Kafka 0.7.2 and went back to clj-kafka e97faeb (which is around where 0.7 support finished). When trying the send-messages example in the project README I get the same error as you. However, if I use the following send-messages* it works

(defn send-messages*
  [^kafka.javaapi.producer.Producer producer ^String topic ^java.util.List msgs]
  (.send producer (kafka.javaapi.producer.ProducerData. topic msgs)))

It seems that without the explicit type hint ProducerData is being incorrectly initialized. Of course, this function will now only send a sequence of messages but it works for me- could you check that you get the same result?

from clj-kafka.

drbobbeaty avatar drbobbeaty commented on June 30, 2024

I'll check as soon as I can - just fighting a fire now. But I'll try this as another function for bulk messages and let you know as soon as possible.

from clj-kafka.

drbobbeaty avatar drbobbeaty commented on June 30, 2024

@pingles Perfect! I followed your lead of having two functions: send-message and send-messages and the latter being the one above and the former what was already in the code.

This works perfectly for me - I've forked the repo, gone to that commit, made a branch, added my stuff and deployed it to our local nexus library. So this is fixed.

You've already fixed this in the latest code, so I think this is closed.

Thanks so much.

from clj-kafka.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.