Coder Social home page Coder Social logo

Comments (12)

mfontanini avatar mfontanini commented on June 15, 2024

Interesting idea. I do agree that there's a mix of callbacks in Consumer and Configuration and they should be in a same spot. When I wrote that I probably wasn't really thinking of which callback belonged to what.

Now, why do you think it's better to move the callbacks into the configuration object rather than move the producer/consumer specific ones away from the configuration objects and into the specific producer/consumer classes? This way you just wouldn't be able to set an offset commit callback on a producer, which is less error prone. If we move the assignment/revocation callbacks into the configuration, you could end up setting that for a config used on a producer which doesn't make too much sense. I have wrote several applications using cppkafka and I've never really had to use the same callback (e.g. assignment) for different consumers so being able to set it on the configuration wouldn't really help much, at least in the cases I've encountered.

Is there any reason why you want the producer/consumer specific callbacks to be in the config objects rather than the other way around?

from cppkafka.

accelerated avatar accelerated commented on June 15, 2024

@mfontanini Well in the same token, you can currently set the DeliveryReportCallback in the Configuration object of a Consumer which also won't make much sense either. So yes, I think it's a great (3rd) alternative solution to move the specific callbacks away from the Configuration object into the respective Consumer/Producer/Topic. On the downside, the Configuration object will still have the generic set() method which can easily be misused by providing the wrong configuration parameters between a Consumer and a Producer. This can be remedied by having a static map of allowed options and do a simple check whenever someone sets the wrong option and throwing. For example producer.set("auto.commit.interval.ms", "1000") which has no effect and could mislead.

Currently based on what I have seen in the code we have:

    /**
     * Producer callbacks
     */
    using DeliveryReportCallback = std::function<void(Producer& producer, const Message&)>;
    
    /**
     * Consumer callbacks
     */
    using OffsetCommitCallback = std::function<void(Consumer& consumer, Error,
                                                    const TopicPartitionList& topic_partitions)>;
    using AssignmentCallback = std::function<void(TopicPartitionList&)>;
    using RevocationCallback = std::function<void(const TopicPartitionList&)>;
    using RebalanceErrorCallback = std::function<void(Error)>;
    
    /**
     * Producer & Consumer callbacks
     */
    using ErrorCallback = std::function<void(KafkaHandleBase& handle, int error,
                                             const std::string& reason)>;
    using ThrottleCallback = std::function<void(KafkaHandleBase& handle,
                                                const std::string& broker_name,
                                                int32_t broker_id,
                                                std::chrono::milliseconds throttle_time)>;
    using LogCallback = std::function<void(KafkaHandleBase& handle, int level,
                                           const std::string& facility,
                                           const std::string& message)>;
    using StatsCallback = std::function<void(KafkaHandleBase& handle, const std::string& json)>;
    using SocketCallback = std::function<int(int domain, int type, int protocol)>;

    /**
     * Topic callbacks
     */
    using PartitionerCallback = std::function<int32_t(const Topic&, const Buffer& key,
                                                      int32_t partition_count)>;

So based on the above, we would move the common callbacks to both Producer and Consumer. I have no problems with this approach and in that case we deprecate all the callback setters in the Config and the TopicConfig.

One negative point for breaking apart the Configuration object is that if I want to configure say 100 consumers with the same config, I can create a single config object and reuse it, whereas if the callbacks are in the Consumer object, I have to call those setter methods 100 times. I can be done programatically in a loop of course, but it's not as nice.

Overall I think your suggestion would be my favorite approach and also the cleanest.

from cppkafka.

accelerated avatar accelerated commented on June 15, 2024

PS: Regarding Configuration::set(), another way to reinforce the proper setting is to split them into enums and then have Consumer:set(ConsumerConfigEnum e, const std::string& value) and thus you get rid of the Configuration class altogether. This approach will make things very simple for anyone trying to provision a Producer/Consumer/Topic object.
Advantages are bomb-proofing the code and also full backwards compatibility as the original Constructor with the config param can be kept until retired.

from cppkafka.

