Coder Social home page Coder Social logo

Comments (51)

rayjang avatar rayjang commented on May 18, 2024

First method is bad I think. I got memory error.
I will try other method.

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

first method cannot be possible since dataset is too big. vectorAssembler takes infinite time.

from dist-keras.

JoeriHermans avatar JoeriHermans commented on May 18, 2024

Hi Rayjang,

Sorry for not replying sooner. At the end of the week I'll have some time to look into this issue. I'll keep you posted if I come up with some other ideas.

Joeri

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

Update.
my image data's shape is (224,224,3) and the total number of dataset is 800.
raw_dataset_train = reader.read.format('com.databricks.spark.csv') \ .options(header='false', inferSchema='true', maxColumns='1000000') \ .load(path_train)
This code is possible ( I changed maxColumns since my columns are 224'*224'*3)
BUT the below code isn't (takes infinite time..)
ector_assembler = VectorAssembler(inputCols=features, outputCol="features")

If you have some time, plz share good idea or code for efficient way of distributed DL on image data with many dimensions

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

pairs = [(x, y) for x, y in zip(features, labels)]
sc.parallelize(pairs)`
I also tried this way, but I got ' out of memory' error.

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

update.
pairs = [(x, y) for x, y in zip(features, labels)]
sc.parallelize(pairs)`
It is working well (I didn't set up more driver memory)
I am trying to find a method to use numpy feature and numpy label directly to train distributedly

from dist-keras.

JoeriHermans avatar JoeriHermans commented on May 18, 2024

This method requires that all the pairs fit into memory right? I find it really strange that VectorAssembler takes so much time...

Joeri

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

I think it was related to my Driver memory. My research has some conditions related to memory size, so I limited the memory size. I increased memory more and added more machines, so It worked well in distributed way. ( summ up: This problem is from my memory condition. but at that condition, there was no error except infinite work time.)
BUT I am still finding better way to train my image(.png) files and lables.
Converting to csv -> df -> spark vector. I think it is complicated in the reality.

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

VectorAssembler is still slow.
I use
1 master with 6G Driver memory,
3 workers with 1 CPUcore, , 6G worker memory, Titanx X GPU

I don't know why VA is slow or take time a lot..(I think processing 800 rows of data takes infinite time. 3 rows of data takes time a lot)
I am not sure that this problem comes from shortage of memory or core , or fault of my code

image

my csv file's columns are like this...
image

from dist-keras.

JoeriHermans avatar JoeriHermans commented on May 18, 2024

Hmmm. What if you would code a "custom" vector assembler? For example, something like this:

def to_vector(row):
    columns = ['your', 'columns', 'here']
    new_row = []
    for column in columns:
        new_row.append(column)

    return new_row

Of course, this snippet could be improved much more.

Joeri

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

Now What I am doing is like ..
I try to make dense vector for 'feature' directly with numpy array.

x_flatten = x_train.reshape((800,-1))
z_train = np.empty((800, 224*224*3))

for i in range(800):
    z_train[i,:] = x_flatten[i,:]

df = pd.DataFrame(columns=['feature','label'])

for i in range(799):
    df.set_value(i,'feature',[z_train[i]])
    df.set_value(i,'label',[y_train[i]])

df['feature'] = df['feature'].apply(lambda x: x[0])
df['label'] = df['label'].apply(lambda x: x[0])
df['feature'] = df['feature'].apply(lambda x: x.tolist())
df['feature'] = df['feature'].apply(lambda x: DenseVector(x))

dataset_train = reader.createDataFrame(df)

nb_classes = 100
encoder = OneHotTransformer(nb_classes, input_col="label", output_col="label_encoded")
dataset_train = encoder.transform(dataset_train)
dataset_train = dataset_train.select( "feature","label", "label_encoded")
reshape_transformer = ReshapeTransformer("feature", "matrix", (224, 224, 3))
dataset_train = reshape_transformer.transform(dataset_train)

dataset_train = dataset_train.select("feature", "matrix","label", "label_encoded")
dataset_train.repartition(num_workers)

trainer = DOWNPOUR(keras_model=model, worker_optimizer=sgd, loss='categorical_crossentropy', num_workers=1,
                   batch_size=4, communication_window=5, num_epoch=1,
                   features_col="feature", label_col="label_encoded")
trained_model = trainer.train(training_set)
print("Training time: " + str(trainer.get_training_time()))

This is Schema
image

I got this error..

Traceback (most recent call last):
  File "/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 148, in dump
    return Pickler.dump(self, obj)
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 408, in dump
    self.save(obj)
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 740, in save_tuple
    save(element)
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 445, in save_instancemethod
    self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj)
  File "/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 582, in save_reduce
    save(args)
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 725, in save_tuple
    save(element)
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 520, in save
    self.save_reduce(obj=obj, *rv)
  File "/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 600, in save_reduce
    save(state)
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 836, in _batch_setitems
    save(v)
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 520, in save
    self.save_reduce(obj=obj, *rv)
  File "/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 600, in save_reduce
    save(state)
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 836, in _batch_setitems
    save(v)
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 520, in save
    self.save_reduce(obj=obj, *rv)
  File "/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 600, in save_reduce
    save(state)
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 836, in _batch_setitems
    save(v)
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 520, in save
    self.save_reduce(obj=obj, *rv)
  File "/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 600, in save_reduce
    save(state)
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 836, in _batch_setitems
    save(v)
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 520, in save
    self.save_reduce(obj=obj, *rv)
  File "/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 600, in save_reduce
    save(state)
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 836, in _batch_setitems
    save(v)
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 770, in save_list
    self._batch_appends(obj)
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 794, in _batch_appends
    save(x)
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 740, in save_tuple
    save(element)
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 836, in _batch_setitems
    save(v)
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 836, in _batch_setitems
    save(v)
  File "/HOME/anaconda3/lib/python3.5/pickle.py", line 506, in save
    self.save_global(obj, rv)
  File "/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 372, in save_global
    if obj.__module__ == "__builtin__" or obj.__module__ == "builtins":
AttributeError: 'NotImplementedType' object has no attribute '__module__'
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in dump(self, obj)
    147         try:
--> 148             return Pickler.dump(self, obj)
    149         except RuntimeError as e:

/HOME/anaconda3/lib/python3.5/pickle.py in dump(self, obj)
    407             self.framer.start_framing()
--> 408         self.save(obj)
    409         self.write(STOP)

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
    474         if f is not None:
--> 475             f(self, obj) # Call unbound method with explicit self
    476             return

/HOME/anaconda3/lib/python3.5/pickle.py in save_tuple(self, obj)
    739         for element in obj:
--> 740             save(element)
    741 

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
    474         if f is not None:
--> 475             f(self, obj) # Call unbound method with explicit self
    476             return

/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in save_instancemethod(self, obj)
    444         if PY3:
--> 445             self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj)
    446         else:

/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    581             save(func)
--> 582             save(args)
    583             write(pickle.REDUCE)

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
    474         if f is not None:
--> 475             f(self, obj) # Call unbound method with explicit self
    476             return

/HOME/anaconda3/lib/python3.5/pickle.py in save_tuple(self, obj)
    724             for element in obj:
--> 725                 save(element)
    726             # Subtle.  Same as in the big comment below.

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
    519         # Save the reduce() output and finally memoize the object
--> 520         self.save_reduce(obj=obj, *rv)
    521 

/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    599         if state is not None:
--> 600             save(state)
    601             write(pickle.BUILD)

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
    474         if f is not None:
--> 475             f(self, obj) # Call unbound method with explicit self
    476             return

/HOME/anaconda3/lib/python3.5/pickle.py in save_dict(self, obj)
    809         self.memoize(obj)
--> 810         self._batch_setitems(obj.items())
    811 

/HOME/anaconda3/lib/python3.5/pickle.py in _batch_setitems(self, items)
    835                     save(k)
--> 836                     save(v)
    837                 write(SETITEMS)

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
    519         # Save the reduce() output and finally memoize the object
--> 520         self.save_reduce(obj=obj, *rv)
    521 

/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    599         if state is not None:
--> 600             save(state)
    601             write(pickle.BUILD)

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
    474         if f is not None:
--> 475             f(self, obj) # Call unbound method with explicit self
    476             return

/HOME/anaconda3/lib/python3.5/pickle.py in save_dict(self, obj)
    809         self.memoize(obj)
--> 810         self._batch_setitems(obj.items())
    811 

/HOME/anaconda3/lib/python3.5/pickle.py in _batch_setitems(self, items)
    835                     save(k)
--> 836                     save(v)
    837                 write(SETITEMS)

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
    519         # Save the reduce() output and finally memoize the object
--> 520         self.save_reduce(obj=obj, *rv)
    521 

/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    599         if state is not None:
--> 600             save(state)
    601             write(pickle.BUILD)

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
    474         if f is not None:
--> 475             f(self, obj) # Call unbound method with explicit self
    476             return

/HOME/anaconda3/lib/python3.5/pickle.py in save_dict(self, obj)
    809         self.memoize(obj)
--> 810         self._batch_setitems(obj.items())
    811 

/HOME/anaconda3/lib/python3.5/pickle.py in _batch_setitems(self, items)
    835                     save(k)
--> 836                     save(v)
    837                 write(SETITEMS)

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
    519         # Save the reduce() output and finally memoize the object
--> 520         self.save_reduce(obj=obj, *rv)
    521 

/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    599         if state is not None:
--> 600             save(state)
    601             write(pickle.BUILD)

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
    474         if f is not None:
--> 475             f(self, obj) # Call unbound method with explicit self
    476             return

/HOME/anaconda3/lib/python3.5/pickle.py in save_dict(self, obj)
    809         self.memoize(obj)
--> 810         self._batch_setitems(obj.items())
    811 

/HOME/anaconda3/lib/python3.5/pickle.py in _batch_setitems(self, items)
    835                     save(k)
--> 836                     save(v)
    837                 write(SETITEMS)

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
    519         # Save the reduce() output and finally memoize the object
--> 520         self.save_reduce(obj=obj, *rv)
    521 

/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    599         if state is not None:
--> 600             save(state)
    601             write(pickle.BUILD)

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
    474         if f is not None:
--> 475             f(self, obj) # Call unbound method with explicit self
    476             return

/HOME/anaconda3/lib/python3.5/pickle.py in save_dict(self, obj)
    809         self.memoize(obj)
--> 810         self._batch_setitems(obj.items())
    811 

/HOME/anaconda3/lib/python3.5/pickle.py in _batch_setitems(self, items)
    835                     save(k)
--> 836                     save(v)
    837                 write(SETITEMS)

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
    474         if f is not None:
--> 475             f(self, obj) # Call unbound method with explicit self
    476             return

/HOME/anaconda3/lib/python3.5/pickle.py in save_list(self, obj)
    769         self.memoize(obj)
--> 770         self._batch_appends(obj)
    771 

/HOME/anaconda3/lib/python3.5/pickle.py in _batch_appends(self, items)
    793                 for x in tmp:
--> 794                     save(x)
    795                 write(APPENDS)

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
    474         if f is not None:
--> 475             f(self, obj) # Call unbound method with explicit self
    476             return

/HOME/anaconda3/lib/python3.5/pickle.py in save_tuple(self, obj)
    739         for element in obj:
--> 740             save(element)
    741 

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
    474         if f is not None:
--> 475             f(self, obj) # Call unbound method with explicit self
    476             return

/HOME/anaconda3/lib/python3.5/pickle.py in save_dict(self, obj)
    809         self.memoize(obj)
--> 810         self._batch_setitems(obj.items())
    811 

/HOME/anaconda3/lib/python3.5/pickle.py in _batch_setitems(self, items)
    835                     save(k)
--> 836                     save(v)
    837                 write(SETITEMS)

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
    474         if f is not None:
--> 475             f(self, obj) # Call unbound method with explicit self
    476             return

/HOME/anaconda3/lib/python3.5/pickle.py in save_dict(self, obj)
    809         self.memoize(obj)
--> 810         self._batch_setitems(obj.items())
    811 

/HOME/anaconda3/lib/python3.5/pickle.py in _batch_setitems(self, items)
    835                     save(k)
--> 836                     save(v)
    837                 write(SETITEMS)

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
    505         if isinstance(rv, str):
--> 506             self.save_global(obj, rv)
    507             return

/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in save_global(self, obj, name, pack)
    371     def save_global(self, obj, name=None, pack=struct.pack):
--> 372         if obj.__module__ == "__builtin__" or obj.__module__ == "builtins":
    373             if obj in _BUILTIN_TYPE_NAMES:

AttributeError: 'NotImplementedType' object has no attribute '__module__'

During handling of the above exception, another exception occurred:

PicklingError                             Traceback (most recent call last)
<ipython-input-15-c89868bf416d> in <module>()
      2                    batch_size=4, communication_window=5, num_epoch=1,
      3                    features_col="feature", label_col="label_encoded")
----> 4 trained_model = trainer.train(training_set)
      5 print("Training time: " + str(trainer.get_training_time()))
      6 

/HOME/anaconda3/lib/python3.5/site-packages/distkeras/trainers.py in train(self, dataframe, shuffle)
    636         self.record_training_start()
    637         # Iterate through the epochs.
--> 638         self.history = dataset.rdd.mapPartitionsWithIndex(worker.train).collect()
    639         # End the training procedure.
    640         self.record_training_end()

