Coder Social home page Coder Social logo

kafka-connect-hdfs's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-connect-hdfs's Issues

Data in unfinished files is lost during shutdown

I found the very strange behavior of kafka-connect-hdfs. I have kafka topic with low message rate and setup flush.size to a huge amount.

Here it what happens during work of kafka-connect:

  1. Some messages grabbed by WorkerSinkTask and put into HdfsSinkTask. After this, these messages treated as processed and offset commit issued.
  2. I send shutdown signal to kafka-connect
  3. The worker is going to stop.
  4. The writer is going to close and close temp files and delete them. At this point, we just lost all data in temp file
  5. No changes rollback after the writer has been closed.

Looks like there is the problem in HdfsSinkTask and put method semantic. Kafka-connect assumes, that messages that passed to put method will be persisted, but they are not.

Am I doing something wrong? Is it known issue?

Cannot project Schema when adding new column

We are using a Schema-Registry to manage our Avro schemas and the connect-hdfs to consume the messages (both in Backward compatibility mode). We added a new column that generated a new schema

  • field added in the code:
    newField: Option[String] = None
  • field in the new schema:
    {"name":"newField","type":["null","string"],"default":null}

The schema-registry forces us to define a default value for a new field, but the connect (the SchemaProjector) is crashing when newField is null because it throws an exception if the default value is null too.

But we do want the field to be set as null as we don't want to need to check for null AND empty String (for example) when querying the output.

Is there something that can be done to unblock our connect?

HDFS connector with cluster in HA mode

Hi all,

I've got a HDP cluster in HDFS HA mode. Specifically, this mode changes how HDFS Advanced core-site config fs.defaultFS is set from hdfs://<host>:<port> to (as shown in e.g. https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.2/bk_hadoop-ha/content/ha-nn-config-cluster.html):

fs.defaultFS = hdfs://mycluster

Setting this directly in the connector's config, hdfs.url=hdfs://mycluster, it doesn't connect as it can't find the cluster:

Caused by: java.net.UnknownHostException: mycluster

What I must do instead, to make it work, is to set hdfs.url=hdfs://<active NameNode fqdn>:8020. However, this doesn't support HA and in case of taking that NameNode down the whole integration fails.

Is it possible to make HDFS Connect to work with NameNode in HA mode?

Thanks!
Wojciech

The last few records in tempFiles can't flush to HDFS when stream in kafka in paused

The key part(TopicPartitionerWriter.java):

//when buffer.poll get the last record,but 'shouldRotate(now)' is false_,no subsequent action will occur

`
while(!buffer.isEmpty()) {

...

SinkRecord projectedRecord = SchemaUtils.project(record, currentSchema, compatibility);
writeRecord(projectedRecord);
buffer.poll();
if (shouldRotate(now)) {
log.info("Starting commit and rotation for topic partition {} with start offsets {}"
+ " and end offsets {}", tp, startOffsets, offsets);
nextState();
// Fall through and try to rotate immediately
} else {`

Exception in log during shutdown

From time to time I got this in logs:

[2016-05-23 08:55:57,948] INFO org.apache.kafka.connect.runtime.WorkerSinkTask@2245d31a Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:187)
[2016-05-23 08:56:07,933] INFO org.apache.kafka.connect.runtime.WorkerSinkTask@536bc87 Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:187)
[2016-05-23 08:56:07,946] INFO org.apache.kafka.connect.runtime.WorkerSinkTask@1bec52b1 Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:187)
[2016-05-23 08:56:07,948] INFO org.apache.kafka.connect.runtime.WorkerSinkTask@2245d31a Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:187)
Exception in thread "Thread-0" java.lang.NullPointerException
    at io.confluent.connect.hdfs.DataWriter.close(DataWriter.java:307)
    at io.confluent.connect.hdfs.HdfsSinkTask.stop(HdfsSinkTask.java:83)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.awaitStop(WorkerSinkTask.java:119)
    at org.apache.kafka.connect.runtime.Worker.stopTask(Worker.java:311)
    at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.removeConnectorTasks(StandaloneHerder.java:238)
    at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.stop(StandaloneHerder.java:68)
    at org.apache.kafka.connect.runtime.Connect.stop(Connect.java:71)
    at org.apache.kafka.connect.runtime.Connect$ShutdownHook.run(Connect.java:93)
[2016-05-23 08:56:16,977] INFO Stopping task test1-bss-1 (org.apache.kafka.connect.runtime.Worker:305)
[2016-05-23 08:56:16,977] INFO Starting graceful shutdown of thread WorkerSinkTask-test1-bss-1 (org.apache.kafka.connect.util.ShutdownableThread:119)

I did not investigate this in deep yet.

Additional info that may be useful: I ran 3 tasks that reads writes to the same hadoop. This exception occurs during shutdown first of them. Other tasks was not shutdown properly (according to logs)

Topics with periods should be supported.

Working with a customer who discovered.

org.group.data.xxxx should be supported.

Kafka topic naming. If you name a topic with periods for namespacing (I.e. org.group.data), the HDFS connector doesnโ€™t like it. (https://github.com/confluentinc/kafka-connect-hdfs/blob/5b563417ce06c93c24c85b42d0cc94504aabe69e/src/main/java/io/confluent/connect/hdfs/filter/TopicPartitionCommittedFileFilter.java) Instead of assuming the second item in the array on line 37 is the partition, use the last item in the array; that might fix it.

ERROR Unable to shutdown local metastore client

[2016-06-28 15:27:28,638] ERROR Unable to shutdown local metastore client (hive.metastore:495)
org.apache.thrift.transport.TTransportException: java.net.SocketException: Socket closed
at org.apache.thrift.transport.TIOStreamTransport.flush(TIOStreamTransport.java:161)
at org.apache.thrift.TServiceClient.sendBase(TServiceClient.java:65)
at com.facebook.fb303.FacebookService$Client.send_shutdown(FacebookService.java:436)
at com.facebook.fb303.FacebookService$Client.shutdown(FacebookService.java:430)
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.close(HiveMetaStoreClient.java:492)
at org.apache.hive.hcatalog.common.HiveClientCache$CacheableHiveMetaStoreClient.tearDown(HiveClientCache.java:403)
at org.apache.hive.hcatalog.common.HiveClientCache$CacheableHiveMetaStoreClient.finalize(HiveClientCache.java:418)
at java.lang.System$2.invokeFinalize(System.java:1213)
at java.lang.ref.Finalizer.runFinalizer(Finalizer.java:98)
at java.lang.ref.Finalizer.access$100(Finalizer.java:34)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:210)
Caused by: java.net.SocketException: Socket closed
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:121)
at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at org.apache.thrift.transport.TIOStreamTransport.flush(TIOStreamTransport.java:159)
... 10 more

