Coder Social home page Coder Social logo

cc-index-table's Introduction

Common Crawl Support Library

Overview

This library provides support code for the consumption of the Common Crawl Corpus RAW crawl data (ARC Files) stored on S3. More information about how to access the corpus can be found at https://commoncrawl.atlassian.net/wiki/display/CRWL/About+the+Data+Set .

You can take two primary routes to consuming the ARC File content:

(1) You can run a Hadoop cluster on EC2 or use EMR to run a Hadoop job. In this case, you can use the ARCFileInputFormat to drive data to your mappers/reducers. There are two versions of the InputFormat: One written to conform to the deprecated mapred package, located at org.commoncrawl.hadoop.io.mapred and one written for the mapreduce package, correspondingly located at org.commoncrawl.hadoop.io.mapreduce.

(2) You can decode data directly by feeding an InputStream to the ARCFileReader class located in the org.commoncrawl.util.shared package.

Both routes (InputFormat or ARCFileReader direct route) produce a tuple consisting of a UTF-8 encoded URL (Text), and the raw content (BytesWritable), including HTTP headers, that were downloaded by the crawler. The HTTP headers are UTF-8 encoded, and the headers and content are delimited by a consecutive set of CRLF tokens. The content itself, when it is of a text mime type, is encoded using the source text encoding.

Build Notes:

  1. You need to define JAVA_HOME, and make sure you have Ant & Maven installed.
  2. Set hadoop.path (in build.properties) to point to your Hadoop distribution.

Sample Usage:

Once the commoncrawl.jar has been built, you can validate that the ARCFileReader works for you by executing the sample command line from root for the commoncrawl source directory:

./bin/launcher.sh org.commoncrawl.util.shared.ARCFileReader --awsAccessKey <ACCESS KEY> --awsSecret <SECRET> --file s3n://aws-publicdatasets/common-crawl/parse-output/segment/1341690164240/1341819847375_4319.arc.gz

cc-index-table's People

Contributors

dependabot[bot] avatar sebastian-nagel avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

cc-index-table's Issues

WARC files unreadabe

