Coder Social home page Coder Social logo

gamussa / reactor-ksqldb Goto Github PK

View Code? Open in Web Editor NEW
0.0 3.0 3.0 79 KB

A wrapper around ksqlDB Java Client using Project Reactor https://projectreactor.io/

Home Page: https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-clients/java-client/

License: MIT License

Java 100.00%
kafka ksqldb reactive-streams reactor

reactor-ksqldb's Introduction

Project Reactor wrapper for Reactive ksqDB Java Client

Setting context

TBD - add YT versions

Getting binaries

Note
While library is 0.1-SNAPSHOT binaries hosted in jitpack
project {
    repositories {
      jcenter()
      // and other repositories

      // jitpack repo for github artifacts
      maven { url 'https://jitpack.io' }

      // temp repositories for ksqlDB artifacts
      maven { url "https://ksqldb-maven.s3.amazonaws.com/maven/" }
      maven { url "https://jenkins-confluent-packages-beta-maven.s3.amazonaws.com/6.1.0-beta200825192044/1/maven/" }
  }

  dependencies {
      // ksqlDB Reactor Client
      implementation 'com.github.gAmUssA:reactor-ksqldb:master-SNAPSHOT'

      // exclude some logger lib
      implementation("io.confluent.ksql:ksqldb-api-client:0.12.0") {
        exclude group: 'org.slf4j', module: 'slf4j-log4j12'
      }
  }
}

License

See LICENSE

Copyright (c) 2020 Viktor Gamov

reactor-ksqldb's People

Contributors

gamussa avatar

Watchers

 avatar  avatar  avatar

reactor-ksqldb's Issues

EventDrivenMicroservice crashed and blocked

Here what I did. (the source code is here : https://github.com/survivant/reactor-ksqldb/tree/test )

in a terminal :

docker-compose up

in IntelliJ, I run the application : EventDrivenMicroservice

in the console of IntelliJ, I see those errors

> Task :EventDrivenMicroservice.main()
2021-04-12 13:39:33 [INFO] (EventDrivenMicroservice.java:33) - CLEANUP PREVIOUS DATA
2021-04-12 13:39:37 [INFO] (EventDrivenMicroservice.java:44) - CLEANUP DONE
2021-04-12 13:39:37 [INFO] (EventDrivenMicroservice.java:46) - CREATE STREAM
2021-04-12 13:39:37 [INFO] (EventDrivenMicroservice.java:53) - INSERT TRANSACTION DATA
2021-04-12 13:39:37 [INFO] (EventDrivenMicroservice.java:60) - KSQL Select
2021-04-12 13:39:37 [INFO] (EventDrivenMicroservice.java:68) - DONE
2021-04-12 13:39:37 [ERROR] (EventDrivenMicroservice.java:58) - Can't create stream
io.confluent.ksql.api.client.exception.KsqlClientException: Received 400 response from server: Cannot insert values into an unknown stream: `TRANSACTIONS`. Error code: 40001
	at io.confluent.ksql.api.client.impl.ClientImpl.lambda$handleErrorResponse$17(ClientImpl.java:452)
	at io.vertx.core.http.impl.HttpClientResponseImpl$BodyHandler.notifyHandler(HttpClientResponseImpl.java:292)
	at io.vertx.core.http.impl.HttpClientResponseImpl.lambda$bodyHandler$0(HttpClientResponseImpl.java:193)
	at io.vertx.core.http.impl.HttpClientResponseImpl.handleEnd(HttpClientResponseImpl.java:248)
	at io.vertx.core.http.impl.Http2ClientConnection$Http2ClientStream.handleEnd(Http2ClientConnection.java:260)
	at io.vertx.core.http.impl.VertxHttp2Stream.lambda$new$1(VertxHttp2Stream.java:68)
	at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:237)
	at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:127)
	at io.vertx.core.http.impl.VertxHttp2Stream.onEnd(VertxHttp2Stream.java:107)
	at io.vertx.core.http.impl.VertxHttp2Stream.onEnd(VertxHttp2Stream.java:100)
	at io.vertx.core.http.impl.Http2ConnectionBase.lambda$onDataRead$10(Http2ConnectionBase.java:333)
	at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:366)
	at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43)
	at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:229)
	at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:221)
	at io.vertx.core.http.impl.Http2ConnectionBase.onDataRead(Http2ConnectionBase.java:333)
	at io.netty.handler.codec.http2.Http2FrameListenerDecorator.onDataRead(Http2FrameListenerDecorator.java:36)
	at io.netty.handler.codec.http2.Http2EmptyDataFrameListener.onDataRead(Http2EmptyDataFrameListener.java:49)
	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onDataRead(DefaultHttp2ConnectionDecoder.java:292)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readDataFrame(DefaultHttp2FrameReader.java:422)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:251)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:160)
	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:174)
	at io.netty.handler.codec.http2.DecoratingHttp2ConnectionDecoder.decodeFrame(DecoratingHttp2ConnectionDecoder.java:63)
	at io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
	at io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	at io.vertx.core.http.impl.VertxHttp2ConnectionHandler.channelRead(VertxHttp2ConnectionHandler.java:418)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:834)
