Coder Social home page Coder Social logo

ibmstreams / streamsx.kafka Goto Github PK

View Code? Open in Web Editor NEW
13.0 13.0 9.0 43.17 MB

Repository for integration with Apache Kafka

Home Page: https://ibmstreams.github.io/streamsx.kafka/

License: Apache License 2.0

Java 99.88% Shell 0.12%
stream-processing ibm-streams kafka apache-spark messaging toolkit

streamsx.kafka's Introduction

Kafka Toolkit

Welcome to the Kafka Toolkit. This toolkit enables SPL applications to integrate with Apache Kafka.

This version of the toolkit currently supports: Apache Kafka 0.10.2, 0.11, 1.0, 1.1, and 2.0 to 2.7. The older Kafka versions 0.9 and 0.10.0, are not supported any more. Using the Kafka operators from the com.ibm.streamsx.messaging toolkit for these server versions is not recommended.

Event Streams cloud service (Message Hub)

For building applications that connect to the IBM Cloud Event Streams service, it is recommended that the com.ibm.streamsx.messagehub toolkit be used. This toolkit provides functionality on top of the Kafka toolkit to simplify the configuration needed to connect to the public cloud service.

Migrating from com.ibm.streamsx.messaging

To migrate applications using the old Kafka operators in the com.ibm.streamsx.messaging toolkit, refer to the Migration Document.

Documentation

Documentation on GitHub

Build the toolkit

cd com.ibm.streamsx.kafka
../gradlew build

Build SPLDoc

./gradlew spldoc

NOTE: SPLDocs will be generated in the docs/spldoc directory.

Release

./gradlew release

NOTE: The release will be available in the build/release/output folder.

Test

cd tests/KafkaTests
./setup.sh -b <list_of_bootstrap_servers>
../../gradlew test

NOTE 1: setup.sh will add a domain-level app config called "kafka-tests", as well as create a properties file containing the bootstrap.servers property.

NOTE 2: Tests will run using the local domain specified by the STREAMS_DOMAIN_ID env var. All tests run in Distributed mode.

Following topics are required for the tests

topic name number of partitions
test 3
other1 1
other2 1
position 1

Create them by hand before or make sure that they are automatically created with the required number of partitions.

Hint: The test case kafkaStartPositionTest requires that the position topic is re-created before every test run.

Samples

Each sample contains a build.gradle file and a Makefile. The samples can be built/compiled by running ../../gradlew build or make from the sample directory.

streamsx.kafka's People

Contributors

anouri avatar chanskw avatar ddebrunner avatar natashadsilva avatar schulz2 avatar thomas-mattsson avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

streamsx.kafka's Issues

KafkaConsumer: Proposal to (optionally) output offset, partition and timestamp values with each message

Use Case

Currently, the operator is capable of populating tuple attributes with the message, key and topic values for a given record. The operator should also be capable of populating tuple attributes with the offset, partition and timestamp values.

Proposal

In order to support the above use case, the following operator parameters will be added:

Parameter Name SPL Type Default Description
outputOffsetAttributeName int64 offset Specifies the output attribute name that should contain the offset. If not specified, the operator will attempt to store the message in an attribute named 'offset'.
outputPartitionAttributeName int32 partition Specifies the output attribute name that should contain the partition. If not specified, the operator will attempt to store the message in an attribute named 'partition'.
outputTimestampAttributeName int64 messageTimestamp Specifies the output attribute name that should contain the record's timestamp. If not specified, the operator will attempt to store the message in an attribute named 'messageTimestamp'.

Compatibility

The proposed changes will not break compatibility with existing applications.

KafkaConsumer NPE when consistent region is enabled

