Comments (4)
Hi,
Sorry for the late answer, I prepared an example how to deal with your initial problem of reading the numpy files into an RDD / DataFrame.
https://github.com/cerndb/dist-keras/blob/master/examples/distributed_numpy_parsing.ipynb
Yes, if you change the notebook flag to local=True, then it would have done it automatically :)
from dist-keras.
That's actually a good suggestion, thanks! I'll try to think of some examples. However, for large datasets this is not really possible (as you will know).
I have an example for larger datasets in another repository (https://github.com/JoeriHermans/hep-track-reconstruction-ml/blob/master/notebooks/data_preprocessing.ipynb) which basically takes raw numpy files, and converts them to a format I will use later.
For smaller (also larger) datasets, it boils down to the fact that no matter what numpy shape you have, in Spark it needs to be stored as a list because in the other case you will encounter some issues regarding types. This is especially annoying in the case of convolutionals. So before you call sc.parallelize
on a list of numpy matrices / vectors, just convert every element of that list to a list using ndarray.tolist()
, and everything will work as expected.
I hope this helps.
Joeri
from dist-keras.
For smaller (also larger) datasets, it boils down to the fact that no matter what numpy shape you have, in Spark it needs to be stored as a list because in the other case you will encounter some issues regarding types.
Thanks, that makes sense. Let me ask you a couple more questions if I may, since I just started with spark.
1.) In your mnist examples you have sth along the lines
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-csv_2.10:1.4.0 pyspark-shell'
this causes an error every time I create the spark context object sc = SparkContext(conf=conf)
17/04/11 17:25:25 WARN SparkContext: Another SparkContext is being constructed (or threw an exception in its constructor). This may indicate an error, since only one SparkContext may be running in this JVM (see SPARK-2243). The other SparkContext was created at:
org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:422)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:236)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.GatewayConnection.run(GatewayConnection.java:214)
java.lang.Thread.run(Thread.java:745)
17/04/11 17:25:25 WARN SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
17/04/11 17:25:25 ERROR SparkContext: Error initializing SparkContext.
org.apache.spark.SparkException: Invalid master URL: spark://143.239.81.131
at org.apache.spark.util.Utils$.extractHostPortFromSparkUrl(Utils.scala:2358)
at org.apache.spark.rpc.RpcAddress$.fromSparkURL(RpcAddress.scala:47)
at org.apache.spark.deploy.client.StandaloneAppClient$$anonfun$1.apply(StandaloneAppClient.scala:52)
at org.apache.spark.deploy.client.StandaloneAppClient$$anonfun$1.apply(StandaloneAppClient.scala:52)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.deploy.client.StandaloneAppClient.<init>(StandaloneAppClient.scala:52)
at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.start(StandaloneSchedulerBackend.scala:108)
at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:156)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:509)
at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:236)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
17/04/11 17:25:25 ERROR Utils: Uncaught exception in thread Thread-2
java.lang.NullPointerException
at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.org$apache$spark$scheduler$cluster$StandaloneSchedulerBackend$$stop(StandaloneSchedulerBackend.scala:214)
at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.stop(StandaloneSchedulerBackend.scala:116)
at org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:467)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1588)
at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1826)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1283)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1825)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:587)
at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:236)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
17/04/11 17:25:25 WARN MetricsSystem: Stopping a MetricsSystem that is not running
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-70-6b825dbb354c> in <module>()
----> 1 sc = SparkContext(conf=conf)
/miniconda2/envs/dist_keras/lib/python2.7/site-packages/pyspark/context.pyc in __init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls)
116 try:
117 self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
--> 118 conf, jsc, profiler_cls)
119 except:
120 # If an error occurs, clean up in order to allow future SparkContext creation:
/miniconda2/envs/dist_keras/lib/python2.7/site-packages/pyspark/context.pyc in _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, jsc, profiler_cls)
177
178 # Create the Java SparkContext through Py4J
--> 179 self._jsc = jsc or self._initialize_context(self._conf._jconf)
180 # Reset the SparkConf to the one actually used by the SparkContext in JVM.
181 self._conf = SparkConf(_jconf=self._jsc.sc().conf())
/miniconda2/envs/dist_keras/lib/python2.7/site-packages/pyspark/context.pyc in _initialize_context(self, jconf)
244 Initialize SparkContext in function to allow subclass specific initialization
245 """
--> 246 return self._jvm.JavaSparkContext(jconf)
247
248 @classmethod
/miniconda2/envs/dist_keras/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args)
1399 answer = self._gateway_client.send_command(command)
1400 return_value = get_return_value(
-> 1401 answer, self._gateway_client, None, self._fqn)
1402
1403 for temp_arg in temp_args:
/miniconda2/envs/dist_keras/lib/python2.7/site-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: org.apache.spark.SparkException: Invalid master URL: spark://143.239.81.131
at org.apache.spark.util.Utils$.extractHostPortFromSparkUrl(Utils.scala:2358)
at org.apache.spark.rpc.RpcAddress$.fromSparkURL(RpcAddress.scala:47)
at org.apache.spark.deploy.client.StandaloneAppClient$$anonfun$1.apply(StandaloneAppClient.scala:52)
at org.apache.spark.deploy.client.StandaloneAppClient$$anonfun$1.apply(StandaloneAppClient.scala:52)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.deploy.client.StandaloneAppClient.<init>(StandaloneAppClient.scala:52)
at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.start(StandaloneSchedulerBackend.scala:108)
at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:156)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:509)
at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:236)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
I did some googling and it terns out I need to set up the master
os.environ['PYSPARK_SUBMIT_ARGS']='--master local[45] pyspark-shell'
but it only work if
master = "local[*]"
and not if u try
master = "yarn-client"
2.) Let's assume a hypothetical scenario of a cluster with 5 nodes sharing the same storage.
node1 is the the entry node and node2,...,node5 are working nodes
What is the proper way to set the node1 as the master and all the rest as working nodes for the computations to take place upon?
2.a) If possible provide an example where we parallelize the model across nodes node2,...,node5
2.b) also how do we achieve the same thing but this time distribute the data across the same nodes and not the model
Thanks!
from dist-keras.
Can I ask one more related to this issue?
(X_train, y_train), (X_test, y_test) = mnist.load_data()
In above case, X ndarray and Y ndarray are separated. but they are combined in your MNIST example code, which means that features and labels are in one same ndarray.
My data is also separated like x(features) and y(label). (my x and y ndarrays have the same style with the return value of 'mnist.load_data()'
In this case, how can I transform x_train and y_train to be the right style for your dist-keras code?
from dist-keras.
Related Issues (20)
- No module named 'pwd' HOT 2
- Training an autoencoder taking long time
- How to plot accuracy or loss for training and validation data HOT 1
- Error in prediction with multi-features LTSM autoencoder
- TypeError: softmax() got an unexpected keyword argument 'axis'
- Model is not getting trained properly
- How to train keras features on non-redundant/infinite set of labels
- 'SequentialWorker' object has no attribute 'add_history' HOT 1
- Installing dist-keras: No matching distribution found for tensorflow
- #How to use Dist-Keras with Pipeline Spark ?
- Is it possible to implement dist-keras and run in local machine for this Keras Model HOT 1
- /examples/workflow.ipynb throws "NameError: name 'SparkSession' is not defined" for spark 2.0+ HOT 1
- jupyter kernel dies when running /examples/workflow.ipynb HOT 2
- unexpected keyword argument 'learning_rate' in DOWNPOUR HOT 2
- LSTM with DIST keras : Weight doesn't get updated
- How to scale a vector using standardscaler?
- dist-keras results are not stable
- Keras and dist-keras results differ
- File "/home/hadoop/anaconda2/lib/python2.7/site-packages/scipy/ndimage/filters.py", line 36, in <module> from . import _ni_support ImportError: cannot import name _ni_support
- please I can run dis-kers on google Google colab
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from dist-keras.