2021-04-12 13:39:37 [ERROR] (EventDrivenMicroservice.java:66) - Push query request failed: io.confluent.ksql.api.client.exception.KsqlClientException: Received 400 response from server: Exception while preparing statement: POSSIBLE_ANOMALIES does not exist.
Statement: SELECT * FROM possible_anomalies EMIT CHANGES;. Error code: 40001

fews errors in the logs but the tests passed

I'm not sure if the tests are really OK.

I have errors like that

2021-04-12 08:25:11 [ERROR] (ReactorClient.java:125) - Insert failed
io.confluent.ksql.api.client.exception.KsqlClientException: Received 400 response from server: Cannot insert values into an unknown stream: `CHEESE_SHIPMENTS`. Error code: 40001

> Task :compileJava UP-TO-DATE
> Task :processResources UP-TO-DATE
> Task :classes UP-TO-DATE
> Task :compileTestJava UP-TO-DATE
> Task :processTestResources NO-SOURCE
> Task :testClasses UP-TO-DATE
> Task :test
2021-04-12 08:24:28 [INFO] (DockerClientProviderStrategy.java:136) - Loaded org.testcontainers.dockerclient.NpipeSocketClientProviderStrategy from ~/.testcontainers.properties, will try it first
2021-04-12 08:24:29 [INFO] (DockerClientProviderStrategy.java:158) - Found Docker environment with local Npipe socket (npipe:////./pipe/docker_engine)
2021-04-12 08:24:29 [INFO] (DockerClientFactory.java:180) - Docker host IP address is localhost
2021-04-12 08:24:29 [INFO] (DockerClientFactory.java:192) - Connected to docker: 
  Server Version: 20.10.5
  API Version: 1.41
  Operating System: Docker Desktop
  Total Memory: 25187 MB
2021-04-12 08:24:29 [INFO] (ImageNameSubstitutor.java:50) - Image name substitution will be performed by: DefaultImageNameSubstitutor (composite of 'ConfigurationFileImageNameSubstitutor' and 'PrefixingImageNameSubstitutor')
2021-04-12 08:24:31 [INFO] (DockerClientFactory.java:210) - Ryuk started - will monitor and terminate Testcontainers containers on JVM exit
2021-04-12 08:24:31 [INFO] (DockerClientFactory.java:221) - Checking the system...
2021-04-12 08:24:31 [INFO] (DockerClientFactory.java:295) - ?? Docker server version should be at least 1.6.0
2021-04-12 08:24:31 [INFO] (DockerClientFactory.java:295) - ?? Docker environment should have more than 2GB free disk space
2021-04-12 08:24:31 [INFO] (GenericContainer.java:357) - Creating container for image: confluentinc/cp-kafka:5.5.1
2021-04-12 08:24:31 [INFO] (GenericContainer.java:357) - Creating container for image: confluentinc/cp-schema-registry:5.5.1
2021-04-12 08:24:31 [WARN] (GenericContainer.java:390) - Reuse was requested but the environment does not support the reuse of containers
To enable reuse of containers, you must set 'testcontainers.reuse.enable=true' in a file located at C:\Users\sd003526\.testcontainers.properties
2021-04-12 08:24:31 [INFO] (GenericContainer.java:418) - Starting container with ID: cc3f467533e4e1ba4677cca5ab177edd9e1f865007b712cd8a6b88343d6fed80
2021-04-12 08:24:31 [INFO] (GenericContainer.java:418) - Starting container with ID: 6649f243ec77efed8dad4670bffb3191b1071f089e04e02d115857a14b61358c
2021-04-12 08:24:31 [INFO] (GenericContainer.java:422) - Container confluentinc/cp-schema-registry:5.5.1 is starting: cc3f467533e4e1ba4677cca5ab177edd9e1f865007b712cd8a6b88343d6fed80
2021-04-12 08:24:32 [INFO] (GenericContainer.java:422) - Container confluentinc/cp-kafka:5.5.1 is starting: 6649f243ec77efed8dad4670bffb3191b1071f089e04e02d115857a14b61358c
2021-04-12 08:24:41 [INFO] (GenericContainer.java:475) - Container confluentinc/cp-kafka:5.5.1 started in PT10.2928901S
2021-04-12 08:24:41 [INFO] (GenericContainer.java:357) - Creating container for image: confluentinc/ksqldb-server:0.12.0
2021-04-12 08:24:41 [INFO] (GenericContainer.java:418) - Starting container with ID: 0272026bfc8f54b44f14629e9405d13a06d2ce9d054a561ba9190cc3b164819d
2021-04-12 08:24:42 [INFO] (GenericContainer.java:422) - Container confluentinc/ksqldb-server:0.12.0 is starting: 0272026bfc8f54b44f14629e9405d13a06d2ce9d054a561ba9190cc3b164819d
2021-04-12 08:24:51 [INFO] (GenericContainer.java:475) - Container confluentinc/cp-schema-registry:5.5.1 started in PT20.0270866S
2021-04-12 08:24:57 [INFO] (GenericContainer.java:475) - Container confluentinc/ksqldb-server:0.12.0 started in PT16.5095082S
2021-04-12 08:25:11 [INFO] (Loggers.java:274) - onSubscribe(SerializedSubscriber)
2021-04-12 08:25:11 [INFO] (Loggers.java:274) - request(unbounded)
2021-04-12 08:25:11 [INFO] (Loggers.java:274) - | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2021-04-12 08:25:11 [INFO] (Loggers.java:274) - | request(unbounded)
2021-04-12 08:25:11 [ERROR] (ReactorClient.java:125) - Insert failed
io.confluent.ksql.api.client.exception.KsqlClientException: Received 400 response from server: Cannot insert values into an unknown stream: `CHEESE_SHIPMENTS`. Error code: 40001
	at io.confluent.ksql.api.client.impl.ClientImpl.lambda$handleErrorResponse$17(ClientImpl.java:452)
	at io.vertx.core.http.impl.HttpClientResponseImpl$BodyHandler.notifyHandler(HttpClientResponseImpl.java:292)
	at io.vertx.core.http.impl.HttpClientResponseImpl.lambda$bodyHandler$0(HttpClientResponseImpl.java:193)
	at io.vertx.core.http.impl.HttpClientResponseImpl.handleEnd(HttpClientResponseImpl.java:248)
	at io.vertx.core.http.impl.Http2ClientConnection$Http2ClientStream.handleEnd(Http2ClientConnection.java:260)
	at io.vertx.core.http.impl.VertxHttp2Stream.lambda$new$1(VertxHttp2Stream.java:68)
	at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:237)
	at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:127)
	at io.vertx.core.http.impl.VertxHttp2Stream.onEnd(VertxHttp2Stream.java:107)
	at io.vertx.core.http.impl.VertxHttp2Stream.onEnd(VertxHttp2Stream.java:100)
	at io.vertx.core.http.impl.Http2ConnectionBase.lambda$onDataRead$10(Http2ConnectionBase.java:333)
	at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:366)
	at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43)
	at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:229)
	at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:221)
	at io.vertx.core.http.impl.Http2ConnectionBase.onDataRead(Http2ConnectionBase.java:333)
	at io.netty.handler.codec.http2.Http2FrameListenerDecorator.onDataRead(Http2FrameListenerDecorator.java:36)
	at io.netty.handler.codec.http2.Http2EmptyDataFrameListener.onDataRead(Http2EmptyDataFrameListener.java:49)
	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onDataRead(DefaultHttp2ConnectionDecoder.java:292)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readDataFrame(DefaultHttp2FrameReader.java:422)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:251)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:160)
	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:174)
	at io.netty.handler.codec.http2.DecoratingHttp2ConnectionDecoder.decodeFrame(DecoratingHttp2ConnectionDecoder.java:63)
	at io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
	at io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	at io.vertx.core.http.impl.VertxHttp2ConnectionHandler.channelRead(VertxHttp2ConnectionHandler.java:418)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:834)