04 Oct 2017 07:14:22.213 [8302] ERROR #splapptrc,J[10],P[67],KafkaStream M[AbstractKafkaConsumerOperator.java:com.ibm.streamsx.kafka.operators.AbstractKafkaConsumerOperator$1.run:387]  - Operator error
04 Oct 2017 07:14:22.214 [8302] ERROR #splapptrc,J[10],P[67],KafkaStream M[?:?:0]  - java.lang.RuntimeException: java.lang.NullPointerException
04 Oct 2017 07:14:22.215 [8302] ERROR #splapptrc,J[10],P[67],KafkaStream M[?:?:0]  - 	at com.ibm.streamsx.kafka.operators.AbstractKafkaConsumerOperator.produceTuples(AbstractKafkaConsumerOperator.java:440)
04 Oct 2017 07:14:22.215 [8302] ERROR #splapptrc,J[10],P[67],KafkaStream M[?:?:0]  - 	at com.ibm.streamsx.kafka.operators.AbstractKafkaConsumerOperator.access$000(AbstractKafkaConsumerOperator.java:45)
04 Oct 2017 07:14:22.215 [8302] ERROR #splapptrc,J[10],P[67],KafkaStream M[?:?:0]  - 	at com.ibm.streamsx.kafka.operators.AbstractKafkaConsumerOperator$1.run(AbstractKafkaConsumerOperator.java:385)
04 Oct 2017 07:14:22.216 [8302] ERROR #splapptrc,J[10],P[67],KafkaStream M[?:?:0]  - 	at java.lang.Thread.run(Thread.java:785)
04 Oct 2017 07:14:22.216 [8302] ERROR #splapptrc,J[10],P[67],KafkaStream M[?:?:0]  - 	at com.ibm.streams.operator.internal.runtime.OperatorThreadFactory$2.run(OperatorThreadFactory.java:137)
04 Oct 2017 07:14:22.216 [8302] ERROR #splapptrc,J[10],P[67],KafkaStream M[?:?:0]  - Caused by: java.lang.NullPointerException
04 Oct 2017 07:14:22.216 [8302] ERROR #splapptrc,J[10],P[67],KafkaStream M[?:?:0]  - 	at com.ibm.streamsx.kafka.operators.AbstractKafkaConsumerOperator.produceTuples(AbstractKafkaConsumerOperator.java:428)
04 Oct 2017 07:14:22.217 [8302] ERROR #splapptrc,J[10],P[67],KafkaStream M[?:?:0]  - 	... 4 more

KafkaProducer seems to expect (mandatorily) a key now

When running the our "KafkaNoKeyAttribute" regression test we receive the following exception:

Exception in thread "Thread-13" java.lang.NullPointerException
at com.ibm.streamsx.kafka.clients.AbstractKafkaClient.getSerializer(AbstractKafkaClient.java:28)
at com.ibm.streamsx.kafka.clients.producer.KafkaProducerClient.(KafkaProducerClient.java:38)
at com.ibm.streamsx.kafka.clients.producer.AtLeastOnceKafkaProducerClient.(AtLeastOnceKafkaProducerClient.java:16)
at com.ibm.streamsx.kafka.operators.AbstractKafkaProducerOperator.initProducer(AbstractKafkaProducerOperator.java:273)
at com.ibm.streamsx.kafka.operators.AbstractKafkaProducerOperator.initialize(AbstractKafkaProducerOperator.java:257)
at com.ibm.streams.operator.internal.runtime.api.OperatorAdapter.initialize(OperatorAdapter.java:736)
at com.ibm.streams.operator.internal.jni.JNIBridge.(JNIBridge.java:274)

Looking at the code leads to lines where a 'keyType' member is accessed that is connected to handling of 'key' attributes. Thus the current implementation seems to expect to always have a 'key' attribute, while the current Kafka documentation states:

public class ProducerRecord<K,V>
extends Object

A key/value pair to be sent to Kafka. This consists of a topic name to which the record is being sent, an optional partition number, and an optional key and value.

If a valid partition number is specified that partition will be used when sending the record. If no partition is specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is present a partition will be assigned in a round-robin fashion.

The record also has an associated timestamp. If the user did not provide a timestamp, the producer will stamp the record with its current time. The timestamp eventually used by Kafka depends on the timestamp type configured for the topic.
...

Provide option to skip messages that fail to be parsed

In Kafka v0.11, support is being added to "skip" over messages that fail to be parsed (either due to SerializationException or some other exception). This functionality is being added here: KAFKA-4740.

The operator should provide the user the option to specify how it wants to handle these sorts of errors.

KafkaConsumer: Add documentation for the common usage scenarios.

I think there are two common modes for the consumer that are essential to document, and then two "more advanced" modes that could also be documented. This would make it easier for folks to use the operators when using message hub, i.e. those who are not kafka experts.

By document I mean explain the model of how topics/partitions are matched to consumer operators and example SPL invocations in those modes.

