jcustenborder / kafka-connect-twitter Goto Github PK
View Code? Open in Web Editor NEWKafka Connect connector to stream data in real time from Twitter.
License: Apache License 2.0
Kafka Connect connector to stream data in real time from Twitter.
License: Apache License 2.0
Hi,
I'm trying to use this connector in the distributed mode, when used with org.apache.kafka.connect.json.JsonConverter
, it works just fine.
But while trying to use with avro.AvroConverter
the task is not getting created and throwing away the following error,
The schema registry is up and running though. Downloaded the confluentinc-kafka-connect-avro-converter-7.1.1
and placed it in the plugin.path
.
Installation:
I've built it from the repo. and copied the target/kafka-connect-target/usr/share/kafka-connect/kafka-connect-twitter
into my connectors
folder.
Any help is appreciated!
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
(io.confluent.connect.avro.AvroConverterConfig:376)
[2022-06-09 21:47:19,579] ERROR [TwitterSourceConnector|task-0] Failed to start task TwitterSourceConnector-0 (org.apache.kafka.connect.runtime.Worker:549)
java.lang.NoClassDefFoundError: com/google/common/base/Ticker
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:171)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:154)
at io.confluent.connect.avro.AvroConverter.configure(AvroConverter.java:73)
at org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:287)
at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:519)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1421)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$restartTask$16(DistributedHerder.java:1057)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:408)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:326)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
[2022-06-09 21:47:19,590] ERROR Uncaught exception in REST call to /connectors/TwitterSourceConnector/tasks/0/restart (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61)
org.apache.kafka.connect.errors.ConnectException: Failed to start task: TwitterSourceConnector-0
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$restartTask$16(DistributedHerder.java:1060)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:408)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:326)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
[2022-06-09 21:50:34,499] INFO [AdminClient clientId=adminclient-8] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:935)
How can I update the twitter API to point to v2?
[2022-05-13 00:46:49,143] INFO 403:The request is understood, but it has been refused. An accompanying error message will explain why. This code is used when requests are being denied due to update limits (https://support.twitter.com/articles/15364-about-twitter-limits-update-api-dm-and-following).
<html>\n<head>\n<meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>\n<title>Error 403
Please use V2 filtered and sample volume stream as alternatives
</title>
</head>
<body>
<h2>HTTP ERROR: 403</h2>
<p>Problem accessing '/1.1/statuses/filter.json'. Reason:
<pre>
Please use V2 filtered and sample volume stream as alternatives
</pre>
</body>
</html>
(twitter4j.TwitterStreamImpl:62)
The Twitter API includes entity arrays including hashtags, user mentions, etc. Could these be exposed in the message? Hashtags make for easy point of analysis when running demos :)
Ref: https://dev.twitter.com/overview/api/entities-in-twitter-objects
I am trying to connect Twitter with the confluent cloud using the Twitter connector. But I'm getting the below issue.
Validation error: {"error_code":404,"message":"unable to validate connector config: unable to validate plugin type: connect service: connector plugin com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector unavailable: resource not found"}
I am trying to receive all updates from a specific user using 'filter.userIds' but it's not working. How can I do that?
Twitter4j version 4.0.7
was released today. The connector automatically pulls in a new patch version due to version range rules in the pom. Using this version results in four compilation errors. The workaround for now is to explicitly use version 4.0.6
.
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.5.1:compile (default-compile) on project kafka-connect-twitter: Compilation failure: Compilation failure:
[ERROR] kafka-connect-twitter/src/main/java/com/github/jcustenborder/kafka/connect/twitter/StatusConverter.java:[22,17] cannot find symbol
[ERROR] symbol: class ExtendedMediaEntity
[ERROR] location: package twitter4j
[ERROR] kafka-connect-twitter/src/main/java/com/github/jcustenborder/kafka/connect/twitter/StatusConverter.java:[414,44] cannot find symbol
[ERROR] symbol: class ExtendedMediaEntity
[ERROR] location: class com.github.jcustenborder.kafka.connect.twitter.StatusConverter
[ERROR] kafka-connect-twitter/src/main/java/com/github/jcustenborder/kafka/connect/twitter/StatusConverter.java:[434,38] cannot find symbol
[ERROR] symbol: class ExtendedMediaEntity
[ERROR] location: class com.github.jcustenborder.kafka.connect.twitter.StatusConverter
[ERROR] kafka-connect-twitter/src/main/java/com/github/jcustenborder/kafka/connect/twitter/StatusConverter.java:[439,10] cannot find symbol
[ERROR] symbol: class ExtendedMediaEntity
[ERROR] location: class com.github.jcustenborder.kafka.connect.twitter.StatusConverter
When trying to create a connector have the exception below:
(base) ubun@ubun-Inspiron-N5110:~/JAVA/Projects/Kafka-Beginners/Kafka_Connector$ sh run.sh
[2020-03-07 20:59:13,296] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:69)
[2020-03-07 20:59:13,345] INFO WorkerInfo values:
jvm.args = -Xms256M, -Xmx2G, -XX:+UseG1GC, -XX:MaxGCPauseMillis=20, -XX:InitiatingHeapOccupancyPercent=35, -XX:+ExplicitGCInvokesConcurrent, -Djava.awt.headless=true, -Dcom.sun.management.jmxremote, -Dcom.sun.management.jmxremote.authenticate=false, -Dcom.sun.management.jmxremote.ssl=false, -Dkafka.logs.dir=/home/ubun/kafka_2.13-2.4.0/bin/../logs, -Dlog4j.configuration=file:/home/ubun/kafka_2.13-2.4.0/bin/../config/connect-log4j.properties
jvm.spec = Private Build, OpenJDK 64-Bit Server VM, 1.8.0_242, 25.242-b08
jvm.classpath = /home/ubun/kafka_2.13-2.4.0/bin/../libs/activation-1.1.1.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/aopalliance-repackaged-2.5.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/argparse4j-0.7.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/audience-annotations-0.5.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/commons-cli-1.4.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/commons-lang3-3.8.1.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/connect-api-2.4.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/connect-basic-auth-extension-2.4.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/connect-file-2.4.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/connect-json-2.4.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/connect-mirror-2.4.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/connect-mirror-client-2.4.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/connect-runtime-2.4.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/connect-transforms-2.4.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/guava-20.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/hk2-api-2.5.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/hk2-locator-2.5.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/hk2-utils-2.5.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jackson-annotations-2.10.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jackson-core-2.10.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jackson-databind-2.10.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jackson-dataformat-csv-2.10.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jackson-datatype-jdk8-2.10.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jackson-jaxrs-base-2.10.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jackson-jaxrs-json-provider-2.10.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jackson-module-jaxb-annotations-2.10.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jackson-module-paranamer-2.10.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jackson-module-scala_2.13-2.10.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jakarta.activation-api-1.2.1.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jakarta.annotation-api-1.3.4.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jakarta.inject-2.5.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jakarta.ws.rs-api-2.1.5.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/javassist-3.22.0-CR2.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/javax.servlet-api-3.1.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/javax.ws.rs-api-2.1.1.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jaxb-api-2.3.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jersey-client-2.28.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jersey-common-2.28.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jersey-container-servlet-2.28.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jersey-container-servlet-core-2.28.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jersey-hk2-2.28.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jersey-media-jaxb-2.28.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jersey-server-2.28.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jetty-client-9.4.20.v20190813.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jetty-continuation-9.4.20.v20190813.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jetty-http-9.4.20.v20190813.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jetty-io-9.4.20.v20190813.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jetty-security-9.4.20.v20190813.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jetty-server-9.4.20.v20190813.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jetty-servlet-9.4.20.v20190813.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jetty-servlets-9.4.20.v20190813.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jetty-util-9.4.20.v20190813.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/jopt-simple-5.0.4.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/kafka_2.13-2.4.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/kafka_2.13-2.4.0-sources.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/kafka-clients-2.4.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/kafka-log4j-appender-2.4.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/kafka-streams-2.4.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/kafka-streams-examples-2.4.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/kafka-streams-scala_2.13-2.4.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/kafka-streams-test-utils-2.4.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/kafka-tools-2.4.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/log4j-1.2.17.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/lz4-java-1.6.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/maven-artifact-3.6.1.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/metrics-core-2.2.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/netty-buffer-4.1.42.Final.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/netty-codec-4.1.42.Final.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/netty-common-4.1.42.Final.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/netty-handler-4.1.42.Final.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/netty-resolver-4.1.42.Final.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/netty-transport-4.1.42.Final.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/netty-transport-native-epoll-4.1.42.Final.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/netty-transport-native-unix-common-4.1.42.Final.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/osgi-resource-locator-1.0.1.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/paranamer-2.8.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/plexus-utils-3.2.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/reflections-0.9.11.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/rocksdbjni-5.18.3.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/scala-collection-compat_2.13-2.1.2.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/scala-java8-compat_2.13-0.9.0.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/scala-library-2.13.1.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/scala-logging_2.13-3.9.2.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/scala-reflect-2.13.1.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/slf4j-api-1.7.28.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/slf4j-log4j12-1.7.28.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/snappy-java-1.1.7.3.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/validation-api-2.0.1.Final.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/zookeeper-3.5.6.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/zookeeper-jute-3.5.6.jar:/home/ubun/kafka_2.13-2.4.0/bin/../libs/zstd-jni-1.4.3-1.jar
os.spec = Linux, amd64, 5.3.0-40-generic
os.vcpus = 4
(org.apache.kafka.connect.runtime.WorkerInfo:71)
[2020-03-07 20:59:13,389] INFO Scanning for plugin classes. This might take a moment ... (org.apache.kafka.connect.cli.ConnectStandalone:78)
[2020-03-07 20:59:13,572] INFO Loading plugin from: /home/ubun/JAVA/Projects/Kafka-Beginners/Kafka_Connector/connector/checker-qual-2.5.2.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:241)
[2020-03-07 20:59:14,155] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/ubun/JAVA/Projects/Kafka-Beginners/Kafka_Connector/connector/checker-qual-2.5.2.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:264)
[2020-03-07 20:59:14,160] INFO Added plugin 'org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:193)
[2020-03-07 20:59:14,160] INFO Added plugin 'org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:193)
[2020-03-07 20:59:14,160] INFO Added plugin 'org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:193)
[2020-03-07 20:59:14,184] INFO Loading plugin from: /home/ubun/JAVA/Projects/Kafka-Beginners/Kafka_Connector/connector/javassist-3.21.0-GA.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:241)
[2020-03-07 20:59:14,337] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/ubun/JAVA/Projects/Kafka-Beginners/Kafka_Connector/connector/javassist-3.21.0-GA.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:264)
[2020-03-07 20:59:14,337] INFO Loading plugin from: /home/ubun/JAVA/Projects/Kafka-Beginners/Kafka_Connector/connector/failureaccess-1.0.1.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:241)
[2020-03-07 20:59:14,352] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/ubun/JAVA/Projects/Kafka-Beginners/Kafka_Connector/connector/failureaccess-1.0.1.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:264)
[2020-03-07 20:59:14,353] INFO Loading plugin from: /home/ubun/JAVA/Projects/Kafka-Beginners/Kafka_Connector/connector/twitter4j-stream-4.0.6.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:241)
[2020-03-07 20:59:14,370] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/ubun/JAVA/Projects/Kafka-Beginners/Kafka_Connector/connector/twitter4j-stream-4.0.6.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:264)
[2020-03-07 20:59:14,371] INFO Loading plugin from: /home/ubun/JAVA/Projects/Kafka-Beginners/Kafka_Connector/connector/listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:241)
[2020-03-07 20:59:14,384] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/ubun/JAVA/Projects/Kafka-Beginners/Kafka_Connector/connector/listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:264)
[2020-03-07 20:59:14,386] INFO Loading plugin from: /home/ubun/JAVA/Projects/Kafka-Beginners/Kafka_Connector/connector/jackson-annotations-2.10.0.pr1.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:241)
[2020-03-07 20:59:14,416] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/ubun/JAVA/Projects/Kafka-Beginners/Kafka_Connector/connector/jackson-annotations-2.10.0.pr1.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:264)
[2020-03-07 20:59:14,417] INFO Loading plugin from: /home/ubun/JAVA/Projects/Kafka-Beginners/Kafka_Connector/connector/jackson-databind-2.10.0.pr1.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:241)
[2020-03-07 20:59:14,706] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/ubun/JAVA/Projects/Kafka-Beginners/Kafka_Connector/connector/jackson-databind-2.10.0.pr1.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:264)
[2020-03-07 20:59:14,706] INFO Loading plugin from: /home/ubun/JAVA/Projects/Kafka-Beginners/Kafka_Connector/connector/reflections-0.9.11.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:241)
[2020-03-07 20:59:14,756] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/ubun/JAVA/Projects/Kafka-Beginners/Kafka_Connector/connector/reflections-0.9.11.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:264)
[2020-03-07 20:59:14,757] INFO Loading plugin from: /home/ubun/JAVA/Projects/Kafka-Beginners/Kafka_Connector/connector/error_prone_annotations-2.2.0.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:241)
[2020-03-07 20:59:14,766] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/ubun/JAVA/Projects/Kafka-Beginners/Kafka_Connector/connector/error_prone_annotations-2.2.0.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:264)
[2020-03-07 20:59:14,768] INFO Loading plugin from: /home/ubun/JAVA/Projects/Kafka-Beginners/Kafka_Connector/connector/connect-utils-0.4.156.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:241)
[2020-03-07 20:59:14,810] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/ubun/JAVA/Projects/Kafka-Beginners/Kafka_Connector/connector/connect-utils-0.4.156.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:264)
[2020-03-07 20:59:14,813] INFO Loading plugin from: /home/ubun/JAVA/Projects/Kafka-Beginners/Kafka_Connector/connector/j2objc-annotations-1.1.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:241)
[2020-03-07 20:59:14,834] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/ubun/JAVA/Projects/Kafka-Beginners/Kafka_Connector/connector/j2objc-annotations-1.1.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:264)
[2020-03-07 20:59:14,835] INFO Loading plugin from: /home/ubun/JAVA/Projects/Kafka-Beginners/Kafka_Connector/connector/twitter4j-core-4.0.6.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:241)
[2020-03-07 20:59:14,874] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/ubun/JAVA/Projects/Kafka-Beginners/Kafka_Connector/connector/twitter4j-core-4.0.6.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:264)
[2020-03-07 20:59:14,874] INFO Loading plugin from: /home/ubun/JAVA/Projects/Kafka-Beginners/Kafka_Connector/connector/guava-27.1-jre.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:241)
[2020-03-07 20:59:15,477] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/ubun/JAVA/Projects/Kafka-Beginners/Kafka_Connector/connector/guava-27.1-jre.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:264)
[2020-03-07 20:59:15,478] INFO Loading plugin from: /home/ubun/JAVA/Projects/Kafka-Beginners/Kafka_Connector/connector/jsr305-3.0.2.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:241)
[2020-03-07 20:59:15,505] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/ubun/JAVA/Projects/Kafka-Beginners/Kafka_Connector/connector/jsr305-3.0.2.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:264)
[2020-03-07 20:59:15,505] INFO Loading plugin from: /home/ubun/JAVA/Projects/Kafka-Beginners/Kafka_Connector/connector/kafka-connect-twitter-0.3.33.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:241)
[2020-03-07 20:59:15,521] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectStandalone:130)
java.lang.NoClassDefFoundError: com/github/jcustenborder/kafka/connect/utils/VersionUtil
at com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector.version(TwitterSourceConnector.java:46)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.versionFor(DelegatingClassLoader.java:375)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.versionFor(DelegatingClassLoader.java:380)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:350)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:330)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:263)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:255)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:224)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:201)
at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:60)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:79)
Caused by: java.lang.ClassNotFoundException: com.github.jcustenborder.kafka.connect.utils.VersionUtil
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
... 11 more
maven-checkstyle-plugin:3.1.2 or one of its dependencies could not be resolved: Could not find artifact com.github.jcustenborder.kafka.connect:kafka-connect-style-checkstyle:jar:1.1.0.0.4
The partitioning logic in the TwitterSourceConnector#taskConfigs()
function is having the opposite effect of what you'd expect.
final int tasks = Math.min(maxTasks, this.config.filterKeywords.size());
...
for (List<String> k : Iterables.partition(this.config.filterKeywords, tasks)) {
If you have filterKeywords=7
& maxTasks=1
, the Iterables#partition()
function will return you seven distinct partitions. This will result in seven tasks trying to be run. The second argument to Iterables#partition()
is the max number of elements per partition, not the number of partitions to break the data into.
I am trying to run this connector in standalone mode, but I am getting this error:
[2022-04-25 23:09:33,524] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectStandalone:128)
org.apache.kafka.common.config.ConfigException: Invalid value io.confluent.connect.avro.AvroConverter for configuration key.converter: Class io.confluent.connect.avro.AvroConverter could not be found.
at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:718)
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:471)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75)
at org.apache.kafka.connect.runtime.WorkerConfig.<init>(WorkerConfig.java:360)
at org.apache.kafka.connect.runtime.standalone.StandaloneConfig.<init>(StandaloneConfig.java:42)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:80)
Basically what I did was clone the repo, create a TwitterSourceConnector.config
and fill it as instructed, then run mvn clean package
as instructed, which ran successfully. Finally, I ran docker-compose up
with the docker-compose.yml
included in the repo, the only difference is that I added:
volumes:
- .:/root
In order to be able to access the repository and the config files from within the container. With the containers running, I launched an iterative shell to the kafka broker with docker exec -it kafka-connect-twitter_kafka_1 bash
and from there I ran connect-standalone config/connect-avro-docker.properties TwitterSourceConnector.properties
, which yielded the aforementioned error.
Besides using the supplied .yml I have also tried using the latest version of each image, to no avail. I have tried both exporting the classpath as instructed and not exporting it. The find
command form the README does not return anything for me. Just running find target/
I get:
target/kafka-connect-twitter-0.3-SNAPSHOT-javadoc.jar
target/kafka-connect-target/usr/share/kafka-connect/kafka-connect-twitter/error_prone_annotations-2.11.0.jar
target/kafka-connect-target/usr/share/kafka-connect/kafka-connect-twitter/twitter4j-stream-4.0.6.jar
target/kafka-connect-target/usr/share/kafka-connect/kafka-connect-twitter/listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
target/kafka-connect-target/usr/share/kafka-connect/kafka-connect-twitter/freemarker-2.3.31.jar
target/kafka-connect-target/usr/share/kafka-connect/kafka-connect-twitter/j2objc-annotations-1.3.jar
target/kafka-connect-target/usr/share/kafka-connect/kafka-connect-twitter/connect-utils-0.7.173.jar
target/kafka-connect-target/usr/share/kafka-connect/kafka-connect-twitter/guava-31.1-jre.jar
target/kafka-connect-target/usr/share/kafka-connect/kafka-connect-twitter/value-2.8.2.jar
target/kafka-connect-target/usr/share/kafka-connect/kafka-connect-twitter/jsr305-3.0.2.jar
target/kafka-connect-target/usr/share/kafka-connect/kafka-connect-twitter/failureaccess-1.0.1.jar
target/kafka-connect-target/usr/share/kafka-connect/kafka-connect-twitter/kafka-connect-twitter-0.3-SNAPSHOT.jar
target/kafka-connect-target/usr/share/kafka-connect/kafka-connect-twitter/checker-qual-3.12.0.jar
target/kafka-connect-target/usr/share/kafka-connect/kafka-connect-twitter/twitter4j-core-4.0.6.jar
target/kafka-connect-twitter-0.3-SNAPSHOT.jar
target/kafka-connect-twitter-0.3-SNAPSHOT-sources.jar
target/components/packages/jcustenborder-kafka-connect-twitter-0.3-SNAPSHOT/jcustenborder-kafka-connect-twitter-0.3-SNAPSHOT/lib/twitter4j-stream-4.0.6.jar
target/components/packages/jcustenborder-kafka-connect-twitter-0.3-SNAPSHOT/jcustenborder-kafka-connect-twitter-0.3-SNAPSHOT/lib/freemarker-2.3.31.jar
target/components/packages/jcustenborder-kafka-connect-twitter-0.3-SNAPSHOT/jcustenborder-kafka-connect-twitter-0.3-SNAPSHOT/lib/connect-utils-0.7.173.jar
target/components/packages/jcustenborder-kafka-connect-twitter-0.3-SNAPSHOT/jcustenborder-kafka-connect-twitter-0.3-SNAPSHOT/lib/value-2.8.2.jar
target/components/packages/jcustenborder-kafka-connect-twitter-0.3-SNAPSHOT/jcustenborder-kafka-connect-twitter-0.3-SNAPSHOT/lib/kafka-connect-twitter-0.3-SNAPSHOT.jar
target/components/packages/jcustenborder-kafka-connect-twitter-0.3-SNAPSHOT/jcustenborder-kafka-connect-twitter-0.3-SNAPSHOT/lib/twitter4j-core-4.0.6.jar
target/components/kafka-connect-twitter-0.3-SNAPSHOT.jar
So I tried replacing grep '\-package'
with grep package
, to get:
target/components/packages/jcustenborder-kafka-connect-twitter-0.3-SNAPSHOT/jcustenborder-kafka-connect-twitter-0.3-SNAPSHOT/lib/twitter4j-stream-4.0.6.jar:target/components/packages/jcustenborder-kafka-connect-twitter-0.3-SNAPSHOT/jcustenborder-kafka-connect-twitter-0.3-SNAPSHOT/lib/freemarker-2.3.31.jar:target/components/packages/jcustenborder-kafka-connect-twitter-0.3-SNAPSHOT/jcustenborder-kafka-connect-twitter-0.3-SNAPSHOT/lib/connect-utils-0.7.173.jar:target/components/packages/jcustenborder-kafka-connect-twitter-0.3-SNAPSHOT/jcustenborder-kafka-connect-twitter-0.3-SNAPSHOT/lib/value-2.8.2.jar:target/components/packages/jcustenborder-kafka-connect-twitter-0.3-SNAPSHOT/jcustenborder-kafka-connect-twitter-0.3-SNAPSHOT/lib/kafka-connect-twitter-0.3-SNAPSHOT.jar:target/components/packages/jcustenborder-kafka-connect-twitter-0.3-SNAPSHOT/jcustenborder-kafka-connect-twitter-0.3-SNAPSHOT/lib/twitter4j-core-4.0.6.jar
Alas, the same error persisted. Any ideas? Am I doing something really wrong in my setup?
Can someone add documentation for building the connector within the local development environment?
Hey,
I'm not sure if this applies to all. When running mvn clean package
I get a
Howerver, JVM fits, variables fits, so it looks all good.
But I found in the /target/javadoc-bundle-options/javadoc-options-javadoc-resources.xml this line
<javadocResourcesDirectory>src/main/javadoc</javadocResourcesDirectory>
which must be (as written in the documents) <javadocDirectory>src/main/javadoc</javadocDirectory>
This solved the problem. After running the clean package it inserts again the "javadocResourcesDirectroY" but rerun it cause same problem again.
Cheers
Per https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/tweet-object the text
field is truncated for retweets, with the full value available in retweeted_status
. Can retweeted_status
be added to the schema for this connector?
At the moment any RT is only available in its first 140 characters (give or take for user name length etc):
ksql> select retweet, max(len(text)) as max_tweet_length from all_tweets group by retweet emit changes;
+--------------------------------------------------------------------+--------------------------------------------------------------------+
|RETWEET |MAX_TWEET_LENGTH |
+--------------------------------------------------------------------+--------------------------------------------------------------------+
|false |411 |
|true |152 |
I might be missing something obvious here. I see that you convert Twitter4j objects to Struct
for topic publishing. I'm writing a simple streams app and want to use a JSON Serde class parametrized by Twitter4j's Status
object so I can operate on that class's methods in the streams app's logic. Your README schemas imply that your library has data model objects Status
, User
, etc at com.github.jcustenborder.kafka.connect.twitter.*
but that's not the case. Similarly, directly using twitter4j.Status
etc does not work without re-creating the classes using annotations, since you converted the field names from camel case to title case.
Bottom-line question -- are there POJOs anywhere that I can use out of the box to deserialize the topic's messages directly into twitter4j-like classes for processing?
When I am building a Kafka Streams application on top of the generated schema, I get compilation issues:
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project twitter-streams-sample: Compilation failure: Compilation failure:
[ERROR] /home/gaspar_d/Projects/oom/twitter-streams-sample/target/generated-sources/com/github/jcustenborder/kafka/connect/twitter/MediaEntity.java:[16,8] class com.github.jcustenborder.kafka.connect.twitter.MediaEntity clashes with package of same name
This is due as there is both a package and a class generated with the name MediaEntity
(due to the MediaEntity
record and the MediaEntity.Size
record).
If 'filter.keywords' is empty (in either TwitterSourceConnector.properties in stand-alone mode or via provisioning via the REST API), the plugin won't start getting tweets, or starts retrieving tweets at all. It seems to be running but no tweets are received.
Seems to be an issue with how the plugin uses twitter4j.
I'll be looking into it and update this issue.
If process.deletes is false, then kafka.delete.topic shouldn't be necessary?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.