Coder Social home page Coder Social logo

axonframework / extension-kafka Goto Github PK

View Code? Open in Web Editor NEW
66.0 16.0 28.0 1.71 MB

Axon Framework extension for Kafka integration to publish and handle Event messages.

Home Page: https://axoniq.io/

License: Apache License 2.0

Java 97.80% Kotlin 2.20%
axon-framework kafka messaging

extension-kafka's Introduction

Axon Framework - Kafka Extension

Maven Central Build Status SonarCloud Status

Axon Framework is a framework for building evolutionary, event-driven microservice systems, based on the principles of Domain-Driven Design, Command-Query Responsibility Separation (CQRS), and Event Sourcing.

As such, it provides the necessary building blocks to follow these principles. Examples of these building blocks are Aggregate factories and Repositories, Command, Event and Query Buses, and an Event Store. The framework provides sensible defaults for all of these components out of the box.

This setup helps to create a well-structured application without having to bother with the infrastructure. The main focus can thus become the business functionality.

This repository provides an extension to the Axon Framework: Kafka. It provides functionality to leverage Kafka to send and receive Events from one (micro)service to another. Thus, it does not include command or query distribution, nor event store specifics required for event sourcing.

For more information on anything Axon, please visit our website, http://axoniq.io.

Getting started

The AxonIQ Library contains a section for the guides of all the Axon Framework extensions. The Kafka extension guide can be found here.

This extension should be regarded as a partial replacement of Axon Server, since it only cover the event routing part.

Receiving help

Are you having trouble using the extension? We'd like to help you out the best we can! There are a couple of things to consider when you're traversing anything Axon:

  • Checking the reference guide should be your first stop, as the majority of possible scenarios you might encounter when using Axon should be covered there.
  • If the Reference Guide does not cover a specific topic you would've expected, we'd appreciate if you could post a new thread/topic on our library fourms describing the problem.
  • There is a forum to support you in the case the reference guide did not sufficiently answer your question. Axon Framework and Server developers will help out on a best effort basis. Know that any support from contributors on posted question is very much appreciated on the forum.
  • Next to the forum we also monitor Stack Overflow for any questions which are tagged with axon.

Feature requests and issue reporting

We use GitHub's issue tracking system for new feature request, extension enhancements and bugs. Prior to filing an issue, please verify that it's not already reported by someone else.