ArchiveLoadFailed: Unknown archive format, first line: ['\\\x03$YgfÍZèÞç\x9cïËè\x1f1&Y\x995\x97÷\x9dç}\x9eç\x0bÝRÝ"Ñ\x0bz\x01\x1f¿\x15\x10\xa0Gñ\x0ey_£³!w©³6\x1aï\'\x81\x8dFÿ-\x84\x05
I cannot read the file and I get an error like this.

Downloading the relevant jar file?

Hi,

I have successfully downloaded a csv via Amazon Athena and would like to perform bulk download of the listed WARC files. After cloning the Github and setting my $SPARK_HOME to my download of pyspark in my virtual environment, I have run the code:

> $SPARK_HOME/bin/spark-submit --class org.commoncrawl.spark.examples.CCIndexWarcExport $APPJAR \ --csv xyx ...

This returned the error: Failed to find Spark jars directory (xyz). Do you have any suggestions on how I can resolve this issue?

Thank you

spark-sumbit stopped suddenly

The following command is used for downloading files from common crawls, but after a few hours the process stopped and I have received the following error. It may depend on the configuration and parameters of spark-submit, could you please assist me?

script:

spark-submit --driver-memory 24g --class org.commoncrawl.spark.examples.CCIndexWarcExport $APPJAR --csv ../csvs/CC-MAIN-2018-51   --numRecordsPerWarcFile 10000  --warcPrefix persian-CC  s3://commoncrawl/cc-index/table/cc-main/warc/ ../data/CC-MAIN-2018-51/ > ../log.out 2>&1 &

logs:

22/05/07 11:00:33 INFO CCIndexExport: Fetching WARC record crawl-data/CC-MAIN-2018-51/segments/1544376827137.61/warc/CC-MAIN-20181215222234-20181216004234-00575.warc.gz 13132840 8337 for http://amvaj-e-bartar.ir/news/detail/val/6724/%D8%B9%D9%85%D9%84%DA%A9%D8%B1%D8%AF%20%DB%B6.%DB%B1%20%D9%85%DB%8C%D9%84%DB%8C%D8%A7%D8%B1%D8%AF%20%D8%AF%D9%84%D8%A7%D8%B1%DB%8C%20%D8%B5%D9%86%D8%B9%D8%AA%20%D8%A2%D8%A8%20%D9%88%20%D8%A8%D8%B1%D9%82%20%D8%AF%D8%B1%20%D8%AE%D8%A7%D8%B1%D8%AC%20%D8%A7%D8%B2%20%DA%A9%D8%B4%D9%88%D8%B1.html
22/05/07 11:00:33 INFO CCIndexExport: Fetching WARC record crawl-data/CC-MAIN-2018-51/segments/1544376826842.56/warc/CC-MAIN-20181215083318-20181215105318-00200.warc.gz 411716885 7390 for http://www.jenabmusic.com/tag/%D8%A2%D9%87%D9%86%DA%AF-%D9%85%D8%AD%D9%85%D8%AF-%D9%86%D8%AC%D9%81%DB%8C
22/05/07 11:00:33 INFO SparkContext: Invoking stop() from shutdown hook
22/05/07 11:00:33 INFO SparkUI: Stopped Spark web UI at http://server.domain.com:4040
22/05/07 11:00:33 INFO DAGScheduler: ResultStage 7 (runJob at SparkHadoopWriter.scala:83) failed in 15440.843 s due to Stage cancelled because SparkContext was shut down
22/05/07 11:00:33 INFO DAGScheduler: Job 5 failed: runJob at SparkHadoopWriter.scala:83, took 15440.906693 s
22/05/07 11:00:33 ERROR SparkHadoopWriter: Aborting job job_202205070643121922168778011744804_0031.
org.apache.spark.SparkException: Job 5 cancelled because SparkContext was shut down
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1(DAGScheduler.scala:1166)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1$adapted(DAGScheduler.scala:1164)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:1164)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2666)
	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2566)
	at org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2086)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1442)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:2086)
	at org.apache.spark.SparkContext.$anonfun$new$38(SparkContext.scala:667)
	at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2019)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2267)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:83)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsNewAPIHadoopDataset$1(PairRDDFunctions.scala:1077)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1075)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsNewAPIHadoopFile$2(PairRDDFunctions.scala:994)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:985)
	at org.apache.spark.api.java.JavaPairRDD.saveAsNewAPIHadoopFile(JavaPairRDD.scala:825)
	at org.commoncrawl.spark.examples.CCIndexWarcExport.run(CCIndexWarcExport.java:199)
	at org.commoncrawl.spark.examples.CCIndexExport.run(CCIndexExport.java:195)
	at org.commoncrawl.spark.examples.CCIndexWarcExport.main(CCIndexWarcExport.java:208)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Exception in thread "main" org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:106)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsNewAPIHadoopDataset$1(PairRDDFunctions.scala:1077)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1075)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsNewAPIHadoopFile$2(PairRDDFunctions.scala:994)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:985)
	at org.apache.spark.api.java.JavaPairRDD.saveAsNewAPIHadoopFile(JavaPairRDD.scala:825)
	at org.commoncrawl.spark.examples.CCIndexWarcExport.run(CCIndexWarcExport.java:199)
	at org.commoncrawl.spark.examples.CCIndexExport.run(CCIndexExport.java:195)
	at org.commoncrawl.spark.examples.CCIndexWarcExport.main(CCIndexWarcExport.java:208)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.spark.SparkException: Job 5 cancelled because SparkContext was shut down
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1(DAGScheduler.scala:1166)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1$adapted(DAGScheduler.scala:1164)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:1164)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2666)
	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2566)
	at org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2086)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1442)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:2086)
	at org.apache.spark.SparkContext.$anonfun$new$38(SparkContext.scala:667)
	at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2019)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2267)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:83)
	... 28 more