Simple modes:

  • Single consumer consuming all partitions.
  • Consumer in a UDP region with each individual consumer being automatically assigned partitions.

More advanced

  • UDP region with manually assigned partitions - Does this mean the width must not change?
  • Manual invocation of multiple operators (non-UDP) with manually assigned partitions.

We should probably push the simple approaches as the way to go.

Also some documentation on handling keyed partitions and maintaining the key through SPL UDP regions.

Explanations and examples could be added to namespace-info.spl see #37

Add support for including headers in Kafka records

Kafka v0.11 will introduce the ability to add record headers to individual records. This support is being added via KIP-82.

The operators should be enhanced to allow users to specify headers that should be added to the operators.

Migration document

Need a migration document that outlines how developers should migrate their existing applications to use this new toolkit.

Some things to keep in mind when writing this:

  • operator names
  • namespace
  • parameter names
  • behavioural changes (I don't think there are any)

Format Java code.

The Java code seems to be a mix of spaces and tabs, or at least inconsistent indentation.

Before there is too much work on it, would it be possible to have a commit that formats the code to use spaces only with 4 space indentation. E.g. Select all code in Eclipse and correct the format.

Please add the ability to specify target partition to kafka producer

We should be able to specify (optionally) which partition a message should go to when producing messages. I believe ProducerRecord api can be used for this. Partition based parallelism is heavily used in our customer engagement, and our producers will have to publish specific keys to specific partitions in this very stateful application. Appreciate this feature very much.

Support kafkaProperty parameters

The deprecated Kafka operators in streamsx.messaging supported providing parameters in the SPL file as well as a properties file. This allowed using a single properties file for multiple consumers, and overriding specific parameters either statically at runtime, for example the consumer group ID. Please consider supporting that in these operators through addition of such a parameter.

NPE during reset() when partitions param is used

The following exception gets thrown during reset() if the partitions parameter is being used. The problem is related to the fact that the reset operation is attempting to retrieve the offsets for all partitions, rather than just those that were specified via the param.

java.lang.NullPointerException
	at com.ibm.streamsx.kafka.clients.OffsetManager.getOffset(OffsetManager.java:70)
	at com.ibm.streamsx.kafka.clients.consumer.KafkaConsumerClient.lambda$refreshFromCluster$4(KafkaConsumerClient.java:320)
	at com.ibm.streamsx.kafka.clients.consumer.KafkaConsumerClient$$Lambda$9.000000008C0DFC50.accept(Unknown Source)
	at java.util.ArrayList.forEach(ArrayList.java:1260)
	at java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1091)
	at com.ibm.streamsx.kafka.clients.consumer.KafkaConsumerClient.refreshFromCluster(KafkaConsumerClient.java:318)
	at com.ibm.streamsx.kafka.clients.consumer.KafkaConsumerClient.reset(KafkaConsumerClient.java:369)
	at com.ibm.streamsx.kafka.clients.consumer.KafkaConsumerClient.startEventLoop(KafkaConsumerClient.java:250)
	at com.ibm.streamsx.kafka.clients.consumer.KafkaConsumerClient$1.run(KafkaConsumerClient.java:109)
	at java.lang.Thread.run(Thread.java:785)
	at com.ibm.streams.operator.internal.runtime.OperatorThreadFactory$2.run(OperatorThreadFactory.java:137)

Problem with Consistent Cut

While migrating code to the new Kafka toolkit I came across a problem with Consistent Cut
and the KafkaConsumer. Attached you'll find example code and the logs generated when
this code is run on my machine.

In the delivered configuration the app runs into exceptions that lead to no output data being
generated (the data that went through Kafka). When the '@consistent' annotation is commented,
the application works as expected.

consistentRegionInKafka.zip

ConnectionState.java:org.apache.curator.ConnectionState.checkState - Authentication faile

 kafka version 0.10 ( zookeeper of kafka with kerberos authentication)
 streams version 4.2.1.2
 com.ibm.streamx.kafka version : 1.1.1 ( I add the code to pass the kerberos)

 the function of KafkaConsumer and KafkaProducer work well.    

but when use KafkaConsumer, there was following eror in the pe.out.

