Comments (13)
I'll go ahead and close this issue, but feel free to re-open if I missed a case where NPE would be thrown either due to misconfiguration or due to records without timestamp with timestamp extractors that depend on it.
from kafka-connect-storage-cloud.
It looks like the rotate.interval.ms
will only work with the TimeBasedPartitioner given the way it is currently implemented. Is this intended? Or a bug?
from kafka-connect-storage-cloud.
By design. rotate.interval.ms
can be based on timestamp if the partitioner is not based on wallclock. We'll need to update the docs to reflect this requirement specifically.
NPE there is something we also want to improve by throwing an intuitive error message.
rotate.schedule.interval.ms
however is independent of partitioner. (You might see duplicates too though if you used it).
from kafka-connect-storage-cloud.
Okay, thanks @kkonstantine. I'm still wondering why rotate.interval.ms
cannot be used with any partitioner besides the TimeBasedPartitioner. The way it is described in the docs suggests that it will use wallclock time to flush at regular intervals. But actually, given the way it is implemented, it will get the time from the events themselves (depending on how the timestamp extractor is configured). It seems to me that it shouldn't matter what partitioner I'm using. If I configure rotate.interval.ms
to be 30 seconds, it will flush every 30 seconds, so long as there is data in the buffer. Was there a specific reason it doesn't work like this? Or could it be modified to support this functionality?
from kafka-connect-storage-cloud.
A fix like this is basically all I'd need: zzbennett@64e3d1e If there is no TimestampExtractor defined, just default to wall clock.
from kafka-connect-storage-cloud.
Yes, this is central in the design of the S3 connector.
We strived to make the connector have exactly-once semantics on top of an eventual consistent store such as S3 without depending on any external dependency (consistent stores etc) besides Kafka and S3.
This can not happen when wall-clock is used for partitioning. Still wall-clock is the default for time-based partitioning and can be used along with size-based partitioning.
There are enhancements to be made probably in terms of partitioning options, but the current state machine of the connector seems to cover a wide range of use cases, many of which might need a custom partitioner. Would you like to mention what are your partitioning requirements that are not covered by the current configuration options?
from kafka-connect-storage-cloud.
Sure. So we are basically using time based partitioning, but our custom partitioner actually makes a call to a service that we built to get the partition info. This service handles creating the tables and partitions in hive, among a few other things.
It's not the best architecture, more just tech debt from our legacy logging infrastructure that we've had to go with for the time being.
I think I understand what you're saying now about the exactly-once semantics and the time based flushing. The timestamp is read from the event in order to achieve exactly-once semantics but also to be able to flush on regular intervals.
In this case, I could use rotate.schedule.interval.ms
and set the interval to like 30 seconds. So long as the size base flushing will take precedence, which, from looking at the code, looks like will happen, i.e. if the buffer fills up in 5 seconds, it will still flush to S3, even though the interval is set to 30 seconds. I will still run into the same NPE for now, since it gets thrown before the rotate schedule interval logic is reached in the rotateOnTime
method, but that's an easy fix.
from kafka-connect-storage-cloud.
Actually, I just confirmed that the NPE does not happen when using rotate.schedule.interval.ms
because of a short circuit in boolean logic, so that's good.
I'd be happy to add some logic to prevent the NPE from happening at all. I'm wondering what the best solution is. I'm thinking of doing either:
- Add some validation logic directly in the
TopicPartitionWriter
itself to ensurerotate.interval.ms
is not used with anything other than theTimeBasedPartitioner
. - Add the validation logic in
S3SinkConnectorConfig
so that the issue is caught in the validate step. - Add the logic to
io.confluent.connect.storage.partitioner.PartitionerConfig
. That's where the configs are actually defined so it seems sensible, but then any connector using thePartitionerConfig
class gets the same validation, which is maybe not desirable.
from kafka-connect-storage-cloud.
Just ran into this myself. It would be very useful to have this dependency documented in https://docs.confluent.io/current/connect/connect-storage-cloud/kafka-connect-s3/docs/configuration_options.html#s3-configuration-options.
Also, could you please explain the difference between rotate.interval.ms and partition.duration.ms?
from kafka-connect-storage-cloud.
The following PRs seem to have fixed the issues described here:
confluentinc/kafka-connect-storage-common#55
#136
#138
from kafka-connect-storage-cloud.
Thanks @kkonstantine I think that does fix it.
from kafka-connect-storage-cloud.
@kkonstantine I would like to use FieldPartitioner with the ability to flush by size or time interval, whichever happens first. I got the same NullPointerException, but learned from comments that I have to use flush.size and rotate.schedule.interval.ms. Could you please provide more details why duplicates may occur on restart, so I could think of any workaround or custom partitioner implementation that guarantees exactly-once semantics?
from kafka-connect-storage-cloud.
@ssbiox at a high level, if you partitioning is deterministic, e.g. it relies on the kafka timestamps or a record field that stores a timestamp for a record, you shouldn't see duplicates. If, however, your partitioning is not deterministic, because for instance it depends on wall clock time, then you might see duplicates occasionally.
from kafka-connect-storage-cloud.
Related Issues (20)
- Add option to keep tombstone events?
- Handling Close call in ICR mode
- java.lang.IllegalStateException
- Where is S3 source connector? HOT 1
- Unable to build v10.5.7 tag locally, missing io.confluent:common:[7.6.0, 7.6.1) HOT 1
- Direct Memory Buffer Leak? HOT 1
- AWS MSK S3 Sink Connector to Deserialize AVRO without Schema Registry HOT 2
- commit timeout question HOT 1
- Failed to put when S3 object lock feature is enabled
- Request for Feature: Confirming Completion and Handling Old Data in S3 Kafka Connector
- io.confluent.connect.s3.format.parquet.ParquetFormat and ParquetRecordWriterProvider writes value in record field of type map and enum as bytes in Parquet result
- Null values being replaced with default HOT 5
- failed to write parquet with error for Fixed Binary size mismatch
- Add ZSTD compression support for JSON output
- Getting SchemaProjectorException after schema evolution with enum
- S3 Kafka Sink connector can't change default path and filename
- S3 Kafka Sink: Tombstone encoded partition doesn't support timebased partioning
- S3SinkConnector error: The bucket is in this region
- [Feature request] S3SinkConnector support for default storage-class (object tiering)
- connector is not interoperable with other s3-compliant cloud storage providers HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from kafka-connect-storage-cloud.