Hi, Why this happen, does it matter?

How to commit offset manually?

After kafka-connect-hdfs write data to hdfs, I load the data file into hive warehouse use 'LOAD DATA INPATH ...' which remove the original data file.
Howerver the latest offset is stored in data file's name, and Kafka-connect commit offset periodically(e.x. offset.flush.interval.ms=60000). If kafka-connect-hdfs restart, it may get the wrong offset from kafka, which causes data duplicate.
I want to commit offset manually after a data file is created, any suggestions?
See the code below, from TopicPartitionWriter.java

String directoryName = FileUtils.directoryName(url, topicsDir, directory);
if (!storage.exists(directoryName)) {
  storage.mkdirs(directoryName);
}
storage.commit(tempFile, committedFile);
startOffsets.remove(encodedPartiton);
offset = offset + recordCounter;
recordCounter = 0;
log.info("Committed {} for {}", committedFile, tp);

// commit offset && load data file into hive warehouse

Size based file commit

Currently, each chunk is committed based on the number of messages written. This is not ideal as the message size varies which creates an uneven distribution of file sizes in HDFS. Size based file commit is preferred if the user want to create evenly distributed files in HDFS.

while running kafka-avro-console-producer, "Error serializing Avro message" is coming. How to solve this?

org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at java.net.Socket.connect(Socket.java:538)
at sun.net.NetworkClient.doConnect(NetworkClient.java:180)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:432)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:527)
at sun.net.www.http.HttpClient.(HttpClient.java:211)
at sun.net.www.http.HttpClient.New(HttpClient.java:308)
at sun.net.www.http.HttpClient.New(HttpClient.java:326)
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1169)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1105)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:999)
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:933)
at sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1283)
at sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1258)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:141)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:181)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:232)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:224)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:219)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:57)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:89)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72)
at io.confluent.kafka.formatter.AvroMessageReader.readMessage(AvroMessageReader.java:158)
at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:55)
at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)

writeRecord wrongly considers a newer record as stale

Hi,

We discovered that the writeRecord method in TopicPartitionWriter:469 considers a more recent Kafka Record as stale and drops it. We observed this when we ran Kafka Connect after a week long gap (with Kafka retention = 5 days). This led to record.kafkaOffset() > expectedOffset. My understanding is that all the more recent records were dropped since the aforementioned condition would always be true until expectedOffset is updated to the retention cutoff, which does not seem to be happening anywhere in the offset management code.

Creation of log directory fails because of permission issues when using keytab

We are experiencing an issue where HDFS connector uses incorrect permissions when creating log directories given configurations with heterogeneous set of keytabs. This happens when we run a script to create several HDFS connectors, each having a different keytab, on an empty one node Kafka Connect cluster.

It seems the problem is within DataWriter which uses the keytab to call UserGroupInformation.loginUserFromKeytab which is a static instance. Since each DataWriter will be running in a separate task/thread, this static call may interfere with other running tasks. So it is possible for the log directory to get created under myuser2 and have the FS WAL file created under a different user, i.e. myuser1 in our example.

Here are the permissions of the log directories in HDFS (scrubbed some names):
$ hdfs dfs -ls /logs/mytopic/ Found 1 items drwxrwxrwx - myuser2 supergroup 0 2016-06-21 18:01 /logs/mytopic/0
$ hdfs dfs -ls /logs/mytopic/0 Found 1 items -rw-r--r-- 2 myuser1 supergroup 417 2016-06-21 18:25 /logs/mytopic/0/log

Here is the full stack trace (scrubbed some names):
[2016-06-21 18:51:21,479] ERROR Recovery failed at state RECOVERY_PARTITION_PAUSED (io.confluent.connect.hdfs.TopicPartitionWriter) org.apache.kafka.connect.errors.ConnectException: Error creating writer for log file hdfs://mynamenode:8020/logs/mytopic/0/log at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:91) at io.confluent.connect.hdfs.wal.FSWAL.apply(FSWAL.java:105) at io.confluent.connect.hdfs.TopicPartitionWriter.applyWAL(TopicPartitionWriter.java:441) at io.confluent.connect.hdfs.TopicPartitionWriter.recover(TopicPartitionWriter.java:197) at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:227) at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:234) at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:91) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:370) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:227) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.hadoop.security.AccessControlException: Permission denied: user=myuser2, access=WRITE, inode="/logs/mytopic/0/log":myuser1:supergroup:-rw-r--r-- at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:281) at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:262) at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:175) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:152) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6590) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6572) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPathAccess(FSNamesystem.java:6497) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2887) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:3189) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:3153) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:612) at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.append(AuthorizationProviderProxyClientProtocol.java:125) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:414) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080) at sun.reflect.GeneratedConstructorAccessor92.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1769) at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1803) at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1796) at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:323) at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:319) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:319) at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1173) at io.confluent.connect.hdfs.wal.WALFile$Writer.<init>(WALFile.java:221) at io.confluent.connect.hdfs.wal.WALFile.createWriter(WALFile.java:67) at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:73) ... 17 more Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=myuser2, access=WRITE, inode="/logs/mytopic/0/log":myuser1:supergroup:-rw-r--r-- at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:281) at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:262) at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:175) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:152) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6590) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6572) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPathAccess(FSNamesystem.java:6497) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2887) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:3189) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:3153) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:612) at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.append(AuthorizationProviderProxyClientProtocol.java:125) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:414) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080) at org.apache.hadoop.ipc.Client.call(Client.java:1468) at org.apache.hadoop.ipc.Client.call(Client.java:1399) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) at com.sun.proxy.$Proxy52.append(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:313) at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy53.append(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1767) ... 27 more

FEATURE REQUEST: batch mode

Hi all,
is there any way I could use the HDFS connector to dump recently received events in the topic and exit as soon as it reach the end of the log?
My use case requires to integrate a low throughput Kafka source in an already developed Oozie workflow and would be more convenient to trigger the download of data from Kafka to HDFS within the workflow instead of keeping an always on Kafka consumer that writes into HDFS folders.

