Coder Social home page Coder Social logo

qubole / streaminglens Goto Github PK

View Code? Open in Web Editor NEW
17.0 7.0 5.0 74 KB

Qubole Streaminglens tool for tuning Spark Structured Streaming Pipelines

Home Page: http://www.qubole.com

License: Apache License 2.0

Scala 100.00%
spark scala structured-streaming sparklens streaming-pipeline streaming spark-streaming micro-batches cluster-management sla

streaminglens's Introduction

Streaminglens

Build Status

Streaminglens is a profiling tool for Spark Structured Streaming Applications running in micro-batch mode. Since the execution plan for each micro-batch is identical, we can continuously learn from previous micro-batches to predict the ideal spark cluster configurations for the next micro-batch. Streaminglens analyzes the excecution run of last micro-batch every five minutes to give an overall idea of the health of the streaming pipeline.

During this analysis, Streaminglens calculates the critical time to complete a micro-batch. Critical Time is the minimum time a Spark job would take to complete if it is run with infinite executors. For more information on how critical time is calculated visit our blog on Spark tunig tool. Streaminglens also takes expected micro-batch SLA as input and it expects every micro-batch to complete before the specified SLA.

Based on the comparison of critical time, actual batch running time and expected microbatch SLA, streaminglens decides the state of the streaming pipeline as Optimum, Underprovisioned, Overprovisioned or Unhealthy and gives appropiate recommendations to tune the spark cluster.

Streaming Pipeline States

State Description
No New Batches No new data in the pipeline. If this state persists despite data ingestion, some batch may be stuck, check Spark UI
Overprovisioned Batch completion time is far less than expected SLA, so you can downscale cluster to reduce costs. On the other hand, if yo see this state but the stream is lagging, lower Trigger Interval or check for specific recommendations.
Optimum Streaming Pipeline is meeting SLA comfortably.
Underprovisioned You need to Upscale Cluster to match Expected Microbatch SLA
Unhealthy You need to increase ingestion at Source level by altering spark cluster or pipeline configurations; Check for Specific Recommendations

How are insights reported?

Streaminglens reports insights through following ways:

  • Insights are printed in Driver logs of Spark Application
  • Streaminglens publishes its metrics through Dropwizard. You can view these metrics through any of the supported metrics sinks in Apache Spark. For more details on various metrics sinks, visit Apache Spark documentation.
  • You can use your own custom reporter to see the aggregated health of the streaming pipeline and recommendations every one hour. For more details see the Using Custom Reporters section.

What insights can you expect?

Streaminglens reports following insights after analyzing the execution of last micro-batch:

  • Comparison of Batch Running Time, Critical Time & Trigger Interval
  • State of streaming pipeline as Underprovisioned, Optimum, Overprovisioned or Unhealthy.
  • Aggregated state of the streaming pipeline every one hour along with recommendations in case the pipeline is not optimum.

A sample insight printed in driver log is shown below:

|||||||||||||||||| StreamingLens Inisights |||||||||||||||||||||||||
BatchId: 8
Analysis Time: 00s 013ms
Expected Micro Batch SLA: 10s 000ms
Batch Running Time: 02s 094ms
Critical Time: 02s 047ms
Streaming Query State: OVERPROVISIONED
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||

An example of aggregated state and recommendations:

Streaming Query State: OVERPROVISIONED
Recommendations:
~ Tune source configurations to ingest more data
~ Decrease the value of trigger Interval to process latest data
~ You can decrease the number of executors to reduce cost

How to build Streaminglens?