22/05/07 11:00:33 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
22/05/07 11:00:33 INFO CCIndexExport: Fetching WARC record crawl-data/CC-MAIN-2018-51/segments/1544376829140.81/warc/CC-MAIN-20181218102019-20181218124019-00288.warc.gz 466742218 12898 for http://www.tarabarnews.com/view/87815/%D9%86%D9%85%D8%A7%DB%8C%D9%86%D8%AF%DA%AF%D8%A7%D9%86-%D9%85%D8%AC%D9%84%D8%B3-%D9%81%D8%B1%D8%AF%DB%8C-%D9%82%D9%88%DB%8C%E2%80%8C%D8%AA%D8%B1-%D8%A7%D8%B2-%D8%A2%D8%AE%D9%88%D9%86%D8%AF%DB%8C-%D8%A8%D8%B1%D8%A7%DB%8C-%D9%88%D8%B2%D8%A7%D8%B1%D8%AA-%D8%B1%D8%A7%D9%87-%D9%88-%D8%B4%D9%87%D8%B1%D8%B3%D8%A7%D8%B2%DB%8C-%D8%B3%D8%B1%D8%A7%D8%BA-%D9%86%D8%AF%D8%A7%D8%B1%D9%86%D8%AF-%D8%AA%D8%A7%DA%A9%DB%8C%D8%AF-%D8%A8%D8%B1-%D8%AD%D9%85%D8%A7%DB%8C%D8%AA-%D9%82%D8%A7%D8%B7%D8%B9-%D8%A7%D8%B2-%D8%A2%D8%AE%D9%88%D9%86%D8%AF%DB%8C-%D8%A8%D8%B1%D8%A7%DB%8C-%D8%A7%D9%86%D8%AC%D8%A7%D9%85-%D8%A7%D9%87%D8%AF%D8%A7%D9%81-%D9%88-%D8%A8%D8%B1%D9%86%D8%A7%D9%85%D9%87%E2%80%8C%D9%87%D8%A7
22/05/07 11:00:33 INFO CCIndexExport: Fetching WARC record crawl-data/CC-MAIN-2018-51/segments/1544376828697.80/warc/CC-MAIN-20181217161704-20181217183704-00220.warc.gz 416114387 15552 for http://www.licamall.com/tags/%EF%BF%BD%EF%BF%BD%EF%BF%BD%EF%BF%BD%EF%BF%BD%EF%BF%BD%EF%BF%BD%EF%BF%BD+%EF%BF%BD%EF%BF%BD%EF%BF%BD%EF%BF%BD%EF%BF%BD%EF%BF%BD%EF%BF%BD%EF%BF%BD%EF%BF%BD%EF%BF%BD+%EF%BF%BD%EF%BF%BD%EF%BF%BD%EF%BF%BD%EF%BF%BD%EF%BF%BD%EF%BF%BD%EF%BF%BD%EF%BF%BD%EF%BF%BD+%EF%BF%BD%EF%BF%BD%EF%BF%BD%EF%BF%BD+%EF%BF%BD%EF%BF%BD%EF%BF%BD%EF%BF%BD+GTF3027GBX
22/05/07 11:00:33 INFO CCIndexExport: Fetching WARC record crawl-data/CC-MAIN-2018-51/segments/1544376825363.58/warc/CC-MAIN-20181214044833-20181214070333-00379.warc.gz 801554484 29772 for https://www.aparat.com/v/jUGBL/%D9%84%DB%8C%D8%A7%D9%85_%D9%84%D8%A7%DB%8C%D9%81_3_-_%D8%A2%D9%85%D9%88%D8%B2%D8%B4_%D9%85%D8%AF%D9%84_%D9%87%D8%A7%DB%8C_%D8%A8%D8%B3%D8%AA%D9%86_%D8%B4%D8%A7%D9%84
22/05/07 11:00:33 INFO CCIndexExport: Fetching WARC record crawl-data/CC-MAIN-2018-51/segments/1544376823657.20/warc/CC-MAIN-20181211151237-20181211172737-00471.warc.gz 626970615 5027 for https://golbarhar.persianblog.ir/7qZpoZMnXjFll98ejwaM-%D8%A8%D9%87-%D8%AE%D8%A7%DA%A9-%D8%B3%D9%BE%D8%B1%D8%AF
22/05/07 11:00:33 INFO CCIndexExport: Fetching WARC record crawl-data/CC-MAIN-2018-51/segments/1544376823872.13/warc/CC-MAIN-20181212112626-20181212134126-00254.warc.gz 630430449 3559 for https://hairextensions.persianblog.ir/tag/%D8%A8%D9%87%D8%AA%D8%B1%DB%8C%D9%86_%D9%85%D8%B1%DA%A9%D8%B2_%D8%A2%D9%85%D9%88%D8%B2%D8%B4_%DA%A9%D8%A7%D8%B4%D8%AA_%D9%86%D8%A7%D8%AE%D9%86
22/05/07 11:00:33 INFO CCIndexExport: Fetching WARC record crawl-data/CC-MAIN-2018-51/segments/1544376823565.27/warc/CC-MAIN-20181211040413-20181211061913-00319.warc.gz 904908127 16662 for https://www.hikvision-cctv.com/%D8%AF%D8%B1%D8%A8%D8%A7%D8%B1%D9%87-%D9%BE%D8%B1%D8%B4%DB%8C%D8%A7%D8%B3%DB%8C%D8%B3%D8%AA%D9%85/
22/05/07 11:00:33 INFO MemoryStore: MemoryStore cleared
22/05/07 11:00:33 INFO BlockManager: BlockManager stopped
22/05/07 11:00:33 INFO BlockManagerMaster: BlockManagerMaster stopped
22/05/07 11:00:33 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
22/05/07 11:00:33 INFO SparkContext: Successfully stopped SparkContext
22/05/07 11:00:33 INFO ShutdownHookManager: Shutdown hook called
22/05/07 11:00:33 INFO CCIndexExport: Fetching WARC record crawl-data/CC-MAIN-2018-51/segments/1544376823738.9/warc/CC-MAIN-20181212044022-20181212065522-00145.warc.gz 296813532 7780 for http://wikipg.com/tag/%D8%A2%D8%A8-%DA%AF%D8%B1%D9%85-%D8%AE%D8%A7%D9%86%DA%AF%DB%8C/
22/05/07 11:00:33 INFO ShutdownHookManager: Deleting directory /tmp/spark-29d09e64-59a8-4ec9-a825-a1b460ad8a32
22/05/07 11:00:33 INFO ShutdownHookManager: Deleting directory /tmp/spark-b98232d1-9c18-4986-be3e-eebd4284a405