/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/rdd.py in collect(self)
    807         """
    808         with SCCallSiteSync(self.context) as css:
--> 809             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    810         return list(_load_from_socket(port, self._jrdd_deserializer))
    811 

/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/rdd.py in _jrdd(self)
   2453 
   2454         wrapped_func = _wrap_function(self.ctx, self.func, self._prev_jrdd_deserializer,
-> 2455                                       self._jrdd_deserializer, profiler)
   2456         python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), wrapped_func,
   2457                                              self.preservesPartitioning)

/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/rdd.py in _wrap_function(sc, func, deserializer, serializer, profiler)
   2386     assert serializer, "serializer should not be empty"
   2387     command = (func, profiler, deserializer, serializer)
-> 2388     pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
   2389     return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
   2390                                   sc.pythonVer, broadcast_vars, sc._javaAccumulator)

/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/rdd.py in _prepare_for_python_RDD(sc, command)
   2372     # the serialized command will be compressed by broadcast
   2373     ser = CloudPickleSerializer()
-> 2374     pickled_command = ser.dumps(command)
   2375     if len(pickled_command) > (1 << 20):  # 1M
   2376         # The broadcast will have same life cycle as created PythonRDD

/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/serializers.py in dumps(self, obj)
    458 
    459     def dumps(self, obj):
--> 460         return cloudpickle.dumps(obj, 2)
    461 
    462 

/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in dumps(obj, protocol)
    702 
    703     cp = CloudPickler(file,protocol)
--> 704     cp.dump(obj)
    705 
    706     return file.getvalue()

/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in dump(self, obj)
    160                 msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
    161             print_exec(sys.stderr)
--> 162             raise pickle.PicklingError(msg)
    163 
    164     def save_memoryview(self, obj):

PicklingError: Could not serialize object: AttributeError: 'NotImplementedType' object has no attribute '__module__'

from dist-keras.

JoeriHermans avatar JoeriHermans commented on May 18, 2024

Hi,

That's a possibility as well, but this simply won't scale. How do you read the data from disk? And in what format is it?

Furthermore, when you increase the number of workers, don't use DOWNPOUR, use ADAG (communication window -

  1. with Adam optimizer (which is actually Accumulated Gradient Normalization, see https://arxiv.org/abs/1710.02368). It has better convergence rate, while using the bandwith more optimally.

Joeri

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

my code load image file and change the png file to ndarray. and I have labels done manually with ndarray format. their format is totally similar with "(X_train, Y_train), (X_valid, Y_valid) = cifar10.load_data()" of "from keras.datasets import cifar10"

        img = image.load_img(img_path, target_size=(224, 224))
        x = image.img_to_array(img)
        x = np.expand_dims(x, axis=0)
        x = preprocess_input(x)
        images.append(x)
        labels.append(np.array(i, ndmin=1, copy=False))

My problem is how to "(X_train, Y_train), (X_valid, Y_valid) = cifar10.load_data()" of "from keras.datasets import cifar10" can be changed directly with dnarray format to input format of dist-keras trainer.

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

I got Pickle error, so it might come from serialization. I am quite new to spark though, my input is not changed to rdd perfectly, right?

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

I don't know what this error is
TypeError: can't pickle _cffi_backend.CTypeDescr objects

Traceback (most recent call last): File "/HOME/rayjang/cnn_finetune/./pdDF.py", line 199, in <module> trained_model = trainer.train(training_set) File "/usr/local/lib/python2.7/dist-packages/distkeras/trainers.py", line 638, in train self.history = dataset.rdd.mapPartitionsWithIndex(worker.train).collect() File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 776, in collect File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 2403, in _jrdd File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 2336, in _wrap_function File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 2315, in _prepare_for_python_RDD File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 428, in dumps File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 657, in dumps File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 107, in dump File "/usr/lib/python2.7/pickle.py", line 224, in dump self.save(obj) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 568, in save_tuple save(element) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 401, in save_instancemethod File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 535, in save_reduce File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 554, in save_tuple save(element) File "/usr/lib/python2.7/pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 606, in save_list self._batch_appends(iter(obj)) File "/usr/lib/python2.7/pickle.py", line 639, in _batch_appends save(x) File "/usr/lib/python2.7/pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 606, in save_list self._batch_appends(iter(obj)) File "/usr/lib/python2.7/pickle.py", line 639, in _batch_appends save(x) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 568, in save_tuple save(element) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 606, in save_list self._batch_appends(iter(obj)) File "/usr/lib/python2.7/pickle.py", line 639, in _batch_appends save(x) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 568, in save_tuple save(element) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 692, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 568, in save_tuple save(element) File "/usr/lib/python2.7/pickle.py", line 306, in save rv = reduce(self.proto) TypeError: can't pickle _cffi_backend.CTypeDescr objects

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

My output of dataset_train.printSchema() and output of your MnistExample's printSchema() look same.
I am still struggling to find out where the pickle(serialization) problem comes.
image
TypeError says that I guess my input type triggers this problem....

from dist-keras.

JoeriHermans avatar JoeriHermans commented on May 18, 2024

Yes, but I load the images in a different manner.

What you could do (which is more memory efficient as well). Get a list of the paths of all your images. Parallelize that list (thus making an rdd of those paths). Next, do something like:

def load_images(iterator):
    for path in iterator:
        row = {}
        image = read_image()
        row['image'] = image
        row['label'] = get_label(image)
        yield row

new_df = df.mapPartitions(load_images)

This should prevent any memory errors, and serialization issues.

Joeri

from dist-keras.

JoeriHermans avatar JoeriHermans commented on May 18, 2024

In your first code snippet. Try setting this

dataset_train = df.mapPartitions(load_images)

to

dataset_train = df.mapPartitions(load_images).toDF()

Joeri

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

I got this error:
TypeError: Can not infer schema for type: <class 'numpy.float32'>

image

from keras import backend as K
import cv2
from keras.utils import np_utils
classes = 100

path_list=[]
i=0
raw_path = '../data/ImageData/'
while i < classes:
    path = raw_path + str(i)
    for img_path in glob.glob(os.path.join(path, '*.png')):
        path_list.append(img_path)
    i+=1
df = sc.parallelize(path_list)

def load_images(iterator):
    for path in iterator:
        a,b,c,image_label,d = img_path.split("/")
        row = {}
        img = image.load_img(path, target_size=(224, 224))
        x = image.img_to_array(img)
        x = np.expand_dims(x, axis=0)
        x = preprocess_input(x)
        x = x.flatten()
        x_list = list(x)
        row['feature'] = x_list
        row['label'] = int(image_label)
        yield row

dataset_train = df.mapPartitions(load_images).toDF()

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

Update!!
I added below code. so error was gone
```
_x_list = list(x)
x_list = [float(v) for v in _x_list]

I got another error.. I will do it and I will update soon

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

When I run "dense_transformer = DenseTransformer(input_col="features", output_col="features_dense")", I still get error

def load_images(iterator):
    for path in iterator:
        a,b,c,image_label,d = img_path.split("/")
        row = {}
        img = image.load_img(path, target_size=(224, 224))
        x = image.img_to_array(img)
        x = np.expand_dims(x, axis=0)
        x = preprocess_input(x)
        x = x.flatten()
        _x_list = list(x)
        x_list = [float(v) for v in _x_list]
        #x_list = DenseVector(x_list)
        row['features'] = x_list
        row['label'] = int(image_label)
        yield row

dataset_train = df.mapPartitions(load_images).toDF()

This showed this error
image
And I trained the model, I got error like "PicklingError: Could not pickle object as excessively deep recursion required."

So, I compared between type of my spark dataframe and type of your mnist dataframe. I found that your type of 'features' is Vector. So I tried to change my feature list to densevector. But I got different error
This error comes from

trainer = ADAG(keras_model=model, worker_optimizer=sgd, loss=loss_mlp, num_workers=num_workers,
               batch_size=4, communication_window=15, num_epoch=1,
               features_col="matrix", label_col="label_encoded")

This is error "PicklingError: Could not serialize object: TypeError: can't pickle _hashlib.HASH objects"

I am getting confused more about rdd..

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

What is the type of output of VectorAssembler?? I try to understand vectorAssembler source and find the difference between my input's type and VA's output type.

from dist-keras.

JoeriHermans avatar JoeriHermans commented on May 18, 2024

Could be due to the fact that your model is to big? How many trainable parameters does it have?

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

I am using a MobileNet. I will try to do fine-tuning with convolution layers to be freezed.
According to the paper, there are 3.3 Million Parameters + 1000(it is one more FC layer for fine-tuning).

from dist-keras.

JoeriHermans avatar JoeriHermans commented on May 18, 2024

Should be doable, I trained models with 200 Mil + params, so that should be fine. Do you have the code for the Keras model?

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

This is my test keras model I'v tested. I will try to fine-tune the pre-trained model.
when I used your distributed training, is not model.complie needed? right? I tested both though, I got error anyway

img_rows, img_cols = 224, 224 # Resolution of inputs
channel = 3
classes = 100
batch_size = 10 
nb_epoch = 20

# Load our model
model = MobileNet(input_shape=(img_rows, img_cols, channel))

model.outputs = model.layers[-3].output
#x = Dropout(0.2)(model.outputs)
x = Conv2D(1000, (1, 1), padding='same', name='features')(model.outputs)
x = Conv2D(classes, (1, 1), padding='same', name='conv_preds2')(x)
x = Activation('softmax', name='act_softmax')(x) 
x = Reshape((classes,), name='reshape_2')(x)
model = Model(model.inputs, x)


#Freeze layers
for layer in model.layers[:-7]:
    layer.trainable = False

    
# Start Fine-tuning
sgd = SGD(lr=1e-3, decay=1e-6, momentum=0.9, nesterov=True)

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

I run the model with a different model(your convnet model in mnist.py). I got same pickle error.
I doubts my input feature is the problem..

# Construct the model.
convnet = Sequential()
convnet.add(Convolution2D(nb_filters, kernel_size[0], kernel_size[1],
                          border_mode='valid',
                          input_shape=input_shape))
convnet.add(Activation('relu'))
convnet.add(Convolution2D(nb_filters, kernel_size[0], kernel_size[1]))
convnet.add(Activation('relu'))
convnet.add(MaxPooling2D(pool_size=pool_size))
convnet.add(Flatten())
convnet.add(Dense(225))
convnet.add(Activation('relu'))
convnet.add(Dense(nb_classes))
convnet.add(Activation('softmax'))

from dist-keras.

JoeriHermans avatar JoeriHermans commented on May 18, 2024

Could you give me the full stacktrace of the error? So I know where exactly it originates from?

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

This is the code for making spark df

from keras import backend as K
import cv2
from keras.utils import np_utils
from pyspark.mllib.linalg import Vectors
classes = 100

path_list=[]
i=0
raw_path = '../data/cifar100/'
while i < classes:
    path = raw_path + str(i)
    for img_path in glob.glob(os.path.join(path, '*.png')):
        path_list.append(img_path)
    i+=1
df = sc.parallelize(path_list)

def load_images(iterator):
    for path in iterator:
        a,b,c,image_label,d = img_path.split("/")
        row = {}
        img = image.load_img(path, target_size=(224, 224))
        x = image.img_to_array(img)
        x = np.expand_dims(x, axis=0)
        x = preprocess_input(x)
        x = x.flatten()
        _x_list = list(x)
        x_list = [float(v) for v in _x_list]
        #x_list = DenseVector(x_list)
        row['features'] = x_list
        row['label'] = int(image_label)
        yield row

dataset_train = df.mapPartitions(load_images)

This is the full stackrace of the error

root@user-node2:/HOME/rayjang/cnn_finetune# ../spark-2.0.0-bin-hadoop2.7/bin/spark-submit --master spark://143.248.80.197:7077 ./1029.py
Using TensorFlow backend.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/10/29 23:57:17 INFO SparkContext: Running Spark version 2.0.0
17/10/29 23:57:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/10/29 23:57:17 INFO SecurityManager: Changing view acls to: root
17/10/29 23:57:17 INFO SecurityManager: Changing modify acls to: root
17/10/29 23:57:17 INFO SecurityManager: Changing view acls groups to:
17/10/29 23:57:17 INFO SecurityManager: Changing modify acls groups to:
17/10/29 23:57:17 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
17/10/29 23:57:17 INFO Utils: Successfully started service 'sparkDriver' on port 44338.
17/10/29 23:57:17 INFO SparkEnv: Registering MapOutputTracker
17/10/29 23:57:17 INFO SparkEnv: Registering BlockManagerMaster
17/10/29 23:57:17 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-8a2b1aef-1606-49e6-9c39-70b70127e6bd
17/10/29 23:57:17 INFO MemoryStore: MemoryStore started with capacity 3.0 GB
17/10/29 23:57:18 INFO SparkEnv: Registering OutputCommitCoordinator
17/10/29 23:57:18 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
17/10/29 23:57:18 INFO Utils: Successfully started service 'SparkUI' on port 4041.
17/10/29 23:57:18 INFO SparkUI: Bound SparkUI to 143.248.80.197, and started at http://143.248.80.197:4041
17/10/29 23:57:18 INFO Utils: Copying /HOME/rayjang/cnn_finetune/./1029.py to /tmp/spark-0f3568e8-18bb-4631-8b76-fc6439c697cd/userFiles-2be57920-b807-40f5-838a-dd4ede4c3d85/1029.py
17/10/29 23:57:18 INFO SparkContext: Added file file:/HOME/rayjang/cnn_finetune/./1029.py at file:/HOME/rayjang/cnn_finetune/./1029.py with timestamp 1509289038226
17/10/29 23:57:18 INFO Executor: Starting executor ID driver on host localhost
17/10/29 23:57:18 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 37483.
17/10/29 23:57:18 INFO NettyBlockTransferService: Server created on 143.248.80.197:37483
17/10/29 23:57:18 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 143.248.80.197, 37483)
17/10/29 23:57:18 INFO BlockManagerMasterEndpoint: Registering block manager 143.248.80.197:37483 with 3.0 GB RAM, BlockManagerId(driver, 143.248.80.197, 37483)
17/10/29 23:57:18 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 143.248.80.197, 37483)
17/10/29 23:57:18 INFO SparkContext: Starting job: runJob at PythonRDD.scala:441
17/10/29 23:57:18 INFO DAGScheduler: Got job 0 (runJob at PythonRDD.scala:441) with 1 output partitions
17/10/29 23:57:18 INFO DAGScheduler: Final stage: ResultStage 0 (runJob at PythonRDD.scala:441)
17/10/29 23:57:18 INFO DAGScheduler: Parents of final stage: List()
17/10/29 23:57:18 INFO DAGScheduler: Missing parents: List()
17/10/29 23:57:18 INFO DAGScheduler: Submitting ResultStage 0 (PythonRDD[1] at RDD at PythonRDD.scala:48), which has no missing parents
17/10/29 23:57:18 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 5.3 KB, free 3.0 GB)
17/10/29 23:57:18 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 3.6 KB, free 3.0 GB)
17/10/29 23:57:18 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 143.248.80.197:37483 (size: 3.6 KB, free: 3.0 GB)
17/10/29 23:57:18 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1012
17/10/29 23:57:18 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (PythonRDD[1] at RDD at PythonRDD.scala:48)
17/10/29 23:57:18 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
17/10/29 23:57:18 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0, PROCESS_LOCAL, 8752 bytes)
17/10/29 23:57:18 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/10/29 23:57:18 INFO Executor: Fetching file:/HOME/rayjang/cnn_finetune/./1029.py with timestamp 1509289038226
17/10/29 23:57:18 INFO Utils: /HOME/rayjang/cnn_finetune/./1029.py has been previously copied to /tmp/spark-0f3568e8-18bb-4631-8b76-fc6439c697cd/userFiles-2be57920-b807-40f5-838a-dd4ede4c3d85/1029.py
Using TensorFlow backend.
17/10/29 23:57:19 INFO PythonRunner: Times: total = 813, boot = 138, init = 652, finish = 23
17/10/29 23:57:19 INFO MemoryStore: Block taskresult_0 stored as bytes in memory (estimated size 1331.1 KB, free 3.0 GB)
17/10/29 23:57:19 INFO BlockManagerInfo: Added taskresult_0 in memory on 143.248.80.197:37483 (size: 1331.1 KB, free: 3.0 GB)
17/10/29 23:57:19 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1363049 bytes result sent via BlockManager)
17/10/29 23:57:19 INFO TransportClientFactory: Successfully created connection to /143.248.80.197:37483 after 14 ms (0 ms spent in bootstraps)
17/10/29 23:57:19 INFO BlockManagerInfo: Removed taskresult_0 on 143.248.80.197:37483 in memory (size: 1331.1 KB, free: 3.0 GB)
17/10/29 23:57:19 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1004 ms on localhost (1/1)
17/10/29 23:57:19 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
17/10/29 23:57:19 INFO DAGScheduler: ResultStage 0 (runJob at PythonRDD.scala:441) finished in 1.017 s
17/10/29 23:57:19 INFO DAGScheduler: Job 0 finished: runJob at PythonRDD.scala:441, took 1.256326 s
/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/session.py:336: UserWarning: Using RDD of dict to inferSchema is deprecated. Use pyspark.sql.Row instead
17/10/29 23:57:20 INFO SharedState: Warehouse path is 'file:/HOME/rayjang/cnn_finetune/spark-warehouse'.
17/10/29 23:57:20 INFO SparkContext: Starting job: runJob at PythonRDD.scala:441
17/10/29 23:57:20 INFO DAGScheduler: Got job 1 (runJob at PythonRDD.scala:441) with 1 output partitions
17/10/29 23:57:20 INFO DAGScheduler: Final stage: ResultStage 1 (runJob at PythonRDD.scala:441)
17/10/29 23:57:20 INFO DAGScheduler: Parents of final stage: List()
17/10/29 23:57:20 INFO DAGScheduler: Missing parents: List()
17/10/29 23:57:20 INFO DAGScheduler: Submitting ResultStage 1 (PythonRDD[9] at RDD at PythonRDD.scala:48), which has no missing parents
17/10/29 23:57:20 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 14.9 KB, free 3.0 GB)
17/10/29 23:57:20 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 8.3 KB, free 3.0 GB)
17/10/29 23:57:20 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 143.248.80.197:37483 (size: 8.3 KB, free: 3.0 GB)
17/10/29 23:57:20 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1012
17/10/29 23:57:20 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (PythonRDD[9] at RDD at PythonRDD.scala:48)
17/10/29 23:57:20 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
17/10/29 23:57:20 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0, PROCESS_LOCAL, 8752 bytes)
17/10/29 23:57:20 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
Using TensorFlow backend.
17/10/29 23:57:21 INFO CodeGenerator: Code generated in 110.023497 ms
17/10/29 23:57:21 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 143.248.80.197:37483 in memory (size: 3.6 KB, free: 3.0 GB)
Using TensorFlow backend.
17/10/29 23:57:22 INFO PythonRunner: Times: total = 921, boot = 1, init = 910, finish = 10
17/10/29 23:57:22 INFO MemoryStore: Block taskresult_1 stored as bytes in memory (estimated size 1332.4 KB, free 3.0 GB)
17/10/29 23:57:22 INFO BlockManagerInfo: Added taskresult_1 in memory on 143.248.80.197:37483 (size: 1332.4 KB, free: 3.0 GB)
17/10/29 23:57:22 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1364372 bytes result sent via BlockManager)
17/10/29 23:57:22 INFO BlockManagerInfo: Removed taskresult_1 on 143.248.80.197:37483 in memory (size: 1332.4 KB, free: 3.0 GB)
17/10/29 23:57:22 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 1785 ms on localhost (1/1)
17/10/29 23:57:22 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
17/10/29 23:57:22 INFO DAGScheduler: ResultStage 1 (runJob at PythonRDD.scala:441) finished in 1.788 s
17/10/29 23:57:22 INFO DAGScheduler: Job 1 finished: runJob at PythonRDD.scala:441, took 1.811145 s
17/10/29 23:57:22 INFO SparkContext: Starting job: runJob at PythonRDD.scala:441
17/10/29 23:57:22 INFO DAGScheduler: Got job 2 (runJob at PythonRDD.scala:441) with 1 output partitions
17/10/29 23:57:22 INFO DAGScheduler: Final stage: ResultStage 2 (runJob at PythonRDD.scala:441)
17/10/29 23:57:22 INFO DAGScheduler: Parents of final stage: List()
17/10/29 23:57:22 INFO DAGScheduler: Missing parents: List()
17/10/29 23:57:22 INFO DAGScheduler: Submitting ResultStage 2 (PythonRDD[17] at RDD at PythonRDD.scala:48), which has no missing parents
17/10/29 23:57:22 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 21.3 KB, free 3.0 GB)
17/10/29 23:57:22 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 9.7 KB, free 3.0 GB)
17/10/29 23:57:22 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 143.248.80.197:37483 (size: 9.7 KB, free: 3.0 GB)
17/10/29 23:57:22 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1012
17/10/29 23:57:22 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (PythonRDD[17] at RDD at PythonRDD.scala:48)
17/10/29 23:57:22 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
17/10/29 23:57:22 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, partition 0, PROCESS_LOCAL, 8752 bytes)
17/10/29 23:57:22 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
Using TensorFlow backend.
Using TensorFlow backend.
17/10/29 23:57:24 INFO CodeGenerator: Code generated in 14.185637 ms
Using TensorFlow backend.
17/10/29 23:57:25 INFO PythonRunner: Times: total = 950, boot = 1, init = 865, finish = 84
17/10/29 23:57:25 INFO MemoryStore: Block taskresult_2 stored as bytes in memory (estimated size 3.0 MB, free 3.0 GB)
17/10/29 23:57:25 INFO BlockManagerInfo: Added taskresult_2 in memory on 143.248.80.197:37483 (size: 3.0 MB, free: 3.0 GB)
17/10/29 23:57:25 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 3130261 bytes result sent via BlockManager)
17/10/29 23:57:25 INFO BlockManagerInfo: Removed taskresult_2 on 143.248.80.197:37483 in memory (size: 3.0 MB, free: 3.0 GB)
17/10/29 23:57:25 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 2482 ms on localhost (1/1)
17/10/29 23:57:25 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
17/10/29 23:57:25 INFO DAGScheduler: ResultStage 2 (runJob at PythonRDD.scala:441) finished in 2.485 s
17/10/29 23:57:25 INFO DAGScheduler: Job 2 finished: runJob at PythonRDD.scala:441, took 2.492799 s
/HOME/rayjang/cnn_finetune/./1029.py:172: UserWarning: Update your `Conv2D` call to the Keras 2 API: `Conv2D(3, (3, 3), padding="valid", input_shape=(224, 224,...)`
  input_shape=input_shape))
/HOME/rayjang/cnn_finetune/./1029.py:174: UserWarning: Update your `Conv2D` call to the Keras 2 API: `Conv2D(3, (3, 3))`
  convnet.add(Convolution2D(nb_filters, kernel_size[0], kernel_size[1]))
root
 |-- features: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- matrix: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: double (containsNull = true)
 |-- label: long (nullable = true)
 |-- label_encoded: array (nullable = true)
 |    |-- element: double (containsNull = true)

17/10/29 23:57:25 INFO CodeGenerator: Code generated in 22.765937 ms
2017-10-29 23:57:25.519880: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.1 instructions, but these are available on your machine and could speed up CPU computations.
2017-10-29 23:57:25.519900: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.2 instructions, but these are available on your machine and could speed up CPU computations.
2017-10-29 23:57:25.519904: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX instructions, but these are available on your machine and could speed up CPU computations.
2017-10-29 23:57:25.519907: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX2 instructions, but these are available on your machine and could speed up CPU computations.
2017-10-29 23:57:25.519910: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use FMA instructions, but these are available on your machine and could speed up CPU computations.
2017-10-29 23:57:25.619291: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:893] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2017-10-29 23:57:25.619505: I tensorflow/core/common_runtime/gpu/gpu_device.cc:955] Found device 0 with properties:
name: TITAN X (Pascal)
major: 6 minor: 1 memoryClockRate (GHz) 1.531
pciBusID 0000:01:00.0
Total memory: 11.90GiB
Free memory: 11.69GiB
2017-10-29 23:57:25.619517: I tensorflow/core/common_runtime/gpu/gpu_device.cc:976] DMA: 0
2017-10-29 23:57:25.619521: I tensorflow/core/common_runtime/gpu/gpu_device.cc:986] 0:   Y
2017-10-29 23:57:25.619526: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1045] Creating TensorFlow device (/gpu:0) -> (device: 0, name: TITAN X (Pascal), pci bus id: 0000:01:00.0)
Traceback (most recent call last):
  File "/HOME/rayjang/cnn_finetune/./1029.py", line 210, in <module>
    trained_model = trainer.train(training_set)
  File "/usr/local/lib/python2.7/dist-packages/distkeras/trainers.py", line 638, in train
    self.history = dataset.rdd.mapPartitionsWithIndex(worker.train).collect()
  File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 776, in collect
  File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 2403, in _jrdd
  File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 2336, in _wrap_function
  File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 2315, in _prepare_for_python_RDD
  File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 428, in dumps
  File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 657, in dumps
  File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 107, in dump
  File "/usr/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 568, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 401, in save_instancemethod
  File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 535, in save_reduce
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 554, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 606, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib/python2.7/pickle.py", line 639, in _batch_appends
    save(x)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 606, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib/python2.7/pickle.py", line 639, in _batch_appends
    save(x)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 568, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 606, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib/python2.7/pickle.py", line 642, in _batch_appends
    save(tmp[0])
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 606, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib/python2.7/pickle.py", line 642, in _batch_appends
    save(tmp[0])
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 606, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib/python2.7/pickle.py", line 639, in _batch_appends
    save(x)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 568, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 692, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 568, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
TypeError: can't pickle _cffi_backend.CTypeDescr objects

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

I deleted what i posted one minute ago because i did simple mistakes to test... I am still lost..

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

This one is from your mnist example.
I made 'features' columns by using VectorAssembler to check type of cell of features.
It is DenseVector. Shoud I change the numpy to DenseVector??

image

Update! ( I added "x_list = DenseVector(x_list)" when I made 'dataset_train' df. but still got pickle error but different.

`---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
in ()
5 features_col="matrix", label_col="label_encoded")
6 trainer.set_parallelism_factor(1)
----> 7 trained_model = trainer.train(training_set)
8
9 print("Training time: " + str(trainer.get_training_time()))

/HOME/anaconda3/lib/python3.5/site-packages/distkeras/trainers.py in train(self, dataframe, shuffle)
636 self.record_training_start()
637 # Iterate through the epochs.
--> 638 self.history = dataset.rdd.mapPartitionsWithIndex(worker.train).collect()
639 # End the training procedure.
640 self.record_training_end()

/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/pyspark/rdd.py in collect(self)
774 """
775 with SCCallSiteSync(self.context) as css:
--> 776 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
777 return list(_load_from_socket(port, self._jrdd_deserializer))
778

