Coder Social home page Coder Social logo

bakdata / fluent-kafka-streams-tests Goto Github PK

View Code? Open in Web Editor NEW
88.0 88.0 16.0 524 KB

Fluent Kafka Streams Test with Java

Home Page: https://medium.com/bakdata/fluent-kafka-streams-tests-e641785171ec

License: MIT License

Java 100.00%
avro java kafka-streams test-framework testing testing-framework

fluent-kafka-streams-tests's People

Contributors

aheise avatar bakdata-bot avatar jakobedding avatar lawben avatar linuxidefix avatar milindmantri avatar onecricketeer avatar philipp94831 avatar raminqaf avatar torbsto avatar yannick-roeder avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

fluent-kafka-streams-tests's Issues

Support topic pattern

Hi,

Would it be possible to add support to topic pattern?
To be able to test behaviour like that:
builder.stream(Pattern.compile(".*")).to("destination-topic");

Bump kafka version to 2.2.0

Currently, we support Kafka 2.0.x. Unfortunately, Kafka breaks API also in minor releases, so we need to make sure we catch up.

I'm suggesting to bump the minor version as long as we do not introduce breaking API changes as well. We could mimic the minor version of Kafka to make it easier to correlate.

Upgrade Kafka, Confluent and Protobuf dependencies

Hello Team Bakdata!

We are upgrading dependencies of our Kafka based project.

  • Kafka 3.2 upgrades to M1 native supported RocksDB. (link)
  • Confluent 7.2.0 brings in Kafka 3.2 (link)
  • Protobuf released arm64 compatible protoc v3.17.3 onwards. (link)

The current used version for Kafka is 3.1 and Confluent is 7.1.1. I would like to suggest that the project could also benefit from upgrading providing smoother experience on arm64 based M1.

I'll putting up the PR (which I have locally tested using ./gradlew test. Is there more to testing dependencies change?

StateStore Configuration

If I add a state store with a SpecificAvroSede as a value. How do I register it with the Test Topoloogy or point it to the SchemaRegistryMock. When I try to storeit i get...e

Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message Caused by: java.lang.NullPointerException at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl

NullPointerException thrown when using TopicNameExtractor for sinks in topology

I am currently trying to use this library and finding NullPointerException thrown when using TopicNameExtractor.

I have two trivial topologies, one with static topic names, the second using TopicName Extractor.

Running a simple passthrough test with this runs OK for the staticTopology, but throws NPE for the dynamicTopology.

Is TopicNameExtractor unsupported? Is there any workaround? Is there any plan to support it in the future?

Here the trivial classes, testcases and stack trace.

package com.example.kstreams;

import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;

public class KafkaStreamsTopology {
    private final Properties properties;
    String inputTopic = "input";
    String outputTopic = "output";

    public KafkaStreamsTopology(Properties properties) {
        this.properties = properties;
    }

    public Topology staticTopology() {
        var builder = new StreamsBuilder();
        builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()))
            .to(outputTopic);
        return builder.build();
    }

    public Topology dynamicOutputTopology() {
        var builder = new StreamsBuilder();
        builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()))
            .to((key, value, recordContext) -> outputTopic);
        return builder.build();
    }
}
package com.example.kstreams;

import com.bakdata.fluent_kafka_streams_tests.TestTopology;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Topology;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

public class KafkaFluentTests {

    private TestTopology<String, String> testTopology;
    private String key = "key";
    private String value = "value";
    private Properties properties = new Properties() {{
        put("application.id", "testApp");
        put("bootstrap.servers", "dummy:1234");
    }};
    private KafkaStreamsTopology topologyComponent = new KafkaStreamsTopology(properties);
    private Topology staticTopology = topologyComponent.staticTopology();
    private Topology dynamicTopology = topologyComponent.dynamicOutputTopology();

    //@BeforeEach skipped, as a different topology is created in each test
    @AfterEach
    void tearDown() {
        testTopology.stop();
    }

    @Test
    void testStaticOutputTopicName() {
        testTopology = new TestTopology<>(staticTopology, properties).withDefaultSerde(Serdes.String(), Serdes.String());
        testTopology.start();
        testTopology.input().add(key, value);

        testTopology.streamOutput().expectNextRecord()
            .hasKey(key).and().hasValue(value);
        //Runs OK!
    }

