Coder Social home page Coder Social logo

confluent-tutorial's Introduction

download


How to setup confluent Kafka.

  1. Account Setup
  2. Cluster Setup
  3. Kafka Topic
  4. Obtain Cloud secrets
  5. Obtain Schema secrets

Create a conda environment

conda create -p venv python==3.7 -y

Activate conda environment

conda activate venv

To use confluent kafka we need following details from Confluent dashboard.

confluentClusterName = ""
confluentBootstrapServers = ""
confluentTopicName = ""
confluentApiKey = ""
confluentSecret = ""

Add below library in requirements.txt

confluent-kafka[avro,json,protobuf]
pyspark==3.2.1

Read data from kafka topic

Import necessary packages

from pyspark.sql import SparkSession

Create a spark session object using below snippet.

spark_session=SparkSession.builder.master("local[*]").appName("Confluent").getOrCreate()

Read data from kafka topic

df = (spark_session
          .readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", confluentBootstrapServers)
          .option("kafka.security.protocol", "SASL_SSL")
          .option("kafka.sasl.jaas.config",
                  "org.apache.kafka.common.security.plain.PlainLoginModule  required username='{}' password='{}';".format(confluentApiKey, confluentSecret))
          .option("kafka.ssl.endpoint.identification.algorithm", "https")
          .option("kafka.sasl.mechanism", "PLAIN")
          .option("subscribe", confluentTopicName)
          .option("startingOffsets", "earliest")
          .option("failOnDataLoss", "false")
          .load()
          )

process read data from kafka topic

df = (df.withColumn('key_str',df['key'].cast('string').alias('key_str')).drop('key').withColumn('value_str',df['value'].cast('string').alias('key_str')))

Write data in json file.

    query = (df.selectExpr("value_str").writeStream
             .format("json")
             .option("format", "append")
             .trigger(processingTime="5 seconds")
             .option("checkpointLocation", os.path.join("csv_checkpoint"))
             .option("path", os.path.join("json"))
             .outputMode("append")
             .start()
             )
    query.awaitTermination()

Write data in csv file

    query = (df.writeStream
             .format("csv")
             .option("format", "append")
             .trigger(processingTime="5 seconds")
             .option("checkpointLocation", os.path.join("csv_checkpoint"))
             .option("path", os.path.join("csv"))
             .outputMode("append")
             .start()
             )
    query.awaitTermination()

Write data to kafka topic

    query = (df.writeStream
             .format("kafka")
             .option("kafka.bootstrap.servers", confluentBootstrapServers)
             .option("kafka.security.protocol", "SASL_SSL")
             .option("kafka.sasl.jaas.config",
                     "org.apache.kafka.common.security.plain.PlainLoginModule  required username='{}' password='{}';".format(
                         confluentApiKey, confluentSecret))
             .option("kafka.ssl.endpoint.identification.algorithm", "https")
             .option("kafka.sasl.mechanism", "PLAIN")
             .option("checkpointLocation", os.path.join("kafka_checkpoint"))
             .option("topic", confluentTopicName).start())


    query.awaitTermination()

Note: Don't run your python script using python command use below command to run your script for kafka confluent.


To run python script

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1 <scipt_name.py>

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.