Coder Social home page Coder Social logo

ibmstreams / streamsx.messaging Goto Github PK

View Code? Open in Web Editor NEW
30.0 28.0 32.0 4.17 MB

This toolkit is focused on interacting with popular messaging systems such as Kafka, JMS, XMS, and MQTT. After release v5.4.2 the complete toolkit will be deprecated. See the README.md file for hints to alternative toolkits.

Home Page: http://ibmstreams.github.io/streamsx.messaging/

License: Apache License 2.0

Perl 9.81% C++ 3.37% Java 85.66% Makefile 1.15%
ibm-streams jms kafka xms mqtt iot-platform rabbitmq websphere-mq stream-processing

streamsx.messaging's Introduction

README -- IBMStreams/streamsx.messaging

The IBMStreams/streamsx.messaging toolkit project is an open source Streams toolkit project focused on the development of operators and functions that extend IBM InfoSphere Streams ability to interact with messaging systems.


After release v5.4.2 the complete toolkit is deprecated. Please, use the following alternatives to communicate with various message queues:

Message Queue New Toolkit
JMS streamsx.jms
Kafka streamsx.kafka
MQTT streamsx.mqtt
RabbitMQ streamsx.rabbitmq
XMS no toolkit planned yet

Messaging Toolkit v5.1 is offically released to support InfoSphere Streams v4.2:

This toolkit contains support for:

  • Kafka
  • RabbitMQ
  • JMS
  • MQTT
  • XMS

Check out our SPLDOC here:

To get started with working with this toolkit:

To learn more about Streams:

streamsx.messaging's People

Contributors

alex-cook4 avatar alexpogue avatar chanskw avatar conglisc avatar ddebrunner avatar dmhursh avatar ejpring avatar lcawley avatar natashadsilva avatar ovidiuparvu avatar petenicholls avatar rohitsw avatar schubon avatar schulz2 avatar zollnapa avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

streamsx.messaging's Issues

JMS: Empty message should provide information about message

With XMS operator, when we use the "empty" message class, we allow user to use "output" function to assign values for the attributes on the output tuple from XMSSource.

With JMS operator, the JMSSource simply emits an empty tuple.

The empty message class is for testing if the connection to messaging server can be made. We should take advantage of the message class, and print out message information of the message being received when this is used.

Create operators to support Apache Kafka

Create operators to exchange data using Apache Kafka messaging system including:

  1. Producer to write messages to a Kafka topic
  2. Consumer to read messages from a Kafka topic

The Consumer should support consumer groups as well as both queuing and publish-subscribe models.

The Apache Kafka project is located here http://kafka.apache.org/

Support more than just blob with MQTT operators.

Not sure exactly what is supported today (the SPLDOC is not complete) but I tried to have an input stream with a single rstring attribute containing JSON to MQTTSink and it failed saying a blob attribute was needed.

It should be easy at least to pass rstring values into the operators and send them out as messages.

MQTT: Support for error output port

  1. add error output port for MQTTSource and MQTTSink
  2. Add replacement for output function for getError()
  3. Determine how errors should really be handled and what should get sent down to the error output port.

MQTTSink topic parameter, unable to use attribute as topic

Since we have moved to implement MQTT operators in Java, the MQTTSink operator can no longer take the topic from an attribute of the data stream. Creating this issue to track the problem and we need to figure out how to lift this limitation.