2021-04-12 08:25:11 [ERROR] (Loggers.java:314) - | onError(io.confluent.ksql.api.client.exception.KsqlClientException: Received 400 response from server: Cannot insert values into an unknown stream: `CHEESE_SHIPMENTS`. Error code: 40001)
2021-04-12 08:25:11 [ERROR] (Loggers.java:319) - 
io.confluent.ksql.api.client.exception.KsqlClientException: Received 400 response from server: Cannot insert values into an unknown stream: `CHEESE_SHIPMENTS`. Error code: 40001
	at io.confluent.ksql.api.client.impl.ClientImpl.lambda$handleErrorResponse$17(ClientImpl.java:452)
	at io.vertx.core.http.impl.HttpClientResponseImpl$BodyHandler.notifyHandler(HttpClientResponseImpl.java:292)
	at io.vertx.core.http.impl.HttpClientResponseImpl.lambda$bodyHandler$0(HttpClientResponseImpl.java:193)
	at io.vertx.core.http.impl.HttpClientResponseImpl.handleEnd(HttpClientResponseImpl.java:248)
	at io.vertx.core.http.impl.Http2ClientConnection$Http2ClientStream.handleEnd(Http2ClientConnection.java:260)
	at io.vertx.core.http.impl.VertxHttp2Stream.lambda$new$1(VertxHttp2Stream.java:68)
	at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:237)
	at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:127)
	at io.vertx.core.http.impl.VertxHttp2Stream.onEnd(VertxHttp2Stream.java:107)
	at io.vertx.core.http.impl.VertxHttp2Stream.onEnd(VertxHttp2Stream.java:100)
	at io.vertx.core.http.impl.Http2ConnectionBase.lambda$onDataRead$10(Http2ConnectionBase.java:333)
	at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:366)
	at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43)
	at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:229)
	at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:221)
	at io.vertx.core.http.impl.Http2ConnectionBase.onDataRead(Http2ConnectionBase.java:333)
	at io.netty.handler.codec.http2.Http2FrameListenerDecorator.onDataRead(Http2FrameListenerDecorator.java:36)
	at io.netty.handler.codec.http2.Http2EmptyDataFrameListener.onDataRead(Http2EmptyDataFrameListener.java:49)
	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onDataRead(DefaultHttp2ConnectionDecoder.java:292)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readDataFrame(DefaultHttp2FrameReader.java:422)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:251)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:160)
	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:174)
	at io.netty.handler.codec.http2.DecoratingHttp2ConnectionDecoder.decodeFrame(DecoratingHttp2ConnectionDecoder.java:63)
	at io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
	at io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	at io.vertx.core.http.impl.VertxHttp2ConnectionHandler.channelRead(VertxHttp2ConnectionHandler.java:418)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:834)