CCIndexWarcExport - Equivalent in Pyspark

I'm currently trying to build a pyspark-based version of something like the CCIndexWarcExport utility and struggle to get this to work properly.
My main problem is to properly read/process the byte output that I get from S3 when I try to access parts of a particular WARC file. As an example:

import boto3

client = boto3.client('s3',
    aws_access_key_id="key",
    aws_secret_access_key="secret")

offset = 928
length = 650
segment_range = "%s-%s" % (offset, offset + length -1 )
response = client.get_object(Bucket = 'commoncrawl', 
    Key = 'crawl-data/CC-MAIN-2018-47/segments/1542039744381.73/crawldiagnostics/CC-MAIN-20181118135147-20181118160608-00061.warc.gz', 
    Range = segment_range)

This request to S3 works, I get a proper botocore.response.StreamingBody object back in the Body of the response. But where do I go from here to read the contents of this response? If I understand the code in CCIndexWarcExport correctly (and I probably don't because I don't know Java at all) what's happening is that the byte response is merged to the existing dataframe as is, correct?
This dataframe is then saved as new WARC files to be read again (?).
However, if I do not want to save the data as new WARC files but continue processing it and have it available, how do I handle the bytes that I extracted?
Is there documentation anywhere that I should look at? I'm a bit stumped for how to proceed and would appreciate any help!

