Coder Social home page Coder Social logo

Comments (13)

kkonstantine avatar kkonstantine commented on July 30, 2024 1

I'll go ahead and close this issue, but feel free to re-open if I missed a case where NPE would be thrown either due to misconfiguration or due to records without timestamp with timestamp extractors that depend on it.

from kafka-connect-storage-cloud.

zzbennett avatar zzbennett commented on July 30, 2024

It looks like the rotate.interval.ms will only work with the TimeBasedPartitioner given the way it is currently implemented. Is this intended? Or a bug?

from kafka-connect-storage-cloud.

kkonstantine avatar kkonstantine commented on July 30, 2024

By design. rotate.interval.ms can be based on timestamp if the partitioner is not based on wallclock. We'll need to update the docs to reflect this requirement specifically.

NPE there is something we also want to improve by throwing an intuitive error message.

rotate.schedule.interval.ms however is independent of partitioner. (You might see duplicates too though if you used it).

from kafka-connect-storage-cloud.

zzbennett avatar zzbennett commented on July 30, 2024

Okay, thanks @kkonstantine. I'm still wondering why rotate.interval.ms cannot be used with any partitioner besides the TimeBasedPartitioner. The way it is described in the docs suggests that it will use wallclock time to flush at regular intervals. But actually, given the way it is implemented, it will get the time from the events themselves (depending on how the timestamp extractor is configured). It seems to me that it shouldn't matter what partitioner I'm using. If I configure rotate.interval.ms to be 30 seconds, it will flush every 30 seconds, so long as there is data in the buffer. Was there a specific reason it doesn't work like this? Or could it be modified to support this functionality?

from kafka-connect-storage-cloud.

zzbennett avatar zzbennett commented on July 30, 2024

A fix like this is basically all I'd need: zzbennett@64e3d1e If there is no TimestampExtractor defined, just default to wall clock.

from kafka-connect-storage-cloud.

kkonstantine avatar kkonstantine commented on July 30, 2024

Yes, this is central in the design of the S3 connector.

We strived to make the connector have exactly-once semantics on top of an eventual consistent store such as S3 without depending on any external dependency (consistent stores etc) besides Kafka and S3.

This can not happen when wall-clock is used for partitioning. Still wall-clock is the default for time-based partitioning and can be used along with size-based partitioning.

There are enhancements to be made probably in terms of partitioning options, but the current state machine of the connector seems to cover a wide range of use cases, many of which might need a custom partitioner. Would you like to mention what are your partitioning requirements that are not covered by the current configuration options?

from kafka-connect-storage-cloud.

zzbennett avatar zzbennett commented on July 30, 2024

Sure. So we are basically using time based partitioning, but our custom partitioner actually makes a call to a service that we built to get the partition info. This service handles creating the tables and partitions in hive, among a few other things.

It's not the best architecture, more just tech debt from our legacy logging infrastructure that we've had to go with for the time being.

I think I understand what you're saying now about the exactly-once semantics and the time based flushing. The timestamp is read from the event in order to achieve exactly-once semantics but also to be able to flush on regular intervals.

In this case, I could use rotate.schedule.interval.ms and set the interval to like 30 seconds. So long as the size base flushing will take precedence, which, from looking at the code, looks like will happen, i.e. if the buffer fills up in 5 seconds, it will still flush to S3, even though the interval is set to 30 seconds. I will still run into the same NPE for now, since it gets thrown before the rotate schedule interval logic is reached in the rotateOnTime method, but that's an easy fix.

from kafka-connect-storage-cloud.

zzbennett avatar zzbennett commented on July 30, 2024

Actually, I just confirmed that the NPE does not happen when using rotate.schedule.interval.ms because of a short circuit in boolean logic, so that's good.

I'd be happy to add some logic to prevent the NPE from happening at all. I'm wondering what the best solution is. I'm thinking of doing either:

  1. Add some validation logic directly in the TopicPartitionWriter itself to ensure rotate.interval.ms is not used with anything other than the TimeBasedPartitioner.
  2. Add the validation logic in S3SinkConnectorConfig so that the issue is caught in the validate step.
  3. Add the logic to io.confluent.connect.storage.partitioner.PartitionerConfig. That's where the configs are actually defined so it seems sensible, but then any connector using the PartitionerConfig class gets the same validation, which is maybe not desirable.

from kafka-connect-storage-cloud.

sanjaypsmc avatar sanjaypsmc commented on July 30, 2024

Just ran into this myself. It would be very useful to have this dependency documented in https://docs.confluent.io/current/connect/connect-storage-cloud/kafka-connect-s3/docs/configuration_options.html#s3-configuration-options.

Also, could you please explain the difference between rotate.interval.ms and partition.duration.ms?

from kafka-connect-storage-cloud.

kkonstantine avatar kkonstantine commented on July 30, 2024

The following PRs seem to have fixed the issues described here:

confluentinc/kafka-connect-storage-common#55
#136
#138

from kafka-connect-storage-cloud.

zzbennett avatar zzbennett commented on July 30, 2024

Thanks @kkonstantine I think that does fix it.

from kafka-connect-storage-cloud.

svkx avatar svkx commented on July 30, 2024

@kkonstantine I would like to use FieldPartitioner with the ability to flush by size or time interval, whichever happens first. I got the same NullPointerException, but learned from comments that I have to use flush.size and rotate.schedule.interval.ms. Could you please provide more details why duplicates may occur on restart, so I could think of any workaround or custom partitioner implementation that guarantees exactly-once semantics?

from kafka-connect-storage-cloud.

kkonstantine avatar kkonstantine commented on July 30, 2024

@ssbiox at a high level, if you partitioning is deterministic, e.g. it relies on the kafka timestamps or a record field that stores a timestamp for a record, you shouldn't see duplicates. If, however, your partitioning is not deterministic, because for instance it depends on wall clock time, then you might see duplicates occasionally.

from kafka-connect-storage-cloud.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.