Coder Social home page Coder Social logo

mirus's People

Contributors

adammelliott avatar arabelle avatar d4v1de avatar dalassi1 avatar ddbrodsky avatar jeremybolster avatar parafiend avatar pdavidson100 avatar renatomefi avatar seratch avatar snyk-bot avatar srahimeen avatar svc-scm avatar yonggang avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

mirus's Issues

Default consumer offset reset policy - earliest

By default Kafka Consumer has auto.offset.reset policy configured to latest. But it looks that the implementation in this connector is reverse -- unless it is configured it will start with earliest.

String offsetReset = (String) consumerProperties.get("auto.offset.reset");
if ("latest".equalsIgnoreCase(offsetReset)) {
logger.trace("Seeking to end");
consumer.seekToEnd(Collections.singletonList(tp));
} else {
logger.trace("Seeking to beginning");
consumer.seekToBeginning(Collections.singletonList(tp));

And that value is extracted from the consumer prefixed ones:

return simpleConfig.originalsWithPrefix("consumer.");

Would it make sense to make the default the same as the normal connector to keep it consistent with normal consumer groups?

Excessive logging (Exception thrown while calling task.commitRecord())

Hi,

We have an issue with Mirus version 0.6.9. We are having excessive number of logs with the Exception thrown while calling task.commitRecord().

Exception thrown while calling task.commitRecord() (org.apache.kafka.connect.runtime.WorkerSourceTask)
java.lang.NullPointerException
at com.salesforce.mirus.metrics.MirrorJmxReporter.recordMirrorLatency(MirrorJmxReporter.java:162)
at com.salesforce.mirus.MirusSourceTask.commitRecord(MirusSourceTask.java:241)
at org.apache.kafka.connect.runtime.WorkerSourceTask.commitTaskRecord(WorkerSourceTask.java:470)
at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$sendRecords$4(WorkerSourceTask.java:387)
at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1365)
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:228)
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197)
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:684)
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:655)
at org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:574)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
at org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:561)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:561)
at org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$3(Sender.java:785)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:584)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:576)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:327)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:242)
at java.base/java.lang.Thread.run(Thread.java:829)

From what I see the actual issue is that topic which is initialized in mirrorJmxExporter and the topic which is passed to recordMirrorLatency method are not the same.
Topics for mirrorJmxExporter are initialized here

this.mirrorJmxReporter.addTopics(topicPartitionList);
and this will be topic which is source.

But topic which is sent to recordMirrorLatency

mirrorJmxReporter.recordMirrorLatency(sourceRecord.topic(), latency);
is taken from sourceRecord.

When SourceRecord is built topic is prefixed/suffixed so it becomes destination topic

String topic = destinationTopicNamePrefix + consumerRecord.topic() + destinationTopicNameSuffix;

So finally when record is passed to mirrorJmxExporter it tries to get something for that topic and breaks.

After compiling the Mirus with changing the topic for recordMirrorLatency with something like this
mirrorJmxReporter.recordMirrorLatency(sourceRecord.sourcePartition().get("topic").toString(), latency) the problem goes away (this would not be correct way to solve this problem, but tried it just to see if it helps).

From what I see the problem got introduced in Mirus in this PR #89

This should be fixed since there is no config option or something similar to disable this behavior.

BR,
Hrvoje

Mirus failing to start when brokers are deployed to new ip's

Hi Team,

I had a mirus pipeline working on a source Kafka cluster to a destination server and I used DNS connection url in the connection string.

I spun up destination Kafka cluster in new servers for upgrading Kafka from 1.1 to 2.1.

Since then, Mirus is failing to start with the below exception:

[2019-03-27 09:21:21,458] ERROR Uncaught exception in herder work thread, exiting:  (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
org.apache.kafka.connect.errors.ConnectException: Error while attempting to create/find topic(s) 'mirus-offsets'
	at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:255)
	at org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:99)
	at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:127)
	at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:109)
	at org.apache.kafka.connect.runtime.Worker.start(Worker.java:174)
	at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:114)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:215)
	at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
	at java.util.concurrent.FutureTask.run(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [Topic authorization failed.]
	at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
	at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262)
	at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:228)
	... 11 more
Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [Topic authorization failed.]
[2019-03-27 09:21:21,480] INFO DefaultSessionIdManager workerName=node0 (org.eclipse.jetty.server.session)