/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/pyspark/rdd.py in _jrdd(self)
2401
2402 wrapped_func = _wrap_function(self.ctx, self.func, self._prev_jrdd_deserializer,
-> 2403 self._jrdd_deserializer, profiler)
2404 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), wrapped_func,
2405 self.preservesPartitioning)

/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/pyspark/rdd.py in _wrap_function(sc, func, deserializer, serializer, profiler)
2334 assert serializer, "serializer should not be empty"
2335 command = (func, profiler, deserializer, serializer)
-> 2336 pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
2337 return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
2338 sc.pythonVer, broadcast_vars, sc._javaAccumulator)

/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/pyspark/rdd.py in _prepare_for_python_RDD(sc, command)
2313 # the serialized command will be compressed by broadcast
2314 ser = CloudPickleSerializer()
-> 2315 pickled_command = ser.dumps(command)
2316 if len(pickled_command) > (1 << 20): # 1M
2317 # The broadcast will have same life cycle as created PythonRDD

/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/pyspark/serializers.py in dumps(self, obj)
426
427 def dumps(self, obj):
--> 428 return cloudpickle.dumps(obj, 2)
429
430

/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in dumps(obj, protocol)
655
656 cp = CloudPickler(file,protocol)
--> 657 cp.dump(obj)
658
659 return file.getvalue()