SPL:
(stream KafkaStream_GnData_Out) as KafkaStream_GnData = KafkaConsumer()
{
param
topic: "B_EBU_GSM_NEW";
propertiesFile : "/home/streamimcdad/conf/consumer.properties" ;
}
() as SinkOp = Custom(KafkaStream_GnData_Out)
{
logic
onTuple KafkaStream_GnData_Out : {
println("message-->"+ message);
}
}

12 Oct 2017 17:43:06.704 [32515] ERROR #splapptrc,J[42],P[67],SinkOp M[ConnectionState.java:org.apache.curator.ConnectionState.checkState:245] - Authentication failed
12 Oct 2017 17:43:07.753 [32524] ERROR #splapptrc,J[42],P[67],SinkOp M[ConnectionState.java:org.apache.curator.ConnectionState.checkState:245] - Authentication failed
12 Oct 2017 17:43:08.807 [32534] ERROR #splapptrc,J[42],P[67],SinkOp M[ConnectionState.java:org.apache.curator.ConnectionState.checkState:245] - Authentication failed
12 Oct 2017 17:43:09.859 [32540] ERROR #splapptrc,J[42],P[67],SinkOp M[ConnectionState.java:org.apache.curator.ConnectionState.checkState:245] - Authentication failed
12 Oct 2017 17:43:10.907 [32547] ERROR #splapptrc,J[42],P[67],SinkOp M[ConnectionState.java:org.apache.curator.ConnectionState.checkState:245] - Authentication failed
12 Oct 2017 17:43:11.939 [32558] ERROR #splapptrc,J[42],P[67],SinkOp M[ConnectionState.java:org.apache.curator.ConnectionState.checkState:245] - Authentication failed
12 Oct 2017 17:43:12.973 [32563] ERROR #splapptrc,J[42],P[67],SinkOp M[ConnectionState.java:org.apache.curator.ConnectionState.checkState:245] - Authentication failed
12 Oct 2017 17:43:14.015 [32571] ERROR #splapptrc,J[42],P[67],SinkOp M[ConnectionState.java:org.apache.curator.ConnectionState.checkState:245] - Authentication failed
12 Oct 2017 17:43:15.070 [32590] ERROR #splapptrc,J[42],P[67],SinkOp M[ConnectionState.java:org.apache.curator.ConnectionState.checkState:245] - Authentication failed
12 Oct 2017 17:43:16.130 [32643] ERROR #splapptrc,J[42],P[67],SinkOp M[ConnectionState.java:org.apache.curator.ConnectionState.checkState:245] - Authentication failed
image

Add additional tests

Need tests for the following:

  • each of the supported message and key attribute types (int, blob, float64, etc)
  • app config vs. properties file
  • startPosition param (KafkaConsumer)
  • each of the different attribute parameters (topicAttrName, outputMessageAttrName, etc)

Support for offsets as startPosition

From IBMStreams/streamsx.messagehub#19:

The startPosition parameter currently supports begin/end and time. For jobs that need to restart from a known position there is a requirement for offsets similar to time. While time is similar to offset there is a concern that it is not as accurate/explicit as offsets for high rate feeds that need to guarantee processing all data after a restart where millisecond precision is not sufficient. James had raised that offsets are partition based so we will need to take that into account.

Proposal: Add ability to send and receive message timestamp data

Use case

Every message published to a Kafka topic contains a timestamp value. By default, the underlying Kafka API will append the record "creation" time to the Kafka message. However, there are instances where users will want to set their own timestamp when publishing messages.

Likewise, when consuming messages, being able to retrieve the timestamp information is important.

Proposal

KafkaProducer

In order to support the above use case, the following parameters will be added to the operator to allow users to specify a message timestamp.

Parameter Name SPL Type Default Description
timestampAttribute int64 messageTimestamp Specifies the attribute on the input port that contains the timestamp for the message. If not specified, the operator will look for an input attribute named messageTimestamp. If this parameter is not specified and there is no input attribute named messageTimestamp, the operator will use the timestamp provided by the underlying Kafka API.

KafkaConsumer

In order to support the above use case, the following parameter will be added to the operator to allow users to retrieve a message timestamp.

Parameter Name SPL Type Default Description
outputTimestampAttributeName int64 messageTimestamp Specifies the output attribute name that should contain the record's timestamp. If not specified, the operator will attempt to store the message in an attribute named 'messageTimestamp'.

