linkedin / brooklin Goto Github PK
View Code? Open in Web Editor NEWAn extensible distributed system for reliable nearline data streaming at scale
License: BSD 2-Clause "Simplified" License
An extensible distributed system for reliable nearline data streaming at scale
License: BSD 2-Clause "Simplified" License
Need to understand what will happen when trying to delete the datastream and recreate the same datastream.
Does it try to reprocess all the messages which already been replicated?
Since the offset of the group was maintained in the brooklin Kafka... will it resume the replicate only remaining messages?
After the source partition data is sent to the destination partition, it is distributed randomly. I expect that the source partition data of no. 1 will be sent to no. 1 partition of no. 1 destination
I deploy Brooklin with Kubernetes to migrate data from one Kafka cluster to another.
For some reason the container dies and restart during the migration. At the end of the migration I noticed that there are more messages in the target cluster than there are in the source cluster. Most of the messages are duplicated.
I’d like to know how can I ensure that no message will be duplicated in the same data stream, even when the pod restarts after failure.
The datastream :
${BROOKLIN_HOME}/bin/brooklin-rest-client.sh -o CREATE -u http://localhost:32311/ -n test-mirroring-stream -s "kafkassl://kafka.kafka-non-prod:9092/.*” -c kafkaMirroringC -t kafkaTP -m '{"owner":"test-user","system.reuseExistingDestination":"false"}'
Config :
############################# Server Basics #############################
brooklin.server.coordinator.cluster=brooklin-cluster
brooklin.server.coordinator.zkAddress=zookeeper:2181/brooklin
brooklin.server.httpPort=${HTTP_PORT}
brooklin.server.connectorNames=kafkaC,kafkaMirroringC
brooklin.server.transportProviderNames=kafkaTP
brooklin.server.csvMetricsDir=/tmp/brooklin-example/
########################### Kafka connector Configs ######################
brooklin.server.connector.kafkaC.factoryClassName=com.linkedin.datastream.connectors.kafka.KafkaConnectorFactory
brooklin.server.connector.kafkaC.assignmentStrategyFactory=com.linkedin.datastream.server.assignment.BroadcastStrategyFactory
brooklin.server.connector.kafkaC.assignmentStrategyFactory=com.linkedin.datastream.server.assignment.BroadcastStrategyFactory
brooklin.server.connector.kafkaC.consummer.enable.auto.commit=false
brooklin.server.connector.kafkaC.consummer.auto.offset.reset=latest
brooklin.server.connector.kafkaC.retryCount=2147483647
brooklin.server.connector.kafkaC.retrySleepDurationMs=2500
########################### Kafka Mirroring connector Configs ###################### (Source properties)
brooklin.server.connector.kafkaMirroringC.factoryClassName=com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorFactory
brooklin.server.connector.kafkaMirroringC.assignmentStrategyFactory=com.linkedin.datastream.server.assignment.BroadcastStrategyFactory
brooklin.server.connector.kafkaMirroringC.isFlushlessModeEnabled=true
brooklin.server.connector.kafkaMirroringC.flowControlEnabled=true
brooklin.server.connector.kafkaMirroringC.maxInFlightMessagesThreshold=1
brooklin.server.connector.kafkaMirroringC.minInFlightMessagesThreshold=1
########################### Kafka transport provider configs ###################### (Target properties)
brooklin.server.transportProvider.kafkaTP.factoryClassName=com.linkedin.datastream.kafka.KafkaTransportProviderAdminFactory
brooklin.server.transportProvider.kafkaTP.bootstrap.servers=kafka-headless.da-saas-rss-env-region.svc.cluster.local:9092
brooklin.server.transportProvider.kafkaTP.zookeeper.connect=zookeeper:2181/kafka
brooklin.server.transportProvider.kafkaTP.client.id=datastream-producer
brooklin.server.transportProvider.kafkaTP.retries=2147483647
brooklin.server.transportProvider.kafkaTP.acks=all
...
The datastream should resume without duplicating any message that has already been copied.
The datastream doesn’t resume, most of the messages are duplicated.
There is a corner case where two datastreams could share a task prefix & consumer group despite expecting to output to two different topics.
Tell us how to reproduce this issue.
Create datastream name "stream-1".
Create datastream name "dedupe-1". (dedupe-1 will now contain taskPrefix 'stream-1')
Delete "stream-1"
Recreate "stream-1" with metadata "system.reuseExistingDestination=false".
I would expect both streams to have different taskPrefixes (maybe through some uuid if system.reuseExistingDestination is present), different destination topics, and different consumer groups.
Both datastreams will still have taskPrefix "stream-1" despite the fact that with system.reuseExistingDestination=false both streams now have completely different destination topics.
The GroupIdConstructor logic will then assign the same consumer group to both streams and thus only one will actually get data.
Brooklin 5.x cannot connect to zk when starting, but version 4.x can connect.
1.starting serviec:
./bin/brooklin-server-start.sh config/server.properties
report an error:
[2023-11-01 18:25:30,609] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=com.linkedin.datastream.common.zk.ZkClient@20322d26 (org.apache.zookeeper.ZooKeeper)
[2023-11-01 18:25:30,611] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util)
[2023-11-01 18:25:30,614] INFO jute.maxbuffer value is 1048575 Bytes (org.apache.zookeeper.ClientCnxnSocket)
[2023-11-01 18:25:30,618] INFO zookeeper.request.timeout value is 0. feature enabled=false (org.apache.zookeeper.ClientCnxn)
[2023-11-01 18:25:30,622] INFO Opening socket connection to server localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn)
[2023-11-01 18:25:30,622] INFO SASL config status: Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2023-11-01 18:25:30,627] INFO Socket connection established, initiating session, client: /127.0.0.1:61573, server: localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn)
[2023-11-01 18:25:30,631] INFO Session establishment complete on server localhost/127.0.0.1:2181, session id = 0x100152bc54b0005, negotiated timeout = 30000 (org.apache.zookeeper.ClientCnxn)
[2023-11-01 18:25:30,632] INFO zkclient 0, zookeeper state changed ( SyncConnected ) (org.apache.helix.zookeeper.zkclient.ZkClient)
[2023-11-01 18:25:30,638] INFO zkclient 0, sycnOnNewSession with sessionID 100152bc54b0005 async return code: OK and proceeds (org.apache.helix.zookeeper.zkclient.ZkClient)
[2023-11-01 18:25:30,638] INFO Pagination config zk.getChildren.pagination.disabled=false, method to be invoked: getAllChildrenPaginated (org.apache.helix.zookeeper.zkclient.ZkConnection)
[2023-11-01 18:25:30,641] WARN Session 0x100152bc54b0005 for sever localhost/127.0.0.1:2181, Closing socket connection. Attempting reconnect except it is a SessionExpiredException. (org.apache.zookeeper.ClientCnxn)
EndOfStreamException: Unable to read additional data from server sessionid 0x100152bc54b0005, likely server has closed socket
at org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:77)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:350)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1290)
[2023-11-01 18:25:30,643] WARN Paginated getChildren is unimplemented in ZK server! Falling back to non-paginated getChildren (org.apache.helix.zookeeper.zkclient.ZkConnection)
Exception in thread "main" org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException: Operation cannot be retried because of retry timeout (-1 milli seconds). Retry was caused by CONNECTIONLOSS
at org.apache.helix.zookeeper.zkclient.ZkClient.retryUntilConnected(ZkClient.java:1700)
at org.apache.helix.zookeeper.zkclient.ZkClient.getChildren(ZkClient.java:1037)
at com.linkedin.datastream.common.zk.ZkClient.getChildren(ZkClient.java:96)
at com.linkedin.datastream.server.CachedDatastreamReader.fetchAllDatastreamNamesFromZk(CachedDatastreamReader.java:190)
at com.linkedin.datastream.server.CachedDatastreamReader.(CachedDatastreamReader.java:59)
at com.linkedin.datastream.server.DatastreamServer.(DatastreamServer.java:159)
at com.linkedin.datastream.server.DatastreamServer.main(DatastreamServer.java:441)
2.zk error log:
2023-11-01 18:25:30,639 [myid:] - WARN [RequestThrottler:ZooKeeperServer@1145] - Received packet at server of unknown type 71
2023-11-01 18:26:07,095 [myid:] - INFO [SessionTracker:ZooKeeperServer@610] - Expiring session 0x100152bc54b0005, timeout of 30000ms exceeded
brooklin and zk reported an error and could not connect
brooklin needs to be able to connect to zk normally
I'm trying to stream a topic with the above cleanup policy and I get this weird exception
Nothing is streamed and I get this exceptions in the log (full log is attached):
[2021-03-09 14:02:36,965] INFO ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [ita-kafka01:9092, ita-kafka02:9092, ita-kafka03:9092]
buffer.memory = 33554432
client.id = datastream-producer
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
(org.apache.kafka.clients.producer.ProducerConfig)
[2021-03-09 14:02:37,362] WARN The configuration 'producer.acks' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
[2021-03-09 14:02:37,363] WARN The configuration 'zookeeper.connect' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
[2021-03-09 14:02:37,363] WARN The configuration 'producer.max.block.ms' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
[2021-03-09 14:02:37,363] WARN The configuration 'producer.max.in.flight.requests.per.connection' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
[2021-03-09 14:02:37,363] WARN The configuration 'factoryClassName' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
[2021-03-09 14:02:37,363] INFO Kafka version : 2.0.1 (org.apache.kafka.common.utils.AppInfoParser)
[2021-03-09 14:02:37,363] INFO Kafka commitId : fa14705e51bd2ce5 (org.apache.kafka.common.utils.AppInfoParser)
[2021-03-09 14:02:37,366] INFO Cluster ID: YdnuOPmjQ4KavUKJgY4aZw (org.apache.kafka.clients.Metadata)
[2021-03-09 14:02:37,375] WARN Unexpected error code: 87. (org.apache.kafka.common.protocol.Errors)
[2021-03-09 14:02:37,375] WARN sent failure, restart producer, exception: (class com.linkedin.datastream.kafka.KafkaTransportProvider:kafkaMirroringConnector:0)
org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
[2021-03-09 14:02:37,376] INFO [Producer clientId=datastream-producer] Closing the Kafka producer with timeoutMillis = 2000 ms. (org.apache.kafka.clients.producer.KafkaProducer)
[2021-03-09 14:02:37,376] WARN [Producer clientId=datastream-producer] Overriding close timeout 2000 ms to 0 ms in order to prevent useless blocking due to self-join. This means you have incorrectly invoked close with a non-zero timeout from the producer call-back. (org.apache.kafka.clients.producer.KafkaProducer)
[2021-03-09 14:02:37,376] INFO [Producer clientId=datastream-producer] Proceeding to force close the producer since pending requests could not be completed within timeout 2000 ms. (org.apache.kafka.clients.producer.KafkaProducer)
[2021-03-09 14:02:37,376] ERROR Sending a message with source checkpoint testcompact/0/1 to topic testcompact partition 0 for datastream task second-mirroring-stream_f06ad4ca-1620-4035-9186-006f3abd18d1(kafkaMirroringConnector), partitionsV2=, partitions=[0], dependencies=[] threw an exception. (KafkaTransportProvider)
com.linkedin.datastream.common.DatastreamRuntimeException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
at com.linkedin.datastream.kafka.KafkaProducerWrapper.generateSendFailure(KafkaProducerWrapper.java:251)
at com.linkedin.datastream.kafka.KafkaProducerWrapper.lambda$null$0(KafkaProducerWrapper.java:198)
at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1235)
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:635)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:604)
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:561)
at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:485)
at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:700)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:532)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:524)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
[2021-03-09 14:02:37,377] WARN Detect exception being thrown from callback for src partition: testcompact-0 while sending, metadata: Checkpoint: testcompact/0/1, Topic: testcompact, Partition: 0 , exception: (com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorTask)
com.linkedin.datastream.server.api.transport.SendFailedException: com.linkedin.datastream.common.DatastreamRuntimeException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
at com.linkedin.datastream.server.EventProducer.onSendCallback(EventProducer.java:293)
at com.linkedin.datastream.server.EventProducer.lambda$send$0(EventProducer.java:194)
at com.linkedin.datastream.kafka.KafkaTransportProvider.doOnSendCallback(KafkaTransportProvider.java:189)
at com.linkedin.datastream.kafka.KafkaTransportProvider.lambda$send$0(KafkaTransportProvider.java:155)
at com.linkedin.datastream.kafka.KafkaProducerWrapper.lambda$null$0(KafkaProducerWrapper.java:198)
at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1235)
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:635)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:604)
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:561)
at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:485)
at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:700)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:532)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:524)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.linkedin.datastream.common.DatastreamRuntimeException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
at com.linkedin.datastream.kafka.KafkaProducerWrapper.generateSendFailure(KafkaProducerWrapper.java:251)
... 16 more
Caused by: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
[2021-03-09 14:02:37,378] ERROR updateErrorRate with 1. Look for error logs right before this message to see what happened (com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorTask)
[2021-03-09 14:02:37,378] INFO Trying to seek to previous checkpoint for partitions: [testcompact-0] (com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorTask)
[2021-03-09 14:02:37,378] ERROR Partition rewind failed due to (com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorTask)
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2215)
at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2199)
at org.apache.kafka.clients.consumer.KafkaConsumer.committed(KafkaConsumer.java:1701)
at org.apache.kafka.clients.consumer.KafkaConsumer.committed(KafkaConsumer.java:1676)
at com.linkedin.datastream.connectors.kafka.AbstractKafkaBasedConnectorTask.lambda$seekToLastCheckpoint$4(AbstractKafkaBasedConnectorTask.java:605)
at java.util.Collections$SingletonSet.forEach(Collections.java:4769)
at com.linkedin.datastream.connectors.kafka.AbstractKafkaBasedConnectorTask.seekToLastCheckpoint(AbstractKafkaBasedConnectorTask.java:604)
at com.linkedin.datastream.connectors.kafka.AbstractKafkaBasedConnectorTask.rewindAndPausePartitionOnException(AbstractKafkaBasedConnectorTask.java:251)
at com.linkedin.datastream.connectors.kafka.AbstractKafkaBasedConnectorTask.lambda$sendDatastreamProducerRecord$1(AbstractKafkaBasedConnectorTask.java:283)
at com.linkedin.datastream.server.EventProducer.onSendCallback(EventProducer.java:303)
at com.linkedin.datastream.server.EventProducer.lambda$send$0(EventProducer.java:194)
at com.linkedin.datastream.kafka.KafkaTransportProvider.doOnSendCallback(KafkaTransportProvider.java:189)
at com.linkedin.datastream.kafka.KafkaTransportProvider.lambda$send$0(KafkaTransportProvider.java:155)
at com.linkedin.datastream.kafka.KafkaProducerWrapper.lambda$null$0(KafkaProducerWrapper.java:198)
at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1235)
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:635)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:604)
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:561)
at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:485)
at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:700)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:532)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:524)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)
[2021-03-09 14:02:41,953] INFO Trying to flush the producer and commit offsets. (com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorTask)
Is your feature request related to a problem? Please describe.
Similarly to the file connector, ingesting data from S3 would be fantastic.
S3 can emit notifications of new files onto SQS, Kinesis, etc. so it may be beneficial to hook in there.
Essentially, it would be great if Brooklin could be notified of new S3 files and then ingest the actual files, so we can output them onto Kafka.
It may be necessary to differentiate between different types of files
Finally, using import java.util.zip.{ GZIPInputStream, ZipInputStream }
, files could be unarchived on-the-fly.
Describe the solution you'd like
Provide the system with an S3 bucket and credentials.
New S3 files will be streamed into data sink (the AWS REST API allows actual streaming of files). Depending on type of file, apply different logic to unarchive/read (see above).
I'd like to have the file streamed into separate Kafka messages depending on the above logic.
For example:
foo.tar.gz
is written to S3GZIPInputStream
1.json
and 2.json
, which have pretty printed JSON objects insideDescribe alternatives you've considered
Additional context
This would be an extremely valuable connector when working with systems that can export their data feeds to S3.
If Group coordinator becomes unreachable for a kafka mirroring task (consumer end), it triggers re-balance and causes duplicated records
No duplicates
Duplicated data
Describe your issue here.
Brooklin unable to connect to the zk server with java 21 runtime. whereas it is able to connect to zk server with java 11 runtime.
Tell us how to reproduce this issue.
Deploy the brooklin with java 21 runtime
Tell us what should happen
Brooklin unable to connect to zk server and giving below error
2023-10-17 12:16:56.628 INFO 1 --- [main-SendThread(zk-cs:2181)] org.apache.zookeeper.ClientCnxn$SendThread : Opening socket connection to server zk-cs/:2181. Will not attempt to authenticate using SASL (unknown error)
2023-10-17 12:16:56.629 WARN 1 --- [main-SendThread(zk-cs:2181)] org.apache.zookeeper.ClientCnxn$SendThread : Session 0x0 for server zk-cs/:2181, unexpected error, closing socket connection and attempting reconnect
java.nio.channels.UnresolvedAddressException
at java.base/sun.nio.ch.Net.checkAddress(Unknown Source)
at java.base/sun.nio.ch.Net.checkAddress(Unknown Source)
at java.base/sun.nio.ch.SocketChannelImpl.checkRemote(Unknown Source)
at java.base/sun.nio.ch.SocketChannelImpl.connect(Unknown Source)
at org.apache.zookeeper.ClientCnxnSocketNIO.registerAndConnect(ClientCnxnSocketNIO.java:277)
at org.apache.zookeeper.ClientCnxnSocketNIO.connect(ClientCnxnSocketNIO.java:287)
at org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:1021)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1064)
I able to hit the zk server from brooklin pod. so it must not be connection issue and with same setup i am able to connect with java 11 runtime.
Tell us what happens instead
Brooklin should able to connect zk server with java 21 runtime.
Artifacts for the Checkstyle results show up in a directory tree, making it tedious to go over them. It would be nicer to have a flat directory with all reports from subprojects. Even better, have a single unified file if possible.
I tried this command, but there is something going on with the way {}
works and doesn't work.
find . -maxdepth 5 -name 'main.html' -path '*/checkstyle/*' -exec cp '{}' `echo '{}' | cut -c 3- | sed -e "s#/#-#g" -e "s#^#$CIRCLE_ARTIFACTS/checkstyle/#"` \;
Just piping commands yields correct target paths:
$ find . -maxdepth 5 -name 'main.html' -path '*/checkstyle/*' | cut -c 3- | sed -e "s#/#-#g" -e "s#^#$CIRCLE_ARTIFACTS/checkstyle/#"
circleci/checkstyle/datastream-testcommon-build-reports-checkstyle-main.html
circleci/checkstyle/datastream-common-build-reports-checkstyle-main.html
circleci/checkstyle/datastream-kafka-build-reports-checkstyle-main.html
circleci/checkstyle/datastream-server-api-build-reports-checkstyle-main.html
circleci/checkstyle/datastream-utils-build-reports-checkstyle-main.html
circleci/checkstyle/datastream-file-connector-build-reports-checkstyle-main.html
circleci/checkstyle/datastream-client-build-reports-checkstyle-main.html
circleci/checkstyle/datastream-server-build-reports-checkstyle-main.html
Using either -exec
or | xargs
to run the cp
doesn't play well with '{}'
as can be seen with replacing cp
with echo
.
$ find . -maxdepth 5 -name 'main.html' -path '*/checkstyle/*' -exec echo '{}' `echo '{}' | cut -c 3- | sed -e "s#/#-#g" -e "s#^#$CIRCLE_ARTIFACTS/checkstyle/#"` \;
./datastream-testcommon/build/reports/checkstyle/main.html circleci/checkstyle/
./datastream-common/build/reports/checkstyle/main.html circleci/checkstyle/
./datastream-kafka/build/reports/checkstyle/main.html circleci/checkstyle/
./datastream-server-api/build/reports/checkstyle/main.html circleci/checkstyle/
./datastream-utils/build/reports/checkstyle/main.html circleci/checkstyle/
./datastream-file-connector/build/reports/checkstyle/main.html circleci/checkstyle/
./datastream-client/build/reports/checkstyle/main.html circleci/checkstyle/
./datastream-server/build/reports/checkstyle/main.html circleci/checkstyle/
Brooklin does not allow creating multiple datastreams replicating from different source Kafka clusters/topics to the same destination Kafka cluster and topic.
Let's say we have the following Kafka clusters: K1, K2
We wish to replicate messages from topic T1 in K1 and K2 clusters to topic T2 in K1 cluster.
We manage to create the first datastream (K2-T1 -> K1-T2) but the creation of the second one (K1-T1 -> K1-T2) fails with the error:
Cannot create a BYOT datastream where the destination is being used by other datastream(s)
{
"name": "my-datastream-1",
"connectorName": "kafkaConnector",
"transportProviderName": "kafkaTransportProvider",
"source": {
"connectionString": "kafka://K2-broker001:9092,K2-broker002:9092,K2-broker003:9092/T1"
},
"destination": {
"connectionString": "kafka://K1-broker001:9092,K1-broker002:9092,K1-broker003:9092/T2"
},
"metadata": {
"group.id": "my_group",
"system.auto.offset.reset": "latest",
"system.reuseExistingDestination": "true",
"system.IsUserManagedDestination": "false",
"system.IsConnectorManagedDestination": "true"
}
}
Datastream is created successfully.
{
"name": "my-datastream-2",
"connectorName": "kafkaConnector",
"transportProviderName": "kafkaTransportProvider",
"source": {
"connectionString": "kafka://K1-broker001:9092,K1-broker002:9092,K1-broker003:9092/T1"
},
"destination": {
"connectionString": "kafka://K1-broker001:9092,K1-broker002:9092,K1-broker003:9092/T2"
},
"metadata": {
"group.id": "my_group",
"system.auto.offset.reset": "latest",
"system.reuseExistingDestination": "true",
"system.IsUserManagedDestination": "false",
"system.IsConnectorManagedDestination": "true"
}
}
We get the following error:
Cannot create a BYOT datastream where the destination is being used by other datastream(s)
{
"name": "my-datastream-2",
"connectorName": "kafkaConnector",
"transportProviderName": "kafkaTransportProvider",
"source": {
"connectionString": "kafka://K1-broker001:9092,K1-broker002:9092,K1-broker003:9092/T1"
},
"destination": {
"connectionString": "kafka://K1-broker002:9092,K1-broker001:9092,K1-broker003:9092/T2"
},
"metadata": {
"group.id": "my_group",
"system.auto.offset.reset": "latest",
"system.reuseExistingDestination": "true",
"system.IsUserManagedDestination": "false",
"system.IsConnectorManagedDestination": "true"
}
}
We expected that it would be possible to replicate from multiple source Kafka clusters/topics to the same Kafka destination cluster and topic.
The second DS fails when another DS with the same destination already exists. If we swap the broker order in the second DS destination connection string, we manage to successfully create the second DS.
Git pre-commit validation script references #!/usr/bin/env python
, which is likely missing on current systems.
How to Configure High Availability Cluster
Is is possible to connect oracle database to brooklin?
Give me a specific example for system.destination.identityPartitioningEnabled ,thanks
I set it to go like this : destination.identityPartitioningEnabled =true ,but have no effect
In addition to the java language also provides other language interface, there are detailed interface documentation does not
I have few issues :
can‘t create ’datastream
config server.properties
`############################# Server Basics #############################
brooklin.server.coordinator.cluster=brooklin-cluster
brooklin.server.coordinator.zkAddress=192.168.1.6:2181
brooklin.server.csvMetricsDir=/tmp/brooklin-example/
brooklin.server.httpPort=32311
########################### Kafka Mirroring connector Configs ######################
brooklin.server.connectorNames=kafkaMirroringConnector
brooklin.server.connector.kafkaMirroringConnector.factoryClassName=com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorFactory
brooklin.server.connector.kafkaMirroringConnector.assignmentStrategyFactory=com.linkedin.datastream.server.assignment.BroadcastStrategyFactory
########################### Transport provider configs ######################
brooklin.server.transportProviderNames=kafkaTransportProvider
brooklin.server.transportProvider.kafkaTransportProvider.factoryClassName=com.linkedin.datastream.kafka.KafkaTransportProviderAdminFactory
brooklin.server.transportProvider.kafkaTransportProvider.bootstrap.servers=192.168.5.6:9092
brooklin.server.transportProvider.kafkaTransportProvider.zookeeper.connect=192.168.5.6:2181
brooklin.server.transportProvider.kafkaTransportProvider.client.id=datastream-producer
brooklin.server.transportProvider.kafkaTransportProvider.producersPerTask=1
brooklin.server.transportProvider.kafkaTransportProvider.numProducersPerConnector=5`
curl command
curl -XPOST http://192.168.6.6:32311/datastream -H "Content-Type: application/json" -d ' { "name" : "first-mirroring-stream", "connectorName" : "kafkaMirroringConnector", "transportProviderName" : "kafkaTransportProvider", "source" : { "connectionString" : "kafka://192.168.8.6:9092/testtopic" }, "destination" : { "connectionString" : "kafka://192.168.5.6:9092/" }, "metadata" : { "owner" : "test", "system.IsConnectorManagedDestination" : "true", "system.reuseExistingDestination" : "false", "system.destination.KafkaBrokers" : "192.168.5.6:9092" } }'
return result
msg=Failed to initialize Datastream: {Status=INITIALIZING, metadata={owner=dalu, system.reuseExistingDestination=false, system.destination.KafkaBrokers=
cause=BYOT is not allowed for connector kafkaMirroringConnector. Datastream: {Status=INITIALIZING, m
I have created 2 kafka cluster 7-a and 7-b with 3 nodes as kafka brokers and 3 zk in each cluster. Started the brooklin server in all 3 nodes of 7-a cluster and created the stream as :
But can not see topics in 7-b cluster. Here is the configuration used :
Can you provide some pointers to debug/resolve the issue.
(base) [jw@cn05 brooklin]$ ./gradlew clean build
...
Gradle suite > Gradle test > com.linkedin.datastream.kafka.factory.TestSimpleKafkaProducerFactory.testOverrideDefaultProducerConfigs PASSED
Task :datastream-kafka_2.11:validateSchemaAnnotation NO-SOURCE
Task :datastream-kafka_2.11:validateTestSchemaAnnotation NO-SOURCE
Task :datastream-kafka_2.11:check
Task :datastream-kafka_2.11:build
Task :datastream-server:assemble
Task :datastream-server:changedFilesReport NO-SOURCE
Task :datastream-server:checkstyleMain
[ant:checkstyle] /data/jw/brooklin/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java:121: error: Line is longer than 160 characters (found 248). Try to keep lines under 120 characters.
[ant:checkstyle] /data/jw/brooklin/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java:124: error: Line is longer than 160 characters (found 164). Try to keep lines under 120 characters.
[ant:checkstyle] /data/jw/brooklin/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java:127: error: Line is longer than 160 characters (found 164). Try to keep lines under 120 characters.
[ant:checkstyle] /data/jw/brooklin/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java:135: error: Line is longer than 160 characters (found 174). Try to keep lines under 120 characters.
[ant:checkstyle] /data/jw/brooklin/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java:141: error: Line is longer than 160 characters (found 200). Try to keep lines under 120 characters.
[ant:checkstyle] /data/jw/brooklin/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java:142: error: Line is longer than 160 characters (found 216). Try to keep lines under 120 characters.
[ant:checkstyle] /data/jw/brooklin/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java:63: error: Line is longer than 160 characters (found 241). Try to keep lines under 120 characters.
[ant:checkstyle] /data/jw/brooklin/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java:66: error: Line is longer than 160 characters (found 193). Try to keep lines under 120 characters.
[ant:checkstyle] /data/jw/brooklin/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java:67: error: Line is longer than 160 characters (found 176). Try to keep lines under 120 characters.
[ant:checkstyle] /data/jw/brooklin/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java:68: error: Line is longer than 160 characters (found 204). Try to keep lines under 120 characters.
[ant:checkstyle] /data/jw/brooklin/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java:69: error: Line is longer than 160 characters (found 242). Try to keep lines under 120 characters.
[ant:checkstyle] /data/jw/brooklin/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java:70: error: Line is longer than 160 characters (found 195). Try to keep lines under 120 characters.
[ant:checkstyle] /data/jw/brooklin/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java:71: error: Line is longer than 160 characters (found 231). Try to keep lines under 120 characters.
[ant:checkstyle] /data/jw/brooklin/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java:72: error: Line is longer than 160 characters (found 253). Try to keep lines under 120 characters.
[ant:checkstyle] /data/jw/brooklin/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java:73: error: Line is longer than 160 characters (found 192). Try to keep lines under 120 characters.
[ant:checkstyle] /data/jw/brooklin/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java:74: error: Line is longer than 160 characters (found 208). Try to keep lines under 120 characters.
[ant:checkstyle] /data/jw/brooklin/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java:75: error: Line is longer than 160 characters (found 168). Try to keep lines under 120 characters.
[ant:checkstyle] /data/jw/brooklin/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java:76: error: Line is longer than 160 characters (found 177). Try to keep lines under 120 characters.
[ant:checkstyle] /data/jw/brooklin/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java:78: error: Line is longer than 160 characters (found 177). Try to keep lines under 120 characters.
[ant:checkstyle] /data/jw/brooklin/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java:79: error: Line is longer than 160 characters (found 165). Try to keep lines under 120 characters.
[ant:checkstyle] /data/jw/brooklin/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java:81: error: Line is longer than 160 characters (found 165). Try to keep lines under 120 characters.
[ant:checkstyle] /data/jw/brooklin/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java:82: error: Line is longer than 160 characters (found 186). Try to keep lines under 120 characters.
[ant:checkstyle] /data/jw/brooklin/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java:84: error: Line is longer than 160 characters (found 166). Try to keep lines under 120 characters.
[ant:checkstyle] /data/jw/brooklin/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java:85: error: Line is longer than 160 characters (found 241). Try to keep lines under 120 characters.
Task :datastream-server:checkstyleMain FAILED
FAILURE: Build failed with an exception.
Checkstyle rule violations were found. See the report at: file:///data/jw/brooklin/datastream-server/build/reports/checkstyle/main.html
Checkstyle files with violations: 2
Checkstyle violations by severity: [error:24]
Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. Run with --scan to get full insights.
Get more help at https://help.gradle.org
Deprecated Gradle features were used in this build, making it incompatible with Gradle 5.0.
Use '--warning-mode all' to show the individual deprecation warnings.
See https://docs.gradle.org/4.10.3/userguide/command_line_interface.html#sec:command_line_warnings
BUILD FAILED in 16m 12s
136 actionable tasks: 124 executed, 12 up-to-date
(base) [jw@cn05 brooklin]$
Similar to kafka , the apache pulsar pub sub should also be supported as one of the connecter types.
Based on GHSA-jfh8-c2jp-5v3q
"Log4j versions prior to 2.16.0 are subject to a remote code execution vulnerability via the ldap JNDI parser."
In addition, according to CVE-2021-4104, Log4j 1.2 is subject to similar RCE when JMSAppenders are used, and Log4j 1.x is well out of support at this time.
It's advised to migrate to Log4j 2.16.0 at this time. Based on the severity of the issue, request that this be dealt with via a backport approach if at all possible. Would it be possible to migrate to Log4j 2.16.0 to mitigate the risk to anyone using this application?
Thank you!
When creating kafka mirroring tasks there's an exception regarding jmx. It seems to be a race condition b/c it doesn't always happen, and tends to happen more when maxTasks
is higher.
[2019-11-27 14:35:37,129] WARN Error registering AppInfo mbean (org.apache.kafka.common.utils.AppInfoParser)
javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=brooklin-producer-1
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:451)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:304)
at com.linkedin.datastream.kafka.factory.SimpleKafkaProducerFactory.createProducer(SimpleKafkaProducerFactory.java:27)
at com.linkedin.datastream.kafka.KafkaProducerWrapper.initializeProducer(KafkaProducerWrapper.java:176)
at com.linkedin.datastream.kafka.KafkaProducerWrapper.maybeGetKafkaProducer(KafkaProducerWrapper.java:147)
at com.linkedin.datastream.kafka.KafkaProducerWrapper.send(KafkaProducerWrapper.java:194)
at com.linkedin.datastream.kafka.KafkaTransportProvider.send(KafkaTransportProvider.java:151)
at com.linkedin.datastream.server.EventProducer.send(EventProducer.java:191)
at com.linkedin.datastream.connectors.kafka.AbstractKafkaBasedConnectorTask.sendDatastreamProducerRecord(AbstractKafkaBasedConnectorTask.java:274)
at com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorTask.sendDatastreamProducerRecord(KafkaMirrorMakerConnectorTask.java:246)
at com.linkedin.datastream.connectors.kafka.AbstractKafkaBasedConnectorTask.translateAndSendBatch(AbstractKafkaBasedConnectorTask.java:229)
at com.linkedin.datastream.connectors.kafka.AbstractKafkaBasedConnectorTask.processRecords(AbstractKafkaBasedConnectorTask.java:481)
at com.linkedin.datastream.connectors.kafka.AbstractKafkaBasedConnectorTask.run(AbstractKafkaBasedConnectorTask.java:315)
at java.lang.Thread.run(Thread.java:748)
I am using this docker image dongjinleekr/brooklin:1.0.0-2
, which internally uses brooklin 1.0.0
4.9.0-11-amd64 #1 SMP Debian 4.9.189-3+deb9u1 (2019-09-20) x86_64 Linux
1.0.0
openjdk version "1.8.0_212"
2.1.0
3.4.10
Start a mirror task with maxTasks
50 (other values could also work, my guess is that higher the value the bigger the chance)
For example:
bin/brooklin-rest-client.sh -o CREATE -u http://localhost:32311/ -n mirror -s "kafka://x.x.x.x.:9092/topic5" -c kafkaMirroringConnector -t kafkaTransportProvider -m '{"owner":"test-user","system.reuseExistingDestination":"false","maxTasks":"50"}' 2>/dev/null
There should not be an error in the log and the mbean(s) should successfully register n
There is an exception in the log and I'm guessing (but didn't check) that the mbean is not properly registered.
When I used brooklin, there was a problem with the destination kafka, which was manifested as: after writing data from the source kafka, the destination kafka could not synchronize the data
Here's how I did it:
After creating several data streams, the destination kafka will not receive the data. I need to restart the destination kafka. Do you have any general optimization methods
What databases are officially supported?
Is MySQL the only database on the roadmap?
Is your feature request related to a problem? Please describe.
A clear and concise description of what the problem is.
I try to Mirroring kafka clusters.
and I using same as tutorial.
but brooklin miss mirroring that kafka headers.
Describe the solution you'd like
A clear and concise description of what you want to happen.
kafka header also mirroring it.
Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.
I checked "KafkaMirrorMakerConnectorTask.java".
I think add kafka header copy logic in translate
method.
Additional context
Add any other context about the feature request here.
Hey team, I have a question regarding the 1.1.0 BMM version. Can someone please share what has been changed in the latest release of BMM (https://github.com/linkedin/brooklin/releases/tag/1.1.0) from the previous version? If someone can give me few pointers that would be great. Thanks
After stopping a kafka mirroring task I noticed this NPE in the log.
[2019-11-27 14:12:00,896] INFO Stopping the DurableScheduledService for mirror-_test_replicator_1574862570_fc2f511c-af4f-43d1-8a35-28c3fe9627b2 (com.linkedin.datastream.common.DurableScheduledService)
[2019-11-27 14:12:00,907] WARN Task mirror-_test_replicator_1574862570_d3b0ffd9-6d41-4217-93e7-574186923cc8(kafkaMirroringC), partitions=[0] took longer than 30000 ms to stop. Interrupting the thread. (com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnector)
[2019-11-27 14:12:00,907] WARN Failed to check status of kafka connector tasks. (com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnector)
java.lang.NullPointerException
at com.linkedin.datastream.connectors.kafka.AbstractKafkaConnector.stopTask(AbstractKafkaConnector.java:216)
at com.linkedin.datastream.connectors.kafka.AbstractKafkaConnector.restartIfNotRunning(AbstractKafkaConnector.java:196)
at java.util.concurrent.ConcurrentHashMap$KeySetView.forEach(ConcurrentHashMap.java:4649)
at com.linkedin.datastream.connectors.kafka.AbstractKafkaConnector.lambda$start$1(AbstractKafkaConnector.java:176)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
I am using this docker image dongjinleekr/brooklin:1.0.0-2
, which internally uses brooklin 1.0.0
4.9.0-11-amd64 #1 SMP Debian 4.9.189-3+deb9u1 (2019-09-20) x86_64 Linux
1.0.0
openjdk version "1.8.0_212"
2.1.0
3.4.10
It doesn't always repro.
But in my case I started replicating a kafka topic and then after several minutes stopped.
maxTasks
was set to 50
There should not be an NPE.
There's an NPE in the log. It didn't seem to affect the worker though, the only seen effect was in the log.
How to solve this problem:The partition number of the source kafka topic is 5, but the partition number of the destination kafka topic is 1
Brooklin takes long time to recover from errors in the destination cluster. Sometimes multiple cycles of rebalances and complete halt of replication during that time.
I have run some tests, they are documented here: https://github.com/AppsFlyer/kafka-mirror-tester/blob/master/results-brooklin.md
4.9.0-11-amd64 #1 SMP Debian 4.9.189-3+deb9u1 (2019-09-20) x86_64 Linux
1.0.2
openjdk version "1.8.0_212"
2.1.0
3.4.10
As described in this document https://github.com/AppsFlyer/kafka-mirror-tester/blob/master/results-brooklin.md, there are several scenarios in which that may happen.
One such scenario is to restart a broker at the destination cluster.
This results in errors for as long as the broker is down, which is understandable. But even long after the broker is back up - brooklin continues to err, up to a complete halt of replication to the entire cluster (not only to that failed broker).
We expect brooklin to recover gracefully and not halt replication during the rebalance cycle.
We expect to see just a single, hopefully short, rebalance, instead we multiple cycles that sometimes take quite long (10-15 minutes).
Brooklin takes a long time (10-15 minutes, sometimes more) to recover. During that time we see cycles of replication and then a complete halt of replication and then again, replication and then again a halt.
Describe your issue here.
I am evaluating brooklin and I used this link https://github.com/linkedin/brooklin/wiki/test-driving-brooklin. I followed the steps in installing the kafka and unzipping brooklin. I ran the following command
(base) stlwm224773:brooklin-1.0.2 ei6201$ bin/brooklin-server-start.sh config/server.properties >/dev/null 2>&1 &
[1] 49693
and when I ran this command https://github.com/linkedin/brooklin/wiki/Streaming-Text-Files-to-Kafka#3-create-a-datastream
bin/brooklin-rest-client.sh -o CREATE -u http://localhost:32311/ -n first-file-datastream -s NOTICE -c file -p 1 -t kafkaTransportProvider -m '{"owner":"test-user"}'
I am getting the following error.
at java.base/java.lang.Thread.run(Thread.java:834)
[2020-07-09 11:31:18,330] ERROR Create Datastream {metadata={owner=test-user, datastreamUUID=f4830b02-c9b2-4b4b-8a96-d728e4b1262f}, transportProviderName=kafkaTransportProvider, name=first-file-datastream, destination={}, connectorName=file, source={connectionString=NOTICE, partitions=1}} failed with error. (com.linkedin.datastream.DatastreamRestClient)
com.linkedin.r2.RemoteInvocationException: com.linkedin.r2.RemoteInvocationException: Failed to get response from server for URI http://localhost:32311/datastream
at com.linkedin.restli.internal.client.ExceptionUtil.wrapThrowable(ExceptionUtil.java:135)
at com.linkedin.restli.internal.client.ResponseFutureImpl.getResponseImpl(ResponseFutureImpl.java:130)
Tell us how to reproduce this issue.
Follow steps in https://github.com/linkedin/brooklin/wiki/Streaming-Text-Files-to-Kafka#3-create-a-datastream
Tell us what should happen
Brooklin should start and transfer the file
Tell us what happens instead
It's throwing a connection request exception.
Can you address a couple of instances of language in the project where the wording can be made more inclusive? (example: whiltelist -> allowlist). We are trying to make the code in all LinkedIn projects more inclusive. Could you please examine whether or not these need to be updated, and make the changes? For suggested replacements see go/inclusivelanguage or google. THANK YOU!
After restarting a broker / broker failures (anythins that trigger a leader election) seems like some brooklin TransportProviders cant self heal and get stuck in a loop.
brooklin is set with "pausePartitionOnError": "true",
A flag indicating whether to auto-pause a topic partition if dispatching its data for delivery to the destination system fails.
when brooklin producer AKA TransportProvider receives an error, it will pause following the configuration "pauseErrorPartitionDurationMs": "180000" (3 minutes).
looking at brookling logs i could find the following errors at the corresponding time of the issue:
"Flush interrupted."
"This server is not the leader for that topic-partition."
"Partition rewind failed due to"
means that at this moment, our brooklin producer is trying to work against a non-leader partition.
roughly 5 minutes later, i've witnessed the following error messages:
"Expiring 227 record(s) for <topic_name>-12: 302797 ms has passed since last append"
after comparing this with the brooklin configuration i've spotted "request.timeout.ms": "300000" which is 5 minutes.
for the next 20 minutes we received NotLeaderForPartitionException, which means we did not produced data and seems like we did not consumed.
later on theres only one exception, "Producer is closed forcefully."
reading a bit online someone said it may be that the produce can't keep with the consume,
"producersPerTask" and "numProducersPerConnector" in our configuration should do the job.
i was looking on the consumer group info and seems like it stoped consuming as well.
At the same time, we have another Datastream that replicates to the SAME cluster and topics sharing the same configurations, the failing cluster have 8 more in maxTasks,
The source of the failing Datastream is kafka remote cluster while the working one is a kafka local cluster, and the local does not fail at all, not even a single exception.
[local]Cluster A ---> Brooklin ---> Cluster C
[remote]Cluster B -----^
on remote cluster some (2~3) TransportProviders are failing.
brooklin configurations:
https://pastebin.com/raw/kHACqwcA
Ideas?
On high load brooklin clusters once enabled flushless i get tons of these errors,
producers get killed but there are some partitions that keeps growing lag until i restart the service
my default producer settings:
default['brooklin_v2']['default_provider_properties']['buffer.memory'] = '61600000'
default['brooklin_v2']['default_provider_properties']['batch.size'] = '80000'
default['brooklin_v2']['default_provider_properties']['linger.ms'] = '15000'
default['brooklin_v2']['default_provider_properties']['request.timeout.ms'] = '300000'
default['brooklin_v2']['default_provider_properties']['compression.type'] = 'gzip'
default['brooklin_v2']['default_provider_properties']['producersPerTask'] = '3'
default['brooklin_v2']['default_provider_properties']['numProducersPerConnector'] = '15'
default['brooklin_v2']['default_provider_properties']['producerRateLimiter'] = '0.05'
default['brooklin_v2']['default_provider_properties']['retries'] = '100'
default['brooklin_v2']['default_provider_properties']['retry.backoff.ms'] = '300'
default['brooklin_v2']['default_provider_properties']['enable.idempotence'] = 'true'
default['brooklin_v2']['default_provider_properties']['acks'] = 'all'
my default consumer:
default['brooklin_v2']['default_connector_properties']['consumer.partition.assignment.strategy'] = 'org.apache.kafka.clients.consumer.RoundRobinAssignor'
default['brooklin_v2']['default_connector_properties']['factoryClassName'] = 'com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorFactory'
default['brooklin_v2']['default_connector_properties']['assignmentStrategyFactory'] = 'com.linkedin.datastream.server.assignment.BroadcastStrategyFactory'
default['brooklin_v2']['default_connector_properties']['consumer.receive.buffer.bytes'] = '4096000'
default['brooklin_v2']['default_connector_properties']['consumer.session.timeout.ms'] = '120000'
default['brooklin_v2']['default_connector_properties']['consumer.heartbeat.interval.ms'] = '9000'
default['brooklin_v2']['default_connector_properties']['consumer.request.timeout.ms'] = '60000'
default['brooklin_v2']['default_connector_properties']['consumer.auto.offset.reset'] = 'latest'
default['brooklin_v2']['default_connector_properties']['commitIntervalMs'] = '30000'
default['brooklin_v2']['default_connector_properties']['consumer.fetch.min.bytes'] = '500000'
default['brooklin_v2']['default_connector_properties']['consumer.fetch.max.bytes'] = '4000000'
default['brooklin_v2']['default_connector_properties']['consumer.fetch.max.wait.ms'] = '250'
default['brooklin_v2']['default_connector_properties']['pausePartitionOnError'] = 'true'
default['brooklin_v2']['default_connector_properties']['pauseErrorPartitionDurationMs'] = '180000'
other then that i've added "isFlushlessModeEnabled": "true" to my connectors settings
my brooklin cluster is processing around 600 MBps with 3 datastreams 450 consumers and around 1000 producers.
i have 3 more clusters that are suffering from the same issues which started about 2 weeks after the implemntation.
producer with OUT_OF_ORDER_SEQUENCE_NUMBER should retry and die at some point
new producer should raise and get new PID, this is overall happening but it seems that 1~few partitions doesn't heal
there are no errors realted to this partitions but it just seem that brooklin "forgets" about them
in this screentshot, you can see most of the partitions are getting consumed after the issue eccours but there is one that only after service restart was back to consumption, from brooklin side i don't even see that the lag metric shows that there is lag which is strange
all partitions should be healed after producers spawns
What is the correct set of configuration items for the KafkaTransportProvider to have it be a kafkassl://... connection?
I took a cursory look at the source code and it appears that the regular configuration for a Kafka Producer should apply:
brooklin.server.transportProviderNames=kafkaTransportProvider
...
brooklin.server.transportProvider.kafkaTransportProvider.bootstrap.servers=localhost:9095
brooklin.server.transportProvider.kafkaTransportProvider.zookeeper.connect=localhost:2181
brooklin.server.transportProvider.kafkaTransportProvider.client.id=datastream-producer
...
brooklin.server.transportProvider.kafkaTransportProvider.security.protocol=ssl
brooklin.server.transportProvider.kafkaTransportProvider.ssl.keystore.location=omitted
brooklin.server.transportProvider.kafkaTransportProvider.ssl.keystore.password=omitted
brooklin.server.transportProvider.kafkaTransportProvider.ssl.key.password=omitted
brooklin.server.transportProvider.kafkaTransportProvider.ssl.truststore.location=omitted
brooklin.server.transportProvider.kafkaTransportProvider.ssl.truststore.password=omitted
brooklin.server.transportProvider.kafkaTransportProvider.ssl.endpoint.identification.algorithm=
brooklin.server.transportProvider.kafkaTransportProvider.ssl.enabled.protocols=TLSv1.2
brooklin.server.transportProvider.kafkaTransportProvider.ssl.keystore.type=JKS
brooklin.server.transportProvider.kafkaTransportProvider.ssl.truststore.type=JKS
Using the ingest text to kafka tutorial, I get a non ssl kafka connection string:
{
"name": "first-file-datastream",
"connectorName": "file",
"transportProviderName": "kafkaTransportProvider",
"source": {
"connectionString": "./brooklin-1.0.0/NOTICE",
"partitions": 1
},
"Status": "INITIALIZING",
"destination": {
"connectionString": "kafka://localhost:9095/first-file-datastream_20190722111027",
"partitions": 1
},
"metadata": {
"datastreamUUID": "0bbd95ea-3bef-4b4e-9ef2-5e8d2496aef6",
"owner": "test-user",
"system.creation.ms": "1563808227090",
"system.destination.KafkaBrokers": "localhost:9095",
"system.taskPrefix": "first-file-datastream"
}
}
I'm sure I'm missing something simple. Thank you for any insight.
KafkaConnectorTask extends AbstractKafkaBasedConnectorTask, which enables users to set start offsets, to set its Kafka consumer to start from these offsets. However, these start offsets takes effect only after an initial poll failure throwing NoOffsetForPartitionException. In particular, the AbstractKafkaBasedConnectorTask.pollRecords() calls handleNoOffsetForPartitionException() which in turns seeks to the set start offsets, if present. The intuition is that a new consumer starts with no checkpoints and the very first throw will throw the NoOffsetForPartitionException. However, Kafka consumer has a config auto.offset.reset which when set to 'earliest' or 'latest', will automatically seek to beginning or end of the topic, without throwing exceptions. This config has a default value of earliest. Therefore, if the auto.offset.reset config is omitted to use its default, or if it is set to a value other than 'none', no exception will be thrown and the start offsets won't be used.
Brooklin's KafkaConnector datastream should start consumption for the set start offsets when set.
Start offsets are ignored.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.