Mirus stops after writing the above error in the logs.

I also verified that the mirus-* topics have required ACL's added on them.

Please advice on how to fix it.

Can we manipulate the filter any of the config parameters(eg. password configuration) during the GET call

While doing the GET operation for displaying Task status, I'm seeing passwords in plain text. Is there any way to edit/mask the config parameters (like passwords in my case) while doing the GET call

Below is the sample example:

curl localhost:8083/connectors/source-2-destination/tasks | jq

[
{
"id": {
"connector": "source-2-destination",
"task": 0
},
"config": {
"partitions": "[{"topic":"topicname","partition":0}]",
"consumer.ssl.truststore.password": "<plaintext password>",
"consumer.ssl.endpoint.identification.algorithm": "",
"consumer.group.id": "mirus-source-2-destination-grp",
"consumer.ssl.truststore.location": "/path/.jks",
"consumer.bootstrap.servers": "destination-kafka-cluster:9093",
"task.class": "com.salesforce.mirus.MirusSourceTask",
"consumer.value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
"consumer.security.protocol": "SSL",
"consumer.ssl.keystore.location": "/path/keystore.jks",
"consumer.ssl.keystore.password": "<plaintext password>",
"consumer.client.id": "source-2-destinationsource-2-destination-0",
"consumer.key.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer"
}
},

Mirus tasks failing with error "Could not initialize class org.xerial.snappy.Snappy"

Hi Team,

We are seeing some of the tasks going to "FAILED" state (with few tasks in running state)with the below error details:

          "id": 1,
            "state": "FAILED",
            "worker_id": "<ip>:<port>",
            "trace": "java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy\n\tat org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:435)\n\tat org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:466)\n\tat java.io.DataInputStream.readByte(DataInputStream.java:265)\n\tat org.apache.kafka.common.utils.ByteUtils.readVarint(ByteUtils.java:168)\n\tat org.apache.kafka.common.record.DefaultRecord.readFrom(DefaultRecord.java:292)\n\tat org.apache.kafka.common.record.DefaultRecordBatch$1.readNext(DefaultRecordBatch.java:263)\n\tat org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next(DefaultRecordBatch.java:568)\n\tat org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next(DefaultRecordBatch.java:537)\n\tat org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.nextFetchedRecord(Fetcher.java:1210)\n\tat org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1245)\n\tat org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:1099)\n\tat org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:545)\n\tat org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:506)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1269)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1200)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1135)\n\tat com.salesforce.mirus.MirusSourceTask.poll(MirusSourceTask.java:173)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:245)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:221)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n"
        },

We tried installing libc6-compat in our pods but it didn't fix. Please advice on how to fix it.

Connector in distributed mode

Dear Mirus team,

We follow the quick-start and successfully run it.
We follow the kafka distributed connect steps connect distributed, the worker and the connector are created and run with no errors, but events are not replicated. We also run "mirus-start.sh" with the same result.

Is there a guide for running in a "distributed mode"?

Regards,
Pablo.

OffsetSerDeTest fails on Windows due to line separator

Currently both cvsList and jsonList end their Strings with \n:

"connector-id,topic,1,123\n",

This causes the tests to fail on Windows.

The tests should either not include the line separator or take into account the System.lineSeparator()

Can Mirus run as a Kafka Connect connector?

We already have a Kafka Connect cluster, so it would be ideal to run Mirus as yet another connector in our cluster instead of running it as a separate server. Is this possible?

Thanks.

Deprecate Prefix & Suffix settings in favor of RegexRouter

AFAICT, the features of prefix and suffix can be done with a SMT

For example

transforms=TopicRename