Compatibility

The above proposal will not break compatibility with existing applications

KafkaProducer not throwing exception

The KafkaProducer operator is not propagating exceptions received by the Kafka API. The offending code is here: https://github.com/IBMStreams/streamsx.kafka/blob/develop/com.ibm.streamsx.kafka/impl/java/src/com/ibm/streamsx/kafka/clients/producer/AtLeastOnceKafkaProducerClient.java#L23

    public boolean processTuple(ProducerRecord producerRecord) throws Exception {
        //logger.trace("Sending: " + producerRecord);
        producer.send(producerRecord);
        return true;
    }

Instead of calling the super classes send() method, it's calling directly into Kafka's KafkaProducer send() method, skipping the error handling and callback code.

Toolkit version is not in adherence with guidelines

The version of the toolkit that the info.xml specifies, it is not in adherence with guidelines: https://github.com/IBMStreams/administration/wiki/Toolkit-Versions
The toolkit has to specify 3 digits only: major, minor and micro. The 4th qualifier is specified for the build identity. It can be set by spl-make-toolkit -i . -v <major.minor.micro.qualifier> during the build/release process if required.
Please reduce the digits number in the version attribute that you provide in info.xml

KafkaConsumer does not restart if exception is thrown during initialize()

The Consumer object is created in a background thread that is run from the initialize() method. A lock is used to wait for the Consumer object to finish being created before continuing with initialize(). In the event of a problem, a RuntimeException is thrown from the background thread. According to the docs for operatorContext.getThreadFactory():

Any uncaught exception thrown by the Runnable passed to the ThreadFactory.newThread(Runnable) will cause the processing element containing the operator to terminate.

The undocumented caveat here is that the initialize() method needs to first complete before the runtime terminates the operator. Unfortunately, the lock is not being released if an exception is thrown, which results in the initialize() method not completing and thus the operator does not terminate/restart.

Add support for transactional messaging

Kafka v0.11 will add support for transactional messaging. More information on this support can be found in KIP-98.

The operator should be enhanced to take advantage of this functionality.

KafkaConsumer never updates offsets in a periodic consistent region.

Changes made for #45 changed the way the offsets were saved through OffsetManager for a consistent region in operatorDriven mode to be the last message submitted. This then removed any drain processing that previously submitted all the messages currently on messageQueue.

However, when the consistent region is periodic there is no mechanism to update offsetManager so it remains fixed at the earliest offset. Thus while this will result in "at least once" it's really inefficient and might break once the initial offset has disappeared (maybe not, would have to check). It's inefficient because every reset will cause a re-read of all messages since the start, not just those since the last successful cut.

Fix could be one of:

Need additional samples

The following samples should be added for the v1.0.0 release:

  • Simple sample
  • Samples demonstrating how to use non-string key and message values
  • Sample demonstrating how to load balance KafkaConsumers while in a UDP region
  • Sample demonstrating how to use a custom partitioner
  • Sample showing how to use appConfig param
  • Sample showing how to use the "attributeName" properties

All samples should also contain a README.md that describes the sample and explains how to run it.

Rename `appConfig` param to `appConfigName`

The previous set of operators contained a parameter called appConfigName. To reduce the amount of work required to migrate to the new toolkit, the current parameter name should be the same.

Please provide option to pass in client.id as a parameter

In our customer engagement, we have a very parallel and stateful Streams application. We achieve parallelism using UDP, and having kafka consumer inside each UDP channel, subscribing to a particular kafka partition, all to the same topic. Without specifying a client id, we are running into problems - kafka doesn't honor client offsets in this situation. Each time we start streams application, consumers start from the very beginning of the topic. Specifying client.id, resolved this problem. Please add the ability to specify optional client.id. Thank you.

Review and complete NLS support for operators

A full review needs to be performed on the operators to ensure that all messages intended for users have been externalized.

Once that is complete, the message file(s) need to be translated into the different languages.

Bad bounds check can cause exception

The bounds check here isn't sufficient for the next step:

It confirms there is space for at least 1 element, but then pulls a complete bundle. If there's not enough space (e.g. high rate streams, or backpressure from downstream) then an exception is thrown and the operator crashes and restarts. Probably the check needs to be improved, or possibly the returned items need to be cached if there's not enough space.