Thanks,
Stefano

Connector can't renew properly Kerberos ticket

Hi,

I've this problem in a Kerberized environment: when I start the connector everything is working fine, I obtain my Kerberos credentials and the connector start writing without issues. The problem begin when a day after the ticket from Kerberos is renewed and the connector crash immediatly with this error:

ERROR Recovery failed at state RECOVERY_PARTITION_PAUSED (io.confluent.connect.hdfs.TopicPartitionWriter:221) org.apache.kafka.connect.errors.ConnectException: java.io.IOException: Failed on local exception: java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]; Host Details : local host is: "myhostname1/IP"; destination host is: "namenode_hostname":8020;
at io.confluent.connect.hdfs.wal.FSWAL.apply(FSWAL.java:131)
at io.confluent.connect.hdfs.TopicPartitionWriter.applyWAL(TopicPartitionWriter.java:519)
at io.confluent.connect.hdfs.TopicPartitionWriter.recover(TopicPartitionWriter.java:204)
at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:234)
at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:234)
at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:91)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:287)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:176)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)

it is strange because the connector is able to renew periodically the ticket (due to the async function in DataWriter class) but the issue still there and I've to restart manually the connector every time that this error happen.

Did you know this error or there is something that maybe I'm doing wrong?

Thanks.

File not written when partition changes

I have setup with flush.size=100000 and TimeBasedPartitioner with path.format=YYYY/MM/dd.

For topics with small amount of messages this cause strange behavior. When partitioner found that partition should be changed it opens new file, but did not close the old one. So, I have not data until flush.size will be reached and unable to run some data dependent tasks like ETL and so.

Expected behavior for me is to close file when new partition started. What do you think?

No dependency information available

When I try to build the source, I run into the following error:

[ERROR] Failed to execute goal on project kafka-connect-hdfs: Could not resolve dependencies for project io.confluent:kafka-connect-hdfs:jar:2.0.0-SNAPSHOT: The following artifacts could not be resolved: io.confluent:kafka-connect-avro-converter:jar:2.0.0-SNAPSHOT, io.confluent:common-config:jar:2.0.0-SNAPSHOT: Could not find artifact io.confluent:kafka-connect-avro-converter:jar:2.0.0-SNAPSHOT in confluent (http://packages.confluent.io/maven/)

issue with filename parsing regular expression

I have a issue with Kafka topic/partition/offset rule for filename in kafka-connect. kafka-connect-hdfs use filename as a "topic+partition+offset", and when I'm using a topic like "A.B.C" then because of regular expression for the file name(HdfsSinkConnecorConstants.COMMMITTED_FILENAME_SEPARATOR_REGEX), occurred exception while recovery process.

ex) file name : A.B.C+2+0014449683+0014449685.avro
topic name is 'A' after parsing, partition is 'B' instead of 2,

kafka-connect-hdfs upon thrift server,instead of hive metastore

In some spark cluster,there will no hive metastore deployed, but only a thrift server upon spark engine.
We should consider to support kafka-connect-hdfs in this scenario.

I try to modify locally,with not so much change,it works well.
(but so far,schema change is a litter difficult.)

Partition writer is used after being removed

Somehow consumer coordinator revoke an assignment, we remove all partition writers in close function[1]. After rejoining to consumer group, maybe we got no assigned partition. In this case, if write[2] is invoked weโ€™ll get a null pointer exception.

[1] https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/DataWriter.java#L285-L303

[2] https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/DataWriter.java#L234

Blocker: Kafka Connect HDFS compile issue: blocker to getting started

mvn compile
Failed to execute goal on project kafka-connect-hdfs: Could not resolve dependencies for project io.confluent:kafka-connect-hdfs:jar:3.1.0-SNAPSHOT: Failure to find io.confluent:kafka-connect-avro-converter:jar:3.1.0-SNAPSHOT in http://packages.confluent.io/maven/ was cached in the local repository, resolution will not be reattempted until the update interval of confluent has elapsed or updates are forced

I have apache kafka trunk in another folder and have done ./gradlew installAll and run with -U option on kafka-connect-hdfs. After the gradlew installAll though, one dependency(io.confluent:kafka-connect-hdfs:jar:3.1.0-SNAPSHOT) was resolved, but avro issue still persists.

Earlier error:
[INFO] ------------------------------------------------------------------------
[INFO] Building kafka-connect-hdfs 3.1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[WARNING] The POM for org.apache.kafka:connect-api:jar:0.10.1.0-SNAPSHOT is missing, no dependency information available
[WARNING] The POM for io.confluent:kafka-connect-avro-converter:jar:3.1.0-SNAPSHOT is missing, no dependency information available
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
..
[ERROR] Failed to execute goal on project kafka-connect-hdfs: Could not resolve dependencies for project io.confluent:kafka-connect-hdfs:jar:3.1.0-SNAPSHOT: The following artifacts could not be resolved: org.apache.kafka:connect-api:jar:0.10.1.0-SNAPSHOT, io.confluent:kafka-connect-avro-converter:jar:3.1.0-SNAPSHOT: Failure to find org.apache.kafka:connect-api:jar:0.10.1.0-SNAPSHOT in http://packages.confluent.io/maven/ was cached in the local repository, resolution will not be reattempted until the update interval of confluent has elapsed or updates are forced

I could not find jar at https://repo.maven.apache.org/maven2/io/confluent so I could try mvn install artifact directly for io.confluent:kafka-connect-avro-converter:jar:3.1.0-SNAPSHOT.Please let me know how to resolve this.Thanks

Missing data caused by failure to find latest offsets from files during recovery

We noticed that there is a gap in the Kafka offsets in the filenames of our Avro files when a new connector is added triggering a rebalance:

-rw-r--r--   3 user group       1654 2016-09-20 08:35 /data/kafka/date_id=20160920/key=value/topic1+0+0000000000+0000000000.avro
-rw-r--r--   3 user group   19546950 2016-09-20 08:35 /data/kafka/date_id=20160920/key=value/topic1+0+0000000001+0000100000.avro
-rw-r--r--   3 user group   19544711 2016-09-20 08:43 /data/kafka/date_id=20160920/key=value/topic1+0+0000100001+0000200000.avro
-rw-r--r--   3 user group       1659 2016-09-20 08:46 /data/kafka/date_id=20160920/key=value/topic1+0+0000215749+0000215749.avro
-rw-r--r--   3 user group   19547591 2016-09-20 09:00 /data/kafka/date_id=20160920/key=value/topic1+0+0000215750+0000315749.avro