When filing bugs:

  • A description of your setup and what's happening helps us figuring out what the issue might be
  • Do not forget to provide version you're using
  • If possible, share a stack trace, using the Markdown semantic ```

When filing features:

  • A description of the envisioned addition or enhancement should be provided
  • (Pseudo-)Code snippets showing what it might look like help us understand your suggestion better
  • If you have any thoughts on where to plug this into the framework, that would be very helpful too
  • Lastly, we value contributions to the framework highly. So please provide a Pull Request as well!

extension-kafka's People

Contributors

abuijze avatar aupodogov avatar bkornmeierpax8 avatar codedrivenmitch avatar dependabot[bot] avatar dgomezg avatar github-actions[bot] avatar gklijs avatar hsenasilva avatar idugalic avatar jentsch avatar leechedan avatar lfgcampos avatar martink76 avatar sandjelkovic avatar smcvb avatar snyk-bot avatar zambrovski avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

extension-kafka's Issues

Current documentation is confusing / wrong

Hi guys,

I'm studying the changes you introduced in 4.5.4 and trying to understand how to deal with the fact that no group id is provided anymore. By doing so, I was looking through the docs and found the old description in it: https://docs.axoniq.io/reference-guide/extensions/kafka#consuming-events-with-a-streamable-message-source

I think this is very confusing:

The distinct group id is derived by the StreamableKafkaMessageSource through a groupIdPrefix and a groupdIdSuffixFactory, which are adjustable through the source's builder.

which contradicts with the changes introduced in #273

Cheers,

Simon

Reduce unique consumer group creation when consuming events with Streamable Message Source

Enhancement Description

When consuming events from Kafka using StreamableKafkaMessageSource a new consumer group id is generated each time the event stream is opened. It eventually creates too many consumer groups in Kafka, that never be used again.

Current Behaviour

Each time when Kafka event stream opened via StreamableKafkaMessageSource new consumer group id suffix is generated.

Wanted Behaviour

KafkaTrackingToken stores previously generated group id suffix and reuse it. When opening Kafka event stream with empty token (or token without group id suffix value) new group id suffix is generated and gets stored in the token.

Possible Workarounds

Consume events with SubscribableKafkaMessageSource instead of StreamableKafkaMessageSource.

README.md change

Enhancement Description

The actual release candidate of extension-kafka is 3 (RC-3), but actually in README.MD is written: 'release candidate' (two)

I'll send a pull request with this little enhancement.

Upgrade of a patch release introduced a deprecated methods and breaks in runtime

Basic information

  • Axon Framework version: 4.5.12
  • JDK version: 11
  • Kafka Extension version: 4.5.4

Steps to reproduce

Switched from 4.5.3 to 4.5.4

Expected behaviour

No errors in log, since it is a patch release.

Actual behaviour

2022-07-05 15:18:10.006  INFO 517797 --- [nanceService]-0] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.1.1
2022-07-05 15:18:10.006  INFO 517797 --- [nanceService]-0] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 97671528ba54a138
2022-07-05 15:18:10.006  INFO 517797 --- [nanceService]-0] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1657027090004
2022-07-05 15:18:10.153  INFO 517797 --- [MongoService]-0] org.apache.kafka.clients.Metadata        : [Consumer clientId=tasklist-localhost-polyflow, groupId=null] Cluster ID: vo0yCvCLQAWtvtIhuY_mWQ
2022-07-05 15:18:10.153  INFO 517797 --- [nanceService]-0] org.apache.kafka.clients.Metadata        : [Consumer clientId=tasklist-localhost-filter-options, groupId=null] Cluster ID: vo0yCvCLQAWtvtIhuY_mWQ
2022-07-05 15:18:10.155  INFO 517797 --- [MongoService]-0] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=tasklist-localhost-polyflow, groupId=null] Subscribed to partition(s): tasklist-eventbus-0
2022-07-05 15:18:10.155  INFO 517797 --- [nanceService]-0] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=tasklist-localhost-filter-options, groupId=null] Unsubscribed all topics or patterns and assigned partitions
2022-07-05 15:18:10.157  INFO 517797 --- [MongoService]-0] o.a.e.k.e.consumer.ConsumerSeekUtil      : Seeking topic-partition [tasklist-eventbus-0] with offset [9]
2022-07-05 15:18:10.157  INFO 517797 --- [MongoService]-0] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=tasklist-localhost-polyflow, groupId=null] Seeking to offset 9 for partition tasklist-eventbus-0
2022-07-05 15:18:10.159  WARN 517797 --- [ AsyncFetcher-0] o.a.e.k.e.consumer.FetchEventsTask       : Encountered an exception fetching ConsumerRecords

java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1223) ~[kafka-clients-3.1.1.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.1.1.jar:na]
	at org.axonframework.extensions.kafka.eventhandling.consumer.FetchEventsTask.run(FetchEventsTask.java:90) ~[axon-kafka-4.5.4.jar:4.5.4]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]

2022-07-05 15:18:10.159  WARN 517797 --- [nanceService]-0] o.a.e.TrackingEventProcessor             : Error occurred. Starting retry mode.

org.axonframework.extensions.kafka.eventhandling.consumer.FetchEventException: Cannot proceed with fetching ConsumerRecords since we encountered an exception
	at org.axonframework.extensions.kafka.eventhandling.consumer.FetchEventsTask.run(FetchEventsTask.java:96) ~[axon-kafka-4.5.4.jar:4.5.4]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1223) ~[kafka-clients-3.1.1.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.1.1.jar:na]
	at org.axonframework.extensions.kafka.eventhandling.consumer.FetchEventsTask.run(FetchEventsTask.java:90) ~[axon-kafka-4.5.4.jar:4.5.4]
	... 3 common frames omitted

2022-07-05 15:18:10.160  WARN 517797 --- [nanceService]-0] o.a.e.TrackingEventProcessor             : Releasing claim on token and preparing for retry in 1s

Multiple TEP not working

Got 2 aggrigate, 1 saga and 2 projections with TEP. However at run time it publushed event to either one of the TEP (Saga / projection 1 / projection2)

Got below setup

  • Kafka extension with Postgres as ES

  • Order agrigate and command service

  • Shipment as aggrigate and Saga event hander

  • query app as event cosumer

  • Notification app event consumer

Refer source code at - https://gitlab.com/bm-cqrs/axon-kafka-distrbuted.git

Potential conflict between Kafka partitioning and Axon segmentation

@altroy75 commented on Tue Sep 04 2018

After trying for analyze how Kafka topic partitioning will work with Axon event processing segments, I've got an impression that having a Kafka topic with several partitions and at the same time more than one Axon segment can potentially lead to skipped, not processed events.

Consider a situation with a Kafka topic that has two partitions and Axon has two configured Axon processor's segments. Also let's assume two JVMs trying to process events concurrently. In such scenario kafka consumer in each JVM will be assigned with one of the two partitions. Then the event processor in each JVM will try to lock a segment. Now it might happen that an event processor's segment doesn't match some or all events in its kafka consumer's partition (i.e. those events are in the second partition), in which case we will have skipped event messages. (note: Kafka consumers above are part of the same consumer group)

This potential problem stems from the fact that Axon segments mechanism is completely unaware and doesn't take into account Kafka dynamic partition assignments inside a consumer group. Those two work distribution mechanisms (i.e. Axon segments and Kafka partitions) work independently of each other and completely unaware of each other. And this can potentially create a problem.

Thanks,
Alex


@ghilainm commented on Wed Sep 05 2018

Hi! According to what I understand it does work. The event processor segment is in the end a group of Kafka partitions. Therefore, assigning a segment to an Axon processor in the end results in assigning a group of Kafka partitions. When locking a segment the Axon processor will assign to the Kafka consumer the relevant partitions. Therefore I don't see why we would lose any events.

However, this seems to be as you said two distributions mechanisms and the Axon one seems to be a unnecessary overhead. It could be nice if it could directly rely on the Kafka distribution mechanism.


@smcvb commented on Wed Sep 05 2018

@nklmish, any insights you might be able to share on this topic?


@nklmish commented on Tue Sep 11 2018

@altroy75, Thank you for the issue!

This potential problem stems from the fact that Axon segments mechanism is completely unaware and doesn't take into account Kafka dynamic partition assignments inside a consumer group.

Agree, I had a discussion with @abuijze regarding this, and one of how we can address this problem is by assigning all partitions to each consumer. For future, we are thinking to switch from tracking processor to subscribing processor.


@altroy75 commented on Wed Sep 12 2018

@nklmish, Thanks for following up on this!
By assigning all partitions to each consumer I guess you mean manual assignment (i.e. assign()) and not via the consumer group protocol (i.e. subscribe()), correct?


@abuijze commented on Fri Oct 19 2018

A somewhat late reply, but we had an Axon 4.0 release to finish. Fortunately, that's done, and we've had some time to find out what's going on here and discuss a solution.
There is in fact an issue with Kafka partitions and how processors track events. Both Axon and Kafka are trying to take care of which instance receives which events. In a Tracking Processor, Axon needs full control. Right now, if a KafkaTrackingToken doesn't know the position of a specific segment, it will let Kafka take care of it. Since all Processor Threads are running in the same Kafka Consumer Group, this may mean a thread will read from a specific position in the middle (or end) of a Stream. Axon requires that partition to be read from the beginning.

Two things need to be done (if I understood @nklmish recommendations correctly):

  • The tracking token should enforce offset 0 if it doesn't have an explicit offset for any partition
  • Each thread of a processor should be in a separate consumer group, to prevent Kafka from trying to balance consumers

@abuijze commented on Fri Oct 19 2018

Lastly, since Axon 4, the Kafka codebase has moved to another repository: https://github.com/AxonFramework/extensions-kafka.

TrackingEventProcessor fails to do incremental retry's when TrackingEventProcessor.eventAvailabilityTimeout is set to default 1000 ms

I'm having below axon configurations

image

Ideally with these configurations whenever any tracking processor enters retry mode it should attempt to retry in 1s, 2s, 4s, 8s, 16s and so on till it reaches max interval of 60s. But during my testing I've noticed abnormal behavior like the processor will re-enter error mode intermittently. eg. 1s, 2s 1s, 1s, 2s, 4s, 1s ...

After debugging I could eliminate this behavior by adding Java Configurations for tracking processor and setting the eventAvailabilityTimeout to 5000 ms

@Autowired
public void configureCardSummary2EventProcessors(EventProcessingConfigurer eventProcessingConfigurer,
        StreamableKafkaMessageSource streamableKafkaMessageSource) {
    eventProcessingConfigurer.registerTrackingEventProcessor("CardSummary2",
            messageSource -> streamableKafkaMessageSource,
            c -> TrackingEventProcessorConfiguration.forSingleThreadedProcessing().
                    andEventAvailabilityTimeout(5000, TimeUnit.MILLISECONDS).andBatchSize(1)
    );

}

Not sure if this is bug or I'm missing any other configuration!!

Remove spring-kafka-test

Enhancement Description

Remove spring-kafka-test from the project since we migrated to test containers.
Some of their methods are still being used but we would rather copy the used methods than importing the whole dependency.

Current Behaviour

spring-kafka-test is still on our pom files.

Wanted Behaviour

Remove spring-kafka-test from our dependency.

Possible Workarounds

N/A

KafkaAutoConfiguration should not be auto-configured after InfraConfiguration

Basic information

KafkaEventPublisher bean, which is responsible for publishing events to Kafka, is configured by KafkaAutoConfiguration, and while it being created, the factory method also registers a subscribing or tracking event processor to the event processing module. EventProcessingModule's initialize method handles this kind of registrations and prepares the module to distribute events to the event handlers. For this purpose, EventHandlerRegistrar bean, which is defined by InfraConfiguration, initialises the event processing module in its afterPropertiesSet method. In order to handle events properly, all event handlers must be registered before initilization of the event processing module. To handle events by KafkaEventPublisher, the publisher bean must be created before EventHandlerRegistrar, but there is no explicit dependency between them. This may work correctly by coincidence, but we can't be certain.

Steps to reproduce

To reproduce the problem, I implemented a test code: https://github.com/muammeryucel/axon-kafka-test.git.

You can also use axon-kafka-example project. For testing the situation mentioned above, I added the following amqp dependencies at the end of dependencies section of the pom.xml:

        <dependency>
            <groupId>org.axonframework.extensions.amqp</groupId>
            <artifactId>axon-amqp-spring-boot-starter</artifactId>
            <version>4.4</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>${spring.boot.version}</version>
        </dependency>

By doing this, I managed to give more priority to InfraConfiguration which is marked to get auto-configured before AMQPAutoConfiguration and this was enough to confuse the auto-configuration ordering. When I run the application, I observed that, KafkaEventPublisher became unable to handle event messages and publishing to Kafka stopped.

As a solution, I created an spring-autoconfigure-metadata.properties file inside src/resources/META-INF directory with containing the following lines, and publishing to Kafka started again after restarting the application:

org.axonframework.extensions.kafka.autoconfig.KafkaAutoConfiguration=
org.axonframework.extensions.kafka.autoconfig.KafkaAutoConfiguration.AutoConfigureBefore=org.axonframework.springboot.autoconfig.InfraConfiguration

Expected behaviour

It should publish the messages to Kafka.

Actual behaviour

It doesn't publish the messages to Kafka.

Proposed Solution

If there is a dependency between auto-configuration order of these beans, it must be declared explicitly. To do this KafkaAutoConfiguration class must have this annotation: @AutoConfigureBefore(InfraConfiguration.class)

As we look at the problem from more general point of wiew, we can easily see that EventProcessingModule is supposed to get initialized just before event processing is started. And also if we debug the initialize method, we can see this method is called more than one time during application bootstrap. EventHandlerRegistrar's afterPropertiesSet method also makes this initialization process thightly coupled with its bean initialization process. I think, EventProcessingModule's "initialize" method should be called only once just before starting event processing and therefore, I propose that, "initialize" method-call should be moved from "afterPropertiesSet" method to "start" method, if it is possible. I didn't give a try, but it seems to me more reasonable.

Cleanup Maven POMs

I noticed that the poms inside the project can be slightly improved by applying general good practices to them:

  • all plugin versions should be defined in plugin management
  • the plugin configuration should be defined at one place and don't need to be overwritten in each pom.

I would like to provide a PR fixing it.

Kafka consumer factory error

Running the example I'm getting following error message on startup

The bean 'kafkaConsumerFactory', defined in class path resource [org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.class], could not be registered. A bean with that name has already been defined in class path resource [org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.class] and overriding is disabled.

Action:

Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true

Setting

spring.main.allow-bean-definition-overriding=true

results in

Parameter 0 of method kafkaFetcher in org.axonframework.extensions.kafka.autoconfig.KafkaAutoConfiguration required a bean of type 'org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerFactory' that could not be found.

KafkaPublisher should track events instead of subscribing on them

The KafkaPublisher uses a SubscribableMessageSource to subscribe on events and send them to a configured Kafka topic. Since sending is not re-tried upon failure, events may get "lost".
A reliably approach could be to use a StreamableMessageSource instead, to have event tracking (rather than event subscription) semantics. The tracking token would be incremented on successful sending only (in transactional mode: when tx is commited, in ack mode: when publish is acknowledged).

This relates to this discussion.

Remove beta stage note in the readme.

Enhancement Description

As the extension is mature and used in several places in production, it's not beta anymore.

Current Behaviour

Currently it's stating it's still in beta stage.

Wanted Behaviour

A readme that doesn't note it's in beta stage.

Axon-spring-boot-starter - kafka version problem

Have tried to run this two versions

  • axon-spring-boot-starter version: '4.1.2'
  • axon-kafka version: '4.0-M2'

but I'm getting always the binary incompatibility issue when the configuration org.axonframework.config.EventProcessingModule is getting loaded. The exception is IncompatibleClassChangeError Implementing class. I tried also to run the axon-spring-boot-starter version: '4.0' with no success.

DefaultProducerFactory closes non-transactional Kafka Producer after `producerCacheSize` messages were sent

We are using axon-kafka with spring boot, mostly keeping the default settings. We noticed that after sending a couple of events to kafka, the producer is closed and we get an INFO message saying Closing the Kafka producer with timeoutMillis = 30000 ms.

After that, sending any more events fails with java.lang.IllegalStateException: Cannot perform operation after producer has been closed.

Looking at the code, it seems there is a bug in DefaultProducerFactory. When close is called on the CloseLazyProducer, it offers itself to the cache queue but producers are only removed from that queue in transactional mode. Once the queue is full, the single producer is closed but still returned by subsequent calls to createProducer.

KafkaPublisher should not be needed when you only want a fetcher.

Enhancement Description

With additional configuration it should be possible to have a fetcher, consuming events from Kafka, without needing to configure a KafkaPublisher instance.

Current Behaviour

As KafkaPublisher is conditional, the KafkaAutoConfiguration won't do anything when it's not there.

Wanted Behaviour

Being able to have auto configuration when only using the fetcher.

Possible Workarounds

Either don't use auto configuration, or adding a KafkaPublisher (which will not be used).

Possible Solutions

It seems just removing @ConditionalOnClass(KafkaPublisher.class) will fix the issue. As there is already a kafkaPublisher defined inside I don't understand why it's in there. It seems like there is no good reason anymore.

StreamableMessageSource fine tuning to overcome Partitioning / Segmentation discrepancy

As described in #1, Kafka's approach of Partitions in a Topic and Axon's notion of Segments of an Event Stream used by a Tracking Event Processor can definitely bite one another, resulting in unhandled events.

In essence, it would be most straightforward to have a SubscribableMessageSource implementation in the Kafka Extensions (as described in issue #17). However, it is definitely beneficial to support async processing from a Kafka Message Source within Axon.
To that end, a couple of consolidations have to be performed to make Kafka's and Axon's approaches compliant with one another, which come down to the following two:

  1. Make sure that if a Consumer is started from a TrackingEventProcessor/StreamableMessageSource where any existing partitions for a topic do not have an explicit offset in the KafkaTrackingToken that an offset of 0 is enforced to ensure complete processing of all records.
  2. Each thread of a TrackingEventProcessor that calls the KafkaMessageSource should initiate a separate Consumer Group to prevent Kafka from trying to (re)balance Consumers.

Support Event Upcasters during reading of events

Enhancement Description

If the changes of the event classes are introduced the Kafka extension will try to deserialize the events using current class structure without the ability to match the binary representation of the event to the new structure. This will lead to a permanent fail of the Kafka consumer, since the structural change can't be adopted.

In case of Aggregate, Axon Framework provides a way of dealing with event evolution by using the Upcasters.

Current Behaviour

Currently, the events are de-serialized using the provided serializer without the ability to apply the upcaster (chain) to it.

Wanted Behaviour

The builder should accept the upcaster chain to pass the domain events through it.

Possible Workarounds

Replacement of the DefaultKafkaMessageConverter by an own one that support upcasters.

Provide `defaultTopic` property by default

Enhancement Description

Provide defaultTopic property (KafkaProperties) by default.

Current Behaviour

Currently, the extension does not provide a default value for defaultTopic property.

Wanted Behaviour

To have a default value for this property so it is not needed to explicitly configure this property in the application.

Possible Workarounds

You have to specify this property in your application in order to run it.

StreamableKafkaMessageSource create token implementations

I'm using axon 3.4.1 with kafka as StreamableMessageSource

There are scenarios where I would like to stop, reset-token(or createTokenAt) and start a particular tracking processor. Stopping and starting is pretty much straight forward. But I'm facing difficulty to reset the token to a particular time stamp as KafkaMessageSource does not provide an implementation for createTokenAt(Instant dateTime). It will be good to have this feature implemented.
I've checked this in axonframework.extensions.kafka 4.0-RC2 and its not implemented there as well.

Below code snippet throws UnsupportedOperationException.

@Autowired
private EventProcessingConfiguration configuration;

@PostMapping("/createTokenAt")
public ResponseEntity<?> createTokenAt(@RequestParam(value = "processorName") String processorName, @RequestParam(value = "tokenTime") String tokenTime) {
    //date format 2007-12-03T10:15:30.00Z

    configuration.eventProcessorByProcessingGroup(processorName, TrackingEventProcessor.class)
            .ifPresent(trackingEventProcessor -> {
                trackingEventProcessor.resetTokens(streamableMessageSource -> streamableMessageSource.createTokenAt(Instant.parse(tokenTime)));
            });
    return new ResponseEntity<>(HttpStatus.OK);

KafkaTrackingToken has no partitions assigned

I am using Spring Boot and Axon Kafka 4-RC.2

I have the following configuration

axon:
  eventhandling:
    processors:
      # This name is referenced in the @ProcessingGroup annotation
      kafka-event-processor:
        # Mandatory if you want to not miss any message, otherwise it is a subscribing event processor and you get only notified about newly emitted event
        # The other mode can be used if you don't want to use a database
        mode: "TRACKING"
        source: "b2gMessageSource"
        # Number of threads dedicated to event processing. Keep it to one if the scaling model is the process.
        # If the threadCount * number of processes is smaller than the number of segment some Kafka partitions will not be consumed!
        threadCount: 1
        # Segments define the parallelism in Axon. Kafka partitions are mapped to segments.
        # Should be aligned with the number of partitions in the topic to ensure maximum parallelism.
        initialSegmentCount: 1
  kafka:
    # Must be transit to consume from transit topic
    default-topic: "transit"
    consumer:
      # Choose a group id for your consumer group, this property triggers the configuration of the KafkaMessageSource referenced in the event process defined above
      group-id: "axon-gateway-b2g-consumer"
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer

I have also disable Axon auto configuration for kafka because it seems to be buggy.

I have added the following configuration class:

package b2ggateway.out;

import java.util.concurrent.TimeUnit;

import org.axonframework.extensions.kafka.KafkaProperties;
import org.axonframework.extensions.kafka.eventhandling.DefaultKafkaMessageConverter;
import org.axonframework.extensions.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.extensions.kafka.eventhandling.consumer.*;
import org.axonframework.serialization.Serializer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Customization of the {@link org.axonframework.extensions.kafka.autoconfig.KafkaAutoConfiguration} which seems to be buggy.
 * The name of the method have been changed to avoid the bean overriding issue leading to ConsumerFactory not being created.
 */
@SuppressWarnings("unchecked")
@Configuration
@EnableConfigurationProperties({KafkaProperties.class})
public class AxonConfig {
    private final KafkaProperties properties;

    public AxonConfig(KafkaProperties properties) {
        this.properties = properties;
    }

    @Bean
    public ConsumerFactory<String, byte[]> b2gConsumerFactory() {
        return new DefaultConsumerFactory(this.properties.buildConsumerProperties());
    }

    @Bean
    public KafkaMessageConverter<String, byte[]> b2gMessageConverter(@Qualifier("eventSerializer") Serializer eventSerializer) {
        return DefaultKafkaMessageConverter.builder().serializer(eventSerializer).build();
    }

    @Bean(destroyMethod = "shutdown")
    public Fetcher b2gFetcher(ConsumerFactory<String, byte[]> kafkaConsumerFactory, KafkaMessageConverter<String, byte[]> kafkaMessageConverter) {
        return AsyncFetcher.<String, byte[]>builder().consumerFactory(kafkaConsumerFactory)
                .bufferFactory(() -> new SortedKafkaMessageBuffer(this.properties.getFetcher().getBufferSize()))
                .messageConverter(kafkaMessageConverter)
                .topic(this.properties.getDefaultTopic())
                .pollTimeout(this.properties.getFetcher().getPollTimeout(), TimeUnit.MILLISECONDS).build();
    }


    @Bean
    public KafkaMessageSource b2gMessageSource(Fetcher kafkaFetcher) {
        return new KafkaMessageSource(kafkaFetcher);
    }
} 

And the following EventProcessor:

package b2ggateway.out.transit;

import org.axonframework.config.ProcessingGroup;
import org.axonframework.eventhandling.EventHandler;
import org.axonframework.messaging.Message;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import api.events.IESent;


@Transactional
@ProcessingGroup("kafka-event-processor")
@Service
public class EventHandler {

    //Must do that because it seems axon still deliver the message to this method
    @SuppressWarnings("ConstantConditions")
    @EventHandler
    public void handle(Message<IESent> ieSentMessage) {
        // Must check the type because axon is delivering all events to this method event not of type IESent
        if (ieSentMessage.getPayload() instanceof IESent) {
             System.out.println(ieSentMessage.getPayload())
        }
    }
}

I have observed that the KafkaTrackingTocken was not assigned to all partitions of my topic, I don't understand why. What should it to make it work? I tried to manually delete the token and it seems even worse because know the token does not contain any token/tokenType in database in table TokenEntry. It just an empty segment.

Outdated tracking token used upon Kafka partition assignment

We are using the Kafka extension in our application to deliver Axon events over Kafka. Recently, we encountered an event loss after the Kafka broker was temporarily not available for the consumer. Since we found the event in the Kafka topic, we assumed that the event got lost on consumer side.
After diving somewhat into the code of the Kafka extension, I guess that I have spotted the problem:

In class org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerUtil, the seek method subscribes to records of the given topic. Also, a ConsumerRebalanceListener is registered, which, on partition assignment, seeks for each partition to the proper offset, using a given Kafka tracking token. The issue is that this particular token is used for each future partition assignment, too. This has one of the following two consequences:

  • If the offset does not exist any more, because the Kafka cluster is set up to maintain the records only for a limited amount of time, then the offset is reset. The default reset strategy 'LATEST' may cause event loss because records may be skipped (this is what happened in our case).
  • If the offset does exist, all events since the initial partition assignment are replayed. This is not necessarily an error, but likely not what applications would expect.

To my mind, the above mentioned seek method should not be given a token, but rather a token supplier/provider, so that upon partition assignment, the most recent token could be used. What do you think? If you agreed, I'd be up for providing a PR. If not, do you have any suggestions how to get around the issue that we faced?

Thanks and regards,
Andre

Create a TokenStore based on a kafka topic.

Feature Description

Currently when you use a Subscribing Event Processor, if you don't implement a token store, it will use the InMemoryToken store, which will mean it will start reading from the beginning on restart. Of course it's possible to use a token store based on a database, since we are storing data in Kafka it would be nice to have a token store implementation based on a Kafka topic.

Current Behaviour

Currently, there is no easy way to replicate all the events to Kafka, without starting at the beginning again after the application is restarted. It's possible to use either the JdbcTokenStore, JpaTokenStore or MongoTokenStore, but that seems both wrong and is not trivial to set up when the application is not connected to a database yet.

Wanted Behaviour

A token store based on Kafka. I don't know if we should set this as the default when using Subscribing Event Processor. It should reuse the Kafka configuration provided, which also means the consumer properties need to be there, when using a Subscribing Event Processor. We probably want to create the topic for the store using the admin api, and set the topic to be compacted.

Possible Workarounds

Fix contribution guide

The contribution guide references the IDEA IntelliJ formatter settings. The provided URL is invalid.

Fix CI build and optimise test process

Enhancement Description

Fix the CI build by either making test more resilient to random failures, or using another approach for integration testing.
One different approach to integration testing might be to replace EmbeddedKafka with TestContainers Kafka module.

Current Behaviour

Since moving to GitHub Actions, the CI build and tests are not very stable. While locally all tests are green, on GHA various random tests fail on each build. Occasionally java.lang.OutOfMemoryError: Java heap space or similar errors will occur which may indicate the root problem for other failing tests cases and explain why there are no failures locally. It seems that the test executing process can take more than 4GB of RAM at one point which can put a lot of memory pressure on the CI JVM.

Wanted Behaviour

Consistent CI builds and passing tests

Is this being actively maintained?

Hi, We are planning to use this is production but its concerning that there has been not activity on this for sometime now? When is this schedule to go in production and if so, is this project being actively maintianed ?

axon-kafka-spring-boot-starter fails when spring-boot-devtools enabled

Basic information

  • Axon Framework version: 4.5.8
  • JDK version: 17.0.2
  • Kafka Extension version: 4.5.2
  • Complete executable reproducer if available (e.g. GitHub Repo):

Stack Trace:
Exception in thread "main" java.lang.IllegalStateException: Unable to load devtools settings from location [META-INF/spring-devtools.properties]
at org.springframework.boot.devtools.settings.DevToolsSettings.load(DevToolsSettings.java:118)
at org.springframework.boot.devtools.settings.DevToolsSettings.load(DevToolsSettings.java:101)
at org.springframework.boot.devtools.settings.DevToolsSettings.get(DevToolsSettings.java:95)
at org.springframework.boot.devtools.restart.ChangeableUrls.(ChangeableUrls.java:56)
at org.springframework.boot.devtools.restart.ChangeableUrls.fromUrls(ChangeableUrls.java:187)
at org.springframework.boot.devtools.restart.ChangeableUrls.fromUrls(ChangeableUrls.java:183)
at org.springframework.boot.devtools.restart.ChangeableUrls.fromClassLoader(ChangeableUrls.java:101)
at org.springframework.boot.devtools.restart.DefaultRestartInitializer.getUrls(DefaultRestartInitializer.java:86)
at org.springframework.boot.devtools.restart.DefaultRestartInitializer.getInitialUrls(DefaultRestartInitializer.java:42)
at org.springframework.boot.devtools.restart.Restarter.(Restarter.java:141)
at org.springframework.boot.devtools.restart.Restarter.initialize(Restarter.java:549)
at org.springframework.boot.devtools.restart.RestartApplicationListener.onApplicationStartingEvent(RestartApplicationListener.java:90)
at org.springframework.boot.devtools.restart.RestartApplicationListener.onApplicationEvent(RestartApplicationListener.java:50)
at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:176)
at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:169)
at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:143)
at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:131)
at org.springframework.boot.context.event.EventPublishingRunListener.starting(EventPublishingRunListener.java:79)
at org.springframework.boot.SpringApplicationRunListeners.lambda$starting$0(SpringApplicationRunListeners.java:56)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at org.springframework.boot.SpringApplicationRunListeners.doWithListeners(SpringApplicationRunListeners.java:120)
at org.springframework.boot.SpringApplicationRunListeners.starting(SpringApplicationRunListeners.java:56)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:293)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1303)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1292)
at com.example.CoreApplication.main(CoreApplication.java:10)
Caused by: java.util.regex.PatternSyntaxException: Illegal repetition near index 13
axon-kafka-${project.version}.jar
^
at java.base/java.util.regex.Pattern.error(Pattern.java:2028)
at java.base/java.util.regex.Pattern.closure(Pattern.java:3309)
at java.base/java.util.regex.Pattern.sequence(Pattern.java:2214)
at java.base/java.util.regex.Pattern.expr(Pattern.java:2069)
at java.base/java.util.regex.Pattern.compile(Pattern.java:1783)
at java.base/java.util.regex.Pattern.(Pattern.java:1430)
at java.base/java.util.regex.Pattern.compile(Pattern.java:1069)
at org.springframework.boot.devtools.settings.DevToolsSettings.lambda$getPatterns$0(DevToolsSettings.java:69)
at java.base/java.util.concurrent.ConcurrentHashMap.forEach(ConcurrentHashMap.java:1603)
at java.base/java.util.Properties.forEach(Properties.java:1422)
at org.springframework.boot.devtools.settings.DevToolsSettings.getPatterns(DevToolsSettings.java:66)
at org.springframework.boot.devtools.settings.DevToolsSettings.add(DevToolsSettings.java:58)
at org.springframework.boot.devtools.settings.DevToolsSettings.load(DevToolsSettings.java:109)
... 25 more

Steps to reproduce

run application with Axon Framework and Kafka extension

Expected behaviour

application run

Actual behaviour

application not run , because of exception

To me , this problem is similar to AxonFramework/extension-reactor#28 (comment) from issue #28 , but here , from what i have found debugging , jar build plugin is configured , but version tag is missing in pom.xml, so project.version property is also missing. So , i think , this problem can be fixed by adding version tag to pom.xml.

Introduce SubscribableMessageSource implementation

As is referred upon in issue #1, there is a certain discrepancy between Kafka's notion of Partitions per Topic and Axon's approach of Segmenting the Event Stream for a Tracking Event Processor.

Having both approaches fully compliant with one another takes some configuration effort, thus making it more difficult for end users to easily employee and set up.

As a follow up on #1 discussion have been ongoing to introduce a SubscribableMessageSource implementation in the Kafka Extension.

This would (1) allow Kafka to decide which segments are delivered to which nodes and (2) still support support replays by resetting Kafka partition offset's to a point in time on the stream.

Support Streaming Processor Reset Operation

Basic information

  • Axon Framework version: 4.2.2
  • JDK version: 1.8.601
  • Kafka Extension version:4.0-RC3
  • Complete executable reproducer if available (e.g. GitHub Repo):

Steps to reproduce

@Component
@Slf4j
@ProcessingGroup("user-processor")
public class UserListener implements IUserEvent {

    @Autowired
    private UserViewService service;


    @Autowired
    private EventProcessingConfiguration epc;
    public void on() {
        Optional<TrackingEventProcessor> ret =
                epc.eventProcessor("user-processor", TrackingEventProcessor.class);

        if (ret.isPresent()) {
            TrackingEventProcessor proc = ret.get();
            proc.shutDown();
            proc.resetTokens();
            proc.start();

        } else {
            throw new ValidationException("Process not found.");
        }
    }
}

call the userListener.on()

Expected behaviour

fetch event functionly, and rebuild the aggregate status

Actual behaviour

`2021-07-17 06:12:24.241 INFO 1372 --- [nio-8090-exec-1] o.a.e.TrackingEventProcessor : Shutdown state set for Processor 'user-processor'.
2021-07-17 06:12:24.241 INFO 1372 --- [nio-8090-exec-1] o.a.e.TrackingEventProcessor : Processor 'user-processor' awaiting termination...
2021-07-17 06:12:24.521 INFO 1372 --- [er-processor]-0] o.a.e.k.e.consumer.FetchEventsTask : Closing down FetchEventsTask using Consumer [org.apache.kafka.clients.consumer.KafkaConsumer@1050b4f7]
2021-07-17 06:12:24.524 INFO 1372 --- [er-processor]-0] o.a.e.TrackingEventProcessor : Released claim
2021-07-17 06:12:24.524 INFO 1372 --- [er-processor]-0] o.a.e.TrackingEventProcessor : Worker for segment Segment[0/0] stopped.
2021-07-17 06:12:24.529 INFO 1372 --- [nio-8090-exec-1] o.a.e.tokenstore.AbstractTokenEntry : token:null ser:org.axonframework.serialization.json.JacksonSerializer@55b7f0d type:class [B
2021-07-17 06:12:24.539 INFO 1372 --- [er-processor]-1] o.a.e.TrackingEventProcessor : Worker assigned to segment Segment[0/0] for processing
2021-07-17 06:12:24.540 INFO 1372 --- [er-processor]-1] o.a.e.TrackingEventProcessor : Using current Thread for last segment worker: TrackingSegmentWorker{processor=user-processor, segment=Segment[0/0]}
2021-07-17 06:12:24.542 INFO 1372 --- [er-processor]-1] o.a.e.TrackingEventProcessor : Fetched token: null for segment: Segment[0/0]
2021-07-17 06:12:24.542 INFO 1372 --- [er-processor]-1] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
auto.commit.interval.ms = 3000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id = kafka-axon-user
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = Axon.Streamable.Consumer-b620957d-22c5-4caf-9e0f-d7b15c094841
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