Improve extraction of host names and registered domains

  • no host name is extracted in the following situations
    • URL contains 4 slashes after the protocol: https:////example.org/ - while java.net.URL extracts an empty hostname, the Nutch's OkHTTP-based protocol seems to fetch the resource as if there are only two slashes.
    • similarly java.net.URL and OkHttp show a different behavior if there is an overlong (or even invalid?) userinfo before the hostname (scheme://userinfo@hostname/)
  • IP addresses are not recognized as such if ending in a dot: https://123.123.123.123./robots.txt
  • the extraction of registered domains (done by crawler-commons' EffectiveTldFinder does not extract anything if the hostname is equal to a public suffix (gov.uk, kharkov.ua for example)

Parsing host names fails on trailing dot

Occasionally, the host name part of a URL may include a trailing dot. This is allowed by RFC 3986, section 3.2.2.:

The rightmost domain label of a fully qualified domain name in DNS may be followed by a
single "." and should be if it is necessary to distinguish between the complete domain name
and some local domain.

While most of the URLs in Common Crawl data do not include a trailing dot (it is removed during URL normalization), the target URLs of HTTP redirects may include one. Parsing the host name to extract the top-level domain, registered domain, etc. should still work for such URL (eg. http://www.example.com./index.html).

Add IP column to Athena table for reverse IP search with `WARC-IP-Address` data

Historical hostname -> IP and IP -> hostname (reverse IP) datasets are currently quite hard to come by: https://opendata.stackexchange.com/questions/1951/dataset-of-domain-names the only super convenient methods being websites such as https://viewdns.info/reverseip/ which are expensive and have undocumented methodology.

Would it be possible to add an IP column to Athena that tracks WARC-IP-Address? If we had that, it would be trivial for someone to export that data at relatively low cost from Common Crawl and make it available for all to use on a CSV file hosted on GItHub for example.

Such data can be of great value for OSINT purposes, e.g. I needed it in this project: https://************.com/cia-2010-covert-communication-websites

There is a tool made for this apparently: https://github.com/CAIDA/commoncrawl-host-ip-mapper but I don't think it can run quickly/cheaply, the tabular approach would really be ideal here.

Add AWS authentication for downloading data

Is it possible to add Amazon Web Services authentication to the download of data?
You described a solution based on Spark that I used. It worked fine, but now it needs authentication.
What are the steps for adding authentication?

Store column "fetch_time" as int64

The column "fetch_time" uses the Parquet int96 data type to store the capture time as Spark/Presto/etc. type "timestamp". Storing the timestamps as int64 would

  • make Parquet min/max stats available (thanks to @wumpus, @cldellow for figuring this out)
  • save space

In addition,

  • int96 timestamps have been deprecated (PARQUET-323)
  • int64 timestamps will become the default in Spark 3.0

Setting the timestamp type is possible via spark.sql.parquet.outputTimestampType since Spark 2.3.0 (SPARK-10365).

Milliseconds precision should be enough. Although, WARC/1.1 allows WARC-Dates with nanoseconds precision, Common Crawl still follows the WARC/1.0 standard, also because many WARC parsers fail on dates with higher precision.

Needs testing whether Athena/Presto, Spark and Hive can process the int64 timestamps and allow columns with mixed data types together with schema merging.

Consider normalizing host, domain names and TLDs

While the bulk of URLs in the crawls is normalized, this is not true for URLs stemming from redirects during fetching. As a result host names of URLs not normalized may include:

  • Unicode IDNs (not normalized to their ASCII representation)
  • IP addresses in other than dot-numeric representation
  • host names in percent-encoding

How to use AWS Athena to query CC-NEWS data ?

Overview:

I want to query something in the CC-NEWS, but in this paper: https://commoncrawl.org/2018/03/index-to-warc-files-and-urls-in-columnar-format/, all data in //s3:commoncrawl/cc-index/table/cc-main/warc/.

My Question:

How to use AWS Athena to query CC-NEWS data ?

Or differentiate news from //s3:commoncrawl/cc-index/table/cc-main/warc/?

Handle dns: lines in the CDXJ files.

I have been working to create the Common Crawl Index Table from a set of WARC and ARC files crawled back in 2008 and 2012 by the End of Term project.

When I get to the step to create the table I am running into the following exception.

21/12/01 08:14:13 ERROR Executor: Exception in task 5.0 in stage 1.0 (TID 6)
java.lang.StringIndexOutOfBoundsException: begin 165, end -1, length 200
        at java.base/java.lang.String.checkBoundsBeginEnd(String.java:3319)
        at java.base/java.lang.String.substring(String.java:1874)
        at org.commoncrawl.spark.CCIndex2Table.convertCdxLine(CCIndex2Table.java:85)
        at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
        at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
        at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

In trying to sort things out I believe the problem might be related to the dns: entries that are being added to the cdxj files I am creating.

For reference I am using the cdx-indexer from pywb to create the cdx files and then the webarchive-indexing/build_local_zipnum.py to build the zipnum index.

When I remove the cdx lines that begin with dns: from the original cdxj files, create a new zipnum index, the cc-index-table builds without issue. This is why I am thinking it is related to dns urls. An initial thought is that this might be happening when parsing the URL in the CommonCrawlURL.java file but I am getting a bit out of my depth with that.

I have included the dns: lines from the cdxj files for reference.

dns:carney.house.gov 20090120110809 {"url": "dns:carney.house.gov", "mime": "text/dns", "status": "200", "digest": "IBWQS6J44DZG2DZGC6CAIXYQRYXTELS6", "length": "258", "offset": "57167110", "filename": "crawl-data/EOT-2008/segments/CDL/warc/CDL-20090120105548-00558-dp01.warc.gz"}
dns:fortenberry.houseenews.net 20090120110350 {"url": "dns:fortenberry.houseenews.net", "mime": "text/dns", "status": "200", "digest": "LJ4VR67S75IT22EYXZJA7J767X3HQQLF", "length": "248", "offset": "38197994", "filename": "crawl-data/EOT-2008/segments/CDL/warc/CDL-20090120105548-00558-dp01.warc.gz"}
dns:globalwarming.house.gov 20090120110057 {"url": "dns:globalwarming.house.gov", "mime": "text/dns", "status": "200", "digest": "GYANLSGMLY2ITPOUG26HU5TVBB642Z4Y", "length": "250", "offset": "22612729", "filename": "crawl-data/EOT-2008/segments/CDL/warc/CDL-20090120105548-00558-dp01.warc.gz"}
dns:kentucky.gov 20090120110351 {"url": "dns:kentucky.gov", "mime": "text/dns", "status": "200", "digest": "F2EGUUGHRZWVAFSAQDZLYYAY2PF3ESB7", "length": "241", "offset": "38202422", "filename": "crawl-data/EOT-2008/segments/CDL/warc/CDL-20090120105548-00558-dp01.warc.gz"}
dns:www.campbellcountyky.org 20090120110055 {"url": "dns:www.campbellcountyky.org", "mime": "text/dns", "status": "200", "digest": "G5BQUQGBQYFIB53TELWHXA2HCGMXL7PR", "length": "247", "offset": "22606867", "filename": "crawl-data/EOT-2008/segments/CDL/warc/CDL-20090120105548-00558-dp01.warc.gz"}
dns:www.gold.ky.gov 20090120110351 {"url": "dns:www.gold.ky.gov", "mime": "text/dns", "status": "200", "digest": "EF6MAYAQ4DOV4U2UWBLS3AQ3YNNJSBN4", "length": "247", "offset": "38202175", "filename": "crawl-data/EOT-2008/segments/CDL/warc/CDL-20090120105548-00558-dp01.warc.gz"}
dns:www.governor.ky.gov 20090120110848 {"url": "dns:www.governor.ky.gov", "mime": "text/dns", "status": "200", "digest": "PHWKYRZGU6N3PAZFV3CLY2JFPXK3I22J", "length": "246", "offset": "58189184", "filename": "crawl-data/EOT-2008/segments/CDL/warc/CDL-20090120105548-00558-dp01.warc.gz"}
dns:www.henrycountyky.com 20090120105705 {"url": "dns:www.henrycountyky.com", "mime": "text/dns", "status": "200", "digest": "I5AVBPKCIEU6OR3REJC74LP5PZO7ZGVR", "length": "250", "offset": "11301948", "filename": "crawl-data/EOT-2008/segments/CDL/warc/CDL-20090120105548-00558-dp01.warc.gz"}
dns:www.henrywaxman.house.gov 20090120111339 {"url": "dns:www.henrywaxman.house.gov", "mime": "text/dns", "status": "200", "digest": "77XOVXU2AAQ2KMHWBZLPSHT63JSYLDVC", "length": "265", "offset": "92635984", "filename": "crawl-data/EOT-2008/segments/CDL/warc/CDL-20090120105548-00558-dp01.warc.gz"}
dns:www.kctcs.edu 20090120110848 {"url": "dns:www.kctcs.edu", "mime": "text/dns", "status": "200", "digest": "AXNYBF5LIS7OQIC2PYHAKLUX7FMGI7Z7", "length": "241", "offset": "58189430", "filename": "crawl-data/EOT-2008/segments/CDL/warc/CDL-20090120105548-00558-dp01.warc.gz"}
dns:www.mahoney.house.gov 20090120105608 {"url": "dns:www.mahoney.house.gov", "mime": "text/dns", "status": "200", "digest": "GA5AMHEAJNXYL3YFDT4NL5CJ4ZZTCOE7", "length": "255", "offset": "219509", "filename": "crawl-data/EOT-2008/segments/CDL/warc/CDL-20090120105548-00558-dp01.warc.gz"}
dns:www.aviationsystemsdivision.arc.nasa.gov 20090120111433 {"url": "dns:www.aviationsystemsdivision.arc.nasa.gov", "mime": "text/dns", "status": "200", "digest": "IKBNHMXKBP6YDDHJXE4KZFBZTCIEKXBF", "length": "264", "offset": "69679824", "filename": "crawl-data/EOT-2008/segments/CDL/warc/CDL-20090120111003-01406-dp01.warc.gz"}
dns:www.spaceflight.nasa.gov 20090120111257 {"url": "dns:www.spaceflight.nasa.gov", "mime": "text/dns", "status": "200", "digest": "M6ZGWE4AF6W7WGURTYYKGI6PPDW66VGK", "length": "263", "offset": "51358480", "filename": "crawl-data/EOT-2008/segments/CDL/warc/CDL-20090120111003-01406-dp01.warc.gz"}
dns:www.index.va.gov 20090120114454 {"url": "dns:www.index.va.gov", "mime": "text/dns", "status": "200", "digest": "ZIKPGE2AAW4XHM4UE7ZC3NU3SX3XRLS7", "length": "240", "offset": "57821587", "filename": "crawl-data/EOT-2008/segments/CDL/warc/CDL-20090120111539-00371-vat01.cdlib.org.warc.gz"}
dns:www1.va.gov 20090120113427 {"url": "dns:www1.va.gov", "mime": "text/dns", "status": "200", "digest": "YILNQUZ34QJPCBCUJQXYY6YZ2VRIGNG2", "length": "240", "offset": "31831131", "filename": "crawl-data/EOT-2008/segments/CDL/warc/CDL-20090120111539-00371-vat01.cdlib.org.warc.gz"}
dns:t2.lanl.gov 20090120115128 {"url": "dns:t2.lanl.gov", "mime": "text/dns", "status": "200", "digest": "E7FB5HHFKKODMRLTW65T3U77OAKEZAEB", "length": "237", "offset": "40048624", "filename": "crawl-data/EOT-2008/segments/CDL/warc/CDL-20090120112235-00558-vat01.cdlib.org.warc.gz"}
dns:www-xdiv.lanl.gov 20090120114647 {"url": "dns:www-xdiv.lanl.gov", "mime": "text/dns", "status": "200", "digest": "GT7B5ZW262XVAG5WRCQ57H2YDI5UAKQN", "length": "245", "offset": "35110129", "filename": "crawl-data/EOT-2008/segments/CDL/warc/CDL-20090120112235-00558-vat01.cdlib.org.warc.gz"}

Replace int96 timestamps in index partitions before CC-MAIN-2020

See #7 and announcement of January 2020 crawl.

Recent Parquet library versions (1.12.2) start to complain about the int96 timestamps:

$> parquet-cli cat -c fetch_time -n 5 s3a://commoncrawl/cc-index/table/cc-main/warc/crawl=CC-MAIN-2018-43/subset=warc/part-00247-f47c372a-e3d4-4f2b-b7a0-a939c04fd01e.c000.gz.parquet
Argument error: INT96 is deprecated. As interim enable READ_INT96_AS_FIXED  flag to read as byte array.

No complains for data from 2020 and newer:

$> parquet-cli cat -c fetch_time -n 5 s3a://commoncrawl/cc-index/table/cc-main/warc/crawl=CC-MAIN-2020-05/subset=warc/part-00243-2224c996-15d6-400a-8ae4-2d0740e74c18.c000.gz.parquet
1579483394000
1580078106000
1580035997000
1579264777000
1579422799000

Tasks:

  • pin the usage of int64 timestamps (shouldn't be implemented by passing a configuration parameter as done in 500d454)
  • rewrite pre-2020 index partitions

Problem running the example for getting data for a language in Spark

I'm attempting to run the example for retrieving all Icelandic data using EMR on AWS. Here's the way that I'm adding the job:
aws emr add-steps --cluster-id j-1PWKHFC4A1VG4 --steps Type=CUSTOM_JAR,Name="CommonCrawlJob",Jar="command-runner.jar",ActionOnFailure=CONTINUE,Args=["/usr/lib/spark/bin/job.sh"]

And this is the job.sh file:
#!/bin/bash /usr/lib/spark/bin/spark-submit --class org.commoncrawl.spark.examples.CCIndexWarcExport /tmp/cc-spark-0.2-SNAPSHOT-jar-with-dependencies.jar --query "SELECT url, warc_filename, warc_record_offset, warc_record_length WHERE crawl = 'CC-MAIN-2018-51' AND subset = 'warc' AND content_languages = 'isl'" --numOutputPartitions 12 --numRecordsPerWarcFile 20000 --warcPrefix ICELANDIC-CC-2018-51 s3://commoncrawl/cc-index/table/cc-main/warc/ s3://output_bucket/test

The error that I'm getting seems to indicate a problem with reading the index data:
19/01/08 17:08:37 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf. 19/01/08 17:08:37 INFO CCIndexExport: Schema of table ccindex: StructType(StructField(url_surtkey,StringType,true), StructField(url,StringType,true), StructField(url_host_name,StringType,true), StructField(url_host_tld,StringType,true), StructField(url_host_2nd_last_part,StringType,true), StructField(url_host_3rd_last_part,StringType,true), StructField(url_host_4th_last_part,StringType,true), StructField(url_host_5th_last_part,StringType,true), StructField(url_host_registry_suffix,StringType,true), StructField(url_host_registered_domain,StringType,true), StructField(url_host_private_suffix,StringType,true), StructField(url_host_private_domain,StringType,true), StructField(url_protocol,StringType,true), StructField(url_port,IntegerType,true), StructField(url_path,StringType,true), StructField(url_query,StringType,true), StructField(fetch_time,TimestampType,true), StructField(fetch_status,ShortType,true), StructField(content_digest,StringType,true), StructField(content_mime_type,StringType,true), StructField(content_mime_detected,StringType,true), StructField(warc_filename,StringType,true), StructField(warc_record_offset,IntegerType,true), StructField(warc_record_length,IntegerType,true), StructField(warc_segment,StringType,true), StructField(crawl,StringType,true), StructField(subset,StringType,true)) Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'crawl' given input columns: []; line 1 pos 72; 'Project ['url, 'warc_filename, 'warc_record_offset, 'warc_record_length] +- 'Filter ((('crawl = CC-MAIN-2018-51) && ('subset = warc)) && ('content_languages = isl)) +- OneRowRelation at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:92) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:89) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:118) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:127) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:89) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:84) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:84) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642) at org.commoncrawl.spark.examples.CCIndexExport.executeQuery(CCIndexExport.java:82) at org.commoncrawl.spark.examples.CCIndexWarcExport.run(CCIndexWarcExport.java:148) at org.commoncrawl.spark.examples.CCIndexExport.run(CCIndexExport.java:192) at org.commoncrawl.spark.examples.CCIndexWarcExport.main(CCIndexWarcExport.java:212) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 19/01/08 17:08:37 INFO SparkContext: Invoking stop() from shutdown hook

Any ideas about what's going wrong? Thanks!

Investigate reasons why table isn't fully sorted by `url_surtkey`

The table is written from the cdx-*.gz files which are sorted by the SURT key. However, looks like that the Parquet files are not fully sorted anymore:

$> parquet-tools cat --json \
         s3a://commoncrawl/cc-index/table/cc-main/warc/crawl=CC-MAIN-2018-22/subset=warc/part-00243-f48ad7b9-218e-42ff-aff7-02e2fdcb7739.c000.gz.parquet \
    | jq -r .url_surtkey
...
org,commoncrawl)/
org,catho85,jubile2017)/evenements/exposition-jean-michel-solves
org,commoncrawl)/2011/12/mapreduce-for-the-masses
org,catho85,jubile2017)/evenements/exposition-la-fleur-de-lage
org,commoncrawl)/2012/03/data-2-0-summit
...

The output looks like the result of zipping 2 sorted streams into the final Parquet file and is observable over various crawls, i.e. it's not bound to a specific Spark / Parquet version.

Ensuring the sort order by SURT key will also ensure that other columns (domain name, partially host name) are properly sorted. Hence, it should improve compression and look-ups in the Parquet files by more precise min/max ranges.

The same issue is described in a post on stackoverflow. Initial experiments prove that without partitioning the data (into crawl and subset) the order is preserved.

If fixed and if better compression or faster look-ups are visible, might be worth to regenerate the index for older crawls.

Corrupted ".warc.gz" files being produced

I've tried using the "--csv" option of CCIndexWarcExport to extract some information based on a CSV file produced from Athena queries of the index. Unfortunately, it seems as though the ".warc.gz" files that are produced may not be readable. I've tried downloading the files from S3 and decompressing them with multiple decompression programs (gunzip and "The Unarchiver") and they both complain that the files cannot be decompressed.

The job that I am running is as follows:

spark-submit --class org.commoncrawl.spark.examples.CCIndexWarcExport /tmp/cc-spark-0.2-SNAPSHOT-jar-with-dependencies.jar --csv s3://athenaforcommoncrawl/small_test_target_records.csv --numOutputPartitions 12 --numRecordsPerWarcFile 20000 --warcPrefix CC-2018-51 s3://commoncrawl/cc-index/table/cc-main/warc/ s3://athenaforcommoncrawl/test_11

The "small_test_target_records.csv" just has a subset of 100 or so of the CSV records produced by Athena. It seems to look normal when viewed in an editor, but I can upload that if desired.

Allow to use a custom table schema

The schema of the table created by CCIndex2Table is fixed to the built-in schema used by/for Common Crawl. In order to support other crawl archives, it would be optimal to keep the table schema configurable:

  1. allow to pass a custom-defined table schema (as JSON file) which defines the output table

  2. (eventually) split the class into a generic one (requiring a custom schema) and a CC-specific one. This would also allow to more easily adapt the parsing of a custom CDX input.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.