transforms.TopicRename.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.TopicRename.regex=(.*)
transforms.TopicRename.replacement=mirrored.$1.some-suffix

Getting Mirus JMX Metrics

Dear Mirus Team,

First of all, thanks for this excellent project.

We start a connector cluster (consisting in 3 connector worker servers) and copy mirus jar library to plugin directory and successfully replicate topics from one cluster (origin) to another (destination).

We would like to use JMX metrics to monitor mirus status in the connector cluster.

We found in project source class (AbstractMirusJmxReporter):
metrics.addReporter(new JmxReporter("mirus"));

and in ConnectorJmxReporter class:
private static final String CONNECTOR_JMX_GROUP_NAME = "connector-metrics";

We try to get the Mbean path using visualVM, but we couldn't find the mirus path in JVM Mbeans.
Do we need to enable something in configuration?

Regards,
Pablo

org.apache.kafka.common.errors.RecordTooLargeException produce duplicated messages

Dear Mirus Tema,

We detect that if an error of "org.apache.kafka.common.errors.RecordTooLargeException" from kafka server occurs, mirus flush data, fail and retry flushing again producing out of space disk considerably fast.

Could you please check this?

Thanks,
Pablo.

[2019-05-31 15:47:32,630] ERROR WorkerSourceTask{id=test_flight_deduplication-1} failed to send record to flight-deduplication-openstack-openstack: {} (org.apache.kafka.connect.runtime.WorkerSourceTask)
org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

[2019-05-31 15:47:33,796] INFO WorkerSourceTask{id=test_flight_deduplication-1} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2019-05-31 15:47:33,797] INFO WorkerSourceTask{id=test_flight_deduplication-1} flushing 73 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2019-05-31 15:47:34,082] INFO WorkerSourceTask{id=test_flight_deduplication-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2019-05-31 15:47:34,083] INFO WorkerSourceTask{id=test_flight_deduplication-0} flushing 69 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)

Plans for migrate topic metadata beyond partition count

I had a look around and I believe the tool is able to identify if the partition count between source and destination topic is the same.

Is there any plans to be able to replicate other topic metadata like retention, max message size, etc?

Thanks.

Getting similar function as "measuring lag" (mirrormaker) in mirus

Dear Mirus team,

We would like to monitor how far is topic replica from one cluster to another using mirus. Using mirror-maker we use the following to monitor "lag":

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mirroring-data --describe

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID

Where LAG is obtained from log-end-offset (highwatermark) minus current-offset.

How can we do a similar function using mirus? We can access to mirus jmx values.

Thanks,
Pablo.

In kafka connect we can access the latest offset in the group_offset connect topic.
kafka-console-consumer.sh --bootstrap-server localhost:9092 --property print.key=true --topic mirus-offsets

Theres a KIP in kafka KIP-196 in WIP for Add metrics to Kafka Connect framework.

Enabling SSL connectivity

Hi Mirus team,

I'm able to mirror data from source to destination for a topic with open (*) permissions on port 9092. But, I'm not able to setup/mirror data over SSL connection. Can you help with the steps/document (if any) to set SSL connectivity.

Regards,
Hari

Produce msg to topic test does not reflect on test.mirror

Hey guys,
After starting kafka locally, and start mirus and getting into the state that:
curl localhost:8083/connectors/mirus-quickstart-source/status
{"name":"mirus-quickstart-source","connector":{"state":"RUNNING","worker_id":"10.126.22.44:8083"},"tasks":[{"state":"RUNNING","id":0,"worker_id":"10.126.22.44:8083"}],"type":"source"}

$ bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
This is my first event

$ bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
This is my first event
Processed a total of 1 messages

--- sad story starts here:
$ bin/kafka-console-consumer.sh --topic test.mirror --from-beginning --bootstrap-server localhost:9092
^CProcessed a total of 0 messages

I was using all default configs, thanks~

How should I resolve the issue?

Configuring the Mirus quickstart to replicate from one system to another

