spring-projects / spring-kafka Goto Github PK
View Code? Open in Web Editor NEWProvides Familiar Spring Abstractions for Apache Kafka
Home Page: https://projects.spring.io/spring-kafka
License: Apache License 2.0
Provides Familiar Spring Abstractions for Apache Kafka
Home Page: https://projects.spring.io/spring-kafka
License: Apache License 2.0
The error handler is invoked in the context of ConsumerRecords
(batch) processing instead of each individual ConsumerRecord
- this means messages within the batch after an error are skipped/ignored.
Also we need to consider whether we need autoCommitOnError
- see spring-cloud/spring-cloud-stream#542
We are trying to implement manual commit with Spring DSL Kafka with the below code. We couldn't find any reference to it.
We can add consumer porperty as "Auto.commit" as "False", but we wanted to commit the message once we process the message successfully. Can some one help on this?
@Bean
IntegrationFlow consumer() {
KafkaHighLevelConsumerMessageSourceSpec messageSourceSpec = Kafka
.inboundChannelAdapter(new ZookeeperConnect(this.kafkaConfig.getZookeeperAddress()))
.consumerProperties(
props -> props.put("zookeeper.session.timeout.ms", "500").put("zookeeper.sync.time.ms", "250").
put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer").
put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer").
put("auto.offset.reset", "smallest").
put("auto.commit.interval.ms", "100")
)
.addConsumer(this.kafkaConfig.getConsumerGroup(),
metadata -> metadata.consumerTimeout(100)
.topicStreamMap(m -> m.put(this.kafkaConfig.getTopicRead(), 1)).maxMessages(1));
Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer = e -> e.poller(p -> p.fixedDelay(100));
return IntegrationFlows.from(messageSourceSpec, endpointConfigurer)
.<Map<String, List<byte[]>>> handle((payload, headers) -> {
payload.entrySet().forEach(e -> processMessage((ConcurrentHashMap<Integer, List<byte[]>>) e.getValue()));
return null;
}).get();
}
Version:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-java-dsl</artifactId>
<version>1.1.0.RELEASE</version>
</dependency>
We have this logs:
18:49:27.318 DEBUG [testMANUAL_IMMEDIATE-1-kafka-1][org.springframework.kafka.listener.KafkaMessageListenerContainer] Received: 2 records
18:49:27.318 DEBUG [testMANUAL_IMMEDIATE-0-kafka-1][org.springframework.kafka.listener.KafkaMessageListenerContainer] Received: 2 records
18:49:27.319 INFO [testMANUAL_IMMEDIATE-1-kafka-1][org.springframework.kafka.listener.ConcurrentMessageListenerContainerTests] manual: ConsumerRecord(topic = testTopic5, partition = 1, offset = 0, key = 2, value = bar)
18:49:27.319 INFO [testMANUAL_IMMEDIATE-0-kafka-1][org.springframework.kafka.listener.ConcurrentMessageListenerContainerTests] manual: ConsumerRecord(topic = testTopic5, partition = 0, offset = 0, key = 0, value = foo)
18:49:27.319 INFO [testMANUAL_IMMEDIATE-1-kafka-1][org.springframework.kafka.listener.ConcurrentMessageListenerContainerTests] manual: ConsumerRecord(topic = testTopic5, partition = 1, offset = 1, key = 2, value = qux)
18:49:27.319 TRACE [testMANUAL_IMMEDIATE-1-kafka-1][org.springframework.kafka.listener.KafkaMessageListenerContainer] Polling...
18:49:27.320 TRACE [testMANUAL_IMMEDIATE-1-kafka-1][org.springframework.kafka.listener.KafkaMessageListenerContainer] Polling...
18:49:27.320 INFO [testMANUAL_IMMEDIATE-0-kafka-1][org.springframework.kafka.listener.ConcurrentMessageListenerContainerTests] manual: ConsumerRecord(topic = testTopic5, partition = 0, offset = 1, key = 0, value = baz)
18:49:27.320 TRACE [testMANUAL_IMMEDIATE-0-kafka-1][org.springframework.kafka.listener.KafkaMessageListenerContainer] Polling...
18:49:27.320 TRACE [testMANUAL_IMMEDIATE-0-kafka-1][org.springframework.kafka.listener.KafkaMessageListenerContainer] Polling...
18:49:27.322 INFO [main][org.springframework.kafka.listener.ConcurrentMessageListenerContainerTests] Stop MANUAL_IMMEDIATE
18:49:27.323 DEBUG [testMANUAL_IMMEDIATE-0-kafka-1][org.springframework.kafka.listener.KafkaMessageListenerContainer] Committing: {testTopic5-0=OffsetAndMetadata{offset=1, metadata=''}}
18:49:27.323 DEBUG [testMANUAL_IMMEDIATE-1-kafka-1][org.springframework.kafka.listener.KafkaMessageListenerContainer] Committing: {testTopic5-1=OffsetAndMetadata{offset=1, metadata=''}}
However with the IMMEDIATE
I expect to see the commit exactly after each:
public void onMessage(ConsumerRecord<Integer, String> message, Acknowledgment ack) {
logger.info("manual: " + message);
ack.acknowledge();
latch.countDown();
}
Moreover looks these offsets are placed to the regular auto offsets:
if (!ackMode.equals(AckMode.MANUAL)) {
updatePendingOffsets(records);
}
From other they are in the manualOffsets
, too:
else if (getAckMode().equals(AckMode.MANUAL_IMMEDIATE)) {
updateManualOffset(record);
consumer.wakeup();
}
Also I'm not sure in that last one wakeup()
.
Its JavaDoc says:
/**
* Wakeup the consumer. This method is thread-safe and is useful in particular to abort a long poll.
* The thread which is blocking in an operation will throw {@link org.apache.kafka.common.errors.WakeupException}.
*/
Yes, we break the current poll()
, but we commit offsets only after the stop()
.
I'd say that we really should call commitAsync()
on the consumer but only for that record directly in the Acknowledgment
implementation without any update...Offset
.
In order for applications to react to a kafka consumer rebalance, it would be very helpful to expose the ConsumerRebalanceListener in the native KafkaConsumer. For example to ensure offset and/or state is committed before another instance takes over the partition. This is currently not possible with the current implementation of KafkaMessageListenerContainer.
Do you plan to add support for this?
When <K, V>
are <String, ?>
, send(K key, V data)
and send(String topic, V data)
are ambiguous.
See the comment on the matter: spring-attic/spring-integration-kafka#106 (comment)
The symptom is:
* @param instance the containe instance to configure.
There are other places in this class to polish, like redundant counter
and logger
properties.
Consider to polish other copy/paste artifacts like @since 1.4
or similar.
The previous version of Spring Integration Kafka had an offset management
stuff.
Has that gone since Kafka-0.9?
Thanks
In our use case we want to synchronously commit as soon as the message is proceed by the consumer. From looking at the http://docs.spring.io/spring-kafka/docs/1.0.0.M2/reference/htmlsingle/ I don't see a way to do this with current version of API.
I do see there is MANUAL_IMMEDIATE, but still the underlying its commitAsync(). Is there a way of synchronously committing the offset?
Using ConcurrentMessageListenerContainer, and setting concurrency to 1 for now (in future it may be greater than 1.) Also set container.setAckMode(AckMode.MANUAL_IMMEDIATE)
I am manually triggering acknowledge in onMessage();
container.setMessageListener(new AcknowledgingMessageListener<K, V>() {
@Override
public void onMessage(ConsumerRecord<K, V> message, Acknowledgment ack) {
logger.warn("**************************");
logger.warn("event id - " + event.getEventId());
logger.warn("**************************");
ack.acknowledge();
}
});
When there are messages already on topic, the acknowledge succeeds for first few messages on topic, but it fails for remaining messages. Once the consumer catches up, the acknowledge for subsequent message (event id -7 ie offset 8 onwards in below log) don't fail.
Why this happens?? Am i doing anything wrong.
2016-05-06 00:10:39.946 INFO 275200 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 0.9.0.1
2016-05-06 00:10:39.946 INFO 275200 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : 23c69d62a0cabf06
2016-05-06 00:10:40.214 INFO 275200 --- [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[]
2016-05-06 00:10:40.323 INFO 275200 --- [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[my-topic-0]
2016-05-06 00:10:40.402 INFO 275200 --- [ kafka-1] o.a.k.c.consumer.internals.Fetcher : Fetch offset 87 is out of range, resetting offset
2016-05-06 00:10:40.511 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.511 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : event id - 0
2016-05-06 00:10:40.511 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : event id - 1
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : event id - 2
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : event id - 3
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : event id - 4
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : event id - 5
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : event id - 6
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : event id - 7
2016-05-06 00:10:40.558 WARN 275200 --- [ kafka-1] c.p.subscriber.SampleEventSubscriber : **************************
2016-05-06 00:10:40.558 ERROR 275200 --- [ kafka-1] o.a.k.c.consumer.OffsetCommitCallback : Commit failed for {my-topic-0=OffsetAndMetadata{offset=3, metadata=''}}
org.apache.kafka.clients.consumer.internals.SendFailedException: null
2016-05-06 00:10:40.574 ERROR 275200 --- [ kafka-1] o.a.k.c.consumer.OffsetCommitCallback : Commit failed for {my-topic-0=OffsetAndMetadata{offset=4, metadata=''}}
org.apache.kafka.clients.consumer.internals.SendFailedException: null
2016-05-06 00:10:40.574 ERROR 275200 --- [ kafka-1] o.a.k.c.consumer.OffsetCommitCallback : Commit failed for {my-topic-0=OffsetAndMetadata{offset=5, metadata=''}}
org.apache.kafka.clients.consumer.internals.SendFailedException: null
2016-05-06 00:10:40.574 ERROR 275200 --- [ kafka-1] o.a.k.c.consumer.OffsetCommitCallback : Commit failed for {my-topic-0=OffsetAndMetadata{offset=6, metadata=''}}
org.apache.kafka.clients.consumer.internals.SendFailedException: null
2016-05-06 00:10:40.574 ERROR 275200 --- [ kafka-1] o.a.k.c.consumer.OffsetCommitCallback : Commit failed for {my-topic-0=OffsetAndMetadata{offset=7, metadata=''}}
org.apache.kafka.clients.consumer.internals.SendFailedException: null
2016-05-06 00:10:40.574 ERROR 275200 --- [ kafka-1] o.a.k.c.consumer.OffsetCommitCallback : Commit failed for {my-topic-0=OffsetAndMetadata{offset=8, metadata=''}}
org.apache.kafka.clients.consumer.internals.SendFailedException: null
The 'sonar-runner' plugin has been deprecated and is scheduled to be removed in Gradle 3.0. please use the official plugin from SonarQube (http://docs.sonarqube.org/display/SONAR/Analyzing+with+Gradle).
I've implemented (de)serialization for POJO messages as a separate project:
https://github.com/stepio/kafka-json
In my opinion it's too tiny to be supported as a separate functionality, but it would be nice as a part of Spring Kafka.
What do you think? If you agree on this, what package would you suggest?
Allow a single @TopicPartition
annotation to support > 1 partition on the same topic.
Alongside with the CODE_OF_CONDUCT.adoc
there must be presented a CONTRIBUTING.adoc
.
The best candidate can be copied and adapted from the Spring Integration.
Also mention CODE_OF_CONDUCT
in the README.
See SPR-14105 and AMQP-595
The template's not doing any conversion; it's just a lightweight wrapper around the Kafka producer; the conversion is done by the Kafka Serializer.
When using o.s.messaging.Message
.
Right now we can define @TopicPartition
everywhere: on class and on method level, or even parameter.
Mark the annotation with:
@Target({})
@Retention(RUNTIME)
to restrict its usage.
See @Poller
in the Spring Integration.
Is there any reason to use commons-logging instead of slf4j?
I can update this, if it's not a feature.
P.S.: Also could you please explain the reason of not making Logger static?
There are not send methods for a partition with no key.
Just the proposal IMO.
Any ...Endpoint
classes and KafkaListenerContainerFactory
look like fully configuration artifacts, so should be in the appropriate place.
From other side their implementations are already there.
Looks weird for me and should be fixed or explained.
Thanks
https://build.spring.io/browse/SK-SON-7
org.gradle.execution.TaskSelectionException: Task 'sonarRunner' not found in root project 'spring-kafka-dist'.
I use this demo
https://github.com/spring-projects/spring-integration-samples/blob/master/basic/kafka/src/main/java/org/springframework/integration/samples/kafka/Application.java
i can sent message but cant get message
@Service
public class KafkaService {
private static final Logger logger = LoggerFactory
.getLogger(KafkaService.class);
@Value("${kafka.topic}")
private String topic;
@Value("${kafka.messageKey}")
private String messageKey;
@Value("${kafka.broker.address}")
private String brokerAddress;
@Value("${kafka.zookeeper.connect}")
private String zookeeperConnect;
@Value("${kafka.consumer.group}")
private String group;
@ServiceActivator(inputChannel = "toKafka")
@Bean
public MessageHandler handler() throws Exception {
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setTopicExpression(new LiteralExpression(this.topic));
handler.setMessageKeyExpression(new LiteralExpression(this.messageKey));
return handler;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, this.group);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public KafkaMessageListenerContainer<String, String> container() throws Exception {
return new KafkaMessageListenerContainer<>(consumerFactory(), new TopicPartition(this.topic, 0));
}
@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
kafkaAdapter(KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(container);
kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
return kafkaMessageDrivenChannelAdapter;
}
@Bean
public PollableChannel received() {
return new QueueChannel();
}
}
@SpringBootApplication
public class SpringApp {
ApplicationContext ctx = SpringApplication.run(SpringApp.class, args);
KafkaTemplate<String,String> kafkaTemplate = ctx.getBean("kafkaTemplate", KafkaTemplate.class);
kafkaTemplate.send("test",0 ,"hello kafka");
System.out.println("kafka");
PollableChannel fromKafka = ctx.getBean("received", PollableChannel.class);
Message<?> received = fromKafka.receive(10000);
System.out.println(received);
while (received != null) {
System.out.println(received);
received = fromKafka.receive(10000);
}
}
i read this document
http://docs.spring.io/spring-kafka/docs/1.0.0.M1/reference/htmlsingle/
and try this also cant get message
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
kafkaListenerContainerFactory() {
SimpleKafkaListenerContainerFactory<String, String> factory =
new SimpleKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
return factory;
}
@KafkaListener(topics = "test")
public void listen(String data) {
System.out.println("received a message");
System.out.println(data);
}
Adding container.setResetStrategy(ContainerOffsetResetStrategy.EARLIEST);
makes test happy.
And I don't need a Thread.sleep()
any more!
Any clues?
See: spring-cloud/spring-cloud-stream#542
Currently, the new container always commits regardless of whether an exception is caught.
It's not a bug report, more like a consultancy request... maybe an idea for the feature.
While playing with kafka-console-consumer I've spotted that sometimes after consumer rebalancing same message is received the second time. Checked google, read the article:
https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer
As I understand, it's not a bug, but kind of limitation.
But we all know that in many cases message uniqueness is important & same message should not be processed second time. So is there any already implemented approach to achieve such uniqueness?
As of now, I see the next quite easy mechanism, which may help:
What do you think?
KafkaOperations and KafkaTemplate does not currently provide a way to perform truly non-blocking operations since they are using java.util.concurrent.Future
where the only way to retrieve the result is a blocking get()
method.
Providing a Reactive alternative to these classes using Reactive Streams and Reactor Core types, like in our new Reactive HTTP client (which is a Reactive alternative to RestTemplate
), in the Reactive Spring Data work that is about to start (see this ReactiveMongoOperations draft) or in the new Cloud Foundry Java client would enable truly async non-blocking and Reactive usage of spring-kafka
and would be a huge improvement in term of scalability, especially when used with Reactive Spring Web or Reactive Spring Data support, or even with the upcoming Reactive support in Spring Cloud Stream 1.1.
Based on all the work we have done on this topic on Spring Reactive to prepare the Reactive support of Spring Framework 5, I would advise to use:
Publisher
for input parameter to accept any Reactive Streams implementation like Reactor Mono
/ Flux
, Akka Streams types or upcoming RxJava 2 Flowable
type.Mono
return type for async single values (it implements Reactive Streams Publisher
, can be easily converted to a blocking value with get()
, to a Java 8 CompletableFuture
or to RxJava Single
).Flux
for streams that will emit multiple values and where we want to be able to subscribe (it also implements Reactive Streams Publisher
, can be easily concerted into RxJava types).I am available with @smaldini and the Spring Reactive team to help bridging Kafka client API with Flux
and Mono
types. I think it should be doable (and efficient) by using Kafka 0.9 new Producer API and new Consumer API.
Port from s-i-k
While working on pull request I've spotted several issues with formatting:
In favor of consumer properties; continue to support recent, however.
References the old convertAndSend
methods.
This is more a feature request and an issue: While working with this project and the spring-integration-kafka project, I faced the problem that KafkaConsumer can fetch messages much faster than I can consume them. With the current implementation the batch of messages are read and each one is pushed downstream for processing. However, if it takes more than session.timeout.ms to process the batch (which can be thousands of messages if there is a large kafka lag) a rebalance will be triggered as KafkaConsumer was not able to send heart beats (it happens while calling poll()). This could be solved by placing a queueChannel after the KafkaMessageListenerContainer, but that just moves the problem as at some point that queue is going to fill up or I run out of memory.
So my suggestion is this: Add support for some flow control in the KafkaMessageListenerContainer so I can start/pause the KafkaConsumer when my downstream processing pipe is backed up to prevent filling up all buffers. KafkaConsumer supports start/pause out of the box, so it allows to pause the consumer while it keeps sending heart beats (with poll()) but not receive more messages until it can be consumed downstream. This could then be wrapped in a integration component that could manage the flow control based on MessageDeliveryExceptions or similar.
I did an implementation like the above, but it required to copy the existing KafkaMessageListenerContainer to make it work due to most of the logic is in the inner class.
Separate inbound headers from outbound to prevent propagation by default - allows the elimination of the enableHeaderRouting
property in the s-i-k KPMH
.
Looks like manualOffsets
are never cleared:
We add the topic
and the Map for its offsets per partitions in the updateManualOffset
:
if (!this.manualOffsets.containsKey(record.topic())) {
this.manualOffsets.putIfAbsent(record.topic(), new ConcurrentHashMap<Integer, Long>());
}
this.manualOffsets.get(record.topic()).put(record.partition(), record.offset());
We call commitAsync()
for them in the next commitIfNecessary()
method. But we never remove those entries which have been commited already.
The best way to fix is to add iterator.remove();
into the while
for the iterator
.
Otherwise we commit them again on the next iteration.
Build fails in 1.7 due to @repeatable annotation on KafkaListener but documentation states it is 1.7 compatible.
:spring-kafka:compileJava
C:\Projects\spring-kafka\spring-kafka\src\main\java\org\springframework\kafka\annotation\KafkaListener.java:21: error: cannot find symbol
import java.lang.annotation.Repeatable;
^
symbol: class Repeatable
location: package java.lang.annotation
C:\Projects\spring-kafka\spring-kafka\src\main\java\org\springframework\kafka\annotation\KafkaListener.java:85: error: cannot find symbol
@repeatable(KafkaListeners.class)
^
symbol: class Repeatable
2 errors
:spring-kafka:compileJava FAILED
FAILURE: Build failed with an exception.
Compilation failed; see the compiler error output for details.
As part of a Spring Integration test I want to use the EmbeddedKafka
:
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(...);
The problem is that I can't figure out how to get the random port assigned to the broker and use it from the Java configuration:
@Configuration
public class KafkaConsumerConfiguration {
@Value("${kafka.broker.port}")
private String kafkaBrokerPort;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + kafkaBrokerPort);
...
return new DefaultKafkaConsumerFactory<>(props);
}
The example given in the documentation (http://docs.spring.io/spring-kafka/docs/1.0.0.M2/reference/htmlsingle/#kafka) doesn't use an application context but rather instantiates everything from the test.
And in the following test the configuration was embedded in the test class so the configuration could easily access the instance of EmbeddedKafka
: https://github.com/spring-projects/spring-kafka/blob/48845e1de043ec9caf6493c7e2320966e7986a9f/spring-kafka%2Fsrc%2Ftest%2Fjava%2Forg%2Fspringframework%2Fkafka%2Fannotation%2FEnableKafkaIntegrationTests.java. But we can't really do that in a "real" application.
My problem is similar to what this person has with the property server.port
: http://stackoverflow.com/questions/31058489/override-default-spring-boot-application-properties-settings-in-junit-test-with
I can't think of a good solution right now. I'm not really familiar with the execution order of the @ClassRule
and the test context. In any case the test context can be cached across test suites so I don't see how we could inject a dynamic port.
The only solution I see is giving an option to assign a static port to the EmbeddedKafka. I don't think it's too bad in a test environment.
What do you think?
Default just logs.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.