2021-07-17 06:12:24.549 WARN 1372 --- [er-processor]-1] o.a.k.clients.consumer.ConsumerConfig : The configuration 'some-key.0' was supplied but isn't a known config.
2021-07-17 06:12:24.549 INFO 1372 --- [er-processor]-1] o.a.kafka.common.utils.AppInfoParser : Kafka version : 2.0.1
2021-07-17 06:12:24.549 INFO 1372 --- [er-processor]-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : fa14705e51bd2ce5
2021-07-17 06:12:24.552 WARN 1372 --- [er-processor]-1] o.a.kafka.common.utils.AppInfoParser : Error registering AppInfo mbean

javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka-axon-user
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[na:1.8.0_241]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[na:1.8.0_241]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[na:1.8.0_241]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[na:1.8.0_241]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) ~[na:1.8.0_241]
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[na:1.8.0_241]
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:791) [kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:615) [kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:596) [kafka-clients-2.0.1.jar:na]
at org.axonframework.extensions.kafka.eventhandling.consumer.DefaultConsumerFactory.createConsumer(DefaultConsumerFactory.java:67) [axon-kafka-4.0-RC3.jar:4.0-RC3]
at org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource.openStream(StreamableKafkaMessageSource.java:122) [axon-kafka-4.0-RC3.jar:4.0-RC3]
at org.axonframework.eventhandling.TrackingEventProcessor.doOpenStream(TrackingEventProcessor.java:508) [axon-messaging-4.2.2.jar:4.2.2]
at org.axonframework.eventhandling.TrackingEventProcessor.lambda$ensureEventStreamOpened$14(TrackingEventProcessor.java:496) [axon-messaging-4.2.2.jar:4.2.2]
at org.axonframework.common.transaction.TransactionManager.fetchInTransaction(TransactionManager.java:70) ~[axon-messaging-4.2.2.jar:4.2.2]
at org.axonframework.eventhandling.TrackingEventProcessor.ensureEventStreamOpened(TrackingEventProcessor.java:495) [axon-messaging-4.2.2.jar:4.2.2]
at org.axonframework.eventhandling.TrackingEventProcessor.processingLoop(TrackingEventProcessor.java:290) [axon-messaging-4.2.2.jar:4.2.2]
at org.axonframework.eventhandling.TrackingEventProcessor$TrackingSegmentWorker.run(TrackingEventProcessor.java:1092) ~[axon-messaging-4.2.2.jar:4.2.2]
at org.axonframework.eventhandling.TrackingEventProcessor$WorkerLauncher.run(TrackingEventProcessor.java:1206) ~[axon-messaging-4.2.2.jar:4.2.2]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_241]