2021-04-12 08:25:11 [INFO] (Loggers.java:274) - | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2021-04-12 08:25:11 [INFO] (Loggers.java:274) - | request(unbounded)
2021-04-12 08:25:11 [INFO] (Loggers.java:274) - | cancel()
2021-04-12 08:25:11 [INFO] (Loggers.java:274) - onComplete()
2021-04-12 08:25:13 [ERROR] (ReactorClient.java:125) - Insert failed
io.confluent.ksql.api.client.exception.KsqlClientException: Received error from /inserts-stream. Error code: 40000. Message: Can't coerce a field of type class java.lang.String (bad shipment id) into type INTEGER
	at io.confluent.ksql.api.client.impl.InsertIntoResponseHandler.doHandleBodyBuffer(InsertIntoResponseHandler.java:41)
	at io.confluent.ksql.api.client.impl.ResponseHandler.handleBodyBuffer(ResponseHandler.java:39)
	at io.confluent.ksql.api.client.impl.InsertIntoResponseHandler.handleBodyBuffer(InsertIntoResponseHandler.java:25)
	at io.vertx.core.parsetools.impl.RecordParserImpl.handleParsing(RecordParserImpl.java:214)
	at io.vertx.core.parsetools.impl.RecordParserImpl.handle(RecordParserImpl.java:285)
	at io.vertx.core.parsetools.impl.RecordParserImpl.handle(RecordParserImpl.java:27)
	at io.vertx.core.http.impl.HttpClientResponseImpl.handleChunk(HttpClientResponseImpl.java:232)
	at io.vertx.core.http.impl.Http2ClientConnection$Http2ClientStream.handleData(Http2ClientConnection.java:265)
	at io.vertx.core.http.impl.VertxHttp2Stream.lambda$new$1(VertxHttp2Stream.java:72)
	at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:237)
	at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:127)
	at io.vertx.core.http.impl.VertxHttp2Stream.onDataRead(VertxHttp2Stream.java:85)
	at io.vertx.core.http.impl.Http2ConnectionBase.lambda$onDataRead$9(Http2ConnectionBase.java:328)
	at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:366)
	at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43)
	at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:229)
	at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:221)
	at io.vertx.core.http.impl.Http2ConnectionBase.onDataRead(Http2ConnectionBase.java:326)
	at io.netty.handler.codec.http2.Http2FrameListenerDecorator.onDataRead(Http2FrameListenerDecorator.java:36)
	at io.netty.handler.codec.http2.Http2EmptyDataFrameListener.onDataRead(Http2EmptyDataFrameListener.java:49)
	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onDataRead(DefaultHttp2ConnectionDecoder.java:292)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readDataFrame(DefaultHttp2FrameReader.java:422)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:251)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:160)
	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:174)
	at io.netty.handler.codec.http2.DecoratingHttp2ConnectionDecoder.decodeFrame(DecoratingHttp2ConnectionDecoder.java:63)
	at io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
	at io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	at io.vertx.core.http.impl.VertxHttp2ConnectionHandler.channelRead(VertxHttp2ConnectionHandler.java:418)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:834)