    @Test
    void testDynamicOutputTopicName() {
        testTopology = new TestTopology<>(dynamicTopology, properties);

        testTopology.start();
        // start throws NullPointerException at com.bakdata.fluent_kafka_streams_tests.TestTopology.addExternalTopics(TestTopology.java:158)
        testTopology.input().add(key, value);

        testTopology.streamOutput().expectNextRecord()
            .hasKey(key).and().hasValue(value);
    }
}
// exception stacktrace
java.lang.NullPointerException
	at com.bakdata.fluent_kafka_streams_tests.TestTopology.addExternalTopics(TestTopology.java:158)
	at com.bakdata.fluent_kafka_streams_tests.TestTopology.start(TestTopology.java:334)
	at com.example.kstreams.KafkaFluentTests.testDynamicOutputTopicName(KafkaFluentTests.java:42)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:686)
	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
	at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:212)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:208)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:137)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:71)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:135)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:248)
	at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$5(DefaultLauncher.java:211)
	at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:226)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:199)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:132)
	at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)

Allow checking output value contents using matchers (e.g. AssertJ matchers)

Hello, thanks for the wrapper. Output values from my topics are rather big (containing many keys) and for the purpose of testing, I'd like to have a #hasValueMatching(Predicate<V> predicate) so that I don't need to construct full copies of my target objects to see some of the properties fulfilled.

Maybe there's something wrong with this need itself. I'd like to hear from you why that could possibly be a bad idea to test the values only partially. Thanks.

Add support for JUnit4

This is a follow-up ticket to #22 .

Currently, the framework is opinionated towards JUnit4. We could broaden the scope by refactoring out the JUnit5 specific parts.

However, I first would like to see an actual demand. So please upvote if needed.

[schema-registry] Use with Java 8 tests

I tried to use this library in Java 8 code, as most processes I work with still have not upgraded, but it complained that I would have to compile against Java 11 myself by including this.

I haven't researched much on it, but is that something that adding modules can solve, or would there have to be a separately maintained release that compiled against Java 8?

Note: Kafka itself didn't move to Java 11 until 2.2.0 release.

[schema-registry] MockedSchemaRegistry 5.2.1 returns Schema id -1 for some Schema registrations

After upgrading to confluent version 5.2.1 we ran into some issues with our test code:

Some schemas were getting version -1 on registration, causing wrong SpecificAvroSerdes to be used for deserialization.

The bug seems to be introduced here in confluent schema-registry version 5.2.0:

confluentinc/schema-registry@a94cfd7

And solved here in confluent schema-registry version 5.2.2-rc3:

confluentinc/schema-registry@f1d6d42#diff-e5caaf947bc9ff275003783d5d50eee6

Id like to suggest to downgrade the schema-registry dependency to 5.1.3 until 5.2.2 has been released so users don't have this issue out of the box.

Schema Registry List and Get subject versions

I would like to be able register multiple schemas under a subject, and know about their evolution.

Add ability to perform

GET /subjects/:name/versions/
GET /subjects/:name/versions/:versionId
GET /subjects/:name/versions/latest

No records returned when using Suppression

In a simple case of the use of suppression in a KTable the output-stream returns no results.

My stream design

        final KStream<String, InputEvent> input = kStreamBuilder.stream(inputTopic, Consumed.with(Serdes.String(),
                InputEventSerde()));
        final KStream<String, AggEvent> myKstream = input
                .mapValue((key, value) -> KeyValue.pair(value.getKey(), AggEvent.fromValue(value)))
                // Make the KTable
                .groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofMinutes(5L)).grace(graceDuration))
                .reduce(Event::add, Materialized.<String, Event, WindowStore<Bytes,byte[]>>as("aggregated_events").withCachingDisabled()
                        .withKeySerde(Serdes.String())
                        .withValueSerde(AggEventSerde())
                        .withRetention(Duration.ofMinutes(10L).plus(graceDuration)))
                .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
                .toStream()
                // remove the windowed
                .map((k, v) -> KeyValue.pair(v.getKey(), v)); 

        myKstream.to(outputTopic, Produced.with(Serdes.String(), AggEventSerde()));

The objects

public class InputEvent {
    String s;
    public InputEvent(String s) {
        this.s = s;
    }

    public String getS() {
        return s;
    }
    // equals
    // hash
}

public class AggEvent {
    public Set<String> events = new HashSet<>();
    
    public AggEvent add(InputEvent i) {
        events.add(i.getS());
        return this;
    }

    public static AggEvent fromValue(InputEvent i ) {
        return new AggEvent().add(i);
    }
    // equals
    // hash
}

The test

When running the tests as simple as the following it throws an exception ath the .expectNextRecord() due to "not having any record".

        var firstInputEvent = new InputEvent("a");
        var secondInputEvent = new InputEvent("b");

       var expectedEvent = new AggEvent().add(firstInputEvent).add(secondInputEvent);

        this.testTopology.input()
            .at(firstMinute).add(firstInputEvent)
            .at(secondMinute).add(secondInputEvent)
            .at(now).add(dummyInputForFlushingSupression()); // just creates an emptyMessage that is sent further in time forcing the KTable to flush messages it supressed.

        this.testTopology.streamOutput().withSerde(Serdes.String(), Serdes.Long())
            .expectNextRecord().hasValue(expectedEvent)
            .expectNoMoreRecord();

