Coder Social home page Coder Social logo

pyspark-spy's Introduction

pyspark-spy

pyspark version python version Build Status

Collect and aggregate on spark events for profitz. In ๐Ÿ way!

Installation

pip install pyspark-spy

How to

You register a listener

import pyspark_spy
listener = pyspark_spy.PersistingSparkListener()
pyspark_spy.register_listener(spark_context, listener)

Execute your spark job as usual

spark_context.range(1, 100).count()

And you have all metrics collected!

print(listener.stage_output_metrics_aggregate())
OutputMetrics(bytesWritten=12861, recordsWritten=2426)

Look Ma, no actions!

Tested on python 3.5 - 3.7 and pyspark 2.3 - 3.0

Available listeners

  • pyspark_spy.interface.SparkListener - Base listener class. It defines on_spark_event(event_name, java_event) method that you can implement yourself for custom logic when any event is received.

  • LoggingSparkListener - just logs event names received into supplied or automatically created logger.

  • StdoutSparkListener - writes event names into stdout

  • PersistingSparkListener - saves spark events into internal buffer

  • ContextSparkListener - same as PersistingSparkListener but also allows you to record only events occured within python context manager scope. More on that later

PersistingSparkListener

Spark events collected (as java objects):

  • applicationEnd
  • applicationStart
  • blockManagerRemoved
  • blockUpdated
  • environmentUpdate
  • executorAdded
  • executorMetricsUpdate
  • executorRemoved
  • jobEnd
  • jobStart
  • otherEvent
  • stageCompleted
  • stageSubmitted
  • taskEnd
  • taskGettingResult
  • taskStart
  • unpersistRDD
listener.java_events['executorMetricsUpdate'] # -> List of py4j java objects

View all possible spark events and their fields https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/scheduler/SparkListener.html

Events converted to python objects:

  • jobEnd
  • stageCompleted
listener.python_events['jobEnd']  # -> List of java events converted to typed namedtuples.
listener.jobEnd  # same

Available aggregations

Only in PersistingSparkListener and ContextSparkListener

  • stage_input_metrics_aggregate - sums up all stageCompleted event inputMetrics into one
print(listener.stage_input_metrics_aggregate())
InputMetrics(bytesRead=21574, recordsRead=584)
  • stage_output_metrics_aggregate - sums up all stageCompleted event outputMetrics into one
print(listener.stage_output_metrics_aggregate())
OutputMetrics(bytesWritten=12861, recordsWritten=2426)

ContextSparkListener

To collect events from different actions and to build separate aggregations, use ContextSparkListener.

listener = ContextSparkListener()
register_listener(sc, listener)

with listener as events: # events is basically another listener
    run_spark_job()
events.stage_output_metrics_aggregate()  # events collected only within context manager

with listener as events_2:
    run_other_spark_job()
events_2.stage_output_metrics_aggregate()  # metrics collected during second job

listener.stage_output_metrics_aggregate() # metrics collected for all jobs

pyspark-spy's People

Contributors

sashgorokhov avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

pyspark-spy's Issues

Py4JException: Error while obtaining a new communication channel after registering listener

Hi Alexander, I stumbled on this repo when looking for solutions to include a spark listener in pyspark.
Nice work.

However, I couldn't get it to work and then tried with this repo and ran the tests.
Whatever I do it blocks on communication for the listeners;

ERROR AsyncEventQueue: Listener Proxy33 threw an exception
py4j.Py4JException: Error while obtaining a new communication channel
at py4j.CallbackClient.getConnectionLock(CallbackClient.java:257)
at py4j.CallbackClient.sendCommand(CallbackClient.java:377)
at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
at com.sun.proxy.$Proxy33.onJobStart(Unknown Source)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:37)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1471)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.base/java.net.Socket.connect(Socket.java:609)
at java.base/java.net.Socket.connect(Socket.java:558)
at java.base/java.net.Socket.(Socket.java:454)
at java.base/java.net.Socket.(Socket.java:264)
at java.base/javax.net.DefaultSocketFactory.createSocket(SocketFactory.java:277)
at py4j.PythonClient.startClientSocket(PythonClient.java:192)
at py4j.PythonClient.getConnection(PythonClient.java:213)
at py4j.CallbackClient.getConnectionLock(CallbackClient.java:250)

Starting a spark session and context is not the issue, tried that separately but from the moment on the listener is being registered, the communication error occurs. I know it's been a while, but do you have any idea what could be going on? It could be on the spark side or on the py4j side. Any pointers would be great. I'm happy to investigate and test this further.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.