Our setup is simple - 1 Kafka Connector node running in distributed mode. We created a worker for a topic via REST service then created another worker for a different topic to reproduce this behavior. Both workers have the HDFS connector enabled.

I verified that the data persisted in the Avro files does reflect the offsets in the filenames and data between 200,001 and 215,748 is missing from HDFS.

After doing a remote debug of the Kafka Connect process, I saw that Kafka Connect fails to find any files to recover the latest offset after rebalancing. It assumes we store all data in the standard directory format of /<topics directory>/<topic name>/<filename>. We are using a custom partitioner, similar to the time based partitioner, which sets the commit file destination directory to a value other than the standard directory. I assume this will happen to anyone using a partitioner other than the default partitioner.

Issue with WAL after enabling Kerberos

Hi,

So we had to enable Kerberos on the Cluster running CDH 5.5.
After adding the keytabs everything seems to work fine the files are being written. But if we say restart the connector agent we get this

Caused by: java.io.IOException: Cannot obtain block length for LocatedBlock{BP-1540877157-10.240.188.16-1448467302331:blk_1074450611_710153; getBlockSize()=55; corrupt=false; offset=0; locs=[10.240.188.18:1004, 10.240.188.19:1004, 10.240.188.16:1004]; storageIDs=[DS-35b9ed3b-1614-43fd-b76b-af867857fafb, DS-d1b3c258-ec66-4b18-bffd-f14162aef547, DS-f1c65e45-6a79-4ef8-89e5-5cbf566b5468]; storageTypes=[DISK, DISK, DISK]}
    at org.apache.hadoop.hdfs.DFSInputStream.readBlockLength(DFSInputStream.java:359)
    at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:301)
    at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:238)
    at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:231)
    at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1498)
    at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:302)
    at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:298)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:298)
    at io.confluent.connect.hdfs.wal.WALFile$Reader.openFile(WALFile.java:579)
    at io.confluent.connect.hdfs.wal.WALFile$Reader.<init>(WALFile.java:528)
    at io.confluent.connect.hdfs.wal.FSWAL.apply(FSWAL.java:107)

This seems to be relating to the connector WAL file in the logs directory. It like the file is not being closed and replicated properly, but I can access the file without problems. If i delete this and restart everything goes on.

I only found smth related to this with 0 length files, but I cannot seem to find a proper solution because the log file does not seem to be zero length.

Has anyone experienced smth similar?

It seems like this is related

https://issues.apache.org/jira/browse/FLUME-2055

https://issues.apache.org/jira/browse/FLUME-2007

Basically I think we have to retry cosing the file if it failed like Flume does.

Did not find matching union field for data

I am trying to write data in avro format from my Java code to Kafka to HDFS using kafka HDFS connector and I am getting some issues. When I use the simple schema and data provided on the confluent platform website, I am able to write data to HDFS, but when I try to use complex avro schema, I get this error in the HDFS connector logs:

ERROR Task hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142) org.apache.kafka.connect.errors.DataException: Did not find matching union field for data: PROD at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:973) at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:981) at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:981) at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:981) at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:981) at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:782) at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:103) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:346) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

I am using confluent platform 3.0.0

My Java code:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", <url>);

KafkaProducer producer = new KafkaProducer(props);

Schema schema = new Schema.Parser().parse(new FileInputStream("avsc/schema.avsc"));
DatumReader<Object> reader = new GenericDatumReader<Object>(schema);

InputStream input = new FileInputStream("json/data.json");
DataInputStream din = new DataInputStream(input);
Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);

Object datum = null;
while (true) {
try {
datum = reader.read(null, decoder);
} catch (EOFException e) {
break;
}
}

ProducerRecord<Object, Object> message = new ProducerRecord<Object, Object>(topic, datum);
producer.send(message);
producer.close();

The schema (this is created from avdl file):