Discussion: Specifying Kafka configurations

The producer and consumer APIs that are provided by Apache Kafka contain numerous configuration options that can change how the operators interact with the Kafka cluster. How these configuration options are specified is a vital part of using these operators within a Stream application.

Goals

In this post, I would like to propose a solution for allowing users to specify Kafka configuration within the context of a Kafka operator This proposal attempts to make it possible for the operators to be configured in any one of the following scenarios:

  1. Allow the user to provide a set of Kafka properties via a file
  2. Allow the user to configure an operator to connect to MessageHub with as little effort as possible
  3. Allow the user to provide Kafka configurations via Streams application configurations
    • Support specifying multiple app configs, so different configs can be shared
    • Support specifying MessageHub credentials via an app config
  4. Allow the user to specify Kafka configurations at submission-time via a parameter
  5. (Forward-looking) When running on Bluemix, if the MessageHub service is bound to the Streaming Analytics service (this is not possible as of this writing), operators should automatically be able to determine the MessageHub credentials from the VCAP env var and connect to MessageHub.

Solution

In order to achieve the above goals, the following options for specifying the configurations will be available in the operator:

  • The propertiesFile parameter will be used to specify a properties file containing kafka configurations
    • Multiple properties files can be specified to allow for having "shared" configurations between different Kafka operators in the same application
  • The messageHubCredFile parameter will be used to specify the file containing the credentials for a MessageHub service
  • The appConfigNames parameter will be used to specify one or more application configurations containing Kafka properties
    • All properties stored in the application configurations specified will be loaded as Kafka configurations. This means that the property names must match the names of the Kafka configurations.
    • Multiple application configurations can be specified to enable having both shared and application-specific app configs
    • A special "message_hub.credentials" property can be specified in an app config to allow users to easily create a connection to a MessageHub service.
  • The kafkaProperties parameter will be used to allow the user to specify configurations at submission-time (the user could also hard-code configurations using this property)
  • (Forward-looking) The operator will be able to retrieve the MessageHub credentials from the VCAP system environment variable and connect to that MessageHub service.
    • Since it is not currently possible to bind a MessageHub service to a Streaming Analytics service, no further details will be provided at this time for how this will be done. This option is being included here so that if this type of binding ever becomes possible, the operators can be updated with minimal effort to support this functionality.

Running in BlueMix

One of the primary goals of this document is to make it as simple as possible to run these operators in a BlueMix environment. To that end, the operators will provide 2 options for specifying the MessageHub credentials:

  • Via the messageHubCredFile parameter
    • Points to a file containing the entire contents of the MessageHub credentials, pasted verbatim
  • Via the message_hub.credentials app config property
    • This value of this property will contain the entire contents of the MessageHub credentials, pasted verbatim

The operator will perform in the following way when MessageHub credentials are read in:

  • The operator will parse out the necessary information from these credentials and set the appropriate configuration options to allow the operator to connect.
  • At a minimum, the user will not need to provide any additional configurations to allow the operators to connect
    • If necessary, additional configuration options can be specified using any of the other methods (i.e. via app config, properties files, etc)

Property Precedence

The precedence for reading Kafka configurations from the various locations will be done in descending order starting from "most easily changed at submission-time". This means that during operator initialization, the Kafka configurations will be loaded from the following locations in the given order. If duplicate properties are found, the value from the first property read will be used.

  1. Submission-time value
  2. Application configuration (if running distributed)
  3. VCAP system env variable
  4. MessageHub credentials file
  5. Properties files

SASL/JAAS

Kafka v0.10 introduced a new property called sasl.jaas.config, which allows users to pass in their JAAS credentials as a property value rather than using a Java system property to point to a file. This property will enable multiple Kafka operators to run within the same JVM but potentially connect to different secured Kafka clusters.

The new set of Kafka operators will eliminate the jaasFile and jaasFilePropName parameters and encourage users to specify their JAAS credentials via this property.

Furthermore, when connecting to a Bluemix environment using the MessageHub credentials, the Kafka operators will construct the necessary the value for the sasl.jaas.config property, eliminating the need for users to have to build/manage their own JAAS files.

Too many libs

