Coder Social home page Coder Social logo

Comments (4)

JoeriHermans avatar JoeriHermans commented on May 18, 2024 2

Hi,

Sorry for the late answer, I prepared an example how to deal with your initial problem of reading the numpy files into an RDD / DataFrame.

https://github.com/cerndb/dist-keras/blob/master/examples/distributed_numpy_parsing.ipynb

Yes, if you change the notebook flag to local=True, then it would have done it automatically :)

from dist-keras.

JoeriHermans avatar JoeriHermans commented on May 18, 2024

That's actually a good suggestion, thanks! I'll try to think of some examples. However, for large datasets this is not really possible (as you will know).

I have an example for larger datasets in another repository (https://github.com/JoeriHermans/hep-track-reconstruction-ml/blob/master/notebooks/data_preprocessing.ipynb) which basically takes raw numpy files, and converts them to a format I will use later.

For smaller (also larger) datasets, it boils down to the fact that no matter what numpy shape you have, in Spark it needs to be stored as a list because in the other case you will encounter some issues regarding types. This is especially annoying in the case of convolutionals. So before you call sc.parallelize on a list of numpy matrices / vectors, just convert every element of that list to a list using ndarray.tolist(), and everything will work as expected.

I hope this helps.

Joeri

from dist-keras.

kirk86 avatar kirk86 commented on May 18, 2024

For smaller (also larger) datasets, it boils down to the fact that no matter what numpy shape you have, in Spark it needs to be stored as a list because in the other case you will encounter some issues regarding types.

Thanks, that makes sense. Let me ask you a couple more questions if I may, since I just started with spark.

1.) In your mnist examples you have sth along the lines
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-csv_2.10:1.4.0 pyspark-shell'

this causes an error every time I create the spark context object sc = SparkContext(conf=conf)

17/04/11 17:25:25 WARN SparkContext: Another SparkContext is being constructed (or threw an exception in its constructor).  This may indicate an error, since only one SparkContext may be running in this JVM (see SPARK-2243). The other SparkContext was created at:
org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:422)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:236)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.GatewayConnection.run(GatewayConnection.java:214)
java.lang.Thread.run(Thread.java:745)
17/04/11 17:25:25 WARN SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
17/04/11 17:25:25 ERROR SparkContext: Error initializing SparkContext.
org.apache.spark.SparkException: Invalid master URL: spark://143.239.81.131
        at org.apache.spark.util.Utils$.extractHostPortFromSparkUrl(Utils.scala:2358)
        at org.apache.spark.rpc.RpcAddress$.fromSparkURL(RpcAddress.scala:47)
        at org.apache.spark.deploy.client.StandaloneAppClient$$anonfun$1.apply(StandaloneAppClient.scala:52)
        at org.apache.spark.deploy.client.StandaloneAppClient$$anonfun$1.apply(StandaloneAppClient.scala:52)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
        at org.apache.spark.deploy.client.StandaloneAppClient.<init>(StandaloneAppClient.scala:52)
        at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.start(StandaloneSchedulerBackend.scala:108)
        at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:156)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:509)
        at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:236)
        at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
        at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:745)
17/04/11 17:25:25 ERROR Utils: Uncaught exception in thread Thread-2
java.lang.NullPointerException
        at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.org$apache$spark$scheduler$cluster$StandaloneSchedulerBackend$$stop(StandaloneSchedulerBackend.scala:214)
        at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.stop(StandaloneSchedulerBackend.scala:116)
        at org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:467)
        at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1588)
        at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1826)
        at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1283)
        at org.apache.spark.SparkContext.stop(SparkContext.scala:1825)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:587)
        at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:236)
        at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
        at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:745)
17/04/11 17:25:25 WARN MetricsSystem: Stopping a MetricsSystem that is not running
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-70-6b825dbb354c> in <module>()
----> 1 sc = SparkContext(conf=conf)

/miniconda2/envs/dist_keras/lib/python2.7/site-packages/pyspark/context.pyc in __init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls)
    116         try:
    117             self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
--> 118                           conf, jsc, profiler_cls)
    119         except:
    120             # If an error occurs, clean up in order to allow future SparkContext creation:

/miniconda2/envs/dist_keras/lib/python2.7/site-packages/pyspark/context.pyc in _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, jsc, profiler_cls)
    177
    178         # Create the Java SparkContext through Py4J
--> 179         self._jsc = jsc or self._initialize_context(self._conf._jconf)
    180         # Reset the SparkConf to the one actually used by the SparkContext in JVM.
    181         self._conf = SparkConf(_jconf=self._jsc.sc().conf())

/miniconda2/envs/dist_keras/lib/python2.7/site-packages/pyspark/context.pyc in _initialize_context(self, jconf)
    244         Initialize SparkContext in function to allow subclass specific initialization
    245         """
--> 246         return self._jvm.JavaSparkContext(jconf)
    247
    248     @classmethod

/miniconda2/envs/dist_keras/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args)
   1399         answer = self._gateway_client.send_command(command)
   1400         return_value = get_return_value(
-> 1401             answer, self._gateway_client, None, self._fqn)
   1402
   1403         for temp_arg in temp_args:

/miniconda2/envs/dist_keras/lib/python2.7/site-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: org.apache.spark.SparkException: Invalid master URL: spark://143.239.81.131
        at org.apache.spark.util.Utils$.extractHostPortFromSparkUrl(Utils.scala:2358)
        at org.apache.spark.rpc.RpcAddress$.fromSparkURL(RpcAddress.scala:47)
        at org.apache.spark.deploy.client.StandaloneAppClient$$anonfun$1.apply(StandaloneAppClient.scala:52)
        at org.apache.spark.deploy.client.StandaloneAppClient$$anonfun$1.apply(StandaloneAppClient.scala:52)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
        at org.apache.spark.deploy.client.StandaloneAppClient.<init>(StandaloneAppClient.scala:52)
        at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.start(StandaloneSchedulerBackend.scala:108)
        at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:156)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:509)
        at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:236)
        at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
        at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:745)

I did some googling and it terns out I need to set up the master
os.environ['PYSPARK_SUBMIT_ARGS']='--master local[45] pyspark-shell'
but it only work if
master = "local[*]"
and not if u try
master = "yarn-client"

2.) Let's assume a hypothetical scenario of a cluster with 5 nodes sharing the same storage.
node1 is the the entry node and node2,...,node5 are working nodes

What is the proper way to set the node1 as the master and all the rest as working nodes for the computations to take place upon?

2.a) If possible provide an example where we parallelize the model across nodes node2,...,node5
2.b) also how do we achieve the same thing but this time distribute the data across the same nodes and not the model

Thanks!

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

Can I ask one more related to this issue?

(X_train, y_train), (X_test, y_test) = mnist.load_data()

In above case, X ndarray and Y ndarray are separated. but they are combined in your MNIST example code, which means that features and labels are in one same ndarray.
My data is also separated like x(features) and y(label). (my x and y ndarrays have the same style with the return value of 'mnist.load_data()'
In this case, how can I transform x_train and y_train to be the right style for your dist-keras code?

from dist-keras.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.