{ "type" : "record", "name" : "RiskMeasureEvent", "namespace" : "risk", "fields" : [ { "name" : "info", "type" : { "type" : "record", "name" : "RiskMeasureInfo", "fields" : [ { "name" : "source", "type" : { "type" : "record", "name" : "Source", "fields" : [ { "name" : "app", "type" : { "type" : "record", "name" : "Application", "fields" : [ { "name" : "csi_id", "type" : "string" }, { "name" : "name", "type" : "string" } ] } }, { "name" : "env", "type" : { "type" : "record", "name" : "Environment", "fields" : [ { "name" : "value", "type" : [ { "type" : "enum", "name" : "EnvironmentConstants", "symbols" : [ "DEV", "UAT", "PROD" ] }, "string" ] } ] } }, ...

The json file:

{ "info": { "source": { "app": { "csi_id": "123", "name": "ABC" }, "env": { "value": { "risk.EnvironmentConstants": "PROD" } }, ...

Any suggestions?

Lack of messages and behavior when using converters other than AvroConverter

This happens when using JsonConverter or StingConverter.

[2016-09-28 16:47:21,325] INFO Reflections took 4681 ms to scan 253 urls, producing 12411 keys and 81532 values  (org.reflections.Reflections)
[2016-09-28 16:47:28,311] DEBUG IPC Client (502605107) connection to hdfs-master0-dev.home/192.168.1.115:8020 from brandon: closed (org.apache.hadoop.ipc.Client)
[2016-09-28 16:47:28,311] DEBUG IPC Client (502605107) connection to hdfs-master0-dev.home/192.168.1.115:8020 from brandon: stopped, remaining connections 0 (org.apache.hadoop.ipc.Client)
[2016-09-28 16:47:46,690] DEBUG Scavenging sessions at 1475099266690 (org.eclipse.jetty.server.session)
[2016-09-28 16:47:48,158] DEBUG Lease renewer daemon for [] with renew id 1 executed (org.apache.hadoop.hdfs.LeaseRenewer)
[2016-09-28 16:48:16,691] DEBUG Scavenging sessions at 1475099296691 (org.eclipse.jetty.server.session)
[2016-09-28 16:48:18,161] DEBUG Lease renewer daemon for [] with renew id 1 executed (org.apache.hadoop.hdfs.LeaseRenewer)
[2016-09-28 16:48:19,161] DEBUG Lease renewer daemon for [] with renew id 1 expired (org.apache.hadoop.hdfs.LeaseRenewer)
[2016-09-28 16:48:19,161] DEBUG Lease renewer daemon for [] with renew id 1 exited (org.apache.hadoop.hdfs.LeaseRenewer)
[2016-09-28 16:48:46,691] DEBUG Scavenging sessions at 1475099326691 (org.eclipse.jetty.server.session)

Parallelize file commit in HdfsWriter close

When closing the HdfsWriter, it loops through the topic partitions and commit the data file associated with each topic partition. This process is blocking and can take a long time if the number of topic partitions is large. Parallelize this process can reduce latency. One way to parallelize file commit is to use a thread pool.

ERROR Uncaught exception in herder work thread

Version: 0.9.0

Steps:

  1. Stop kafka connect.
  2. Deleted connect-configs & connector-offsets topics in kafka.;
  3. Start kafka connect;
  4. create connectors with some configs changed.

2016-06-28 11:39:40,487] ERROR Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:166)
java.lang.NullPointerException
at io.confluent.connect.hdfs.DataWriter.close(DataWriter.java:307)
at io.confluent.connect.hdfs.HdfsSinkTask.stop(HdfsSinkTask.java:83)
at org.apache.kafka.connect.runtime.WorkerSinkTask.awaitStop(WorkerSinkTask.java:119)
at org.apache.kafka.connect.runtime.Worker.stopTask(Worker.java:311)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.onRevoked(DistributedHerder.java:898)
at org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.onJoinPrepare(WorkerCoordinator.java:236)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:207)
at org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.ensureActive(WorkerGroupMember.java:130)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:182)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:159)
at java.lang.Thread.run(Thread.java:745)
[2016-06-28 11:39:40,489] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:68)

I got something in log, connectors are stopped because of config update:
[2016-06-28 11:39:40,257] INFO Connector beaver_http_request-c3 config updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder:827)
[2016-06-28 11:39:40,260] INFO Rebalance started (org.apache.kafka.connect.runtime.distributed.DistributedHerder:879)
[2016-06-28 11:39:40,260] INFO Stopping connector beaver_http_response-c3 (org.apache.kafka.connect.runtime.Worker:226)
[2016-06-28 11:39:40,260] INFO Stopped connector beaver_http_response-c3 (org.apache.kafka.connect.runtime.Worker:240)
[2016-06-28 11:39:40,260] INFO Stopping connector beaver_tcp_session-c3 (org.apache.kafka.connect.runtime.Worker:226)

NullPointerException when using Avro converter

Hi,

We are getting this error

java.lang.NullPointerException at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:863) at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:946) at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:751) at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:103) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:266) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:175) at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90) at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58) at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82) Exception in thread "WorkerSinkTask-elasticsearch-sink-0" java.lang.NullPointerException at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:863) at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:946) at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:751) at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:103) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:266) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:175) at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90) at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58) at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)

if we use the Schema Registry with this schema and value
{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\"}]}

"expiration_date":null

in a row coming from the JDBC connector.

Too many connections to the hive metastore

We observed that kafka-connect-hdfs creates too many connections to the hive metastore, essentially DoS-ing our servers. Ideally, there should never be more than tasks.max connections to the metastore since each HdsSinkTask creates one DataWriter, which creates one HiveMetaStoreClient. However, we observed thousands of connections when tasks.max was only 200.

We believe the issue is caused by the retrying logic in HiveMetaStore. Specifally, HiveMetaStore blindly calls reconnect on all TException, without ever calling client.close():

 74   private <R> R doWithRetry(ClientAction<R> action) throws TException {
 75     try {
 76       return action.call();
 77     } catch (TException e) {
 78       try {
 79           client.reconnect();
 80       } catch (MetaException swallowedException) {
 81         // reconnect failed, throw the original exception
 82         log.warn("Cannot connect to Hive meta store: {}", swallowedException.getMessage());
 83         throw e;
 84       }
 85       return action.call();
 86     }
 87   }

Moreover, adding a client.close() before client.reconnect() won't solve the problem as the underlying layers use a cacheable client wrapper that won't tear down the connection as long as there are users: https://github.com/apache/hive/blob/master/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HiveClientCache.java#L394

Resolution: From what I understand, the retry logic is unnecessary since the underlying IMetaStoreClient object is actually the RetryingHiveMetaStoreClient (as opposed to the vanilla HiveMetaStoreClient) which already implements a sane retry logic (calls reconnect only after socket timeout). Therefore, we have solved the problem by simply removing the retry logic in HiveMetaStore.java

Customizable Hive Database and Table names

Looks like kafka-connect-hdfs currently assumes certain values for Hive database (from config) and table names (=topic). Our use case involves reading the database and table names from the Avro Schema (custom properties). To achieve this, we have to fork TopicPartitionWriter and DataWriter since they directly call hiveMetaStore.addPartition(DB, TABLE,...) and overwrite the DB and TABLE arguments.

Recovery persistently fails due to empty WAL file

We noticed that two partitions of one of our topics had an ever increasing offset lag. This was happening because Kafka Connect persistently failed to recover these two partitions. We get the following error every time in the log4j files:

[2016-05-04 17:30:02,865] ERROR Recovery failed at state RECOVERY_PARTITION_PAUSED (io.confluent.connect.hdfs.TopicPartitionWriter:221)
org.apache.kafka.connect.errors.ConnectException: Error creating writer for log file hdfs://bikeshed//kafka_connect/logs/goscribe.mp-hash_events/48/log
    at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:91)
    at io.confluent.connect.hdfs.wal.FSWAL.apply(FSWAL.java:105)
    at io.confluent.connect.hdfs.TopicPartitionWriter.applyWAL(TopicPartitionWriter.java:448)
    at io.confluent.connect.hdfs.TopicPartitionWriter.recover(TopicPartitionWriter.java:204)
    at io.confluent.connect.hdfs.DataWriter.recover(DataWriter.java:236)
    at io.confluent.connect.hdfs.DataWriter.onPartitionsAssigned(DataWriter.java:299)
    at io.confluent.connect.hdfs.HdfsSinkTask.onPartitionsAssigned(HdfsSinkTask.java:103)
    at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:362)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:194)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:225)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:311)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:890)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:171)
    at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
    at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
    at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