Migration Documentation from InfoSphere Streams messaging toolkit to streamsx.messaging

  1. Toolkit and namespace change
  2. remove of format parameter in MQTTSource and MQTTSink
  3. MQTTSource now take a list of comma-separated strings and qos... rather than an SPL list.
  4. MQTTSink - topic parameter can no longer accept attribute, to refer to an attribute, use topicAttribute parameter (Issue #17)
  5. MQTTSource - no longer support getTopic() output function, user need to specify topicOutAttribute parameter
  6. qos is no longer an unsigned int - need to be signed integer
  7. qos is attribute free, if referring to attribute, use qosAttribute (Issue #23)
  8. Issue #18 - getError output method is no longer available for MqttSource and MqttSink operators

KAFKA: Add tests

Tests to support basic operations, various consumer configurations and etc

Ant build set up conflicts with Studio

Problem:

  1. the ant build puts the output to impl/java/classes, Studio puts the output to impl/java/bin by default. This causes lots of problems when we intermix Studio build with ant build. Changing the build script to send output to impl/java/bin to be consistent.
  2. when running the build script, we end up having two copies of *$StreamModel.class file in the toolkit directory. This causes problem with spl-make-toolkit. It will not index the toolkit and complains that processing of the *$StreamModel.class file is already done.

Solution:

  1. make build output to impl/java/bin - to be consistent with Studio
  2. exclude *$StreamModel.class file from the final jar file. Do not think they are needed at runtime, only needed for compile time.
  3. configure studio project to send generated source file to src-gen directory, so the generated source is not intermixed with actual source
  4. configure ant script to send generated source file to src-gen directory to be consistent with Studio.
  5. do not remove the impl/java/bin directory after jar-ing up the files. The bin directory will be needed for spl-make-toolkit to run.

With this set up, Studio and Ant can work more nicely together. You can use either way to build and would not cause trouble.

MQTT: Determine how to replace output functions from old MQTT operators

In the C++ version of the MQTT operators, the operators have the following output functions:

  1. MQTTSource has getError(), getTopic()
  2. MQTTSink has getError()

Since the operators are now implemented in Java, we can no longer have output functions. Need to figure out how to replace these functions.

MQTTSink: Add retain parameter

Add retain parameter to allow clients to control if messages should be retained by MQTT providers when there is no active subscriber.

MQTTSink No reason given for failure to publish message

I'm trying to have a Streams application connect to an MQTT virtual machine appliance, and it failed with only this:

CDIST1554E Unable to publish message

Even tracing at INFO didn't add any useful info.

Be useful if there was some confirmation that the server was connected to successfully.

spldoc error in messaging toolkit

Found these in the build log.

spldoctoolkit:
[echo] Tookit to SPLDOC: /Data/chanskw/gitrepos/gitspree/streamsx.messaging/samples/KafkaConsumerSample
[exec] CDISP0543W WARNING: The /Data/chanskw/gitrepos/gitspree/streamsx.messaging/samples/KafkaConsumerSample/doc/spldoc/dita/toolkits/toolkits.xml file is not indexed because it is in a reserved directory.
[exec] CDISP0543W WARNING: The /Data/chanskw/gitrepos/gitspree/streamsx.messaging/samples/KafkaConsumerSample/doc/spldoc/dita/tk$KafkaConsumerSample/tk$KafkaConsumerSample.xml file is not indexed because it is in a reserved directory.
[exec] The /Data/chanskw/gitrepos/gitspree/streamsx.messaging/samples/KafkaConsumerSample/doc/spldoc output directory is being used.
[exec] CDISP0543W WARNING: The /Data/chanskw/gitrepos/gitspree/streamsx.messaging/samples/KafkaConsumerSample/doc/spldoc/dita/toolkits/toolkits.xml file is not indexed because it is in a reserved directory.
[exec] CDISP0543W WARNING: The /Data/chanskw/gitrepos/gitspree/streamsx.messaging/samples/KafkaConsumerSample/doc/spldoc/dita/tk$KafkaConsumerSample/tk$KafkaConsumerSample.xml file is not indexed because it is in a reserved directory.
[exec] The documents are being generated.
[exec] /Data/chanskw/gitrepos/gitspree/streamsx.messaging/samples/KafkaConsumerSample/com.ibm.streamsx.messaging.sample.kafka/KafkaConsumerSample.spl:13:1: CDISP7109E ERROR: Unknown SPL artifact referenced by link "KafkaConsumer". Correct the reference to the SPL artifact in the SPLDOC link.
[exec] CDISP7097E ERROR: Generation complete with 1 errors and 0 warnings. Top-level document: /Data/chanskw/gitrepos/gitspree/streamsx.messaging/samples/KafkaConsumerSample/doc/spldoc/html/index.html
[exec] CDISP9228E ERROR: The java command run by the spl-make-doc command failed. The return code is: 1.
[exec] Result: 1

spldoctoolkit:
[echo] Tookit to SPLDOC: /Data/chanskw/gitrepos/gitspree/streamsx.messaging/samples/KafkaProducerSample
[exec] CDISP0543W WARNING: The /Data/chanskw/gitrepos/gitspree/streamsx.messaging/samples/KafkaProducerSample/doc/spldoc/dita/toolkits/toolkits.xml file is not indexed because it is in a reserved directory.
[exec] CDISP0543W WARNING: The /Data/chanskw/gitrepos/gitspree/streamsx.messaging/samples/KafkaProducerSample/doc/spldoc/dita/tk$KafkaProducerSample/tk$KafkaProducerSample.xml file is not indexed because it is in a reserved directory.
[exec] The /Data/chanskw/gitrepos/gitspree/streamsx.messaging/samples/KafkaProducerSample/doc/spldoc output directory is being used.
[exec] CDISP0543W WARNING: The /Data/chanskw/gitrepos/gitspree/streamsx.messaging/samples/KafkaProducerSample/doc/spldoc/dita/toolkits/toolkits.xml file is not indexed because it is in a reserved directory.
[exec] CDISP0543W WARNING: The /Data/chanskw/gitrepos/gitspree/streamsx.messaging/samples/KafkaProducerSample/doc/spldoc/dita/tk$KafkaProducerSample/tk$KafkaProducerSample.xml file is not indexed because it is in a reserved directory.
[exec] The documents are being generated.
[exec] /Data/chanskw/gitrepos/gitspree/streamsx.messaging/samples/KafkaProducerSample/com.ibm.streamsx.messaging.sample.kafka/KafkaProducerSample.spl:15:1: CDISP7109E ERROR: Unknown SPL artifact referenced by link "KafkaProducer". Correct the reference to the SPL artifact in the SPLDOC link.

MQTT: Tests shows that MQTT operators can lose tuples

Reran the MQTT tests and tests are failing randomly. The same test runs in one run, but will fail in another run.

It appears that sometimes the MQTT operators are not gett the first "n" tuples after the application has started. In one case, I am always missing the first tuple, in some other cases, we missed first hundreds of tuples.

Possible issue:

  1. new mqtt client?
  2. spree?
  3. test case problem?

This problem seems to show up since we have moved to the 1.0 version of the MQTT client.

Add MQTTPublish and MQTTSubscribe operators.

The verbs used by MQTT are publish and subscribe, not source and sink, the current operator names are confusing, always having to stop and think which one publishes messages.

I have an MQTTPublish operator that extends the MQTTSink operator Java class with these differences:
a) Removes the topicAttributeName parameter
a) Adds a topicAttribute parameter using the new attribute passing mechanism supported by Streams 3.2.2 beta.