In project's sub-directory ./opt/downloaded reside too many libraries. The following
libs are downloaded but not really used:
gson-2.8.0.jar
jopt-simple-5.0.3.jar
kafka_2.10-0.10.2.0.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
metrics-core-2.2.0.jar
scala-library-2.10.6.jar
slf4j-api-1.7.21.jar
slf4j-log4j12-1.7.21.jar
snappy-java-1.1.2.6.jar
zkclient-0.10.jar
zookeeper-3.4.9.jar

FileNotFound exception being returned if properties file is not present

Operators should log an error that the properties file does not exist, but it should not cause the operator to restart.

Exception in thread "Thread-11" java.io.FileNotFoundException: /home/streamsadmin/.streams/var/Streams.sab_ALZ-StreamsDomain-StreamsInstance/9e35385b-fd9c-4107-b019-39234e80b6ac/3919e8e769c403e5051b4cea8beec2da1bb72ce351b42fa3c86e62b4/BuildConfig1/toolkits/com.ibm.streamsx.health.ingest.kafka/etc/consumer.properties (No such file or directory)
	at java.io.FileInputStream.open(FileInputStream.java:212)
	at java.io.FileInputStream.<init>(FileInputStream.java:152)
	at com.google.common.io.Files$FileByteSource.openStream(Files.java:120)
	at com.google.common.io.Files$FileByteSource.openStream(Files.java:110)
	at com.google.common.io.ByteSource$AsCharSource.openStream(ByteSource.java:456)
	at com.google.common.io.CharSource.read(CharSource.java:232)
	at com.google.common.io.Files.toString(Files.java:261)
	at com.ibm.streamsx.kafka.operators.AbstractKafkaOperator.loadFromPropertiesFile(AbstractKafkaOperator.java:130)
	at com.ibm.streamsx.kafka.operators.AbstractKafkaOperator.loadProperties(AbstractKafkaOperator.java:116)

KafkaConsumer: triggerCount should specify the exact value for initiating a checkpoint

In the current implementation of the triggerCount parameter, the value specified by the user will be the minimum number of tuples that will be submitted prior to initiating a checkpoint. This implies that the actual number of tuples that get submitted before the operator is checkpointed is not deterministic.

The KafkaConsumer operator needs to be enhanced so that the value specified for triggerCount represents the exact number of tuples that will be submitted prior to a checkpoint.

KafkaConsumer: Proposal to add optional input port to allow for dynamically specifying the topic-partition offset to consume from

Use Case

When the KafkaConsumer operator starts, it has the ability to read messages either from the beginning of a topic or the end (via the startPosition) parameter. Further, by enabling consistent region support, the operator can begin consuming from the offset position that is saved during a checkpoint operation.

However, the operator does not currently provide the ability for an application to explicitly specify the offsets that the operator should begin consuming from. This feature is useful when the application is restarted and needs to continue consuming from a previously known offset. In this scenario, the application will need to have stored the previous offset in an external data store (i.e. a database).

Proposal

To support this, an optional input port will be added to the KafkaConsumer operator to allow applications to dynamically add a list of topic-partitions to consume from or to remove a list of topic-partitions that should no longer be consumed from. Furthermore, adding and remove topic-partitions will be supported any point during the application's runtime (not just at startup).

Adding topic-partitions

  • In the case where a user wants to add topic partitions, they must provide the following information:
    • topic - topic to subscribe to
    • partition - partition to be assigned to
    • offset - offset to begin consuming messages from
  • In the event that offset provided by the user does not exist, the operator will leave it up to the underlying KafkaConsumer API to decide what to do
    • By default, the KafkaConsumer API will use the auto.offset.reset config to either instruct the consumer to begin reading from the beginning of the topic (earliest message), the end of the topic (latest message) or to throw an OffsetOutOfRangeException
  • If the consumer is already assigned to a topic-partition that gets submitted to the input port, the consumer will seek to the new offset (or put another way, the current offset that the consumer is reading from will be overridden by the value submitted via the input port)

Remove topic-partitions

  • In the case where a user wants to remove topic partitions, they must provide the following information:
    • topic - topic to unsubscribe from
    • partition - partition to unsubscribe from
  • If the consumer is not assigned to a topic-partition that the user is attempting to remove, then no action is taken

Input port schema

The input port schema must contain a single rstring attribute that accepts a JSON message in order to add or remove a topic-partition. The JSON message will have the following schema:

{
  "action" : ["ADD" | "REMOVE"]
  "topicPartitionOffsets" : [
    {
      "topic" : "topic-name",
      "partition" : <partition_number>,
      "offset" : <offset_number>
    },
    ...
  ]
}

Convenience functions will be provided to aid in creating the messages:

/* Creates the JSON message to add a single topic-partition to the operator and to begin consuming at the specified offset. */
rstring addTopicPartitionMessage(rstring topic, int32 partition, int64 offset);

/* Creates the JSON message to add multiple topic-partitions to the operator. The operator will begin consuming from the topic-partitions at the specified offset */
rstring addTopicPartitionMessage(list<tuple<rstring topic, int32 partition, int64 offset>> topicPartitionsToAdd);

/* Creates the JSON message to remove a single topic-partition from the operator. */
rstring removeTopicPartitionMessage(rstring topic, int32 partition);

/* Creates the JSON message to remove multiple topic-partitions from the operator. */
rstring removeTopicPartitionMessage(list<tuple<rstring topic, int32 partition>> topicPartitionsToRemove);

Limitations

The following limitations will apply:

  • When the input port is specified, the topic, partitions and startPosition parameters should not be defined (a warning will be displayed during compile-time that these parameter values will be ignored). Topics and partitions to consume from must be specified by submitting a tuple to the input port.
  • When the input port is specified, the operator will not begin consuming messages until at least one tuple is received on the input port that adds a topic-partition

Compatibility

The above proposal will not break compatibility with existing applications.

SPL Examples

The following are examples of possible SPL applications that add or remove topic-partitions:

Add topic partitions

        (stream<TopicPartitionsOffsetUpdate> TopicPartitionUpdateStream) as TopicPartitionUpdater = Custom()
        {
            logic
                onProcess:
                {
                    rstring addSingleTopicMessage = addTopicPartitionMessage("test1", 0, 90l);
                    submit({jsonString = addSingleTopicMessage}, TopicPartitionUpdateStream);

                    rstring addMultipleTopicsMessage = addTopicPartitionMessage([
                      {topic="test1", partition=1, offset=100l},
                      {topic="test2", partition=0, offset=0l}
                    ])
                    submit({jsonString = addMultipleTopicsMessage}, TopicPartitionUpdateStream);               
                }
        }
    
        (stream<rstring message> MessageOutStream) as KafkaConsumerOp =
            KafkaConsumer(TopicPartitionUpdateStream)
        {
            param
                propertiesFile : getThisToolkitDir() +
                    "/etc/consumer.properties" ;
        }

Remove topic-partitions

NOTE: The offset value doesn't matter since the topic-partitions are being removed.

        (stream<TopicPartitionsOffsetUpdate> TopicPartitionUpdateStream) as TopicPartitionUpdater = Custom()
        {
            logic
                onProcess:
                {
                    rstring removeSingleTopicMessage = removeTopicPartitionMessage("test1", 0);
                    submit({jsonString = removeSingleTopicMessage}, TopicPartitionUpdateStream);

                    rstring removeMultipleTopicsMessage = removeTopicPartitionMessage([
                      {topic="test1", partition=1},
                      {topic="test2", partition=0}
                    ])
                    submit({jsonString = removeMultipleTopicsMessage}, TopicPartitionUpdateStream);  
                }
        }
    
        (stream<rstring message> MessageOutStream) as KafkaConsumerOp =
            KafkaConsumer(TopicPartitionUpdateStream)
        {
            param
                propertiesFile : getThisToolkitDir() +
                    "/etc/consumer.properties" ;
        }

Kafka Toolkit v1.3.0

The Kafka Toolkit v2.0.0 will support Apache Kafka v0.11.x. The following enhancements to the toolkit need to be performed:

  • Add transactional messaging support (new in v0.11)
  • Enhance the operator to better handle records that fail to be parsed (new in v0.11)
    • Will likely result in a new parameter (errorPolicy?)
  • Add support for including headers in records (new in v0.11)

Kafka Toolkit v1.0.0 - Release Checklist

The following tasks should be completed before publishing v1.0.0 of the Kafka toolkit:

  • [Issue #6] Tests
  • [Issue #7] Samples
  • [Issue #8] Add NLS language files
  • [Issue #9] Complete and review any missing SPLDoc

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.