The same test using a default inplementation with TestDrive and InputTopic and OutputTopics works just fine for this case

500 errors on server result in JSON serialiser exceptions rather than the expected error as the return is in HTML

When attempting to call the mock server in an kafka streams integration test I am receiving a 500 error from the server. It consists of the following headers:
HTTP/1.1 500 Server Error
Cache-Control must-revalidate,no-cache,no-store
Content-Type text/html;charset=ISO-8859-1
Content-Length 5820
Connection close

The response body in the debugger (truncated) is as follows:

<head>
<meta http-equiv="Content-Type" content="text/html;charset=ISO-8859-1"/>
<title>Error 500 java.lang.NoSuchMethodError: &apos;boolean com.fasterxml.jackson.annotation.JsonSubTypes.failOnRepeatedNames()&apos;</title>
</head>
<body><h2>HTTP ERROR 500 java.lang.NoSuchMethodError: &apos;boolean com.fasterxml.jackson.annotation.JsonSubTypes.failOnRepeatedNames()&apos;</h2>
<table>
<tr><th>URI:</th><td>/subjects/public_systest_alerting_preferences_2-value/versions</td></tr>
<tr><th>STATUS:</th><td>500</td></tr>
<tr><th>MESSAGE:</th><td>java.lang.NoSuchMethodError: &apos;boolean com.fasterxml.jackson.annotation.JsonSubTypes.failOnRepeatedNames()&apos;</td></tr>
<tr><th>SERVLET:</th><td>com.github.tomakehurst.wiremock.servlet.WireMockHandlerDispatchingServlet-76304b46</td></tr>
<tr><th>CAUSED BY:</th><td>java.lang.NoSuchMethodError: &apos;boolean com.fasterxml.jackson.annotation.JsonSubTypes.failOnRepeatedNames()&apos;</td></tr>
</table>
<h3>Caused by:</h3><pre>java.lang.NoSuchMethodError: &apos;boolean com.fasterxml.jackson.annotation.JsonSubTypes.failOnRepeatedNames()&apos;
	at com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector.findSubtypes(JacksonAnnotationIntrospector.java:627)
	at com.fasterxml.jackson.databind.jsontype.impl.StdSubtypeResolver._collectAndResolve(StdSubtypeResolver.java:265)
	at com.fasterxml.jackson.databind.jsontype.impl.StdSubtypeResolver.collectAndResolveSubtypesByClass(StdSubtypeResolver.java:152)
...

This results in Unexpected character ('<' (code 60)): expected a valid value (JSON String, Number (or 'NaN'/'INF'/'+INF'), Array, Object or token 'null', 'true' or 'false') exceptions being reported back to the calling code from the schema registry library. This appears to be because it's returning HTML rather than the expected JSON.

This is using the schema-registry-mock 2.8.1 against kafka-streams 7.3.0-ce

java.lang.ClassNotFoundException: com.fasterxml.jackson.annotation.JsonMerge

Facing error while running test case with Junit 5 when using the schema registry mock:

[INFO] -------------------------------------------------------
[INFO]  T E S T S
[INFO] -------------------------------------------------------
[INFO] Running com.loconav.IgnitionAlertTest
[main] INFO org.eclipse.jetty.util.log - Logging initialized @1028ms
[main] INFO org.eclipse.jetty.server.Server - jetty-9.2.24.v20180105
[main] INFO org.eclipse.jetty.server.handler.ContextHandler - Started o.e.j.s.ServletContextHandler@4566d049{/__admin,null,AVAILABLE}
[main] INFO org.eclipse.jetty.server.handler.ContextHandler - Started o.e.j.s.ServletContextHandler@7f6f61c8{/,null,AVAILABLE}
[main] INFO org.eclipse.jetty.server.NetworkTrafficServerConnector - Started NetworkTrafficServerConnector@4a11eb84{HTTP/1.1}{0.0.0.0:51339}
[main] INFO org.eclipse.jetty.server.Server - Started @1225ms
[main] INFO io.confluent.kafka.serializers.KafkaAvroSerializerConfig - KafkaAvroSerializerConfig values: 
        schema.registry.url = [http://localhost:51339]
        basic.auth.user.info = [hidden]
        auto.register.schemas = true
        max.schemas.per.subject = 1000
        basic.auth.credentials.source = URL
        schema.registry.basic.auth.user.info = [hidden]
        value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
        key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy

[main] INFO org.eclipse.jetty.server.NetworkTrafficServerConnector - Stopped NetworkTrafficServerConnector@4a11eb84{HTTP/1.1}{0.0.0.0:0}
[main] INFO org.eclipse.jetty.server.handler.ContextHandler - Stopped o.e.j.s.ServletContextHandler@7f6f61c8{/,null,UNAVAILABLE}
[main] INFO org.eclipse.jetty.server.handler.ContextHandler - Stopped o.e.j.s.ServletContextHandler@4566d049{/__admin,null,UNAVAILABLE}
[ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.823 s <<< FAILURE! - in com.loconav.IgnitionAlertTest
[ERROR] alertIsGeneratedWithCorrectTimestampForDeviceCoordinates  Time elapsed: 0.237 s  <<< ERROR!
java.lang.NoClassDefFoundError: com/fasterxml/jackson/annotation/JsonMerge
        at com.loconav.IgnitionAlertTest.setup(IgnitionAlertTest.java:52)
Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.annotation.JsonMerge
        at com.loconav.IgnitionAlertTest.setup(IgnitionAlertTest.java:52)

[INFO] 
[INFO] Results:
[INFO] 
[ERROR] Errors: 
[ERROR]   IgnitionAlertTest.setup:52 ? NoClassDefFound com/fasterxml/jackson/annotation/...
[INFO] 
[ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0

The above mentioned line number is when I am retrieving Topology from the main code. Line no 51,52,53 in Test:

50    IgnitionAlertService alertService = new IgnitionAlertService();
51    Topology topology =
52        alertService.generateTopology(
53            props.getProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG));

I am not using JsonMerge annotation anywhere in my main code or test cases. Here is the dependency tree:

[INFO] Scanning for projects...
[INFO] 
[INFO] --------------------< com.loconav:ignition-alerts >---------------------
[INFO] Building ignition-alerts 1.0.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ ignition-alerts ---
[INFO] com.loconav:ignition-alerts:jar:1.0.0
[INFO] +- com.loconav:kafka-clients-core:jar:1.0.0:compile
[INFO] |  +- org.apache.kafka:kafka-clients:jar:2.1.0:compile
[INFO] |  |  +- com.github.luben:zstd-jni:jar:1.3.5-4:compile
[INFO] |  |  +- org.lz4:lz4-java:jar:1.5.0:compile
[INFO] |  |  +- org.xerial.snappy:snappy-java:jar:1.1.7.2:compile
[INFO] |  |  \- org.slf4j:slf4j-api:jar:1.7.25:compile
[INFO] |  +- org.apache.kafka:kafka-streams:jar:2.1.0:compile
[INFO] |  |  +- org.apache.kafka:connect-json:jar:2.1.0:compile
[INFO] |  |  |  \- org.apache.kafka:connect-api:jar:2.1.0:compile
[INFO] |  |  |     \- javax.ws.rs:javax.ws.rs-api:jar:2.1.1:compile
[INFO] |  |  \- org.rocksdb:rocksdbjni:jar:5.14.2:compile
[INFO] |  +- io.confluent:kafka-schema-registry-client:jar:5.1.2:compile
[INFO] |  |  +- io.confluent:common-config:jar:5.1.2:compile
[INFO] |  |  +- org.apache.avro:avro:jar:1.8.1:compile
[INFO] |  |  |  +- org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile
[INFO] |  |  |  +- org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile
[INFO] |  |  |  +- com.thoughtworks.paranamer:paranamer:jar:2.7:compile
[INFO] |  |  |  +- org.apache.commons:commons-compress:jar:1.8.1:compile
[INFO] |  |  |  \- org.tukaani:xz:jar:1.5:compile
[INFO] |  |  +- com.fasterxml.jackson.core:jackson-databind:jar:2.9.6:compile
[INFO] |  |  \- io.confluent:common-utils:jar:5.1.2:compile
[INFO] |  |     +- org.apache.zookeeper:zookeeper:jar:3.4.13:compile
[INFO] |  |     |  +- jline:jline:jar:0.9.94:compile
[INFO] |  |     |  +- org.apache.yetus:audience-annotations:jar:0.5.0:compile
[INFO] |  |     |  \- io.netty:netty:jar:3.10.6.Final:compile
[INFO] |  |     \- com.101tec:zkclient:jar:0.10:compile
[INFO] |  +- com.loconav:kafka-schemas:jar:1.0.0:compile
[INFO] |  |  \- com.google.guava:guava:jar:27.1-jre:compile
[INFO] |  |     +- com.google.guava:failureaccess:jar:1.0.1:compile
[INFO] |  |     +- com.google.guava:listenablefuture:jar:9999.0-empty-to-avoid-conflict-with-guava:compile
[INFO] |  |     +- com.google.code.findbugs:jsr305:jar:3.0.2:compile
[INFO] |  |     +- org.checkerframework:checker-qual:jar:2.5.2:compile
[INFO] |  |     +- com.google.errorprone:error_prone_annotations:jar:2.2.0:compile
[INFO] |  |     +- com.google.j2objc:j2objc-annotations:jar:1.1:compile
[INFO] |  |     \- org.codehaus.mojo:animal-sniffer-annotations:jar:1.17:compile
[INFO] |  +- com.loconav:kafka-streams-services:jar:1.0.0:compile
[INFO] |  \- org.slf4j:slf4j-simple:jar:1.7.25:compile
[INFO] +- org.junit.jupiter:junit-jupiter-engine:jar:5.4.2:test
[INFO] |  +- org.apiguardian:apiguardian-api:jar:1.0.0:test
[INFO] |  \- org.junit.platform:junit-platform-engine:jar:1.4.2:test
[INFO] +- org.junit.jupiter:junit-jupiter-api:jar:5.4.2:test
[INFO] |  +- org.opentest4j:opentest4j:jar:1.1.1:test
[INFO] |  \- org.junit.platform:junit-platform-commons:jar:1.4.2:test
[INFO] +- com.bakdata.fluent-kafka-streams-tests:schema-registry-mock:jar:1.0.1:compile
[INFO] |  +- io.confluent:kafka-avro-serializer:jar:5.1.0:compile
[INFO] |  +- io.confluent:kafka-streams-avro-serde:jar:5.1.0:compile
[INFO] |  \- com.github.tomakehurst:wiremock:jar:2.20.0:runtime
[INFO] |     +- org.eclipse.jetty:jetty-server:jar:9.2.24.v20180105:runtime
[INFO] |     |  +- javax.servlet:javax.servlet-api:jar:3.1.0:runtime
[INFO] |     |  +- org.eclipse.jetty:jetty-http:jar:9.2.24.v20180105:runtime
[INFO] |     |  \- org.eclipse.jetty:jetty-io:jar:9.2.24.v20180105:runtime
[INFO] |     +- org.eclipse.jetty:jetty-servlet:jar:9.2.24.v20180105:runtime
[INFO] |     |  \- org.eclipse.jetty:jetty-security:jar:9.2.24.v20180105:runtime
[INFO] |     +- org.eclipse.jetty:jetty-servlets:jar:9.2.24.v20180105:runtime
[INFO] |     |  +- org.eclipse.jetty:jetty-continuation:jar:9.2.24.v20180105:runtime
[INFO] |     |  \- org.eclipse.jetty:jetty-util:jar:9.2.24.v20180105:runtime
[INFO] |     +- org.eclipse.jetty:jetty-webapp:jar:9.2.24.v20180105:runtime
[INFO] |     |  \- org.eclipse.jetty:jetty-xml:jar:9.2.24.v20180105:runtime
[INFO] |     +- com.fasterxml.jackson.core:jackson-core:jar:2.8.11:compile
[INFO] |     +- com.fasterxml.jackson.core:jackson-annotations:jar:2.8.11:compile
[INFO] |     +- org.apache.httpcomponents:httpclient:jar:4.5.5:runtime
[INFO] |     |  +- org.apache.httpcomponents:httpcore:jar:4.4.9:runtime
[INFO] |     |  +- commons-logging:commons-logging:jar:1.2:runtime
[INFO] |     |  \- commons-codec:commons-codec:jar:1.10:runtime
[INFO] |     +- org.xmlunit:xmlunit-core:jar:2.5.1:runtime
[INFO] |     +- org.xmlunit:xmlunit-legacy:jar:2.5.1:runtime
[INFO] |     +- com.jayway.jsonpath:json-path:jar:2.4.0:runtime
[INFO] |     |  \- net.minidev:json-smart:jar:2.3:runtime
[INFO] |     |     \- net.minidev:accessors-smart:jar:1.2:runtime
[INFO] |     |        \- org.ow2.asm:asm:jar:5.0.4:runtime
[INFO] |     +- net.sf.jopt-simple:jopt-simple:jar:5.0.3:runtime
[INFO] |     +- junit:junit:jar:4.12:runtime
[INFO] |     +- org.apache.commons:commons-lang3:jar:3.7:runtime
[INFO] |     +- com.flipkart.zjsonpatch:zjsonpatch:jar:0.4.4:runtime
[INFO] |     +- com.github.jknack:handlebars:jar:4.0.7:runtime
[INFO] |     |  \- org.antlr:antlr4-runtime:jar:4.7.1:runtime
[INFO] |     \- com.github.jknack:handlebars-helpers:jar:4.0.7:runtime
[INFO] +- org.apache.kafka:kafka-streams-test-utils:jar:2.1.0:compile
[INFO] +- org.hamcrest:hamcrest-all:jar:1.3:test
[INFO] \- joda-time:joda-time:jar:2.10.1:compile
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  1.876 s
[INFO] Finished at: 2019-04-09T14:53:00+05:30
[INFO] ------------------------------------------------------------------------

Here is the full debug log while running mvn -X test. This link contains the effective pom.

fluent-kafka-streams-tests-junit4 for JDK 8

Hi all,

My Kafka Streams application is using JDK 8, when I tried to use fluent-kafka-streams-tests-junit4, I got the error

[ERROR] bad class file: /Users/tinnguyen/.m2/repository/com/bakdata/fluent-kafka-streams-tests/fluent-kafka-streams-tests-junit4/2.0.0/fluent-kafka-streams-tests-junit4-2.0.0.jar(com/bakdata/fluent_kafka_streams_tests/junit4/TestTopologyRule.class)
[ERROR] class file has wrong version 55.0, should be 52.0

I know it's caused by the dependency is built for JDK 11. I'm wondering if we have any build for JDK 8 on Maven repo?

How to enable SchemaRegistryMock when using Spring Kafka + @Autowired + manual overriding of Avro Serdes?

Hello, this is not an issue on the library, more of like asking for help because I have a slightly different code structure from the example codes and I can't seem to figure it out..

I am facing an issue wherein I'm using a SpecificAvroSerde that is initiated like so:

// When you want to override serdes explicitly/selectively
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
                                                                 "http://my-schema-registry:8081");
// `Foo` and `Bar` are Java classes generated from Avro schemas
final Serde<Foo> keySpecificAvroSerde = new SpecificAvroSerde<>();
keySpecificAvroSerde.configure(serdeConfig, true); // `true` for record keys
final Serde<Bar> valueSpecificAvroSerde = new SpecificAvroSerde<>();
valueSpecificAvroSerde.configure(serdeConfig, false); // `false` for record values

And I'm using @Autowired on my implementation.

So I think during the startup of my unit test, the autowired bean is being created first, and because of that, it doesn't access the SchemaRegistryMock.

Example Code: My @Autowired Implementation

@Component
public class HourlyLoginCountProcessor {
    private static final String LOGIN_RAW_STREAM_TOPIC_NAME = "LOGIN_RAW_STREAM";
    private static final String LOGIN_REVERSE_GEOCODED_TOPIC_NAME = "LOGIN_REVERSE_GEOCODED";

    @Autowired
    public void buildPipeline(StreamsBuilder streamsBuilder) throws IOException {
        KStream<String, LoginRaw> loginRawStream = streamsBuilder.stream(LOGIN_RAW_STREAM_TOPIC_NAME,
                Consumed.with(Serdes.String(), this.getLoginRawSerde()));

        // groupBy is required when aggregating, in our case, only the Service name is common to all of them
        // so we're putting that as a key of the message
        KGroupedStream<String, LoginRaw> loginRawStreamGroupedByService = loginRawStream.groupBy(
            (key, value) -> value.getService().toString()
        );

        KTable<Windowed<String>, LoginCount> loginCountTable = loginRawStreamGroupedByService
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(3L)))
            .aggregate(
                /* initializer */
                () -> {
                    LoginCount loginCount = new LoginCount();

                    //todo on service
                    loginCount.setService("");
                    loginCount.setLoginCount(0L);
                    loginCount.setWithLocation(0L);

                    // blank placeholder to avoid runtime error
                    loginCount.setStartTime("");
                    loginCount.setEndTime("");

                    return loginCount;
                },
                /* aggregation portion */
                (key, loginRaw, loginCountAggregate) -> {
                    loginCountAggregate.setLoginCount(loginCountAggregate.getLoginCount() + 1);

                    // do an increment only if login data has latitude or longitude values.
                    if (loginRaw.getLatitude() != null) {
                        loginCountAggregate.setWithLocation(loginCountAggregate.getWithLocation() + 1);
                    }

                    String captureRate = this.calculateCaptureRate(loginCountAggregate);
                    loginCountAggregate.setCaptureRate(captureRate);

                    loginCountAggregate.setService(loginRaw.getService());

                    return loginCountAggregate;
                }
            );

        /* adding the Window startTime and endTime to the message */
        loginCountTable
            /* post a new entry ONLY at the end of the Window */
            .suppress(Suppressed.untilWindowCloses(unbounded()))
            .toStream()
            .map((window, loginCount) -> {
                String startTime = LocalDateTime.ofInstant(window.window().startTime(), ZoneId.of("Asia/Manila")).toString();
                String endTime = LocalDateTime.ofInstant(window.window().endTime(), ZoneId.of("Asia/Manila")).toString();

                loginCount.setStartTime(startTime);
                loginCount.setEndTime(endTime);

                return KeyValue.pair(window.key(), loginCount);
            })
            .to(LOGIN_REVERSE_GEOCODED_TOPIC_NAME, Produced.keySerde(new WindowedSerdes.TimeWindowedSerde(Serdes.String())));
    }

    private String calculateCaptureRate(LoginCount aggregate) {
        // omitted for brevity 
    }

    public Serde<LoginRaw> getLoginRawSerde() {
        final Map<String, String> config = new HashMap<>();
        final Serde<LoginRaw> serde = new SpecificAvroSerde<>();

        config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
                this.schemaRegistryUrl); // not working but I just put this here anyway
        serde.configure(config, false);

        return serde;
    }
}

