Hi,
I get a SparkException: Task not serializable
when I try to use the Python Lucene analyzers beyond the count that appears in the DocTests. For example if I do
from sparklingml.feature.lucene_analyzers import SpanishAnalyzerLucene
text = '''hola me llamo Bernardo y vengo de Cuenca.
Mi color favorito es el chocolate, y me relaciono bien con todo tipo de animales
One, dos, tres, cuatro, ... conozco todos los numeros'''
df = spark.createDataFrame([ (line, ) for line in text.split('\n')], ["lines"])
transformer = SpanishAnalyzerLucene()
transformer.setParams(inputCol="lines", outputCol="out")
result = transformer.transform(df)
result.count()
result.collect()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/spark/latest/python/pyspark/sql/dataframe.py", line 438, in collect
port = self._jdf.collectToPython()
File "/opt/spark/latest/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/opt/spark/latest/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/opt/spark/latest/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o54.collectToPython.
: org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:389)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:228)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:275)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2803)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2800)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2800)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2823)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2800)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.NotSerializableException: org.apache.lucene.analysis.es.SpanishAnalyzer
Serialization stack:
- object not serializable (class: org.apache.lucene.analysis.es.SpanishAnalyzer, value: org.apache.lucene.analysis.es.SpanishAnalyzer@10b34f48)
- field (class: com.sparklingpandas.sparklingml.feature.LuceneTransformer$$anonfun$createTransformFunc$1, name: analyzer$1, type: class org.apache.lucene.analysis.Analyzer)
- object (class com.sparklingpandas.sparklingml.feature.LuceneTransformer$$anonfun$createTransformFunc$1, <function1>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, name: func$2, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, <function1>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(input[0, string, true]))
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, name: references$1, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, <function2>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 35 more
It looks like there is a serialization issue here, that might be solved with the following patch:
diff --git a/sparklingml/feature/lucene_analyzers.py b/sparklingml/feature/lucene_analyzers.py
index f2ce502..6965cba 100644
--- a/sparklingml/feature/lucene_analyzers.py
+++ b/sparklingml/feature/lucene_analyzers.py
@@ -776,6 +776,8 @@ class SpanishAnalyzerLucene(
>>> result = transformer.transform(df)
>>> result.count()
2
+ >>> result.head()
+ Row(vals=u'hi boo', out=[u'hi', u'boo'])
"""
package_name = "com.sparklingpandas.sparklingml.feature"
class_name = "SpanishAnalyzerLucene"
diff --git a/src/main/scala/com/sparklingpandas/sparklingml/feature/LuceneAnalyzer.scala b/src/main/scala/com/sparklingpandas/sparklingml/feature/LuceneAnalyzer.scala
index a035bef..76bc4a5 100644
--- a/src/main/scala/com/sparklingpandas/sparklingml/feature/LuceneAnalyzer.scala
+++ b/src/main/scala/com/sparklingpandas/sparklingml/feature/LuceneAnalyzer.scala
@@ -27,8 +27,8 @@ trait LuceneTransformer[T <:LuceneTransformer[T]]
}
override def createTransformFunc: String => Array[String] = {
- val analyzer = buildAnalyzer()
- (inputText: String) => {
+ (inputText: String) => {
+ val analyzer = buildAnalyzer()
val inputStream = analyzer.tokenStream($(inputCol), inputText)
val builder = Array.newBuilder[String]
val charTermAttr = inputStream.addAttribute(classOf[CharTermAttribute])
After this I can do
>>> result.collect()
[Row(lines=u'hola me llamo Bernardo y vengo de Cuenca.', out=[u'hola', u'llam', u'bernard', u'veng', u'cuenc']), Row(lines=u'Mi color favorito es el chocolate, y me relaciono bien con todo tipo de animales', out=[u'color', u'favorit', u'chocolat', u'relacion', u'bien', u'tipo', u'animal']), Row(lines=u'One, dos, tres, cuatro, ... conozco todos los numeros', out=[u'one', u'dos', u'tres', u'cuatr', u'conozc', u'numer'])]
>>>
>>> result.show()
+--------------------+--------------------+
| lines| out|
+--------------------+--------------------+
|hola me llamo Ber...|[hola, llam, bern...|
|Mi color favorito...|[color, favorit, ...|
|One, dos, tres, c...|[one, dos, tres, ...|
+--------------------+--------------------+
Do you think that this is a good solution, or should serialization be handled in a different way here?