Coder Social home page Coder Social logo

src-d / jgit-spark-connector Goto Github PK

View Code? Open in Web Editor NEW
71.0 11.0 33.0 8.76 MB

jgit-spark-connector is a library for running scalable data retrieval pipelines that process any number of Git repositories for source code analysis.

License: Apache License 2.0

Scala 75.07% Shell 6.79% Makefile 1.69% Python 12.35% Jupyter Notebook 3.40% Dockerfile 0.70%
spark pyspark scala python git datasource

jgit-spark-connector's Issues

Guava version at bblfsh/scala-client

On clean Ubuntu 16.04 env using

bin/spark-shell --packages tech.sourced:engine:0.1.2 --repositories "https://jitpack.io"
--repositories is needed for siva-java, not included in uber-jar 0.1.2
and

Example \w UAST extraction I got Guava version mismatch at runtime

17/10/26 13:59:37 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 10, localhost, executor driver): java.lang.NoSuchMethodError: com.google.common.util.concurre$
t.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
        at io.grpc.internal.ClientCallImpl.<init>(ClientCallImpl.java:104)
        at io.grpc.internal.ManagedChannelImpl$RealChannel.newCall(ManagedChannelImpl.java:554)
        at io.grpc.internal.ManagedChannelImpl.newCall(ManagedChannelImpl.java:533)
        at gopkg.in.bblfsh.sdk.v1.protocol.generated.ProtocolServiceGrpc$ProtocolServiceBlockingStub.parse(ProtocolServiceGrpc.scala:50)
        at org.bblfsh.client.BblfshClient.parse(BblfshClient.scala:29)
        at tech.sourced.engine.udf.ExtractUASTsUDF$.extractUsingBblfsh(ExtractUASTsUDF.scala:110)
        at tech.sourced.engine.udf.ExtractUASTsUDF$.extractUAST(ExtractUASTsUDF.scala:92)
        at tech.sourced.engine.udf.ExtractUASTsUDF$.extractUASTsWithLang(ExtractUASTsUDF.scala:69)

[DS] Model iterators

We need a way to transform model (repository,reference,commit,blob) -> Iterator[Row]
It should be an abstract class with all the generic code for all the models, and specific code to generate rows per each model.

This iterator should be able to process more than one repository in a lazy way.

Rename head_ref to head, master_ref to master

I do understand that this has smth to do with the Scala API, but Python users should not suffer because of the name collision in the different language. references.head_ref is a duplication.

Cleanup un-packed .siva files

After copying from HDFS and unpacking a .siva files at the start of the job, local FS on workers has to be cleaned-up at when the Job ends.

Current implementation does not take care of it.

There are two ways to do that

  • CompletionIterator / InterruptibleIterator preferable, see example
  • TaskContext listener

This issue is about picking a simplest one that would feat Spark API use cases and implementing it.
Unpacked .siva file optimization needs to be taken into account.

Handle releases correctly

If we create a new tag defining a version on the spark-api repository, we should:

  • Execute all the tests
  • Generate a uber-jar using sbt assembly
  • Generate a new container version including the new uber-jar
  • Upload assembly jar to spark packages
  • Upload python code to pypi

Error reporting: check existence of the path to siva files

If example from our README https://github.com/src-d/spark-api#pyspark-api-usage is tried literally

from sourced.spark import API as SparkAPI
from pyspark.sql import SparkSession
 
spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
api = SparkAPI(spark, '/path/to/siva/files')
api.repositories.filter("id = 'github.com/mawag/faq-xiyoulinux'").references.filter("name = 'refs/heads/HEAD'").show()

with the )non-existent path to .siva files_, it will result in

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-1-7f67465f882f> in <module>()
      4 spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
      5 api = SparkAPI(spark, '/path/to/siva/files')
----> 6 api.repositories.filter("id = 'github.com/mawag/faq-xiyoulinux'").references.filter("name = 'refs/heads/HEAD'").show()

