salesforce / mirus Goto Github PK
View Code? Open in Web Editor NEWMirus is a cross data-center data replication tool for Apache Kafka
License: BSD 3-Clause "New" or "Revised" License
Mirus is a cross data-center data replication tool for Apache Kafka
License: BSD 3-Clause "New" or "Revised" License
By default Kafka Consumer has auto.offset.reset
policy configured to latest. But it looks that the implementation in this connector is reverse -- unless it is configured it will start with earliest.
mirus/src/main/java/com/salesforce/mirus/MirusSourceTask.java
Lines 161 to 167 in 4b48244
And that value is extracted from the consumer prefixed ones:
Would it make sense to make the default the same as the normal connector to keep it consistent with normal consumer groups?
That KIP was added a few months back and is based on similar ideas as here, but built by Comcast. https://github.com/Comcast/MirrorTool-for-Kafka-Connect
Curious if anyone has any inputs as if this is "better" due to integration with monitoring or dynamic cluster support, etc?
Hi,
We have an issue with Mirus version 0.6.9. We are having excessive number of logs with the Exception thrown while calling task.commitRecord().
Exception thrown while calling task.commitRecord() (org.apache.kafka.connect.runtime.WorkerSourceTask)
java.lang.NullPointerException
at com.salesforce.mirus.metrics.MirrorJmxReporter.recordMirrorLatency(MirrorJmxReporter.java:162)
at com.salesforce.mirus.MirusSourceTask.commitRecord(MirusSourceTask.java:241)
at org.apache.kafka.connect.runtime.WorkerSourceTask.commitTaskRecord(WorkerSourceTask.java:470)
at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$sendRecords$4(WorkerSourceTask.java:387)
at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1365)
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:228)
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197)
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:684)
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:655)
at org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:574)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
at org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:561)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:561)
at org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$3(Sender.java:785)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:584)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:576)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:327)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:242)
at java.base/java.lang.Thread.run(Thread.java:829)
From what I see the actual issue is that topic which is initialized in mirrorJmxExporter and the topic which is passed to recordMirrorLatency method are not the same.
Topics for mirrorJmxExporter are initialized here
But topic which is sent to recordMirrorLatency
is taken from sourceRecord.When SourceRecord is built topic is prefixed/suffixed so it becomes destination topic
So finally when record is passed to mirrorJmxExporter it tries to get something for that topic and breaks.
After compiling the Mirus with changing the topic for recordMirrorLatency with something like this
mirrorJmxReporter.recordMirrorLatency(sourceRecord.sourcePartition().get("topic").toString(), latency) the problem goes away (this would not be correct way to solve this problem, but tried it just to see if it helps).
From what I see the problem got introduced in Mirus in this PR #89
This should be fixed since there is no config option or something similar to disable this behavior.
BR,
Hrvoje
Hi Team,
I had a mirus pipeline working on a source Kafka cluster to a destination server and I used DNS connection url in the connection string.
I spun up destination Kafka cluster in new servers for upgrading Kafka from 1.1 to 2.1.
Since then, Mirus is failing to start with the below exception:
[2019-03-27 09:21:21,458] ERROR Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
org.apache.kafka.connect.errors.ConnectException: Error while attempting to create/find topic(s) 'mirus-offsets'
at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:255)
at org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:99)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:127)
at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:109)
at org.apache.kafka.connect.runtime.Worker.start(Worker.java:174)
at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:114)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:215)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [Topic authorization failed.]
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262)
at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:228)
... 11 more
Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [Topic authorization failed.]
[2019-03-27 09:21:21,480] INFO DefaultSessionIdManager workerName=node0 (org.eclipse.jetty.server.session)
Mirus stops after writing the above error in the logs.
I also verified that the mirus-* topics have required ACL's added on them.
Please advice on how to fix it.
While doing the GET operation for displaying Task status, I'm seeing passwords in plain text. Is there any way to edit/mask the config parameters (like passwords in my case) while doing the GET call
Below is the sample example:
curl localhost:8083/connectors/source-2-destination/tasks | jq
[
{
"id": {
"connector": "source-2-destination",
"task": 0
},
"config": {
"partitions": "[{"topic":"topicname","partition":0}]",
"consumer.ssl.truststore.password": "<plaintext password>",
"consumer.ssl.endpoint.identification.algorithm": "",
"consumer.group.id": "mirus-source-2-destination-grp",
"consumer.ssl.truststore.location": "/path/.jks",
"consumer.bootstrap.servers": "destination-kafka-cluster:9093",
"task.class": "com.salesforce.mirus.MirusSourceTask",
"consumer.value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
"consumer.security.protocol": "SSL",
"consumer.ssl.keystore.location": "/path/keystore.jks",
"consumer.ssl.keystore.password": "<plaintext password>",
"consumer.client.id": "source-2-destinationsource-2-destination-0",
"consumer.key.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer"
}
},
Hi Team,
We are seeing some of the tasks going to "FAILED" state (with few tasks in running state)with the below error details:
"id": 1,
"state": "FAILED",
"worker_id": "<ip>:<port>",
"trace": "java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy\n\tat org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:435)\n\tat org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:466)\n\tat java.io.DataInputStream.readByte(DataInputStream.java:265)\n\tat org.apache.kafka.common.utils.ByteUtils.readVarint(ByteUtils.java:168)\n\tat org.apache.kafka.common.record.DefaultRecord.readFrom(DefaultRecord.java:292)\n\tat org.apache.kafka.common.record.DefaultRecordBatch$1.readNext(DefaultRecordBatch.java:263)\n\tat org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next(DefaultRecordBatch.java:568)\n\tat org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next(DefaultRecordBatch.java:537)\n\tat org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.nextFetchedRecord(Fetcher.java:1210)\n\tat org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1245)\n\tat org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:1099)\n\tat org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:545)\n\tat org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:506)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1269)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1200)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1135)\n\tat com.salesforce.mirus.MirusSourceTask.poll(MirusSourceTask.java:173)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:245)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:221)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n"
},
We tried installing libc6-compat
in our pods but it didn't fix. Please advice on how to fix it.
Dear Mirus team,
We follow the quick-start and successfully run it.
We follow the kafka distributed connect steps connect distributed, the worker and the connector are created and run with no errors, but events are not replicated. We also run "mirus-start.sh" with the same result.
Is there a guide for running in a "distributed mode"?
Regards,
Pablo.
Currently both cvsList and jsonList end their Strings with \n:
"connector-id,topic,1,123\n",
This causes the tests to fail on Windows.
The tests should either not include the line separator or take into account the System.lineSeparator()
We already have a Kafka Connect cluster, so it would be ideal to run Mirus as yet another connector in our cluster instead of running it as a separate server. Is this possible?
Thanks.
AFAICT, the features of prefix and suffix can be done with a SMT
For example
transforms=TopicRename
transforms.TopicRename.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.TopicRename.regex=(.*)
transforms.TopicRename.replacement=mirrored.$1.some-suffix
Dear Mirus Team,
First of all, thanks for this excellent project.
We start a connector cluster (consisting in 3 connector worker servers) and copy mirus jar library to plugin directory and successfully replicate topics from one cluster (origin) to another (destination).
We would like to use JMX metrics to monitor mirus status in the connector cluster.
We found in project source class (AbstractMirusJmxReporter):
metrics.addReporter(new JmxReporter("mirus"));
and in ConnectorJmxReporter class:
private static final String CONNECTOR_JMX_GROUP_NAME = "connector-metrics";
We try to get the Mbean path using visualVM, but we couldn't find the mirus path in JVM Mbeans.
Do we need to enable something in configuration?
Regards,
Pablo
It'd be great to have this deployed on Maven Central. Let me know if you need help.
Dear Mirus Tema,
We detect that if an error of "org.apache.kafka.common.errors.RecordTooLargeException" from kafka server occurs, mirus flush data, fail and retry flushing again producing out of space disk considerably fast.
Could you please check this?
Thanks,
Pablo.
[2019-05-31 15:47:32,630] ERROR WorkerSourceTask{id=test_flight_deduplication-1} failed to send record to flight-deduplication-openstack-openstack: {} (org.apache.kafka.connect.runtime.WorkerSourceTask)
org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
[2019-05-31 15:47:33,796] INFO WorkerSourceTask{id=test_flight_deduplication-1} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2019-05-31 15:47:33,797] INFO WorkerSourceTask{id=test_flight_deduplication-1} flushing 73 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2019-05-31 15:47:34,082] INFO WorkerSourceTask{id=test_flight_deduplication-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2019-05-31 15:47:34,083] INFO WorkerSourceTask{id=test_flight_deduplication-0} flushing 69 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
I had a look around and I believe the tool is able to identify if the partition count between source and destination topic is the same.
Is there any plans to be able to replicate other topic metadata like retention, max message size, etc?
Thanks.
Dear Mirus team,
We would like to monitor how far is topic replica from one cluster to another using mirus. Using mirror-maker we use the following to monitor "lag":
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mirroring-data --describe
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
Where LAG is obtained from log-end-offset (highwatermark) minus current-offset.
How can we do a similar function using mirus? We can access to mirus jmx values.
Thanks,
Pablo.
In kafka connect we can access the latest offset in the group_offset connect topic.
kafka-console-consumer.sh --bootstrap-server localhost:9092 --property print.key=true --topic mirus-offsets
Theres a KIP in kafka KIP-196 in WIP for Add metrics to Kafka Connect framework.
Hi Mirus team,
I'm able to mirror data from source to destination for a topic with open (*) permissions on port 9092. But, I'm not able to setup/mirror data over SSL connection. Can you help with the steps/document (if any) to set SSL connectivity.
Regards,
Hari
Hey guys,
After starting kafka locally, and start mirus and getting into the state that:
curl localhost:8083/connectors/mirus-quickstart-source/status
{"name":"mirus-quickstart-source","connector":{"state":"RUNNING","worker_id":"10.126.22.44:8083"},"tasks":[{"state":"RUNNING","id":0,"worker_id":"10.126.22.44:8083"}],"type":"source"}
$ bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
This is my first event
$ bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
This is my first event
Processed a total of 1 messages
--- sad story starts here:
$ bin/kafka-console-consumer.sh --topic test.mirror --from-beginning --bootstrap-server localhost:9092
^CProcessed a total of 0 messages
I was using all default configs, thanks~
How should I resolve the issue?
I'm new to Kafka and Mirus and have been experimenting with the quickstarts for both of them. It was pretty easy to get the default configuration going (hats off to the Mirus team) on a Centos VM, but now I'd to like to set up replication from one instance of Kafka to another that is on a separate VM. Changing destination.bootstrap.servers to the IP of second VM had no effect. Assuming this is possible with the Mirus quickstart can someone advise on my next steps? Thank you.
I don't see a reason to include your own when Kafka Connect has provided that converter since at least version 0.11
Dear Mirus Team,
Setting the option "mirus.connector.auto.restart.enabled" and "mirus.task.auto.restart.enabled" in connect-distributed.properties configuration file seams to be not working. Task always "RESTART" and connector is created and logs show "The configuration 'mirus.connector.auto.restart.enabled' / 'mirus.task.auto.restart.enabled' was supplied but isn't a known config.", with no line that set a default value or config value.
The default value is true (com/salesforce/mirus/config/MirusConfigDefinition.java)
Could you please check?
Thanks,
Pablo.
Hello,
First of all thanks for this tool, it looks super nice!
I've been trying to make a different setup where I actually apply a transform in between the mirroring, the problem is that my transform receives a null
in the record apply
as the kafka connect interface suggests, here is the example:
kafka-connect_1 | Caused by: java.lang.NullPointerException
kafka-connect_1 | at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:187)
kafka-connect_1 | at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:44)
kafka-connect_1 | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
kafka-connect_1 | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
configuration
{
"connector.class": "com.salesforce.mirus.MirusSourceConnector",
"tasks.max": "1",
"topics.whitelist": "legacy_data_syncer.op.accounts",
"destination.topic.name.suffix": ".mirror",
"destination.bootstrap.servers": "kafka:9092",
"consumer.bootstrap.servers": "kafka:9092",
"connector.key.converter": "org.apache.kafka.connect.json.JsonConverter",
"connector.value.converter": "org.apache.kafka.connect.json.JsonConverter",
"consumer.client.id": "mirus-legacy_data_syncer.op.accounts1",
"consumer.key.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
"consumer.value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
"transforms":"unwrap",
"transforms.unwrap.type":"io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope",
"transforms.unwrap.array.encoding":"document",
"transforms.unwrap.flatten.struct":"false"
}
Am I trying to something not supported or there's more going on?
Thanks for any help!
Hi,
This may not be a bug but something I'm not doing right.
I'm adding a header to Kafka message as shown below.
final ProducerRecord rec = new ProducerRecord(topic, key, message);
rec.headers().add(new RecordHeader("testHeader", "HeaderValue".getBytes(StandardCharsets.UTF_8)));
final RecordMetadata metadata = (RecordMetadata) producer.send(rec).get();
And then at the consumer side I'm reading the header.
for (final ConsumerRecord<String, String> record : records)
{
final Headers sourceHeaders = record.headers();
if (sourceHeaders != null)
{
final Header header = sourceHeaders.lastHeader("testHeader");
if (header != null)
{
System.out.printf("key = %s, value = %s%n", header.key(), new String(header.value()));
} }
If I consume from the same topic I get the value
key = testHeader, value = HeaderValue
If I consume from the replicated topic I get the value
key = testHeader, value = SGVhZGVyVmFsdWU=
I see that mirus does header conversion using ByteArrayConverter. So how do I get the original string value from the replicated topic?
Our Mirus service running in K8s pod is filing with "java.lang.OutOfMemoryError". Below is the error that we are seeing in our Mirus logs.
java.lang.OutOfMemoryError: Java heap space
Dumping heap to /tmp/heapdump.bin ...
Heap dump file created [2116271685 bytes in 3.475 secs]
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Thread-2"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-33"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-14"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-13"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-1"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "server-timer"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-3"
*** java.lang.instrument ASSERTION FAILED ***: "!errorOutstanding" with message can't create name string at JPLISAgent.c line: 807
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-2"
*** java.lang.instrument ASSERTION FAILED ***: "!errorOutstanding" with message can't create name string at JPLISAgent.c line: 807
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-24"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-23"
We tried to increase Max heap size from 2 GB to 5 GB but still issue is not fixed. When checked the heap dumps, we found that the topic message data is occupying most the heap space. We also also tried by configuring the producer with below configs:
producer.buffer.memory=10000000
producer.batch.size=1000
producer.acks=1
Please suggest on how to over come this issue.
Is there any endpoint for Mirus, which can be used as service health check?
Basically I have deployed Mirus on K8s and looking for ways to monitor the health of my service.
ReplayPolicy.IGNORE filters all the messages: duplicates and a new ones.
The policy is set by default, so mirus just ignores all the messages from the source cluster, which is useless.
See:
#65
#64 (comment)
https://github.com/salesforce/mirus/blob/master/src/main/java/com/salesforce/mirus/MirusSourceTask.java#L256
Regarding the Medium post
Mirus completely replaced Mirror Maker across all production data-centers at Salesforce in April 2018. Since then our data volumes have continued to grow.
For those who are running mirrormaker and have an active consumer group offset for their data and would prefer not to have duplicates after starting Mirus, is there a migration documentation available, or run-book that Salesforce applied for replacement?
Hello,
I am evaluating different tools for data kafka replication.
I've read awesome comments about Mirus tool so I've decided give it a try.
Playing with that tool my scenario would be replicating data from k1(noSSL) --> k2(SSL enabled)
What properties should I apply in the worker to get those connections working.
I've got this worker config file.
`# Kafka broker bootstrap server - this is Source cluster
bootstrap.servers=k1Ips:9092
group.id=mirus
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
header.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
config.storage.topic=mirus-config
status.storage.topic=mirus-status
offset.storage.topic=mirus-offsets
producer.security.protocol=SSL
producer.ssl.endpoint.identification.algorithm=""
producer.ssl.truststore.location= trustore.jks
producer.ssl.truststore.password=*************
producer.ssl.keystore.location=keystore.jks
producer.ssl.keystore.password=**********
producer.ssl.key.password=************
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
My connector looks like this:
{
"name": "mirus-source",
"connector.class": "com.salesforce.mirus.MirusSourceConnector",
"tasks.max": "5",
"topics.regex.list": "^(.pattern).",
"destination.topic.name.suffix": "",
"enable.destination.topic.checking": "false",
"destination.consumer.bootstrap.servers": "k2Ips:9092",
"consumer.bootstrap.servers": "k1IPs",
"consumer.client.id": "mirus-ot2",
"consumer.key.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
"consumer.value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer"
}`
do I miss something?
Many thanks beforehand.
I have a try of mirus, but failed to run the quick start example, I always failed in step 4:
curl localhost:8083
curl: (7) Failed to connect to localhost port 8083: Connection refused
From the source code in Mirus.java, I do not find any code to start the rest server, is there anything I missed?
BTW, I can run the quick start example after adding some code in Mirus.java to start the rest server.
I have 2 clusters running on my machine and have created connector with below details
curl localhost:8083/connectors/mirus-quickstart-source-1/config \
-X PUT \
-H 'Content-Type: application/json' \
-d '{
"name": "mirus-quickstart-source-1",
"connector.class": "com.salesforce.mirus.MirusSourceConnector",
"tasks.max": "5",
"topics.whitelist": "test-mir",
"destination.topic.name.suffix": ".mirror",
"destination.bootstrap.servers": "localhost:9093",
"consumer.bootstrap.servers": "localhost:9092",
"consumer.client.id": "mirus-quickstart",
"consumer.key.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
"consumer.value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer"
}'
However the mirroring is not happening.
One broker runs with listener port 9092 and zk running on port 2181
another broker runs with listener port 9093 and zk running on port 2182
source broker -- one with listner port 9092
destination broker -- one with listner port 9092
Are my configuration for connector correct? what am i missing?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.