This then becomes a more natural operator for MQTT users and for Streams users.
MQTTSink continues to work, but new code using 3.2.2 beta or later should be encouraged to use MQTTPublish.

I'll submit a pull request later today so others can see what it looks like.

Cannot build toolkit

Seems like you're missing a directory creation step. When I type ant at the top level, I get:

compile:
    [javac] Compiling 11 source files to /homes/hny2/hildrum/github/IBMStreams/streamsx.messaging/com.ibm.streamsx.messaging/impl/java/bin
    [javac] javac: directory not found: impl/java/src-gen
    [javac] Usage: javac <options> <source files>
    [javac] use -help for a list of possible options

BUILD FAILED
/homes/hny2/hildrum/github/IBMStreams/streamsx.messaging/build.xml:57: The following error occurred while executing this line:
/homes/hny2/hildrum/github/IBMStreams/streamsx.messaging/com.ibm.streamsx.messaging/build.xml:95: Compile failed; see the compiler error output for details.

Total time: 11 seconds

What format do the new MQTT operators use for the messaging?

The MQTT operators in the Streams 3.2.0 product allow the specification of the format used for messages bin, or block.

This parameter is not available in the new toolkit, for the Streams 3.2.2. open beta, so if i want to have messages that can be sent to a 3.2.0 instance or a 3.2.2 beta instance, what format should I use in the execution of the 3.2.0 operators?

I assume it is block?

Enhance connection mechanism with Kafka operators

With the messaging toolkit operators, all operators support the following connection mechanism. I am wondering if it makes sense for the Kafka operators to also support these:

  1. support for connection document
  2. support for connection and reconnection policy
  3. dynamic update for connection - i.e. ability to switch Kafka servers on the fly with a control signal.

Add support for clean session as well as durable connections for MQTT operators.

For transient clients the ability to have a clean session rather than a durable one would be useful for MQTTSink and Source,

http://mosquitto.org/man/mqtt-7.html

Clean session / Durable connections

On connection, a client sets the "clean session" flag, which is sometimes also known as the "clean start" flag. If clean session is set to false, then the connection is treated as durable. This means that when the client disconnects, any subscriptions it has will remain and any subsequent QoS 1 or 2 messages will be stored until it connects again in the future. If clean session is true, then all subscriptions will be removed for the client when it disconnects.

Create MQTT Java operator

Create MQTT operators to:

  1. subscribe and read messages from MQTT-compliant servers to Streams
  2. publish messages to MQTT-compliant server from Streams

The MQTT operators will work with the MQTT Paho Client library:
http://www.eclipse.org/paho/

MQTT: Support for SSL connection

Allow for SSL connection from MQTT operators to MQTT providers.

  1. serverURI - need to allow for ssl:// scheme
  2. add trustStore, keyStore, and keyStorePassword parameteres
  • applicable for both MQTTSource and MQTTSink operators

MQTT operator support for MQTT 3.1.1 Spec

Investigate support of MQTT 3.1.1 with MQTT operators
MQTT 3.1.1 provides support for failover and HA, where we can pass in a list of servers to the client. The client will attempt the reconnect and automatically try other servers on the list when a connection is lost.

Support for transaction for JMS

Should consider supporting transacted session in JMS operator.
Transaction and Persistent messages are designed and tuned to work together with WMQ.

Investigate transaction commit and rollback and what it means for Qos in JMS operators.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.