Coder Social home page Coder Social logo

duncanfrazer / connector-for-apache-kafka Goto Github PK

View Code? Open in Web Editor NEW

This project forked from pelioniot/connector-for-apache-kafka

0.0 0.0 0.0 368 KB

A source and sink Kafka Connector for interacting with Pelion IoT platform.

License: Apache License 2.0

Java 100.00%

connector-for-apache-kafka's Introduction

Connector-for-Apache-Kafka®

Connector-for-Apache-Kafka

Connector-for-Apache-Kafka is a Kafka Connector plugin which includes both a Source and a Sink connector. The Source connector is used to consume data from Pelion IoT platform (device registrations, observations and responses) and store them to Apache Kafka. The Sink Connector reads messages from a Kafka topic (device management requests) and forwards them to Pelion IoT for processing. When used in tandem, the two connectors allow communicating with IoT devices by simply posting and reading messages to/from Kafka topics. Together with the extensive support of a number of connectors already available for external system that integrate with Apache Kafka (see Confluent Hub), the Pelion connector can be used to easily integrate in a scalable and standardized way with all those systems.

Please have a look at our demo-example guide to quickly get started by bootstrapping a local development Kafka cluster using Docker and installing both the Pelion connector as well as Elastic and Amazon S3 connectors to forward Pelion IoT messages to those stores. If you don't own Pelion Ready hardware yet, you can still testbed the platform by using our Pelion Virtual Demo to spin-off a simulated device able to connect to Pelion IoT platform.

Installation

Download the ZIP file and extract it into one of the directories that is listed on the Connect worker's plugin.path configuration properties. This must be done on each of the installations where Connect will be run. For more information, refer to Confluent documentation page for installing Connector plugins.

Source Connector Configuration

com.pelion.connect.dm.source.PelionSourceConnector

An example template configuration of the source connector (source-quickstart-pelion.properties) can be found in the repository. You can use it as a starting guide and make any necessary adjustments for your own environment.

The table below outlines information about each configuration setting:

Config Value Type Required Default Value Description
tasks.max Int Yes 1 The number of tasks this connector will start.
key.converter String Yes org.apache..StringConverter The key converter to use when storing messages.
value.converter String Yes io.confluent..AvroConverter The value converter to use when storing messages.
pelion.api.host String No api.us-east-1.mbedcloud.com The Pelion API host. Not required unless you use an on-premise instance.
pelion.access.key.list List Yes None A list of Pelion Access Keys. The list should match the number of tasks configured since each task would be assigned an access key from the list. NOTE: Each Access Key should belong to a different Pelion Application group.
topic.prefix String Yes None The prefix to use when constructing the topic names to store Pelion messages. We follow the schema: $topic.prefix.{notifications,registrations,responses}
max.retries Int No 10 The maximum number of times to retry to connect to notification channel before failing the task.
retry.backoff.ms Int No 60000 (1min) The time in milliseconds to wait following a notification channel close before a retry attempt is made.
resource.type.mapping List No Empty A list of resource id's and their respective data types.The form should be <resource_id>:{s:string, i:integer, d:double: b:bool}. If not defined, payload would be processed as a string.
subscriptions List Yes None A list of custom name aliases given for each pre-subscription configuration. During initial connector bootstrap, the list would be equally distributed according to the provided max.tasks.
subscriptions.$alias.endpoint-name String Yes None The endpoint ID (optionally having an * character at the end) e.g: "node-001" or "node*".
subscriptions.$alias.resource-path List Yes None List of resources to pre-subscribe (optionally having an * character at the end) e.g: "/3200/0/5501, /3303/*, ..".
subscriptions.$alias.endpoint-type String Yes None The endpoint type e.g: "Sensor".

NOTE: For protecting the access keys and avoiding being specified in a properties file in a production environment, follow the Externalizing Secrets guide provided by Confluent to securely store and utilize secrets in connector configurations.

Consuming Pelion messages

kafka-avro-console-consumer \
   --bootstrap-server localhost:9092 \
   --property schema.registry.url=http://localhost:8081 \
   --topic ${topic.prefix}.registrations
kafka-avro-console-consumer \
   --bootstrap-server localhost:9092 \
   --property schema.registry.url=http://localhost:8081 \
   --topic ${topic.prefix}.notifications
kafka-avro-console-consumer \
   --bootstrap-server localhost:9092 \
   --property schema.registry.url=http://localhost:8081 \
   --topic ${topic.prefix}.responses

Sink Connector Configuration

com.pelion.connect.dm.sink.PelionSinkConnector

An example template configuration of the sink connector (sink-quickstart-pelion.properties) can be found in the repository. You can use it as a starting guide and make any necessary adjustments for your own environment.

The table below outlines information about each configuration setting:

Config Value Type Required Default Value Description
tasks.max Int Yes 1 The number of tasks this connector will start. If more that one, each task would be assigned the same Pelion access key to invoke device management requests.
key.converter String Yes org.apache..StringConverter The key converter to use when storing messages.
value.converter String Yes io.confluent..AvroConverter The value converter to use when storing messages.
topics List Yes None A list of topics the connector listens for device management requests.
pelion.api.host String No api.us-east-1.mbedcloud.com The Pelion API host. Not required unless you use an on-premise instance.
pelion.access.key String Yes None The Pelion Access Key to invoke device management requests
max.retries Int No 10 The maximum number of times to retry on errors before failing the task.
retry.backoff.ms Int No 3000 The time in milliseconds to wait following an error before a retry attempt is made.
ignore.errors Boolean No True Whether the sink connector should ignore device requests response errors and continue processing (default true).

Example Sending a request

  1. Startup a Kafka consumer to listen for responses from Pelion Device Management:

    kafka-avro-console-consumer \
       --bootstrap-server localhost:9092 \
       --property schema.registry.url=http://localhost:8081 \
       --topic ${topic.prefix}.responses
    
  2. Startup a producer to send a device request. The format should follow the request avro schema:

    kafka-avro-console-producer --broker-list localhost:9092 \
    --property schema.registry.url=http://localhost:8081 --topic ${topic}.requests \
    --property value.schema="$(cat ./demo-example/configs/device-request-schema.avsc)"
    
    {"ep":"01795a4c18c60000000000010011c8c5","async_id":"my-async-id-get","retry":null,"expiry_seconds":null,"body": {"method":"GET","uri":"/3200/0/5501","accept":null,"content_type":null,"payload_b64":null}}
    
  3. After a bit you should receive the following message printed in the consumer console:

    {"id":"my-async-id-get","status":200,"error":null,"payload":"300","ct":{"string":"text/plain"},"max_age":{"int":0}}
    

Development

Building the Connecto

At the root of the project issue:

cd kafka-connect-pelion
mvn clean package

Enabling Debug mode

Prior to installing a Source/Sink connector configuration, you can enable DEBUG mode to trace messages from the connector tasks by issuing the following command:

curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/admin/loggers/com.pelion.connect.dm -d '{"level": "DEBUG"}' | jq '.'

 

“Apache and Kafka are registered trademark of the Apache Software Foundation”.

connector-for-apache-kafka's People

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.