Streaminglens is built using Apache Maven](http://maven.apache.org/).

To build Streaminglens, clone this repository and run:

mvn -DskipTests clean package

This will create target/spark-streaminglens_2.11-0.5.0.jar file which contains streaminglens code and associated dependencies. Make sure the Scala and Java versions correspond to those required by your Spark cluster. We have tested it with Java 7/8, Scala 2.11 and Spark version 2.4.0.

How to use Streaminglens?

1. Adding streaminglens jar to spark application

You can add Streaminglens jar to Spark jobs launched through spark-shell or spark-submit by using the --packages command line option. For example, to include it when starting the spark shell:

$ bin/spark-shell --packages com.qubole:spark-streaminglens_2.11:0.5.3

Alternatively, if you have the streaminglens jar avaialbale, you can add it to your spark-submit command line options using the --jars option

--jars /path/to/spark-streaminglens_2.11-0.5.3.jar

You could also add this to your cluster's spark-defaults.conf so that it is automatically avaialable for all applications.

2. Initializing streaminglens

You need to initialize streaminglens by adding following line to your code

import com.qubole.spark.StreamingLens

val streamingLens = new StreamingLens(sparkSession, options)

Streaminglens requires spark session object and a Map[String, String] to be passed as parameters. For more details about various options see the Configuring Streaminglens Options section below.

3. Using custom reporters

You can use your own custom reporter to report aggregated health of streaming pipeline and assoicated recommendations. You just need to extend StreamingLensEventsReporterInterface and pass the class name with streamingLens.reporter.className option and set streamingLens.reporter.enabled to true.

You can also configure the reporting frequecy through streamingLens.reporter.intervalMinutes option. By default, aggregated health and recommendations are reported every one hour.

If you need to pass any additional options to your StreamingLens reporter class, prefix them with streamingLens.reporter and pass them along with other streaminglens parameters.

4. Configuring Streaminglens Options

Streaminglens supports various configuration options.

Name Default Meaning
streamingLens.analysisIntervalMinutes 5 mins Frequency of analysis of micro-batches
streamingLens.criticalPathAnalysis. overProvisionPercentage 0.3 (or 30%) Percentage below which to consider spark cluster as over-provisoned, example: if batch running time is less than 30% of expected micro-batch SLA, cluster is considered over-provisioned
streamingLens.criticalPathAnalysis. underProvisionPercentage 0.7 (or 70%) Percentage above which to consider spark cluster as under-provisioned, example: if batch running time is more than 70% of expected micro-batch SLA, cluster is considered under-provisioned
streamingLens.criticalPathAnalysis. criticalPathPercentage 0.7 (or 70%) Percentage above which to consider spark application configured incorrectly, example: if critical time is more than 70% of expected micro-batch SLA, pipeline is unhealthy and spark cluster is improperly configured
streamingLens.minBatches 1 Minimum no of batches which must be completed before doing next analysis
streamingLens.maxResultsRetention 30 Number of analysis results to retain in-memory
streamingLens.maxBatchesRetention 10 Number of batches for which metrics are retained in-memory
streamingLens.maxAnalysisTimeSeconds 5 Number of seconds to wait before timeout analysis
streamingLens.maxRetries 3 Number of retries in case some error occurs during analysis
streamingLens.shouldLogResults true Whether to print analysis results in spark driver logs
streamingLens.reporter.enabled false Whether to dump analysis results in any custom output
streamingLens.expectedMicroBatchSLAMillis 1000 * 60 * 2 Interval in milliseconds for SLA
streamingLens.reporter.className Fully resolved classname for reporter class
streamingLens.reporter.discountFactor 0.95 Exponential factor by which to discount earlier microbatches while computing aggregated state
streamingLens.reporter.intervalMinutes 60 Frequency of reporting the health of streaming query

5. Setting expected micro-batch SLA

You can set expected micro-batch SLA (in milliseconds) for a particular streaming query or for the whole spark streaming application. To set expected micro-batch SLA for whole application, pass it in the options map with key streamingLens.expectedMicroBatchSLAMillis while starting Streaminglens.

To set expected micro-batch SLA for a streaming query, use the below StreamingLens API

streaminglens.updateExpectedMicroBatchSLA(queryName: String, sla: Long)

If expected micro-batch SLA is not set, default value of streamingLens.expectedMicroBatchSLAMillis is used for all the streaming queries.

6. Stopping Streaminglens

You can stop streaminglens in a running spark application using following Streaminglens API:

streamingLens.stop()

Acknowledgement

Streaminglens implementation would not have been possible without referencing the implementation of Sparklens.

streaminglens's People

Contributors

abhishekd0907 avatar indit-qubole avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

streaminglens's Issues

StreamingLens cannot be cast to scala.runtime.Nothing

While running Qubole pipeline facing the following exception

App > java.lang.ClassCastException: com.qubole.spark.streaminglens.StreamingLens cannot be cast to scala.runtime.Nothing$
App > at scala.Option.foreach(Option.scala:407)
App > at org.apache.spark.sql.streaming.StreamingQueryManager.updateInsightManagerTrigger(StreamingQueryManager.scala:508)
App > at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:489)
App > at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:319)

What could be the possible reasons for it and what's the solution around it?

StreamingLens Insights always showing "Streaming Query State: NONEWBATCHES" in Logs.

Hi All,

I am using StreamingLens in my spark structure streaming application but it's always showing same logs .BatchId is getting updated but Streaming Query State: NONEWBATCHES remains same.
can someone suggest why the State and recommendations are not updating in logs.

|||||||||||||||||| StreamingLens Insights |||||||||||||||||||||||||
BatchId: 344
Analysis Time: 00s 000ms
Expected Micro Batch SLA: 120s 000ms
Batch Running Time: 00s 000ms
Critical Time: 00s 000ms
Streaming Query State: NONEWBATCHES
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||