2021-07-17 06:12:24.583 INFO 1372 --- [ AsyncFetcher-1] org.apache.kafka.clients.Metadata : Cluster ID: DQELf6cCSyiz1x8IhUAixg
2021-07-17 06:12:24.584 INFO 1372 --- [ AsyncFetcher-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=kafka-axon-user, groupId=Axon.Streamable.Consumer-b620957d-22c5-4caf-9e0f-d7b15c094841] Discovered group coordinator kafka:9092 (id: 2147483646 rack: null)
2021-07-17 06:12:24.585 INFO 1372 --- [ AsyncFetcher-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=kafka-axon-user, groupId=Axon.Streamable.Consumer-b620957d-22c5-4caf-9e0f-d7b15c094841] Revoking previously assigned partitions []
2021-07-17 06:12:24.585 INFO 1372 --- [ AsyncFetcher-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=kafka-axon-user, groupId=Axon.Streamable.Consumer-b620957d-22c5-4caf-9e0f-d7b15c094841] (Re-)joining group
2021-07-17 06:12:24.634 INFO 1372 --- [ AsyncFetcher-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=kafka-axon-user, groupId=Axon.Streamable.Consumer-b620957d-22c5-4caf-9e0f-d7b15c094841] Successfully joined group with generation 1
2021-07-17 06:12:24.634 INFO 1372 --- [ AsyncFetcher-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=kafka-axon-user, groupId=Axon.Streamable.Consumer-b620957d-22c5-4caf-9e0f-d7b15c094841] Setting newly assigned partitions [localevent-2, localevent-1, localevent-0]
2021-07-17 06:12:24.634 INFO 1372 --- [ AsyncFetcher-1] s.TrackingTokenConsumerRebalanceListener : Seeking topic-partition [localevent-2] with offset [0]
2021-07-17 06:12:24.635 INFO 1372 --- [ AsyncFetcher-1] s.TrackingTokenConsumerRebalanceListener : Seeking topic-partition [localevent-1] with offset [0]
2021-07-17 06:12:24.635 INFO 1372 --- [ AsyncFetcher-1] s.TrackingTokenConsumerRebalanceListener : Seeking topic-partition [localevent-0] with offset [0]
2021-07-17 06:12:25.495 INFO 1372 --- [ AsyncFetcher-0] o.a.e.k.e.consumer.FetchEventsTask : Fetch events task and used Consumer instance [org.apache.kafka.clients.consumer.KafkaConsumer@1050b4f7] have been closed
`

KafkaPublisherTest.testPublishMessagesWithAckModeNoUnitOfWorkShouldBePublishedAndReadSuccessfully sometimes fails

Basic information

  • Axon Framework version: 4.0.4
  • JDK version: 11
  • Kafka Extension version: 4.0-SNAPSHOT

KafkaPublisherTest.testPublishMessagesWithAckModeNoUnitOfWorkShouldBePublishedAndReadSuccessfully publishes 10 messages to the event bus and polls Spring's embedded kafka broker to consume generated kafka messages. Sometimes these attempts retrieve all the messages but sometimes only 7 or 8 of them are retrieved and the test fails. It doesn't work consistently. This undeterministic false-negative behaviour should be fixed.

Steps to reproduce

I can't reproduce this all the time. To reproduce this, run maven test over and over again. Sometimes the test passes, sometimes it fails.

Expected behaviour

All messages must be consumed.

Actual behaviour

Sometimes, some of the messages can not be received.

Proposed solution

KafkaTestUtils' "getRecords" method should be called with "minRecords" parameter. Thus, it waits until expected number of records are consumed. If it can't get them all, it waits until the timeout and returns received messages. PR is on the way :)

Processor is not going to error mode if the FetchEventTask dies.

Basic information

  • Axon Framework version: 4.5.9
  • JDK version: 11
  • Kafka Extension version: 4.5.3

Steps to reproduce

We are using the Kafka Extension with Azure Event Hubs and ran in the following error. For some Azure reasons, we received a TopicAuthorizationException which caused the Fetcher to quit the FetchEventTask. After that the processor is NOT going to the error state and the new StreamabelKafkaMessageSource is not re-created. As a result, the message processor is idling around thinking that there are no messages (since there is no one who delivers messages into the message buffer) and no error processing is started.

We analyzed the code a little and it seems that if any exception is thrown on consumer.poll() inside the FetchEventTask, the fetch task is just closed. We could not find any logic re-starting the fetching again.

Expected behaviour

Processor goes into an error mode and tries to recover (in our case we would detect the error and it would even be able to reconnect at some point).

Actual behaviour

On error the processor is not detecting an error in fetch task.

File Leak

There are two or three issues at play at the moment.

  1. After org.apache.kafka.common.KafkaException: java.io.IOException: Too many open files the KafkaConsumer thread dies instead of crashing the entire service. This creates a zombie process which requires external healthchecks to monitor.

  2. After running multiple different tests and a file leak detector we are 100% certain the root cause is the following:

#706 selector by thread:AsyncFetcher-0 on Tue Jan 07 15:27:59 EST 2020
	at java.nio.channels.spi.AbstractSelector.<init>(AbstractSelector.java:86)
	at sun.nio.ch.SelectorImpl.<init>(SelectorImpl.java:54)
	at sun.nio.ch.KQueueSelectorImpl.<init>(KQueueSelectorImpl.java:83)
	at sun.nio.ch.KQueueSelectorProvider.openSelector(KQueueSelectorProvider.java:42)
	at java.nio.channels.Selector.open(Selector.java:227)
	at org.apache.kafka.common.network.Selector.<init>(Selector.java:160)
	at org.apache.kafka.common.network.Selector.<init>(Selector.java:214)
	at org.apache.kafka.common.network.Selector.<init>(Selector.java:227)
	at org.apache.kafka.common.network.Selector.<init>(Selector.java:231)
	at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:445)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:422)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:270)
	at org.axonframework.extensions.kafka.eventhandling.producer.DefaultProducerFactory.createKafkaProducer(DefaultProducerFactory.java:184)
	at org.axonframework.extensions.kafka.eventhandling.producer.DefaultProducerFactory.createTransactionalProducer(DefaultProducerFactory.java:178)
	at org.axonframework.extensions.kafka.eventhandling.producer.DefaultProducerFactory.createProducer(DefaultProducerFactory.java:114)
	at org.axonframework.extensions.kafka.eventhandling.producer.KafkaPublisher.send(KafkaPublisher.java:128)
	at org.axonframework.extensions.kafka.eventhandling.producer.KafkaEventPublisher.handle(KafkaEventPublisher.java:77)
	at org.axonframework.eventhandling.SimpleEventHandlerInvoker.handle(SimpleEventHandlerInvoker.java:108)
	at org.axonframework.eventhandling.MultiEventHandlerInvoker.handle(MultiEventHandlerInvoker.java:89)
	at org.axonframework.eventhandling.AbstractEventProcessor.lambda$null$1(AbstractEventProcessor.java:165)
	at org.axonframework.messaging.DefaultInterceptorChain.proceed(DefaultInterceptorChain.java:57)
	at org.axonframework.messaging.interceptors.CorrelationDataInterceptor.handle(CorrelationDataInterceptor.java:65)
	at org.axonframework.messaging.DefaultInterceptorChain.proceed(DefaultInterceptorChain.java:55)
	at org.axonframework.eventhandling.AbstractEventProcessor.lambda$processInUnitOfWork$2(AbstractEventProcessor.java:173)
	at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.executeWithResult(BatchingUnitOfWork.java:86)
	at org.axonframework.eventhandling.AbstractEventProcessor.processInUnitOfWork(AbstractEventProcessor.java:159)
	at org.axonframework.eventhandling.AbstractEventProcessor.processInUnitOfWork(AbstractEventProcessor.java:143)
	at org.axonframework.eventhandling.SubscribingEventProcessor.process(SubscribingEventProcessor.java:111)
	at org.axonframework.eventhandling.DirectEventProcessingStrategy.handle(DirectEventProcessingStrategy.java:35)
	at org.axonframework.eventhandling.SubscribingEventProcessor.lambda$start$0(SubscribingEventProcessor.java:95)
	at org.axonframework.eventhandling.AbstractEventBus.lambda$prepareCommit$15(AbstractEventBus.java:229)
	at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:891)
	at java.util.concurrent.CopyOnWriteArraySet.forEach(CopyOnWriteArraySet.java:404)
	at org.axonframework.eventhandling.AbstractEventBus.prepareCommit(AbstractEventBus.java:229)
	at org.axonframework.eventsourcing.eventstore.AbstractEventStore.prepareCommit(AbstractEventStore.java:64)
	at org.axonframework.eventhandling.AbstractEventBus.doWithEvents(AbstractEventBus.java:218)
	at org.axonframework.eventhandling.AbstractEventBus.lambda$null$8(AbstractEventBus.java:152)
	at org.axonframework.messaging.unitofwork.MessageProcessingContext.notifyHandlers(MessageProcessingContext.java:71)
	at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.lambda$notifyHandlers$2(BatchingUnitOfWork.java:155)
	at java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:899)
	at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.notifyHandlers(BatchingUnitOfWork.java:155)
	at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.changePhase(AbstractUnitOfWork.java:222)
	at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commitAsRoot(AbstractUnitOfWork.java:83)
	at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commit(AbstractUnitOfWork.java:71)
	at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.executeWithResult(BatchingUnitOfWork.java:111)
	at org.axonframework.eventhandling.AbstractEventProcessor.processInUnitOfWork(AbstractEventProcessor.java:159)
	at org.axonframework.eventhandling.AbstractEventProcessor.processInUnitOfWork(AbstractEventProcessor.java:143)
	at org.axonframework.eventhandling.SubscribingEventProcessor.process(SubscribingEventProcessor.java:111)
	at org.axonframework.eventhandling.DirectEventProcessingStrategy.handle(DirectEventProcessingStrategy.java:35)
	at org.axonframework.eventhandling.SubscribingEventProcessor.lambda$start$0(SubscribingEventProcessor.java:95)
	at org.axonframework.extensions.kafka.eventhandling.consumer.subscribable.SubscribableKafkaMessageSource.lambda$null$2(SubscribableKafkaMessageSource.java:177)
	at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:891)
	at java.util.concurrent.CopyOnWriteArraySet.forEach(CopyOnWriteArraySet.java:404)
	at org.axonframework.extensions.kafka.eventhandling.consumer.subscribable.SubscribableKafkaMessageSource.lambda$start$3(SubscribableKafkaMessageSource.java:177)
	at org.axonframework.extensions.kafka.eventhandling.consumer.FetchEventsTask.run(FetchEventsTask.java:89)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Creating a new KafkaProducer on every event seems incredibly wasteful. Either a cached queue of 10 or simply 1 would be desirable.
Finally if the decision to use many (not advised) then the handles need to be closed.

In our cases we had set the max files (lsof) to 16k and repeatedly watched it exceed the threshold over a period of several minutes.

KafkaTrackingToken should specify the topic per partition

The current KafkaTrackingToken does not allow for storing several topic-partition.
This is however a very common use case when it comes to consuming Kafka Records.

We should thus expand the KafkaTrackingToken to contain topic knowledge along side the partitionPositions map. Additionally, this should adjust the StreamableKafkaMessageSource to allow for several topics instead of a single one.

Axon4 Kafka: configuration

Team

Axon4 kafaka integration documentation is inadequet. Can you please share sample / example with configuration / changes w.r.t. default axon server connector code.
Axon4 + SpringBoot Java + Kafka

Fix errors in CI/CD pipeline

Expected behaviour

CI/CD Pipeline deployes snaphosts / releases

Actual behaviour

The pipeline seems to have multiple errors:

There are some issues with calling Sonar (no authorization).
Error: Failed to execute goal org.sonarsource.scanner.maven:sonar-maven-plugin:3.9.0.2155:sonar (default-cli) on project axon-kafka-parent: You're not authorized to run analysis. No sonar.login or SONAR_TOKEN env variable was set -> [Help 1]
See: https://github.com/AxonFramework/extension-kafka/runs/3869078032?check_suite_focus=true

Due to incompatibility in a module naming scheme, JavaDoc maven plugin fails to detect the name of the Kafka module. This leads to a build error and prevents pipeline from completing.
See: https://github.com/AxonFramework/extension-kafka/runs/3836134591?check_suite_focus=true

Errors in log from docker containers (kafka-axon-example)

Basic information

  • Kafka Extension version: commit 7b59029

docker-compose -f kafka-axon-example/docker-compose.yaml up

Expected behaviour

A-OK

Actual behaviour

Loads of:

kafka-rest_1     | 2021-08-09T08:39:52.639Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: broker in state DOWN connecting
kafka-rest_1     | 2021-08-09T08:39:52.639Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: Connecting to ipv4#127.0.0.1:9092 (plaintext) with socket 19
kafka-rest_1     | 2021-08-09T08:39:52.639Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: Broker changed state DOWN -> CONNECT
kafka-rest_1     | 2021-08-09T08:39:52.639Z sinek:nproducer:debug [thrd:localhost:9092/1]: Broadcasting state change
kafka-rest_1     | 2021-08-09T08:39:52.639Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: failed: err: Local: Broker transport failure: (errno: Connection refused)
kafka-rest_1     | 2021-08-09T08:39:52.639Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused
kafka-rest_1     | 2021-08-09T08:39:52.639Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: Broker changed state CONNECT -> DOWN
kafka-rest_1     | 2021-08-09T08:39:52.639Z sinek:nproducer:debug [thrd:localhost:9092/1]: Broadcasting state change
kafka-rest_1     | 2021-08-09T08:39:52.639Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: Purging bufq with 0 buffers
kafka-rest_1     | 2021-08-09T08:39:52.639Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: Updating 0 buffers on connection reset
kafka-rest_1     | 2021-08-09T08:39:53.639Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: broker in state DOWN connecting
kafka-rest_1     | 2021-08-09T08:39:53.639Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: Connecting to ipv4#127.0.0.1:9092 (plaintext) with socket 19
kafka-rest_1     | 2021-08-09T08:39:53.639Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: Broker changed state DOWN -> CONNECT
kafka-rest_1     | 2021-08-09T08:39:53.639Z sinek:nproducer:debug [thrd:localhost:9092/1]: Broadcasting state change
kafka-rest_1     | 2021-08-09T08:39:53.639Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: failed: err: Local: Broker transport failure: (errno: Connection refused)
kafka-rest_1     | 2021-08-09T08:39:53.639Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused
kafka-rest_1     | 2021-08-09T08:39:53.639Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: Broker changed state CONNECT -> DOWN
kafka-rest_1     | 2021-08-09T08:39:53.639Z sinek:nproducer:debug [thrd:localhost:9092/1]: Broadcasting state change
kafka-rest_1     | 2021-08-09T08:39:53.639Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: Purging bufq with 0 buffers
kafka-rest_1     | 2021-08-09T08:39:53.639Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: Updating 0 buffers on connection reset
kafka-rest_1     | 2021-08-09T08:39:54.639Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: broker in state DOWN connecting
kafka-rest_1     | 2021-08-09T08:39:54.639Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: Connecting to ipv4#127.0.0.1:9092 (plaintext) with socket 19
kafka-rest_1     | 2021-08-09T08:39:54.639Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: Broker changed state DOWN -> CONNECT
kafka-rest_1     | 2021-08-09T08:39:54.640Z sinek:nproducer:debug [thrd:localhost:9092/1]: Broadcasting state change
kafka-rest_1     | 2021-08-09T08:39:54.640Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: failed: err: Local: Broker transport failure: (errno: Connection refused)
kafka-rest_1     | 2021-08-09T08:39:54.640Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused
kafka-rest_1     | 2021-08-09T08:39:54.640Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: Broker changed state CONNECT -> DOWN
kafka-rest_1     | 2021-08-09T08:39:54.640Z sinek:nproducer:debug [thrd:localhost:9092/1]: Broadcasting state change
kafka-rest_1     | 2021-08-09T08:39:54.640Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: Purging bufq with 0 buffers
kafka-rest_1     | 2021-08-09T08:39:54.640Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: Updating 0 buffers on connection reset
kafka-rest_1     | 2021-08-09T08:39:55.640Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: broker in state DOWN connecting
kafka-rest_1     | 2021-08-09T08:39:55.640Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: Connecting to ipv4#127.0.0.1:9092 (plaintext) with socket 19
kafka-rest_1     | 2021-08-09T08:39:55.640Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: Broker changed state DOWN -> CONNECT
kafka-rest_1     | 2021-08-09T08:39:55.640Z sinek:nproducer:debug [thrd:localhost:9092/1]: Broadcasting state change
kafka-rest_1     | 2021-08-09T08:39:55.640Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: failed: err: Local: Broker transport failure: (errno: Connection refused)
kafka-rest_1     | 2021-08-09T08:39:55.640Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused
kafka-rest_1     | 2021-08-09T08:39:55.640Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: Broker changed state CONNECT -> DOWN
kafka-rest_1     | 2021-08-09T08:39:55.640Z sinek:nproducer:debug [thrd:localhost:9092/1]: Broadcasting state change
kafka-rest_1     | 2021-08-09T08:39:55.640Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: Purging bufq with 0 buffers
kafka-rest_1     | 2021-08-09T08:39:55.640Z sinek:nproducer:debug [thrd:localhost:9092/1]: localhost:9092/1: Updating 0 buffers on connection reset

Separate execution of unit tests from integration tests

As a developer I want to have a separation between quick unit tests and long-running integrated/integration tests which rely on external resources (Kafka in a testcontainer), SpringBoot Application start or other long running activities.

Allow filtering or sending different event to different topics in the publisher.

Feature Description

In some cases we want to only send certain events via Kafka. Also a typical Kafka setup is using a different topic for each message type. It would be nice if this was configurable.

Current Behaviour

Currently there is just one topic to which all events are sent.

Wanted Behaviour

Add an optional mapping lambda, from EventMessage to Optional KafkaPublisher<String, byte[]>. By default this will send all events to the default "Axon.Events" topic, if a topic is used in the builder, it will use a lambda that always returns the set topic.
If the lambda returns an empty optional the event will be skipped. Note that by using the lamda on the EventMessage, it's both versatile, and we can skip the conversion step if the event doesn't need to be sent via Kafka.
Something like:

m -> {
   if (m.getPayloadType() instance of WantedEvent) {
    return Optional.of("wanted-events-topic");
  } else {
    return Optional.empty()
  }
}

Possible Workarounds

Skipping the extension, and doing it yourself, for example with Spring Cloud Streams, which needs a lot more code to setup.

SerializationException while fetching KafkaTrackingToken from JpaTokenStore after processor enters retry mode

I'm using Axon 4.2 with extension-kafka 4.0-RC3
Below are the axon configurations used:
image

Application works fine and is able to consume events from kafka topic until there is any kind RuntimeException while handling an event. If a RuntimeException occurs during event handling the processor enters ERROR MODE as I'v configured PropagatingErrorHandler. During the 2nd retry onwards aoxn fails to fetch token because of jackson SerializationException.

token_entry before entring error mode:
image

Error Trace:

_org.axonframework.serialization.SerializationException: Error while deserializing object
at org.axonframework.serialization.json.JacksonSerializer.deserialize(JacksonSerializer.java:202)

at org.axonframework.eventhandling.tokenstore.AbstractTokenEntry.getToken(AbstractTokenEntry.java:117)
at org.axonframework.eventhandling.tokenstore.jpa.JpaTokenStore.fetchToken(JpaTokenStore.java:167)
at org.axonframework.eventhandling.TrackingEventProcessor.lambda$ensureEventStreamOpened$13(TrackingEventProcessor.java:474)
at org.axonframework.common.transaction.TransactionManager.fetchInTransaction(TransactionManager.java:70)
at org.axonframework.eventhandling.TrackingEventProcessor.ensureEventStreamOpened(TrackingEventProcessor.java:473)
at org.axonframework.eventhandling.TrackingEventProcessor.processingLoop(TrackingEventProcessor.java:274)
at org.axonframework.eventhandling.TrackingEventProcessor$TrackingSegmentWorker.run(TrackingEventProcessor.java:1071)
at org.axonframework.eventhandling.TrackingEventProcessor$WorkerLauncher.run(TrackingEventProcessor.java:1183)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot find a (Map) Key deserializer for type [simple type, class org.apache.kafka.common.TopicPartition]
at [Source: (byte[])"{"positions":{"giftcard-topic-r4-4":0,"giftcard-topic-r4-3":6,"giftcard-topic-r4-0":0,"giftcard-topic-r4-2":0,"giftcard-topic-r4-1":0}}"; line: 1, column: 1]

at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:67)
at com.fasterxml.jackson.databind.DeserializationContext.reportBadDefinition(DeserializationContext.java:1452)
at com.fasterxml.jackson.databind.deser.DeserializerCache._handleUnknownKeyDeserializer(DeserializerCache.java:599)
at com.fasterxml.jackson.databind.deser.DeserializerCache.findKeyDeserializer(DeserializerCache.java:168)
at com.fasterxml.jackson.databind.DeserializationContext.findKeyDeserializer(DeserializationContext.java:500)
at com.fasterxml.jackson.databind.deser.std.MapDeserializer.createContextual(MapDeserializer.java:248)
at com.fasterxml.jackson.databind.DeserializationContext.handlePrimaryContextualization(DeserializationContext.java:651)
at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.resolve(BeanDeserializerBase.java:484)
at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:293)
at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:244)
at com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:142)
at com.fasterxml.jackson.databind.DeserializationContext.findRootValueDeserializer(DeserializationContext.java:477)
at com.fasterxml.jackson.databind.ObjectReader._findRootDeserializer(ObjectReader.java:1895)
at com.fasterxml.jackson.databind.ObjectReader.bindAndClose(ObjectReader.java:1606)
at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1234)
at org.axonframework.serialization.json.JacksonSerializer.deserialize(JacksonSerializer.java:200)
... 9 more

Reading from two Kafka topics in an ACL

Hi everyone,

So my team is creating a proof-of-concept suite of microservices using Axon Framework. When we started v3.3.5 was the latest version so that's what we're using.

First, a little description. We have two bounded contexts - let's call them "reporting" and "verification." Each context contains one microservice at the moment. Both microservices are running on top of Spring Boot. Since these two microservices need to talk to each other we created an anti-corruption layer (ACL) between them. Each microservice receives commands via Spring Cloud, and each microservice dispatches events via Kafka. The ACL subscribes to the Kafka topic and translates events from one microservice into commands for the other. The ACL uses a Saga class to handle the events. So far this is working just great.

However, we don't really want both microservices putting their events in the same Kafka topic. We'd like each of them to have their own topic. After searching through the Google Group and reading through the source code a bit it seems we can accomplish this using two Tracking Event Processors - one for each topic. So, my first question is - is that correct?

Assuming it is I imagine you'd somehow associate the existing Saga to both Tracking Event Processors? How would you go about configuring these Tracking Event Processors with Spring Boot?

Appreciate the help!

Fine tune Kafka Configurer Module

At the moment, the only easy approach to configure the Kafka Extension in an Axon project, is to use the kafka-spring-boot-starter dependency provided.
It would be beneficial if the kafka module would contain a ConfigurerModule implementation, which for example could be tied in through the Service Loader mechanism, to configure Kafka outside of the scope of Spring (Boot) too.

Subscribing Producer Mode not working as expected

Basic information

  • Axon Framework version: 4.0.4
  • JDK version: 11
  • Kafka Extension version: 4.0-RC3

Kafka event publishing is supposed to work in two modes: subscribing or tracking. In tracking mode, event publishing is handled by a separate tracking event processor thread asynchronously. In subscribing mode, which is default, it should be handled within the same thread synchronously but it doesn't work as expected.

Steps to reproduce

  1. Open axon-kafka-example project in your IDE.
  2. Add a breakpoint to KafkaEventPublisher's handle method (line:77).
  3. Run the application with debug mode.
  4. When the execution hits the breakpoint, look at the stacktrace.
  5. You will see that, it is invoked by a tracking event processor asysnchronously.

Expected behaviour

Kafka event publishing should be invoked synchronously.

Actual behaviour

It is invoked by a tracking event processor thread asynchronously. Subscribing mode doesn't work.

Error running example: "The XStream instance is a hard requirement and should be provided"

Basic information

  • Axon Framework version: 4.6.0-SNAPSHOT
  • JDK version: 11
  • Kafka Extension version: 4.6.0-SNAPSHOT
  • Complete executable reproducer if available (e.g. GitHub Repo):

Steps to reproduce

Simply follow the steps in the README for the kafka-extension example

Expected behaviour

It runs at least!

Actual behaviour

Fails at startup with:

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'kafkaPublisher' defined in class path resource [org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.axonframework.extensions.kafka.eventhandling.producer.KafkaPublisher]: Factory method 'kafkaPublisher' threw exception; nested exception is org.axonframework.common.AxonConfigurationException: The XStream instance is a hard requirement and should be provided

Seems related to this discussion:
https://discuss.axoniq.io/t/kafka-extension-kafka-publisher-does-not-support-axon-framework-4-5-5/3773

Without this, my first attempt at Axon framework with Kafka is dead in the water

Check for missing Producer/Consumer properties

The KafkaProperties class is likely missing some configurable properties. This means users do not have "full Kafka control" over the Producer and Consumer this extension uses.
Firstly, we should verify which are missing, if any, after which we should introduce those.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.