Sample: Unit Test code:

@SpringBootTest
public class HourlyLoginCountProcessorTests {
    private SchemaRegistryMock schemaRegistryMock = new SchemaRegistryMock();

    @Autowired
    HourlyLoginCountProcessor hourlyLoginCountProcessor;

    TestTopology<Object, Object> testTopology = null;

    @BeforeEach
    public void setUp() throws IOException {
        this.schemaRegistryMock.start();

        StreamsBuilder streamsBuilder = new StreamsBuilder();
        this.hourlyLoginCountProcessor.buildPipeline(streamsBuilder);

        Topology topology = streamsBuilder.build();

        this.testTopology = new TestTopology<>(topology, this.getStreamsConfiguration("http://dummy"))
                .withSchemaRegistryMock(this.schemaRegistryMock);

        this.testTopology.start();
    }

    @Test
    void HourlyLoginCountProcessorTest() {
        LoginRaw loginRaw = this.generateLoginRawForSauyo();
        LoginRaw loginRawNullLocation = this.generateLoginRawWithNullLocation();

        // try inserting data into the LOGIN_RAW_STREAM stream / topic
        this.testTopology.input()
                .add("", loginRaw)
                .add("", loginRawNullLocation);

        this.testTopology.input()
                .at(TimeUnit.SECONDS.toMillis(1)).add("", loginRaw)
                .at(TimeUnit.SECONDS.toMillis(2)).add("", loginRawNullLocation);
//                .at(System.currentTimeMillis()).add(dummyInput); // flush KTable

        // see the output of the stream based on input
        ProducerRecord<Object, LoginCount> record =
                this.testTopology.tableOutput().withValueType(LoginCount.class).readOneRecord();

        // should use .toString() method because the field is in CharSequence
        System.out.println("haha");
    }