2021-04-12 08:25:13 [INFO] (ReactorClient.java:65) - Result column names: [SHIPMENTID, CHEESE, SHIPMENTTIMESTAMP]
BUILD SUCCESSFUL in 48s
4 actionable tasks: 1 executed, 3 up-to-date
08:25:14: Task execution finished ':test --tests "io.confluent.developer.ksqldb.reactor.ReactorClientTest"'.

enhancement : be able to consume message from topics

I create 2 streams orders, returns with topics that have the same names.

if I do pull request like :

List<Row> rows = reactorClient.executeQueryFromBeginning("select * from orders;").block();
rows.forEach(System.out::println);

I receive this error message :

io.confluent.ksql.api.client.exception.KsqlClientException: Received 400 response from server: Can't pull from `ORDERS` as it's not a materialized table. See https://cnfl.io/queries for more info.
Add EMIT CHANGES if you intended to issue a push query.
Statement: select * from orders;. Error code: 40001

I was expecting to receive all the items that are in the topic : orders.

I could create a table to use the stream orders but at the end, I think it shouldn't be mandatory to query a table.

there is a way from ksqldb api-client to retrieve the messages from a topic with a offset (in my case it's from the beginning)?

how post Pojo objects into streams ?

I have a POC in Spring and Quarkus applications that I want to migrate to ksqldb instead of plain subcriber/publisher.

I'll create a new POC and I want to use this project to publish new events into the stream.

In SpringBoot I was using : KafkaTemplate to publish my pojo.
in Quarkus I was using : Emitter emitter;

my Pojo

public class Movie {

    public String title;
    public int year;

    @Override
    public String toString() {
        return "Movie{" +
                "title='" + title + '\'' +
                ", year=" + year +
                '}';
    }
}

In your project you have those methods that are using KsqlObject

public Flux<InsertAck> streamInserts(String streamName, Publisher<KsqlObject> insertsPublisher) {
    return fromFuture(() -> this.ksqlDbClient.streamInserts(streamName, insertsPublisher))
        .flatMapMany(acksPublisher -> acksPublisher);
  }
....
  public Mono<Void> insertInto(String streamName, KsqlObject row) {
    return fromFuture(() -> this.ksqlDbClient.insertInto(streamName, row))
        .doOnError(throwable -> log.error("Insert failed", throwable));
  }

For my POC, I need to have a materialized vue so ksqldb is perfect for that, but I also need to publish and subcribe to topics/streams and I'm not sure if I could use ksqldb-api-client.

ksqldb docker image

why the image is the tests and docker-compose are this

confluentinc/ksqldb-server

instead of

confluentinc/cp-ksqldb-server

the same probably apply to this

image: confluentinc/ksqldb-cli:0.15.0

enhancement : create stream from Pojo

@gAmUssA

I have java model objects (pojo) that I will send/receive in Kafka. We could create the steams form the pojo structure, it's dynamic...

ex :

@Data
@Builder
public class Orders {
    // KEY : UUID.toString()
    String orderId;
    String product;
    String orderTimestamp;
    
    // CREATED, SENT, RECEIVED, RETURNED 
    String status;
}

@Data
@Builder
public class Returns {
    // KEY : UUID.toString()
    String returnId;
    String orderId;
    String returnTimestamp;
}

instead of

    String ORDER_TOPIC = "orders";
    String RETURN_TOPIC = "returns";
    
    String CREATE_STREAM_ORDERS = String.format("CREATE STREAM %s (orderId VARCHAR KEY, product VARCHAR, orderTimestamp VARCHAR, status VARCHAR) "
                                  + " WITH (kafka_topic='%s', partitions=1, value_format='json');", ORDER_TOPIC, ORDER_TOPIC);

    String CREATE_STREAM_RETURNS = String.format("CREATE STREAM %s (returnId VARCHAR KEY, orderId VARCHAR, returnTimestamp VARCHAR) "
                                   + " WITH (kafka_topic='%s', partitions=1, value_format='json');", RETURN_TOPIC, RETURN_TOPIC);

    reactorClient
              .executeStatement(CREATE_STREAM_ORDERS)
              .then(reactorClient.listStreams())
              .block();
  
    reactorClient
              .executeStatement(CREATE_STREAM_RETURNS)
              .then(reactorClient.listStreams())
              .block();

I could have something like this


.createStream(stream name, pojo class, key to used in the pojo, properties... defaults could be : stream name, 1, json )

reactorClient
            .createStream(ORDER_TOPIC, Orders.class, "orderId", of("kafka_topic", ORDER_TOPIC,
                                                            "partitions", 1,
                                                            "value_format", "json"))
            .then(reactorClient.listStreams())
            .block();

    reactorClient
            .createStream(RETURN_TOPIC, Returns.class, "orderId", of("kafka_topic", RETURN_TOPIC,
                                                                    "partitions", 1,
                                                                    "value_format", "json"))
            .then(reactorClient.listStreams())
            .block();

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.