Coder Social home page Coder Social logo

sparklingml's Introduction

buildstatus

SparklingPandas

SparklingPandas aims to make it easy to use the distributed computing power of PySpark to scale your data analysis with Pandas. SparklingPandas builds on Spark's DataFrame class to give you a polished, pythonic, and Pandas-like API.

Documentation

See SparklingPandas.com.

Videos

An early version of Sparkling Pandas was discussed in Sparkling Pandas - using Apache Spark to scale Pandas - Holden Karau and Juliet Hougland

Requirements

The primary requirement of SparklingPandas is that you have a recent (v1.4 currently) version of Spark installed - http://spark.apache.org and Python 2.7.

Using

Make sure you have the SPARK_HOME environment variable set correctly, as SparklingPandas uses this for including the PySpark libraries

Other than that you can install SparklingPandas with pip and just import it.

State

This is in early development. Feedback is taken seriously and is seriously appreciated. As you can tell, us SparklingPandas are a pretty serious bunch.

Support

Check out our Google group at https://groups.google.com/forum/#!forum/sparklingpandas

sparklingml's People

Contributors

holdenk avatar juanrh avatar mohamed-ali avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

sparklingml's Issues

Serialization exception when using Lucene analyzers from Python API

Hi,

I get a SparkException: Task not serializable when I try to use the Python Lucene analyzers beyond the count that appears in the DocTests. For example if I do

from sparklingml.feature.lucene_analyzers import SpanishAnalyzerLucene

text = '''hola me llamo Bernardo y vengo de Cuenca.
Mi color favorito es el chocolate, y me relaciono bien con todo tipo de animales
One, dos, tres, cuatro, ... conozco todos los numeros'''
df = spark.createDataFrame([ (line, ) for line in text.split('\n')], ["lines"])
transformer = SpanishAnalyzerLucene()
transformer.setParams(inputCol="lines", outputCol="out")
result = transformer.transform(df)
result.count()

result.collect()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/spark/latest/python/pyspark/sql/dataframe.py", line 438, in collect
    port = self._jdf.collectToPython()
  File "/opt/spark/latest/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/opt/spark/latest/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/opt/spark/latest/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o54.collectToPython.
: org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:389)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:228)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:275)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2803)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2800)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2800)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2823)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2800)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.NotSerializableException: org.apache.lucene.analysis.es.SpanishAnalyzer
Serialization stack:
	- object not serializable (class: org.apache.lucene.analysis.es.SpanishAnalyzer, value: org.apache.lucene.analysis.es.SpanishAnalyzer@10b34f48)
	- field (class: com.sparklingpandas.sparklingml.feature.LuceneTransformer$$anonfun$createTransformFunc$1, name: analyzer$1, type: class org.apache.lucene.analysis.Analyzer)
	- object (class com.sparklingpandas.sparklingml.feature.LuceneTransformer$$anonfun$createTransformFunc$1, <function1>)
	- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, name: func$2, type: interface scala.Function1)
	- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, <function1>)
	- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
	- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(input[0, string, true]))
	- element of array (index: 0)
	- array (class [Ljava.lang.Object;, size 1)
	- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, name: references$1, type: class [Ljava.lang.Object;)
	- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, <function2>)
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
	... 35 more

It looks like there is a serialization issue here, that might be solved with the following patch:

diff --git a/sparklingml/feature/lucene_analyzers.py b/sparklingml/feature/lucene_analyzers.py
index f2ce502..6965cba 100644
--- a/sparklingml/feature/lucene_analyzers.py
+++ b/sparklingml/feature/lucene_analyzers.py
@@ -776,6 +776,8 @@ class SpanishAnalyzerLucene(
     >>> result = transformer.transform(df)
     >>> result.count()
     2
+    >>> result.head()
+    Row(vals=u'hi boo', out=[u'hi', u'boo'])
     """
     package_name = "com.sparklingpandas.sparklingml.feature"
     class_name = "SpanishAnalyzerLucene"
diff --git a/src/main/scala/com/sparklingpandas/sparklingml/feature/LuceneAnalyzer.scala b/src/main/scala/com/sparklingpandas/sparklingml/feature/LuceneAnalyzer.scala
index a035bef..76bc4a5 100644
--- a/src/main/scala/com/sparklingpandas/sparklingml/feature/LuceneAnalyzer.scala
+++ b/src/main/scala/com/sparklingpandas/sparklingml/feature/LuceneAnalyzer.scala
@@ -27,8 +27,8 @@ trait LuceneTransformer[T <:LuceneTransformer[T]]
   }
 
   override def createTransformFunc: String => Array[String] = {
-    val analyzer = buildAnalyzer()
-      (inputText: String) => {
+    (inputText: String) => {
+      val analyzer = buildAnalyzer()
       val inputStream = analyzer.tokenStream($(inputCol), inputText)
       val builder = Array.newBuilder[String]
       val charTermAttr = inputStream.addAttribute(classOf[CharTermAttribute])

After this I can do

>>> result.collect()
[Row(lines=u'hola me llamo Bernardo y vengo de Cuenca.', out=[u'hola', u'llam', u'bernard', u'veng', u'cuenc']), Row(lines=u'Mi color favorito es el chocolate, y me relaciono bien con todo tipo de animales', out=[u'color', u'favorit', u'chocolat', u'relacion', u'bien', u'tipo', u'animal']), Row(lines=u'One, dos, tres, cuatro, ... conozco todos los numeros', out=[u'one', u'dos', u'tres', u'cuatr', u'conozc', u'numer'])]
>>>
>>> result.show()
+--------------------+--------------------+
|               lines|                 out|
+--------------------+--------------------+
|hola me llamo Ber...|[hola, llam, bern...|
|Mi color favorito...|[color, favorit, ...|
|One, dos, tres, c...|[one, dos, tres, ...|
+--------------------+--------------------+

Do you think that this is a good solution, or should serialization be handled in a different way here?

Add user-guide/project documentation

It's still not very clear how and when to use sparklingml. So, It would help to either add a doc/ section or at least, for now, add links to @holdenk video talks/sides on the project so that new users/contributors get a clearer picture on the project.

Ambiguous Java class inside startup.py/PythonRegistrationProvider/

The following code snippet from startup.py/PythonRegistrationProvider seem to have an identation problem. And it also generates a PEP8 E305 problem.

    class Java:
        package = "com.sparklingpandas.sparklingml.util.python"
        className = "PythonRegisterationProvider"
        implements = [package + "." + className]

@holdenk can you confirm that? I can submit a PR to fix it if so.

Tests breaking on master

Could not resolve resource file No module named jar
Traceback (most recent call last):
File "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main
"main", fname, loader, pkg_name)
File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
exec code in run_globals
File "/projects/sparklingml/sparklingml/startup.py", line 88, in
auto_convert=True)
File "/home/simone/spark-2.2.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1632, in init
File "/home/simone/spark-2.2.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1706, in start_callback_server
File "/home/simone/spark-2.2.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 2017, in start
py4j.protocol.Py4JNetworkError: An error occurred while trying to start the callback server (127.0.0.1:25334)
Exception in thread "python-udf-registrationProvider-thread" java.lang.Exception: Exit code 1
at com.sparklingpandas.sparklingml.util.python.PythonRegistration$$anon$2.run(Initialize.scala:178)
at java.lang.Thread.run(Thread.java:748)
[info] - verify that the transformer runs *** FAILED ***
[info] java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
[info] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
[info] at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
[info] at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
[info] at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
[info] at scala.concurrent.Await$.result(package.scala:190)
[info] at com.sparklingpandas.sparklingml.util.python.PythonTransformer$class.constructUDF(PythonTransformer.scala:25)
[info] at com.sparklingpandas.sparklingml.feature.StrLenPlusKPython.constructUDF(BasicPython.scala:10)
[info] at com.sparklingpandas.sparklingml.util.python.PythonTransformer$class.transform(PythonTransformer.scala:38)
[info] at com.sparklingpandas.sparklingml.feature.StrLenPlusKPython.transform(BasicPython.scala:10)
[info] at com.sparklingpandas.sparklingml.feature.StrLenPlusKPythonSuite$$anonfun$1.apply(BasicPython.scala:25)
[info] ...
[info] SpacyTokenizePythonSuite:
2017-08-19 19:28:23,121 [pool-1-thread-1] WARN SparkContext - Using an existing SparkContext; some configuration may not take effect.
[info] - verify spacy tokenization works *** FAILED ***
[info] java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
[info] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
[info] at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
[info] at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
[info] at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
[info] at scala.concurrent.Await$.result(package.scala:190)
[info] at com.sparklingpandas.sparklingml.util.python.PythonTransformer$class.constructUDF(PythonTransformer.scala:25)
[info] at com.sparklingpandas.sparklingml.feature.SpacyTokenizePython.constructUDF(BasicPython.scala:39)
[info] at com.sparklingpandas.sparklingml.util.python.PythonTransformer$class.transform(PythonTransformer.scala:38)
[info] at com.sparklingpandas.sparklingml.feature.SpacyTokenizePython.transform(BasicPython.scala:39)
[info] at com.sparklingpandas.sparklingml.feature.SpacyTokenizePythonSuite$$anonfun$2.apply(BasicPython.scala:49)
[info] ...
[info] ScalaTest
[info] Run completed in 40 seconds, 343 milliseconds.
[info] Total number of tests run: 83
[info] Suites: completed 47, aborted 0
[info] Tests: succeeded 81, failed 2, canceled 0, ignored 0, pending 0
[info] *** 2 TESTS FAILED ***

Is it a known issue? Tested on xubuntu 16.04 with a clean installation. Python 3.5.1, sbtVersion 0.13.15, scala 2.11.8. This is after successfully installing all the requirements.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.