I'm new to Kafka and Mirus and have been experimenting with the quickstarts for both of them. It was pretty easy to get the default configuration going (hats off to the Mirus team) on a Centos VM, but now I'd to like to set up replication from one instance of Kafka to another that is on a separate VM. Changing destination.bootstrap.servers to the IP of second VM had no effect. Assuming this is possible with the Mirus quickstart can someone advise on my next steps? Thank you.

mirus.connector.auto.restart.enabled doesnt work

Dear Mirus Team,

Setting the option "mirus.connector.auto.restart.enabled" and "mirus.task.auto.restart.enabled" in connect-distributed.properties configuration file seams to be not working. Task always "RESTART" and connector is created and logs show "The configuration 'mirus.connector.auto.restart.enabled' / 'mirus.task.auto.restart.enabled' was supplied but isn't a known config.", with no line that set a default value or config value.
The default value is true (com/salesforce/mirus/config/MirusConfigDefinition.java)
Could you please check?

Thanks,
Pablo.

Applying transforms

Hello,

First of all thanks for this tool, it looks super nice!

I've been trying to make a different setup where I actually apply a transform in between the mirroring, the problem is that my transform receives a null in the record apply as the kafka connect interface suggests, here is the example:

kafka-connect_1            | Caused by: java.lang.NullPointerException
kafka-connect_1            | 	at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:187)
kafka-connect_1            | 	at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:44)
kafka-connect_1            | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
kafka-connect_1            | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)

configuration

{
   "connector.class": "com.salesforce.mirus.MirusSourceConnector",
   "tasks.max": "1",
   "topics.whitelist": "legacy_data_syncer.op.accounts",
   "destination.topic.name.suffix": ".mirror",
   "destination.bootstrap.servers": "kafka:9092",
   "consumer.bootstrap.servers": "kafka:9092",
   "connector.key.converter": "org.apache.kafka.connect.json.JsonConverter",
   "connector.value.converter": "org.apache.kafka.connect.json.JsonConverter",
   "consumer.client.id": "mirus-legacy_data_syncer.op.accounts1",
   "consumer.key.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
   "consumer.value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
   "transforms":"unwrap",
   "transforms.unwrap.type":"io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope",
   "transforms.unwrap.array.encoding":"document",
   "transforms.unwrap.flatten.struct":"false"
}

Am I trying to something not supported or there's more going on?

Thanks for any help!

Not able to convert header value of replicated topic from ByteArray to String

Hi,

This may not be a bug but something I'm not doing right.

I'm adding a header to Kafka message as shown below.

 final ProducerRecord rec = new ProducerRecord(topic, key, message);
 rec.headers().add(new RecordHeader("testHeader", "HeaderValue".getBytes(StandardCharsets.UTF_8)));
 final RecordMetadata metadata = (RecordMetadata) producer.send(rec).get();