Caused by: java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:197)
    at java.io.DataInputStream.readFully(DataInputStream.java:169)
    at io.confluent.connect.hdfs.wal.WALFile$Reader.init(WALFile.java:590)
    at io.confluent.connect.hdfs.wal.WALFile$Reader.initialize(WALFile.java:558)
    at io.confluent.connect.hdfs.wal.WALFile$Reader.<init>(WALFile.java:535)
    at io.confluent.connect.hdfs.wal.WALFile$Writer.<init>(WALFile.java:214)
    at io.confluent.connect.hdfs.wal.WALFile.createWriter(WALFile.java:67)
    at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:73)
    ... 16 more

The log directory on HDFS looks like this:

vikesh@sjc8c-rm9-15d ~ $ hadoop fs -ls hdfs://bikeshed//kafka_connect/logs/goscribe.mp-hash_events/48/
Found 2 items
-rw-r--r--   3 hadoop hadoop          0 2016-05-02 08:25 hdfs://bikeshed/kafka_connect/logs/goscribe.mp-hash_events/48/log
-rw-r--r--   3 hadoop hadoop        358 2016-05-02 07:03 hdfs://bikeshed/kafka_connect/logs/goscribe.mp-hash_events/48/log.1

The WALFile Reader fails to read the empty WAL file (/log). My understanding of what happened is the following:

Connect calls truncate during recovery, which deletes log.1 file and then renames the log file to log.1. At this point, there is no log file. Once the recovery is finished and one partition has been written, it creates the log file and writes it:

  1. Create log file (
    out = fs.create(p, true, bufferSize, replication, blockSize);
    )
  2. Write version header etc.:

If the process gets killed between 1 and 2, this would happen.

Resolution: We manually deleted the log file and observed that the recovery proceeded normally. I believe that's the case because FSWAL checks for a non-existent log file (but not an empty log file, which is clearly possible):

Connector Doesn't Polling Automatically

Hi there,

I meet an issue that:

hdfs connector start reading from topic and write to hive table for example first 20 rows (assume we have 50 rows in total) then it will stop. When I insert a new row, it may read another 5 rows, and stop again. It won't read 50 rows if there wasn't any new rows add to the topic. It looks hdfs connector stops auto polling.

Is there anyone could help?

Thanks

Add schema check for writer cache

As schemas may evolve for topics, we need to make sure that the cached writer is associated with the valid schema. Compatible schemas may go to the same file but we may need to rewrite uncommitted records with the new scheme, or we just close the writer associated with the old schema and write new schema records to files.

Why the "topics" of SinkTask cannot be overridden in SinkConnector.taskConfigs(maxTasks)?

I found the topics cannot be overridden by Connector.taskConfigs(int maxTasks) (i.e. even if I return a config which I have already overridden the topics, the kafka connect will still use the original configuration in Worker.connectorTaskConfigs). Is this a bug? I think it should check the config from Connector.taskConfigs first, if it does not contain the configuration of topics, it tries to use the original configure.

Here is the detail implementation,
1. I wrote a new connector of mongodb, the main config options of it is as below:

# I want to create 3 tasks and each task only subscribe one topic
tasks.max=3
topics=topic1,topic2,topic3

2. I want to let each task only subscribe one topic, so I generate the config count equals to the topic size, and override the option topics.

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
   List<Map<String, String>> configs = new ArrayList<>();
   for (String topic : originalConfig.split(",")) {
        Map<String, String> config = new HashMap<String, String>();
        config.putAll(originalConfig);
        config.put("topics", topic);
        configs.add(config);
   }
   return configs
}

Trigger closing time partition after time some event time have been observed.

Hi

It would be nice to be able to trigger a close of a file when a certain event time has been observed. This so that a certain hour can be considered finished instead of waiting for flush.size or rotate.interval.ms to trigger the close. This would make parquet files larger and finish earlier. Win-win.

Cheers,
-Kristoffer

HDFS connector issue

HDFS connector throws this exception and halts execution even if one message in a kafka topic is erroneous. I found that kafka-avro-console-consumer continues with processing of subsequent messages when --skip-message-on-error option is used. Can the same behaviour be provided in HDFS connector?

Thanks.

Exception in thread "WorkerSinkTask-hdfs-sink-0" org.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:266)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:175)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 61
Caused by: java.io.EOFException
at org.apache.avro.io.BinaryDecoder$ByteArrayByteSource.readRaw(BinaryDecoder.java:944)
at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:349)
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:363)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:355)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:130)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:191)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:130)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:99)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:266)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:175)

Partition by multiple columns.

Hi
My hive table have to be partitioned by multiple columns. How about change the type of "partition.field.name" to LIST?

What is the difference between schema-registry-start of kafka_connect 1.0.1 and kafka_connect 3.0.1?

While running schema-registry-start of kafka_connect 3.0.1, the following exceptions are coming. May I know how to resolve this?

[2016-09-22 15:55:12,108] ERROR Server died unexpectedly: (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:51)
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'brokers': Error reading field 'host': Error reading string of length 24947, only 114 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73)
at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:380)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:178)
at org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:205)
at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1272)
at io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread.(KafkaStoreReaderThread.java:107)
at io.confluent.kafka.schemaregistry.storage.KafkaStore.init(KafkaStore.java:143)
at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:164)
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:55)
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:37)
at io.confluent.rest.Application.createServer(Application.java:117)
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain.main(SchemaRegistryMain.java:43)
at io.confluent.kafka.schemaregistry.rest.Main.main(Main.java:34)

Thanks,
M, Janaarthan.

ERROR Commit of WorkerSinkTask id + offsets threw an unexpected exception:

Hello,

Can anyone please help me with this error, which is showing up very frequently and I feel this ends up in a situation that kafka connect is just running but not doing its purpose ( from kafka to HDFS ).

[2016-07-11 09:25:10,319] ERROR Error closing writer for topicname. Error: {] (io.confluent.connect.hdfs.DataWriter:321)
[2016-07-11 09:25:11,320] INFO WorkerSinkTask{id=data_platform_high_traffic-8} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:244)
[2016-07-11 09:25:11,327] ERROR Commit of WorkerSinkTask{id=data_platform_high_traffic-8} offsets threw an unexpected exception: (org.apache.kafka.connect.runtime.WorkerSinkTask:180)
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:201)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:998)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:305)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:222)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Before this error I had my session time out at default value which is 30000ms and later I changed it to 60000ms. Along with increasing session timeout I bumped up request.timeout as well.