21/10/01 15:50:04 WARN QueryInsightsManager: Streaming Lens failed key not found: BatchDescription(e68c3c2c-6d5f-469e-864a-)

Spark Submit Command:

spark-submit
--verbose
--name SparkStreamingLens
--num-executors 1
--conf streamingLens.reporter.intervalMinutes=1
--jars /home/abc/jars/spark-streaminglens_2.11-0.5.3.jar,
/home/abc/jars/kafka-clients-0.10.2.1.jar,
--master yarn
--deploy-mode cluster
--driver-cores 1 --driver-memory 2G --executor-cores 1 --executor-memory 2G
--supervise --class com.data.datalake.SparkStreamingLens
/home/abc/jar/SparkStreamingLens-spark-utility_2.11-1.0.jar

@abhishekd0907 @itsvikramagr @shubhamtagra @jsensarma @mjose007 @akumarb2010 @itsvikramagr @Indu-sharma
@akumarb2010 @iamrohit @beriaanirudh @mayurdb @michaelmior @rishitesh @emlyn @vrajat @fdemesmaeker @indit-qubole Kindly Suggest.

Kindly Guide if is there anything needs to change here.

https://github.com/qubole/streaminglens/blob/master/src/main/scala/com/qubole/spark/streaminglens/common/results/AggregateStateResults.scala

https://github.com/qubole/streaminglens/blob/master/src/main/scala/com/qubole/spark/streaminglens/common/results/StreamingCriticalPathResults.scala

As in Project (com.qubole.spark.streaminglens.QueryInsightsManager) below code is available to fetch the insights.