    private LoginRaw generateLoginRawWithNullLocation() {
        // omitted for brevity
    }

    private LoginRaw generateLoginRawForSauyo() {
        // omitted for brevity
    }

    private Properties getStreamsConfiguration(String url) {
        final Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "TopologyTestDriver");
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "mock:9092");
        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
        // giving any value to SCHEMA_REGISTRY_URL_CONFIG will activate a mocked Schema Registry.
        // actual value is ignored
        streamsConfiguration.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, url);

        return streamsConfiguration;
    }
}

Any help would be greatly appreciated, like an advice on how am I able to refactor my unit test and/or implementation class using @Autowired so this can work together without facing runtime issues on the unit test, thanks so much! ๐Ÿ™๐Ÿป

Support JsonSchema and Protobuf providers on SchemaRegistryMock

Since confluent version 5.5.0, schema registry supports JsonSchema and Protobuf as SchemaProviders to register schemas on it. To do so, when creating a SchemaRegistryClient or MockSchemaRegistryClient, a list of providers can be passed and it will use those ones for the registry.

The problem happens to be that in your TestTopology and SchemaRegistryMock classes, there is no support to send a list of providers to the MockSchemaRegistry you internally use.

Can anyone help me with that?

[schema-registry] Topology with manually created SpecificAvroSerde needs to know the schema-registry URL upfront