And then at the consumer side I'm reading the header.

            for (final ConsumerRecord<String, String> record : records)
            {
                 final Headers sourceHeaders = record.headers();
                 if (sourceHeaders != null)
                 {
                     final Header header = sourceHeaders.lastHeader("testHeader");
                 if (header != null)
                 {
                     System.out.printf("key = %s, value = %s%n", header.key(), new String(header.value()));
                } } 

If I consume from the same topic I get the value
key = testHeader, value = HeaderValue

If I consume from the replicated topic I get the value
key = testHeader, value = SGVhZGVyVmFsdWU=

I see that mirus does header conversion using ByteArrayConverter. So how do I get the original string value from the replicated topic?

Mirus deployed in k8s pod is failing with "java.lang.OutOfMemoryError"

Our Mirus service running in K8s pod is filing with "java.lang.OutOfMemoryError". Below is the error that we are seeing in our Mirus logs.

java.lang.OutOfMemoryError: Java heap space
Dumping heap to /tmp/heapdump.bin ...
Heap dump file created [2116271685 bytes in 3.475 secs]

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Thread-2"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-33"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-14"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-13"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-1"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "server-timer"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-3"
*** java.lang.instrument ASSERTION FAILED ***: "!errorOutstanding" with message can't create name string at JPLISAgent.c line: 807

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-2"
*** java.lang.instrument ASSERTION FAILED ***: "!errorOutstanding" with message can't create name string at JPLISAgent.c line: 807

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-24"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-23"

We tried to increase Max heap size from 2 GB to 5 GB but still issue is not fixed. When checked the heap dumps, we found that the topic message data is occupying most the heap space. We also also tried by configuring the producer with below configs:

    producer.buffer.memory=10000000
    producer.batch.size=1000
    producer.acks=1

Please suggest on how to over come this issue.

Mirror not correct

Hi all,
I have problem:
image

When mirror to destination, the message changed. How resolve?

Thanks all.

API end point for health check

Is there any endpoint for Mirus, which can be used as service health check?

Basically I have deployed Mirus on K8s and looking for ways to monitor the health of my service.

MirrorMaker migration documentation

Regarding the Medium post

Mirus completely replaced Mirror Maker across all production data-centers at Salesforce in April 2018. Since then our data volumes have continued to grow.

For those who are running mirrormaker and have an active consumer group offset for their data and would prefer not to have duplicates after starting Mirus, is there a migration documentation available, or run-book that Salesforce applied for replacement?

SSL configuration

Hello,
I am evaluating different tools for data kafka replication.
I've read awesome comments about Mirus tool so I've decided give it a try.
Playing with that tool my scenario would be replicating data from k1(noSSL) --> k2(SSL enabled)
What properties should I apply in the worker to get those connections working.
I've got this worker config file.

`# Kafka broker bootstrap server - this is Source cluster
bootstrap.servers=k1Ips:9092
group.id=mirus
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
header.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
config.storage.topic=mirus-config
status.storage.topic=mirus-status
offset.storage.topic=mirus-offsets

producer.security.protocol=SSL
producer.ssl.endpoint.identification.algorithm=""
producer.ssl.truststore.location= trustore.jks
producer.ssl.truststore.password=*************
producer.ssl.keystore.location=keystore.jks
producer.ssl.keystore.password=**********
producer.ssl.key.password=************

config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
My connector looks like this:{
"name": "mirus-source",
"connector.class": "com.salesforce.mirus.MirusSourceConnector",
"tasks.max": "5",
"topics.regex.list": "^(.pattern).",
"destination.topic.name.suffix": "",
"enable.destination.topic.checking": "false",
"destination.consumer.bootstrap.servers": "k2Ips:9092",
"consumer.bootstrap.servers": "k1IPs",
"consumer.client.id": "mirus-ot2",
"consumer.key.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
"consumer.value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer"
}`

do I miss something?

Many thanks beforehand.

failed to run the quick start example

I have a try of mirus, but failed to run the quick start example, I always failed in step 4:
curl localhost:8083
curl: (7) Failed to connect to localhost port 8083: Connection refused

From the source code in Mirus.java, I do not find any code to start the rest server, is there anything I missed?

BTW, I can run the quick start example after adding some code in Mirus.java to start the rest server.

Mirroring not working

I have 2 clusters running on my machine and have created connector with below details

curl localhost:8083/connectors/mirus-quickstart-source-1/config \
      -X PUT \
      -H 'Content-Type: application/json' \
      -d '{
           "name": "mirus-quickstart-source-1",
           "connector.class": "com.salesforce.mirus.MirusSourceConnector",
           "tasks.max": "5",
           "topics.whitelist": "test-mir",
           "destination.topic.name.suffix": ".mirror",
           "destination.bootstrap.servers": "localhost:9093",
           "consumer.bootstrap.servers": "localhost:9092",
           "consumer.client.id": "mirus-quickstart",
           "consumer.key.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
           "consumer.value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer"
       }'

However the mirroring is not happening.
One broker runs with listener port 9092 and zk running on port 2181
another broker runs with listener port 9093 and zk running on port 2182
source broker -- one with listner port 9092
destination broker -- one with listner port 9092

Are my configuration for connector correct? what am i missing?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.