Thank you!

java.lang.NoSuchMethodError: while I run using the following command connect-standalone /etc/kafka/connect-standalone.properties /etc/kafka-connect-hdfs/quickstart-hdfs.properties

[2016-09-21 10:28:33,536] INFO Kafka version : 0.10.0.0-cp1 (org.apache.kafka.common.utils.AppInfoParser:83)
[2016-09-21 10:28:33,536] INFO Kafka commitId : bbf33fc824e68098 (org.apache.kafka.common.utils.AppInfoParser:84)
[2016-09-21 10:28:33,537] INFO Starting FileOffsetBackingStore with file /tmp/connect.offsets1 (org.apache.kafka.connect.storage.FileOffsetBackingStore:60)
[2016-09-21 10:28:33,597] INFO Worker started (org.apache.kafka.connect.runtime.Worker:124)
[2016-09-21 10:28:33,598] INFO Herder started (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:73)
[2016-09-21 10:28:33,598] INFO Starting REST server (org.apache.kafka.connect.runtime.rest.RestServer:98)
[2016-09-21 10:28:33,830] INFO jetty-9.2.15.v20160210 (org.eclipse.jetty.server.Server:327)
[2016-09-21 10:28:34,019] WARN FAILED o.e.j.s.ServletContextHandler@1f9e9475{/,null,STARTING}: java.lang.NoSuchMethodError: javax.ws.rs.core.Application.getProperties()Ljava/util/Map; (org.eclipse.jetty.util.component.AbstractLifeCycle:212)
java.lang.NoSuchMethodError: javax.ws.rs.core.Application.getProperties()Ljava/util/Map;
at org.glassfish.jersey.server.ApplicationHandler.(ApplicationHandler.java:331)
at org.glassfish.jersey.servlet.WebComponent.(WebComponent.java:392)
at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:177)
at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:369)
at javax.servlet.GenericServlet.init(GenericServlet.java:241)
at org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:616)
at org.eclipse.jetty.servlet.ServletHolder.initialize(ServletHolder.java:396)
at org.eclipse.jetty.servlet.ServletHandler.initialize(ServletHandler.java:871)
at org.eclipse.jetty.servlet.ServletContextHandler.startContext(ServletContextHandler.java:298)
at org.eclipse.jetty.server.handler.ContextHandler.doStart(ContextHandler.java:741)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:132)
at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:114)
at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:61)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:132)
at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:114)
at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:61)
at org.eclipse.jetty.server.handler.StatisticsHandler.doStart(StatisticsHandler.java:232)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:132)
at org.eclipse.jetty.server.Server.start(Server.java:387)
at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:114)
at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:61)
at org.eclipse.jetty.server.Server.doStart(Server.java:354)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at org.apache.kafka.connect.runtime.rest.RestServer.start(RestServer.java:145)
at org.apache.kafka.connect.runtime.Connect.start(Connect.java:56)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:82)
[2016-09-21 10:28:34,039] WARN FAILED org.eclipse.jetty.server.handler.HandlerCollection@5dafbe45[o.e.j.s.ServletContextHandler@1f9e9475{/,null,STARTING}, org.eclipse.jetty.server.handler.DefaultHandler@2254127a, org.eclipse.jetty.server.handler.RequestLogHandler@51891008]: java.lang.NoSuchMethodError: javax.ws.rs.core.Application.getProperties()Ljava/util/Map; (org.eclipse.jetty.util.component.AbstractLifeCycle:212)
java.lang.NoSuchMethodError: javax.ws.rs.core.Application.getProperties()Ljava/util/Map;
at org.glassfish.jersey.server.ApplicationHandler.(ApplicationHandler.java:331)
at org.glassfish.jersey.servlet.WebComponent.(WebComponent.java:392)
at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:177)
at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:369)
at javax.servlet.GenericServlet.init(GenericServlet.java:241)
at org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:616)
at org.eclipse.jetty.servlet.ServletHolder.initialize(ServletHolder.java:396)
at org.eclipse.jetty.servlet.ServletHandler.initialize(ServletHandler.java:871)
at org.eclipse.jetty.servlet.ServletContextHandler.startContext(ServletContextHandler.java:298)
at org.eclipse.jetty.server.handler.ContextHandler.doStart(ContextHandler.java:741)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:132)
at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:114)
at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:61)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:132)
at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:114)
at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:61)
at org.eclipse.jetty.server.handler.StatisticsHandler.doStart(StatisticsHandler.java:232)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:132)
at org.eclipse.jetty.server.Server.start(Server.java:387)
at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:114)
at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:61)
at org.eclipse.jetty.server.Server.doStart(Server.java:354)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at org.apache.kafka.connect.runtime.rest.RestServer.start(RestServer.java:145)
at org.apache.kafka.connect.runtime.Connect.start(Connect.java:56)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:82)
[2016-09-21 10:28:34,046] WARN FAILED org.eclipse.jetty.server.handler.StatisticsHandler@12591ac8: java.lang.NoSuchMethodError: javax.ws.rs.core.Application.getProperties()Ljava/util/Map; (org.eclipse.jetty.util.component.AbstractLifeCycle:212)
java.lang.NoSuchMethodError: javax.ws.rs.core.Application.getProperties()Ljava/util/Map;
at org.glassfish.jersey.server.ApplicationHandler.(ApplicationHandler.java:331)
at org.glassfish.jersey.servlet.WebComponent.(WebComponent.java:392)
at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:177)
at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:369)
at javax.servlet.GenericServlet.init(GenericServlet.java:241)
at org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:616)
at org.eclipse.jetty.servlet.ServletHolder.initialize(ServletHolder.java:396)
at org.eclipse.jetty.servlet.ServletHandler.initialize(ServletHandler.java:871)
at org.eclipse.jetty.servlet.ServletContextHandler.startContext(ServletContextHandler.java:298)
at org.eclipse.jetty.server.handler.ContextHandler.doStart(ContextHandler.java:741)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:132)
at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:114)
at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:61)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:132)
at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:114)
at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:61)
at org.eclipse.jetty.server.handler.StatisticsHandler.doStart(StatisticsHandler.java:232)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:132)
at org.eclipse.jetty.server.Server.start(Server.java:387)
at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:114)
at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:61)
at org.eclipse.jetty.server.Server.doStart(Server.java:354)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at org.apache.kafka.connect.runtime.rest.RestServer.start(RestServer.java:145)
at org.apache.kafka.connect.runtime.Connect.start(Connect.java:56)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:82)
[2016-09-21 10:28:34,062] INFO Started ServerConnector@29f7cefd{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:266)
[2016-09-21 10:28:34,063] WARN FAILED org.eclipse.jetty.server.Server@700fb871: java.lang.NoSuchMethodError: javax.ws.rs.core.Application.getProperties()Ljava/util/Map; (org.eclipse.jetty.util.component.AbstractLifeCycle:212)
java.lang.NoSuchMethodError: javax.ws.rs.core.Application.getProperties()Ljava/util/Map;
at org.glassfish.jersey.server.ApplicationHandler.(ApplicationHandler.java:331)
at org.glassfish.jersey.servlet.WebComponent.(WebComponent.java:392)
at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:177)
at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:369)
at javax.servlet.GenericServlet.init(GenericServlet.java:241)
at org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:616)
at org.eclipse.jetty.servlet.ServletHolder.initialize(ServletHolder.java:396)
at org.eclipse.jetty.servlet.ServletHandler.initialize(ServletHandler.java:871)
at org.eclipse.jetty.servlet.ServletContextHandler.startContext(ServletContextHandler.java:298)
at org.eclipse.jetty.server.handler.ContextHandler.doStart(ContextHandler.java:741)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:132)
at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:114)
at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:61)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:132)
at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:114)
at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:61)
at org.eclipse.jetty.server.handler.StatisticsHandler.doStart(StatisticsHandler.java:232)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:132)
at org.eclipse.jetty.server.Server.start(Server.java:387)
at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:114)
at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:61)
at org.eclipse.jetty.server.Server.doStart(Server.java:354)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at org.apache.kafka.connect.runtime.rest.RestServer.start(RestServer.java:145)
at org.apache.kafka.connect.runtime.Connect.start(Connect.java:56)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:82)
[2016-09-21 10:28:34,079] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:100)
java.lang.NoSuchMethodError: javax.ws.rs.core.Application.getProperties()Ljava/util/Map;
at org.glassfish.jersey.server.ApplicationHandler.(ApplicationHandler.java:331)
at org.glassfish.jersey.servlet.WebComponent.(WebComponent.java:392)
at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:177)
at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:369)
at javax.servlet.GenericServlet.init(GenericServlet.java:241)
at org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:616)
at org.eclipse.jetty.servlet.ServletHolder.initialize(ServletHolder.java:396)
at org.eclipse.jetty.servlet.ServletHandler.initialize(ServletHandler.java:871)
at org.eclipse.jetty.servlet.ServletContextHandler.startContext(ServletContextHandler.java:298)
at org.eclipse.jetty.server.handler.ContextHandler.doStart(ContextHandler.java:741)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:132)
at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:114)
at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:61)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:132)
at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:114)
at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:61)
at org.eclipse.jetty.server.handler.StatisticsHandler.doStart(StatisticsHandler.java:232)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:132)
at org.eclipse.jetty.server.Server.start(Server.java:387)
at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:114)
at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:61)
at org.eclipse.jetty.server.Server.doStart(Server.java:354)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at org.apache.kafka.connect.runtime.rest.RestServer.start(RestServer.java:145)
at org.apache.kafka.connect.runtime.Connect.start(Connect.java:56)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:82)
[2016-09-21 10:28:34,105] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:68)
[2016-09-21 10:28:34,106] INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer:154)
[2016-09-21 10:28:34,119] INFO Stopped ServerConnector@29f7cefd{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:306)
[2016-09-21 10:28:34,124] INFO Stopped o.e.j.s.ServletContextHandler@1f9e9475{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:865)
[2016-09-21 10:28:34,124] INFO REST server stopped (org.apache.kafka.connect.runtime.rest.RestServer:165)
[2016-09-21 10:28:34,124] INFO Herder stopping (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:77)
[2016-09-21 10:28:34,124] INFO Worker stopping (org.apache.kafka.connect.runtime.Worker:128)
[2016-09-21 10:28:34,124] WARN Shutting down tasks [] uncleanly; herder should have shut down tasks before the Worker is stopped. (org.apache.kafka.connect.runtime.Worker:141)
[2016-09-21 10:28:34,125] INFO Stopped FileOffsetBackingStore (org.apache.kafka.connect.storage.FileOffsetBackingStore:68)
[2016-09-21 10:28:34,125] INFO Worker stopped (org.apache.kafka.connect.runtime.Worker:151)
[2016-09-21 10:28:48,152] INFO Reflections took 14487 ms to scan 282 urls, producing 17851 keys and 134059 values (org.reflections.Reflections:229)
[2016-09-21 10:28:48,168] INFO Herder stopped (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:91)
[2016-09-21 10:28:48,168] INFO Kafka Connect stopped (org.apache.kafka.connect.runtime.Connect:73)

readOffset is slow/expensive for large, highly partitioned tables

Our current setup has a Partitioner that routes data into {topic}/day={...}/hour={...}/{actual data files}. In some load testing with a 500 partition topic, we managed to DDoS HDFS's NameNode when our consumers were all recovering and calling readOffset. This function recursively scans the contents of the topic for each partition, performing a total of O(time * partitions^2) work.

What do you feel would be a good solution to this?

  1. Include offsets in WAL recovery: Have wal.apply() return the max offset, and then get rid of resetOffsets. The WAL interface would have to be less generic and would have to know about the file name format. If the WAL is empty then we need something else. Maybe part of WAL recovery can be a special "current offset" file.
  2. Work around: Make Kafka partition the first directory in the Partitioner and do some magic special casing in readOffset and/or fileStatusWithMaxOffset. We could probably also assume that the latest offset isn't in an old time partition. If we do both of these tricks then total recovery cost becomes O(partitions * constant for how far back we scan).

I'm happy to contribute a PR for this (though I imagine the work around in option 2 wouldn't be easily publishable).

Thanks!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.