mfontanini avatar mfontanini commented on June 15, 2024

Regarding moving callbacks from the configuration into the consumer/producer class I think would work well. However, I don't agree with moving all callbacks, even common ones, there. I think common ones may regularly be used among multiple consumers/producers. I actually have done this: we're capturing all logging errors from kafka and routing them to the logging library we use. By configuring this on the configuration itself, it's easy to just create a base config with thee callbacks, then copy it and add whatever specific to consumers/producers you use.

You could argue that this would be a similar case for consumer/producer specific callbacks but I really don't imagine you'd want to share the same e.g. offset commit callback between unrelated consumers. These are much more specific to each consumer/producer object you create and, I think, much less likely to be shared. I may be missing some use case though so feel free to correct me if you think there's higher odds you'd want to share these specific callbacks among different objects.

Finally, regarding Configuration::set, I don't think restricting the keys is a good idea. If you do this and librdkafka adds a new config key, you'll have to fix cppkafka to accept it. That means cppkafka will have to always be up to date with librdkafka and I'd really rather not do that. New keys show up on new releases pretty often so I think this is a common enough scenario that you'd want to avoid it. By keeping this generic, you let librdkafka filter the keys on runtime instead of cppkafka trying to stay up to date with that.

from cppkafka.

accelerated avatar accelerated commented on June 15, 2024

Point taken. Well which callbacks would you move then? Offset commit, ok...what about the partitioner on the Topic? That one clearly can be very different from one topic to another, as each partitioning scheme is different. And maybe the delivery report? So seems like the common callbacks would remain the Configuration object and the specific ones moved to the respective classes? Well...personally i prefer to see all in one place, but you have the final say.
Also note that currently the rebalance callbacks (all 3 of them) are set automatically in the Consumer object which should be optional as some users prefer to use the auto partitioning scheme provided by librdkafka. By setting it you actually turn off that feature.
As for checking the Configuration::set(), one way of always being in sync is to validate against all settings at runtime and check for the global/consumer/producer flags. Right now it's not exported but this can be easily fixed with an extern declaration in the header file.

from cppkafka.

mfontanini avatar mfontanini commented on June 15, 2024

I agree this would look a bit inconsistent. I'll think about it but I feel like common ones should go into the config objects for some reason. By the way, in the list you posted above, the topic partitioner callback belongs to just the producer as it's used to map a key you produced into a specific partition within a topic if you don't set a specific partition when producing it.

Also note that currently the rebalance callbacks (all 3 of them) are set automatically in the Consumer object which should be optional as some users prefer to use the auto partitioning scheme provided by librdkafka. By setting it you actually turn off that feature.

Can you be more specific on what you mean regarding "the auto partitioning scheme provided by librdkafka"? cppkafka is just overriding the callback and then doing the default operations which are to call assign and unassign depending on which rebalance event happened (assignment, revocation or error). I doubt librdkafka adds any behavior on top of this by default.

As for checking the Configuration::set(), one way of always being in sync is to validate against all settings at runtime and check for the global/consumer/producer flags. Right now it's not exported but this can be easily fixed with an extern declaration in the header file.

That is nice but if it's not currently being exported then we can't really use it. I do think this should be exported as it's useful in cases like this one.

from cppkafka.

accelerated avatar accelerated commented on June 15, 2024

Well the partitioner callback is actually set in the TopicConfiguration, so it's a per-topic basis, hence my remark to move it into the Topic object itself. Not sure why you say it belongs to the Producer object?
Ok so what are the action items based on this conversation?

  1. move producer/consumer/topic callbacks to their classes and leave the common callbacks in the Configuration object?
  2. do you want to keep backward compatibility or not? if yes, do you prefer #ifdef new code vs old code or just seamless integration with old functions being marked as deprecated but still left active.
  3. I can look into the librdkafka settings and see how we can incorporate that into your code. Would you be open to that?
    Please advise what you prefer. I think 3 should be a separate pull request imho.

from cppkafka.

mfontanini avatar mfontanini commented on June 15, 2024