| |||||||||||||||||| StreamingLens Inisights |||||||||||||||||||||||||
| BatchId: ${results.batchId}
| Analysis Time: ${pd(results.analysisTime)}
| Expected Micro Batch SLA: ${pd(results.streamingCriticalPathResults.expectedMicroBatchSLA)}
| Batch Running Time: ${pd(results.streamingCriticalPathResults.batchRunningTime)}
| Critical Time: ${pd(results.streamingCriticalPathResults.criticalTime)}
| Streaming Query State: ${results.streamingCriticalPathResults.streamingQueryState.toString}
| ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
""".stripMargin)

Here we are taking all the details from streamingCriticalPathResults and here only code available for NONEWBATCH State

case class StreamingCriticalPathResults(expectedMicroBatchSLA: Long = 0,
batchRunningTime: Long = 0,
criticalTime: Long = 0,
streamingQueryState: StreamingState.Value = StreamingState.NONEWBATCHES)

Also in com.qubole.spark.streaminglens.common.results AggregateStateResults.scala below code is available.

**package com.qubole.spark.streaminglens.common.results

case class AggregateStateResults(state: String = "NO NEW BATCHES",

                         recommendation: String = "Streaming Query State: NO NEW BATCHES<br>")**

KIndly Suggest.

Not able to run streaminglens in intellij idea

I am running my code locally in intellij idea with sreaming lens maven dependancy .
I am getting below error . No output , let me know what i am doing wrong here


package com.manu.sstreaming;

import com.qubole.spark.streaminglens.StreamingLens;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import org.apache.spark.sql.streaming.Trigger;
import scala.Predef;
import scala.collection.JavaConversions.*;
import scala.collection.JavaConverters;
import scala.collection.Seq;


/**
 * @author Manu Jose
 * create on : 16/04/20
 */
public class SStreamingNC {
    public static void main(String[] args) throws Exception {


        String host = "localhost";
        int port = 9999;
        //int port = Integer.parseInt(args[0]);


        SparkSession spark = SparkSession
                .builder()
                .appName("JavaStructuredNetworkWordCount")
                .master("local")
                .getOrCreate();


        Map<String, String> options = new HashMap<>();
        options.put("streamingLens.reporter.intervalMinutes", "1");

        scala.collection.immutable.Map<String, String> scalaMap = JavaConverters.mapAsScalaMapConverter(options).asScala().toMap(
                Predef.conforms());
        StreamingLens streamingLens = new StreamingLens(spark, scalaMap);
        streamingLens.registerListeners();




        // Create DataFrame representing the stream of input lines from connection to host:port
        spark.sql("SET spark.sql.streaming.metricsEnabled=true");
        Dataset<Row> lines = spark
                .readStream()
                .format("socket")
                .option("host", host)
                .option("port", port)
                .load();

        // Split the lines into words
        Dataset<String> words = lines.as(Encoders.STRING()).flatMap(
                (FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(),
                Encoders.STRING());

        // Generate running word count
        Dataset<Row> wordCounts = words.groupBy("value").count();


        // Start running the query that prints the running counts to the console
        StreamingQuery query = wordCounts.writeStream()
                .outputMode("update")
                .format("console")
                .queryName("Query_name")
                .trigger(Trigger.ProcessingTime(2 * 1000))
                .start();

       spark.streams().awaitAnyTermination();

    }

}
20/05/01 01:06:10 INFO StateStore: Getting StateStoreCoordinatorRef
20/05/01 01:06:10 INFO StateStore: Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef@11ddc5d8
20/05/01 01:06:10 INFO StateStore: Env is not null
20/05/01 01:06:10 INFO StateStore: Getting StateStoreCoordinatorRef
20/05/01 01:06:10 INFO StateStore: Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef@c1c5bf
20/05/01 01:06:10 INFO StateStore: Env is not null
20/05/01 01:06:10 INFO StateStore: Getting StateStoreCoordinatorRef
20/05/01 01:06:10 INFO StateStore: Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef@62726145

Streaming Lens failed key not found: BatchDescription

Can Someone suggest what is the issue here I am getting this logs while executing spark-structure-streaming Application with StreamingLence.Its Not generating Any Recommendation and state.

21/10/01 11:30:07 WARN QueryInsightsManager: Streaming Lens failed key not found: BatchDescription(62618e99-5175-4106-90f1-0b2ef0b49fda,2)
21/10/01 11:30:07 INFO QueryInsightsManager: Max retries reached. Attempting to stop StreamingLens
21/10/01 11:30:07 INFO QueryInsightsManager: Successfully shutdown StreamingLens

Spark Submit Command.

spark Submit Command:

spark-submit
--verbose
--name SparkStreamingLens
--num-executors 1
--conf streamingLens.reporter.intervalMinutes=1
--jars /home/abc/jars/spark-streaminglens_2.11-0.5.3.jar,
/home/abc/jars/kafka-clients-0.10.2.1.jar,
--master yarn
--deploy-mode cluster
--driver-cores 1 --driver-memory 2G --executor-cores 1 --executor-memory 2G
--supervise --class com.data.datalake.SparkStreamingLens
/home/abc/jar/SparkStreamingLens-spark-utility_2.11-1.0.jar

Kindly help.

streaminglens didnot print recommendation?

i have written below code to my spark structured streaming program, but i doesnot print recommendation in driver log, why?

`var options: Map[String, String] = Map();

  options += ("streamingLens.expectedMicroBatchSLAMillis" -> "30000");

    options += ("streamingLens.reporter.intervalMinutes" -> "2");

    val streamingLens = new StreamingLens(spark, options);`

Not able to see recommendation for StreamingLens in Logs?

Hi,

I am trying to Implement StreamingLens In My existing Streaming Application. My application Is working fine and its loading data from one kafka topic to anothe kafka topic. But in ambari I am not able to see StreamingLens Reports, when I did this for batch application using sparklence i could see the logs generated by sparklense with all the resources information but same report I am not able to see for streaming application can someone suggest if I need to do additional code or where should I check the reports which should generate by StreamingLens.

My Sample code

      class SparkStreamingLens(spark: SparkSession, options: RequestBuilder)
      object SparkStreamingLens {
      def main(args: Array[String]): Unit = {
      println(" Spark Parameters are :")
      val igniteDetails = args(0)  
      val applicationName = args(1)
      val argumentTable = args(2)
      // options.addParameter("streamingLens.reporter.intervalMinutes", "1")
      val spark = SparkSession
      .builder()
      .appName(applicationName)
      .getOrCreate()
      val streamingLens = new SparkStreamingLens(spark, options)
      // Remaining Code to Read from Kafka and write Into Kafka(Streaming Data)
      }
      }

  spark-submit Command:

  spark-submit \
  --verbose \
  --name SparkStreamingLens \
  --num-executors 1  \
  --conf streamingLens.reporter.intervalMinutes=1  \
  --jars /home/abc/jars/spark-streaminglens_2.11-0.5.3.jar,\
 /home/abc/jars/kafka-clients-0.10.2.1.jar,\
  --master yarn \
  --deploy-mode cluster \
  --driver-cores 1  --driver-memory 2G  --executor-cores 1  --executor-memory 2G \
  --supervise --class com.data.datalake.SparkStreamingLens \
 /home/abc/jar/SparkStreamingLens-spark-utility_2.11-1.0.jar  \
 "jdbc:ignite:thin://00.000.00.00:00000;distributedJoins=true;user=aaaaaa;password=aaaaaaa;"  \
 SparkStreamingLens \
 argumentTable

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.