/usr/local/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate)
    334         """
    335         if isinstance(truncate, bool) and truncate:
--> 336             print(self._jdf.showString(n, 20))
    337         else:
    338             print(self._jdf.showString(n, int(truncate)))

/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling o39.showString.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(repository_id#11, 200)
+- *Filter (((isnotnull(name#12) && (name#12 = refs/heads/HEAD)) && isnotnull(repository_id#11)) && (repository_id#11 = github.com/mawag/faq-xiyoulinux))
   +- *Scan GitRelation(org.apache.spark.sql.SQLContext@4faae818,references,/path/to/siva/files,/tmp) [repository_id#11,name#12,hash#13] PushedFilters: [IsNotNull(name), EqualTo(name,refs/heads/HEAD), IsNotNull(repository_id), EqualTo(repository_id,..., ReadSchema: struct<repository_id:string,name:string,hash:string>

which is not very clean error message.

This can be fixed by implementing a proper check and error reporting, if the given path does not exist, in the Scala part.

[API] Create a new method to get files directly from repositories df

We need a new method that would get files from repositories using several filters to improve performance.

The first step is to add two new columns to files relation:

  • repository_id
  • reference_name

Then, we should create a new api method, something like:

def getFiles(repositoriesIds:Seq[String],referenceNames:Seq[String],commitHashes:Seq[String]):DataFrame

Race condition, tmp siva files deleted too soon

Executing the python tests I noticed randomly they fail because the tmp siva files cannot be found so this looks like a race condition somewhere (could very well be a Py4J/pyspark thing, so if you think this is not related to our code I can investigate further to see what's going on the py side)

Stacktrace of the crash:

17/09/19 15:24:30 ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/29/temp_shuffle_2fe14279-9444-431b-9392-4bf715b284d9
java.io.FileNotFoundException: /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/29/temp_shuffle_2fe14279-9444-431b-9392-4bf715b284d9 (No such file or directory)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(FileOutputStream.java:270)
	at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
	at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$2.apply$mcV$sp(DiskBlockObjectWriter.scala:215)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1346)
	at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(DiskBlockObjectWriter.scala:212)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.stop(BypassMergeSortShuffleWriter.java:237)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
17/09/19 15:24:30 ERROR BypassMergeSortShuffleWriter: Error while deleting file /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/29/temp_shuffle_2fe14279-9444-431b-9392-4bf715b284d9
17/09/19 15:24:30 ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/28/temp_shuffle_649e8ea0-e7ff-4e72-b5d5-f2eec176dd46
java.io.FileNotFoundException: /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/28/temp_shuffle_649e8ea0-e7ff-4e72-b5d5-f2eec176dd46 (No such file or directory)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(FileOutputStream.java:270)
	at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
	at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$2.apply$mcV$sp(DiskBlockObjectWriter.scala:215)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1346)
	at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(DiskBlockObjectWriter.scala:212)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.stop(BypassMergeSortShuffleWriter.java:237)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
17/09/19 15:24:30 ERROR BypassMergeSortShuffleWriter: Error while deleting file /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/28/temp_shuffle_649e8ea0-e7ff-4e72-b5d5-f2eec176dd46
17/09/19 15:24:30 ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/3e/temp_shuffle_247a46ff-753f-4a69-8c6b-fed143266a01
java.io.FileNotFoundException: /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/3e/temp_shuffle_247a46ff-753f-4a69-8c6b-fed143266a01 (No such file or directory)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(FileOutputStream.java:270)
	at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
	at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$2.apply$mcV$sp(DiskBlockObjectWriter.scala:215)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1346)
	at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(DiskBlockObjectWriter.scala:212)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.stop(BypassMergeSortShuffleWriter.java:237)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
17/09/19 15:24:30 ERROR BypassMergeSortShuffleWriter: Error while deleting file /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/3e/temp_shuffle_247a46ff-753f-4a69-8c6b-fed143266a01
17/09/19 15:24:30 ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/02/temp_shuffle_15d14d72-f4f0-44e4-8c32-4998fa803dff
java.io.FileNotFoundException: /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/02/temp_shuffle_15d14d72-f4f0-44e4-8c32-4998fa803dff (No such file or directory)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(FileOutputStream.java:270)
	at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
	at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$2.apply$mcV$sp(DiskBlockObjectWriter.scala:215)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1346)
	at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(DiskBlockObjectWriter.scala:212)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.stop(BypassMergeSortShuffleWriter.java:237)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
17/09/19 15:24:30 ERROR BypassMergeSortShuffleWriter: Error while deleting file /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/02/temp_shuffle_15d14d72-f4f0-44e4-8c32-4998fa803dff
17/09/19 15:24:30 ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/38/temp_shuffle_dd1b5a5a-7226-4578-aaa9-1086dd67594e
java.io.FileNotFoundException: /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/38/temp_shuffle_dd1b5a5a-7226-4578-aaa9-1086dd67594e (No such file or directory)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(FileOutputStream.java:270)
	at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
	at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$2.apply$mcV$sp(DiskBlockObjectWriter.scala:215)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1346)
	at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(DiskBlockObjectWriter.scala:212)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.stop(BypassMergeSortShuffleWriter.java:237)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
17/09/19 15:24:30 ERROR BypassMergeSortShuffleWriter: Error while deleting file /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/38/temp_shuffle_dd1b5a5a-7226-4578-aaa9-1086dd67594e
17/09/19 15:24:30 ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/3d/temp_shuffle_ef432a77-5e65-4f6d-96ed-b5b5965e58f8
java.io.FileNotFoundException: /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/3d/temp_shuffle_ef432a77-5e65-4f6d-96ed-b5b5965e58f8 (No such file or directory)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(FileOutputStream.java:270)
	at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
	at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$2.apply$mcV$sp(DiskBlockObjectWriter.scala:215)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1346)
	at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(DiskBlockObjectWriter.scala:212)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.stop(BypassMergeSortShuffleWriter.java:237)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
17/09/19 15:24:30 ERROR BypassMergeSortShuffleWriter: Error while deleting file /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/3d/temp_shuffle_ef432a77-5e65-4f6d-96ed-b5b5965e58f8
17/09/19 15:24:30 ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/29/temp_shuffle_69c5fa7d-a6f6-4d9d-a943-9816db1d5518
java.io.FileNotFoundException: /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/29/temp_shuffle_69c5fa7d-a6f6-4d9d-a943-9816db1d5518 (No such file or directory)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(FileOutputStream.java:270)
	at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
	at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$2.apply$mcV$sp(DiskBlockObjectWriter.scala:215)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1346)
	at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(DiskBlockObjectWriter.scala:212)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.stop(BypassMergeSortShuffleWriter.java:237)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
17/09/19 15:24:30 ERROR BypassMergeSortShuffleWriter: Error while deleting file /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/29/temp_shuffle_69c5fa7d-a6f6-4d9d-a943-9816db1d5518
17/09/19 15:24:30 ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/20/temp_shuffle_692f2eae-7e3b-43e5-9e7e-1fc7310787fb
java.io.FileNotFoundException: /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/20/temp_shuffle_692f2eae-7e3b-43e5-9e7e-1fc7310787fb (No such file or directory)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(FileOutputStream.java:270)
	at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
	at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$2.apply$mcV$sp(DiskBlockObjectWriter.scala:215)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1346)
	at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(DiskBlockObjectWriter.scala:212)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.stop(BypassMergeSortShuffleWriter.java:237)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
17/09/19 15:24:30 ERROR BypassMergeSortShuffleWriter: Error while deleting file /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/20/temp_shuffle_692f2eae-7e3b-43e5-9e7e-1fc7310787fb
17/09/19 15:24:30 ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/33/temp_shuffle_7e094f01-8975-4983-a60c-2c6b13fa28dc
java.io.FileNotFoundException: /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/33/temp_shuffle_7e094f01-8975-4983-a60c-2c6b13fa28dc (No such file or directory)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(FileOutputStream.java:270)
	at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
	at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$2.apply$mcV$sp(DiskBlockObjectWriter.scala:215)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1346)
	at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(DiskBlockObjectWriter.scala:212)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.stop(BypassMergeSortShuffleWriter.java:237)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
17/09/19 15:24:30 ERROR BypassMergeSortShuffleWriter: Error while deleting file /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/33/temp_shuffle_7e094f01-8975-4983-a60c-2c6b13fa28dc
17/09/19 15:24:30 ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/11/temp_shuffle_99570036-ad3f-40c1-8971-f8aa09263446
java.io.FileNotFoundException: /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/11/temp_shuffle_99570036-ad3f-40c1-8971-f8aa09263446 (No such file or directory)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(FileOutputStream.java:270)
	at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
	at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$2.apply$mcV$sp(DiskBlockObjectWriter.scala:215)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1346)
	at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(DiskBlockObjectWriter.scala:212)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.stop(BypassMergeSortShuffleWriter.java:237)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
17/09/19 15:24:30 ERROR BypassMergeSortShuffleWriter: Error while deleting file /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/11/temp_shuffle_99570036-ad3f-40c1-8971-f8aa09263446
17/09/19 15:24:30 ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/24/temp_shuffle_28e739f9-a3d0-462d-b71f-4196351aefef
java.io.FileNotFoundException: /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/24/temp_shuffle_28e739f9-a3d0-462d-b71f-4196351aefef (No such file or directory)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(FileOutputStream.java:270)
	at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
	at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$2.apply$mcV$sp(DiskBlockObjectWriter.scala:215)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1346)
	at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(DiskBlockObjectWriter.scala:212)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.stop(BypassMergeSortShuffleWriter.java:237)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
17/09/19 15:24:30 ERROR BypassMergeSortShuffleWriter: Error while deleting file /tmp/blockmgr-86a0b5f1-6b06-4e0f-8cb3-a7994b0c6f2e/24/temp_shuffle_28e739f9-a3d0-462d-b71f-4196351aefef

Add integration tests

Check common use cases queries on integration tests to avoid duplicate errors over the time.

Make release process more automatic

Right now, to release a new version we need:

  • Change the project version on build.sbt
  • Push changes to master
  • Create tag pointing to that commit
  • Push to master moving the version to the next -SNAPSHOT one

Use some plugin to automate this process.

Spark: investigate a suspicious warning

On running over ./examples/siva-files

make clean build
SPARK_HOME="" ./spark/bin/spark-shell --jars ./target/scala-2.11/engine-uber.jar
val engine = Engine(spark, "./examples/siva-files")

engine.getRepositories.getHEAD.getFiles.classifyLanguages.where('lang === "Python").extractUASTs.queryUAST("//*[@roleIdentifier]", "uast", "result").extractTokens("result", "tokens").select('path, 'lang, 'uast, 'tokens).show

results in suspicious warning

17/10/18 18:25:01 WARN Executor: Managed memory leak detected; size = 4456448 bytes, TID = 1034

[API] Add convenience wrappers for Python

Right now, there is a number of steps user need to take to make spark API work in PySpark

  • add jar
  • configure DataSource
  • select/join (see example notebook)

We want to have a convenience API in PySpark that does it for us, that using py4j delegates to appropriate methods in Scala.

So this issue is twofold:

  • make sure that Scala API matches the proposed one
  • make PySpark wrappers for it

[DS] Service to provide repositories by folder string

Per JVM should be only one repository instance. To do this we will need a singleton that provides a repository giving the path as a key. It should:

  • Copy all the files from HDFS to the local fs once.
  • Be able to check if it is a bare repository or not and create correctly the repository instance.
  • Generate repository index checking the config file urls.
  • A close method that removes all local temporal files and free the repository instance.

Improvement proposal

Hi,

I think we can simplify default usage of Engine a bit - let's make possible to use pure python instead of pyspark.
There are at least several ways how it can be done:

  1. Some function that will find pyspark : https://stackoverflow.com/questions/23256536/importing-pyspark-in-python-shell
  2. or add pyspark to PYTHONPATH during installation: export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
    so it will be possible to import pyspark and so on.

I think the ideal situation for Data Scientist:

from sourced.engine import Engine
engine = Engine(siva_folder, **parameters_for_SparkSession)

and pyspark session will be initialized in background.

What do you think?
@bzz , @ajnavarro, @erizocosmico, @mcarmonaa

[API] Query UAST using xpath

Depends on #7 and the status of making libuast available in scala client, but we want to have an API to make xpath queries to the UASTs.

Overall API proposal

Create an overall API proposal. Initially this is expected to:

  • Load repositories from rooted repository storage (borges-style)
  • Load repositories from URL list
  • Extract files
  • Detect language
  • Parse files into UAST with Babelfish

[DOC] Installations instructions

Document an easy method of installation:

  • not a container release (demo convenience container would be nice to have though)
  • packaging in 3 lines (download Spark, fetch deps, run spark-shell)

Docs should consist of:

  • very brief installation\usage in README
  • linking to markdown-like gitBook that @erizocosmico knows best about
  • make sure public API has ScalaDoc

[API] Prepare "repository index" and incorporate it in API

Right now, in order to get URLs or the repositories, or languages we need to traverse all .siva files and extract original repo urls from config.

This issue is about speeding it up by pre-computing a dataset of original repo url, init_ hash, languages

It has 2 parts:

  • prepare a dataset, using Spark API
  • incorporate it into the Spark API so it is used for filtering, instead of traversing .siva files

[DS] Array[Filter] to list of properties to filter

Having a list of org.apache.spark.sql.sources.Filter we should transform all of them to two lists: include and exclude params.

Per example if we want to get repository_id filters we should be able to do something like:

val filters = processFilters(filters: Array[Filter]): Map[String, (include:Seq[T], exclude:Seq[T])]
val repoFilters = filters("repository_id").getOrElse((Seq(),Seq()))

We only should take into account this filters:

  • And
  • Or
  • [Not]Equals
  • In

We should ignore by now:

  • StringStartsWith
  • StringEndsWith
  • StringContains

[DS] Generate repositories partitions from HDFS blocks

Repositories will be in a specific folder. Example:

repositories/
├── repo1/
│   └── ...
├── repo2/
│   └── ...
└── repo3/
    └── ...

We should get all the files from a repository, get the blocks information, and aggregate repositories by datanodes with more block of each repository.
With this information we need to create a new class called RepositoryPartition that extends the trait org.apache.spark.Partition, that will include a list of repository folders.

This partitions will be sent to each relation to create RDD partitions correctly, depending of the locality.

[DS] Mock Datasource

We need a first Datasource approach to check the viability of the API.
This Datasource should return

  • Repositories source relation
  • References source relation
  • Commits source relation
  • Files source relation

Prepare a Docker image \w Jupyter and Spark API

This would allow:

  • something, that everyone can run, to get the latest state of Spark API
  • consists of a docker image \w hostPath mounted siva files
  • has 1 notebook \w example of PySpark using Spark API

Push down JOIN conditions if possible to the GitDatasource

Actually, if we have a query like this one:

api.getRepositories().where("id = 'github.com/foo/bar'").getHEAD.getFiles.extractUASTs().select("name", "path", "uast")

The optimized plan will be (simplified):

Project
+-Join[commit_hash = hash]
   :   +- GitRelation[FILES]
   +- Join[repository_id = id]
       :   +- Filter[name="refs/heads/HEAD",repository_id="github.com/foo/bar"]
       :       +- GitRelation[REFERENCES]
       +- Filter[id="github.com/foo/bar"]
           +- GitRelation[REPOSITORIES]

As we can see, no filter is pushed down to the files relation, having to push up all the files over all the revisions over all the repositories in this case.
We should implement a rule to be able to transform the optimized plan from above to:

Project
+- Filter[commit_hash = hash, repository_id = id, repository_id = "github.com/foo/bar"]
    +-GitRelation[FILES]

Then, the non resolved conditions (repository_id = id in this case) should be handled by the iterator.

[DS] hadoop configuration on RDD

Hadoop org.apache.hadoop.conf.Configuration file is not serializable. To be able to send to all the RDD partitions this info, we need to broadcast it. Spark uses org.apache.spark.util.SerializableConfiguration but is package private. It can be used on our code creating a wrapper to make it public:

package org.apache.spark

import org.apache.hadoop.conf.Configuration
import org.apache.spark.util.SerializableConfiguration

class SCWrapper(conf: Configuration) extends SerializableConfiguration(conf: Configuration) {}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.