You shouldn't move the partitioner callback into the topic itself. You don't normally need to create Topic objects as they're used for very few operations (e.g. fetch metadata). Plus, on librdkafka this callback is a property of the topic configuration and the rdkafka topic handle needs to be created using a configuration that holds this callback, hence there's no "give me a topic object and now let me set a callback to it". It has to be done from the start. There's a per topic partitioner callback and there's also a default partitioner you can set (by setting a default topic configuration on the config you use). So whatever solution should take into account both of those.

Similar to the above, there's something I didn't think about in my previous responses. All of these callbacks are actually part of the configuration from rdkafka's point of view. This means that you would need to configure proxy callbacks to each of those in whatever consumer/producer classes that will try to forward those to any user provided callbacks, if set. This is pretty bad as now every e.g. offset commit, will always have to execute that proxy callback on the consumer, regardless of whether you configured it or not. This kind of defeats the idea of "you don't pay for what you don't use". Same for the rest of the callbacks. You don't want rdkafka to even generate stats if you don't want them and there's no easy way around this if you move the callbacks at the handle level rather than at the configuration level.

For rebalance callbacks this is negligible as these don't happen often enough that it have any impact. Plus, I think it's really common to want to handle assignment/revocations so this makes more sense.

So now going back, I'm not sure how this could be fixed. This was something pretty annoying I found on rdkafka as you need all your callbacks to be set before you even create your producer/consumer handles.

from cppkafka.

accelerated avatar accelerated commented on June 15, 2024

So are you saying that because the configuration object that's copied when you instantiate a Consumer/Producer is essentially the same from the librdkafka perspective so if I were to set callback1 in consumer1 and callback2 in consumer2 sequentially in this order, and both consumers were created using the same config object initially, then callback2 would overwrite callback1 because the handle is clonable? In that case the solution would be always have a separate config per consumer/producer/topic which brings us back to deprecating the config object altogether.

In reference to the overriding of the "auto-partitioning" scheme which i forgot to address earlier, even if your callback does the same basic functionality today, if in the future librdkafka decides to do more (perhaps because of a new feature in kafka itself) then your library users will not benefit from it. I think this override should be left to the discretion of the user.

from cppkafka.

mfontanini avatar mfontanini commented on June 15, 2024

No, that's not at all what I was referring to. The config handles are cloned when instantiating consumers/producers so future modifications of the configuration object won't change anything. I'm referring to the fact that since rdkafka uses the config handles to configure callbacks, you have to set callbacks on the config handles before instantiating the consumer/producers. So if you want to move setting callbacks to the consumer/producer rather than the configs, then you'd have to override every single callback on the consumer/producer with a proxy that forwards them to the optionally user set callback.

When you say "auto-partitioning" are you sure you're referring to the rebalance callback and not the topic partitioning one? The later is not modified by cppkafka by default. The rebalance callback doesn't modify any behavior and you will almost always override it anyway. Sure rdkafka could add some additional behavior but why would it do that? The rebalance callback should be overridden by all applications and by adding additional behavior rdkafka would be basically implementing a piece of code nobody would use because they all override the callback.

If you still have doubts about this, check the official Confluent python librdkafka wrapper that does the same thing.

from cppkafka.

accelerated avatar accelerated commented on June 15, 2024

Ok, yes it's messy and not too many good alternatives available unfortunately. Regarding the rebalance callback i was referring to this.

As of now you are setting it in the Consumer constructor without giving the user any option. I can understand that your code does the same thing as librdkafka's default, however if librdkafka author decides one day to add or change this functionality, your handler won't match anymore. I think this is something that should be left as an optional argument in the constructor.

BTW I have made some enhancements to expose the config options at runtime and get metadata from them, so I will PR those soon. There's changes for cppkafka as well to make use of the new API functionality.

from cppkafka.

accelerated avatar accelerated commented on June 15, 2024

FYI exposing config options via public API. I am currently testing the CppKafka changes consuming this API and will submit PR soon.
Also this polling interface was submitted as well.

from cppkafka.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.