I believe Confluent made a change to how SpecificAvroSerde are initialized which now requires the Schema Registry URL to be known while creating the Kafka Streams Topology. Due to the current setup the Mock Schema Registry URL is not yet known at that point and can't be provided.

Details:

It used to be possible to create SpecificAvroSerde as follows:

Serdes.from(SpecificAvro.class) but now you must define them like this:

// When you want to override serdes explicitly/selectively
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
                                                                 "http://my-schema-registry:8081");
// `Foo` and `Bar` are Java classes generated from Avro schemas
final Serde<Foo> keySpecificAvroSerde = new SpecificAvroSerde<>();
keySpecificAvroSerde.configure(serdeConfig, true); // `true` for record keys
final Serde<Bar> valueSpecificAvroSerde = new SpecificAvroSerde<>();
valueSpecificAvroSerde.configure(serdeConfig, false); // `false` for record values

Source: https://docs.confluent.io/current/streams/developer-guide/datatypes.html

This means that if you for, for example, use the Kafka Streams Processor API to create a State Store that works with Specific Avro objects you need to provide the Schema Registry URL when building the topology.

At that time the Mocked Schema Registry is not yet started, and therefore the URL is not known yet.

Is is possbile to somehow expose this URL before creating the topology that is passed to the TestTopology? Or Alternatively: to provide the URL for the SR to the TestTopology upfront?

Not listening on HTTP port. The WireMock server is most likely stopped