/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in dump(self, obj)
105 self.inject_addons()
106 try:
--> 107 return Pickler.dump(self, obj)
108 except RuntimeError as e:
109 if 'recursion' in e.args[0]:

/HOME/anaconda3/lib/python3.5/pickle.py in dump(self, obj)
406 if self.proto >= 4:
407 self.framer.start_framing()
--> 408 self.save(obj)
409 self.write(STOP)
410 self.framer.end_framing()

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
473 f = self.dispatch.get(t)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
477

/HOME/anaconda3/lib/python3.5/pickle.py in save_tuple(self, obj)
738 write(MARK)
739 for element in obj:
--> 740 save(element)
741
742 if id(obj) in memo:

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
473 f = self.dispatch.get(t)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
477

/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in save_instancemethod(self, obj)
396 # Memoization rarely is ever useful due to python bounding
397 if PY3:
--> 398 self.save_reduce(types.MethodType, (obj.func, obj.self), obj=obj)
399 else:
400 self.save_reduce(types.MethodType, (obj.func, obj.self, obj.self.class),

/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
533 else:
534 save(func)
--> 535 save(args)
536 write(pickle.REDUCE)
537

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
473 f = self.dispatch.get(t)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
477

/HOME/anaconda3/lib/python3.5/pickle.py in save_tuple(self, obj)
723 if n <= 3 and self.proto >= 2:
724 for element in obj:
--> 725 save(element)
726 # Subtle. Same as in the big comment below.
727 if id(obj) in memo:

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
518
519 # Save the reduce() output and finally memoize the object
--> 520 self.save_reduce(obj=obj, *rv)
521
522 def persistent_id(self, obj):

/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
551
552 if state is not None:
--> 553 save(state)
554 write(pickle.BUILD)
555

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
473 f = self.dispatch.get(t)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
477

/HOME/anaconda3/lib/python3.5/pickle.py in save_dict(self, obj)
808
809 self.memoize(obj)
--> 810 self._batch_setitems(obj.items())
811
812 dispatch[dict] = save_dict

/HOME/anaconda3/lib/python3.5/pickle.py in _batch_setitems(self, items)
834 for k, v in tmp:
835 save(k)
--> 836 save(v)
837 write(SETITEMS)
838 elif n:

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
518
519 # Save the reduce() output and finally memoize the object
--> 520 self.save_reduce(obj=obj, *rv)
521
522 def persistent_id(self, obj):

/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
551
552 if state is not None:
--> 553 save(state)
554 write(pickle.BUILD)
555

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
473 f = self.dispatch.get(t)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
477

/HOME/anaconda3/lib/python3.5/pickle.py in save_dict(self, obj)
808
809 self.memoize(obj)
--> 810 self._batch_setitems(obj.items())
811
812 dispatch[dict] = save_dict

/HOME/anaconda3/lib/python3.5/pickle.py in _batch_setitems(self, items)
834 for k, v in tmp:
835 save(k)
--> 836 save(v)
837 write(SETITEMS)
838 elif n:

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
518
519 # Save the reduce() output and finally memoize the object
--> 520 self.save_reduce(obj=obj, *rv)
521
522 def persistent_id(self, obj):

/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
551
552 if state is not None:
--> 553 save(state)
554 write(pickle.BUILD)
555

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
473 f = self.dispatch.get(t)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
477

/HOME/anaconda3/lib/python3.5/pickle.py in save_dict(self, obj)
808
809 self.memoize(obj)
--> 810 self._batch_setitems(obj.items())
811
812 dispatch[dict] = save_dict

/HOME/anaconda3/lib/python3.5/pickle.py in _batch_setitems(self, items)
834 for k, v in tmp:
835 save(k)
--> 836 save(v)
837 write(SETITEMS)
838 elif n:

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
518
519 # Save the reduce() output and finally memoize the object
--> 520 self.save_reduce(obj=obj, *rv)
521
522 def persistent_id(self, obj):

/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
551
552 if state is not None:
--> 553 save(state)
554 write(pickle.BUILD)
555

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
473 f = self.dispatch.get(t)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
477

/HOME/anaconda3/lib/python3.5/pickle.py in save_dict(self, obj)
808
809 self.memoize(obj)
--> 810 self._batch_setitems(obj.items())
811
812 dispatch[dict] = save_dict

/HOME/anaconda3/lib/python3.5/pickle.py in _batch_setitems(self, items)
834 for k, v in tmp:
835 save(k)
--> 836 save(v)
837 write(SETITEMS)
838 elif n:

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
473 f = self.dispatch.get(t)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
477

/HOME/anaconda3/lib/python3.5/pickle.py in save_list(self, obj)
768
769 self.memoize(obj)
--> 770 self._batch_appends(obj)
771
772 dispatch[list] = save_list

/HOME/anaconda3/lib/python3.5/pickle.py in _batch_appends(self, items)
792 write(MARK)
793 for x in tmp:
--> 794 save(x)
795 write(APPENDS)
796 elif n:

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
473 f = self.dispatch.get(t)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
477

/HOME/anaconda3/lib/python3.5/pickle.py in save_tuple(self, obj)
738 write(MARK)
739 for element in obj:
--> 740 save(element)
741
742 if id(obj) in memo:

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
473 f = self.dispatch.get(t)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
477

/HOME/anaconda3/lib/python3.5/pickle.py in save_dict(self, obj)
808
809 self.memoize(obj)
--> 810 self._batch_setitems(obj.items())
811
812 dispatch[dict] = save_dict

/HOME/anaconda3/lib/python3.5/pickle.py in _batch_setitems(self, items)
834 for k, v in tmp:
835 save(k)
--> 836 save(v)
837 write(SETITEMS)
838 elif n:

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
473 f = self.dispatch.get(t)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
477

/HOME/anaconda3/lib/python3.5/pickle.py in save_dict(self, obj)
808
809 self.memoize(obj)
--> 810 self._batch_setitems(obj.items())
811
812 dispatch[dict] = save_dict

/HOME/anaconda3/lib/python3.5/pickle.py in _batch_setitems(self, items)
834 for k, v in tmp:
835 save(k)
--> 836 save(v)
837 write(SETITEMS)
838 elif n:

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
473 f = self.dispatch.get(t)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
477

/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in save_instancemethod(self, obj)
396 # Memoization rarely is ever useful due to python bounding
397 if PY3:
--> 398 self.save_reduce(types.MethodType, (obj.func, obj.self), obj=obj)
399 else:
400 self.save_reduce(types.MethodType, (obj.func, obj.self, obj.self.class),

/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
533 else:
534 save(func)
--> 535 save(args)
536 write(pickle.REDUCE)
537

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
473 f = self.dispatch.get(t)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
477

/HOME/anaconda3/lib/python3.5/pickle.py in save_tuple(self, obj)
723 if n <= 3 and self.proto >= 2:
724 for element in obj:
--> 725 save(element)
726 # Subtle. Same as in the big comment below.
727 if id(obj) in memo:

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
518
519 # Save the reduce() output and finally memoize the object
--> 520 self.save_reduce(obj=obj, *rv)
521
522 def persistent_id(self, obj):

/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
551
552 if state is not None:
--> 553 save(state)
554 write(pickle.BUILD)
555

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
473 f = self.dispatch.get(t)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
477

/HOME/anaconda3/lib/python3.5/pickle.py in save_dict(self, obj)
808
809 self.memoize(obj)
--> 810 self._batch_setitems(obj.items())
811
812 dispatch[dict] = save_dict

/HOME/anaconda3/lib/python3.5/pickle.py in _batch_setitems(self, items)
834 for k, v in tmp:
835 save(k)
--> 836 save(v)
837 write(SETITEMS)
838 elif n:

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
473 f = self.dispatch.get(t)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
477

/HOME/anaconda3/lib/python3.5/pickle.py in save_dict(self, obj)
808
809 self.memoize(obj)
--> 810 self._batch_setitems(obj.items())
811
812 dispatch[dict] = save_dict

/HOME/anaconda3/lib/python3.5/pickle.py in _batch_setitems(self, items)
834 for k, v in tmp:
835 save(k)
--> 836 save(v)
837 write(SETITEMS)
838 elif n:

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
473 f = self.dispatch.get(t)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
477

/HOME/anaconda3/lib/python3.5/pickle.py in save_list(self, obj)
768
769 self.memoize(obj)
--> 770 self._batch_appends(obj)
771
772 dispatch[list] = save_list

/HOME/anaconda3/lib/python3.5/pickle.py in _batch_appends(self, items)
792 write(MARK)
793 for x in tmp:
--> 794 save(x)
795 write(APPENDS)
796 elif n:

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
518
519 # Save the reduce() output and finally memoize the object
--> 520 self.save_reduce(obj=obj, *rv)
521
522 def persistent_id(self, obj):

/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
551
552 if state is not None:
--> 553 save(state)
554 write(pickle.BUILD)
555

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
473 f = self.dispatch.get(t)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
477

/HOME/anaconda3/lib/python3.5/pickle.py in save_dict(self, obj)
808
809 self.memoize(obj)
--> 810 self._batch_setitems(obj.items())
811
812 dispatch[dict] = save_dict

/HOME/anaconda3/lib/python3.5/pickle.py in _batch_setitems(self, items)
834 for k, v in tmp:
835 save(k)
--> 836 save(v)
837 write(SETITEMS)
838 elif n:

/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
473 f = self.dispatch.get(t)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
477

/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in save_builtin_function(self, obj)
319 if obj.module is "builtin":
320 return self.save_global(obj)
--> 321 return self.save_function(obj)
322 dispatch[types.BuiltinFunctionType] = save_builtin_function
323

/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in save_function(self, obj, name)
194 # we'll pickle the actual function object rather than simply saving a
195 # reference (as is done in default pickler), via save_function_tuple.
--> 196 if islambda(obj) or obj.code.co_filename == '' or themodule is None:
197 #print("save global", islambda(obj), obj.code.co_filename, modname, themodule)
198 self.save_function_tuple(obj)

AttributeError: 'builtin_function_or_method' object has no attribute 'code'`

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

##Another try ##
when I checked the type of 'features' as VectorAssembler in mnist.py, its type is SparseVector.
so I changed my code like this.
image

BUT I got different error..
Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
image

from dist-keras.

JoeriHermans avatar JoeriHermans commented on May 18, 2024

Why encode it as a SparseVector? Can't you just convert it to DenseVector? If I recall correctly I wrote some utility methods which do just this: to_dense_vector.

Joeri

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

My model is CNN model based on mobileNet.
On your code, 'matrix' column is needed to train convnet by ADAG.
As I understand, 'matrix' column is made by the output of 'features' column of VectorAssembler. The type of 'features' is SparseVector.
image

I have one more question. What I need is 'matrix' and 'Encoded label' columns of pyspark dataframe.
Is it possible to make 'matrix' colum by using 'to_dense_vector??
I saw the code of transfomer.py, If I undertand correctly, 'matrix' column is denseVector. right???
sorry for confusing u.

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

where is 'to_dense_vector' method? I looked through every file you wrote but I can't find 'to_dense_vector' method.

AND I tried again to encode it as DenseVector and run it. I got this same error
Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

I decided to follow your way in your mnist example and change my data smaller. I changed my csv. Originally, I did normalize before saving a file as csv. So all output cell is float.
I changed plance to normalize after the loading it as spark dataframe as you did. ( also.. I decrease image' dimensions from 224224 to 128128)
I guess that VectorAssembler is working now because of all data type is int (less memory is needed)
BUT my code is stuck when I ran the one-hot encoding(label -> encoded_label)
When I ran it in jupyter, jupyter stopped and shut down.
if i ran it as a *.py in command, I got wierd error...
Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 7, localhost): java.util.concurrent.ExecutionException: java.lang.Exception: failed to compile: java.lang.NegativeArraySizeException

from dist-keras.

JoeriHermans avatar JoeriHermans commented on May 18, 2024

java.lang.NegativeArraySizeException

Euhm, how?

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

That error came from
encoder = OneHotTransformer(nb_classes, input_col="label", output_col="label_encoded") dataset_train = encoder.transform(raw_dataset_train)

I followed your example with my own data.
My temporal conclusion is that your code is not working with big image or data of many columns .
I ran your code with toy example with a few columns. It worked BUT With my own data with 128'*128'*3 columns, it is not working. I guess that this problem comes from the slowness of dataframe unlike array or list.
I am trying to find how to solve this issue.

from dist-keras.

JoeriHermans avatar JoeriHermans commented on May 18, 2024

How can this OneHotEncoder not work under your setting? It basically takes an integer input, computes the max value, and makes a vector out of those values.

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

type is also int. I can't understand why it doesn't work. When I check the source code of transformer, It is not that heavy (One-hot encoding)
Can I send u my data? Can u test it(just load it and run it)?
I think that it doesn't take long.

from dist-keras.

JoeriHermans avatar JoeriHermans commented on May 18, 2024

Can you provide me a shareable link? Or is the data not public? I have time to check it in detail on Tuesday (I'm working on a project with a deadline atm).

Joeri

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

This is my data. If you download it, plz let me know. After u download it, I will delete the link.

Thanks. I need to train my own model in distirbuted way . BUT I am still stuck in this problem... I will try to find the way to solve the problem anyway.

from dist-keras.

JoeriHermans avatar JoeriHermans commented on May 18, 2024

I got the data. Could you send me your code to [email protected]? So I can inspect the code myself.

Joeri

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

https://github.com/cerndb/dist-keras/blob/master/examples/mnist.ipynb

I use my data as the train data in your mnist.ipynb

I tried to modify my data to use it as input of trainer without VectorAssembler and your custom tranformer. I can't find the way. I decided to follow your way of mnist example as I make my data smaller and make type of my data integer.

from dist-keras.

JoeriHermans avatar JoeriHermans commented on May 18, 2024

Ok, I'll keep you posted. But you are also using the same model?

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

I will use different model after I test my own data.
But, now I use your simple convnet model to test my own data.
I got error before training the model. I got error while I modified input data.
I will test my model too. I will send it for u later after I organize messy my model code

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

Only different thing with ur mnist example code is the number of classes. Mnist classes are 10. My classes are 100

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

I will send my code soon. I realize that I added some configuration since my data has too many dimension.

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

I e-mail u.

from dist-keras.

rayjang avatar rayjang commented on May 18, 2024

After VectorAssembler job was done, I try to save the dataframe to csv.
It also caused error like the below
java.util.concurrent.ExecutionException: java.lang.Exception: failed to compile: java.io.EOFException
When I did with toy example( jsut 20 columns), It worked well.
I still guess that the problem comes from that dataframe cannot hold too many columns to calculate data.

from dist-keras.

JoeriHermans avatar JoeriHermans commented on May 18, 2024

Solved in e-mail thread.

from dist-keras.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.