I am facing an error while running tests with Schema Registry mock along with JUnit4

alertIsGeneratedWithCorrectTimestampForDeviceCoordinates(com.loconav.IgnitionAlertTest)  Time elapsed: 0.038 sec  <<< ERROR!
java.lang.IllegalStateException: Not listening on HTTP port. The WireMock server is most likely stopped
        at com.google.common.base.Preconditions.checkState(Preconditions.java:508)
        at com.github.tomakehurst.wiremock.WireMockServer.port(WireMockServer.java:177)
        at com.bakdata.schemaregistrymock.SchemaRegistryMock.getUrl(SchemaRegistryMock.java:131)
        at com.loconav.IgnitionAlertTest.setup(IgnitionAlertTest.java:48)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
        at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
        at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
        at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
        at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252)
        at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141)
        at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189)
        at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165)
        at org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85)
        at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115)
        at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75)

alertIsGeneratedWithCorrectTimestampForDeviceCoordinates(com.loconav.IgnitionAlertTest)  Time elapsed: 0.038 sec  <<< ERROR!
java.lang.NullPointerException
        at com.loconav.IgnitionAlertTest.tearDown(IgnitionAlertTest.java:59)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
        at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
        at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
        at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252)
        at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141)
        at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189)
        at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165)
        at org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85)
        at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115)
        at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75)


Results :

Tests in error: 
  alertIsGeneratedWithCorrectTimestampForDeviceCoordinates(com.loconav.IgnitionAlertTest): Not listening on HTTP port. The WireMock server is most likely stopped
  alertIsGeneratedWithCorrectTimestampForDeviceCoordinates(com.loconav.IgnitionAlertTest)

Code looks like below:

public class IgnitionAlertTest {

  private TopologyTestDriver topologyTestDriver;

  @RegisterExtension final SchemaRegistryMock schemaRegistry = new SchemaRegistryMock();

  @Before
  public void setup() {

    // setup test driver
    Properties props = new Properties();
    props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
    props.setProperty(
        StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.topicPrefix(TopicConfig.COMPRESSION_TYPE_CONFIG), "snappy");
    props.setProperty(
        AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistry.getUrl());

    Topology topology =
        IgnitionAlertService.generateTopology(
            props.getProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG));

    topologyTestDriver = new TopologyTestDriver(topology, props);
  }

  @After
  public void tearDown() {
    topologyTestDriver.close();
  }

  @Test
  public void alertIsGeneratedWithCorrectTimestampForDeviceCoordinates() {

    Serializer<String> keySerializer = Topics.DEVICE_COORDINATES.keySerde().serializer();

    Serializer<DeviceLocations> valueSerializer =
        Topics.DEVICE_COORDINATES.valueSerde().serializer();

    long currentTime = System.currentTimeMillis();
    ConsumerRecordFactory<String, DeviceLocations> consumerRecordFactory =
        new ConsumerRecordFactory<>(
            Topics.DEVICE_COORDINATES.name(), keySerializer, valueSerializer, currentTime);

    // ...other data generation code to test
  }
}

Can there be some issue with dependency management as this test is executing in a module that is inheriting testing dependencies fom the parent project.
I tried to run this using Junit 5 and can see logs of running jetty server
but it keeps giving me an error java.lang.NoClassDefFoundError: com/fasterxml/jackson/annotation/JsonMerge while executing test

Version 2.8.0 no longer works with Spring Boot 3x due to the dependency of wiremock still using javax instead of jakarta

This may not be something you can do something about right now, but due to the wiremock dependency this library no longer works with Spring Boot 3.x

java.lang.NoClassDefFoundError: javax/servlet/DispatcherType

	at java.base/java.lang.Class.forName0(Native Method)
	at java.base/java.lang.Class.forName(Class.java:390)
	at java.base/java.lang.Class.forName(Class.java:381)
	at com.github.tomakehurst.wiremock.jetty9.JettyHttpServerFactory.getServerConstructor(JettyHttpServerFactory.java:35)
	at com.github.tomakehurst.wiremock.jetty9.JettyHttpServerFactory.<clinit>(JettyHttpServerFactory.java:30)
	at com.github.tomakehurst.wiremock.core.WireMockConfiguration.<init>(WireMockConfiguration.java:92)
	at com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig(WireMockConfiguration.java:125)
	at com.bakdata.schemaregistrymock.SchemaRegistryMock.<init>(SchemaRegistryMock.java:123)

That is the case even if I override the dependency for wiremock to the latest beta version:
(edit: at this point I didn't realize I had to exclude the jre-8 version that bakdata uses)

<dependency>
	<groupId>com.github.tomakehurst</groupId>
	<artifactId>wiremock</artifactId>
	<version>3.0.0-beta-2</version>
	<scope>test</scope>
</dependency>

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.