Coder Social home page Coder Social logo

huggingface / datatrove Goto Github PK

View Code? Open in Web Editor NEW
1.4K 39.0 88.0 16.55 MB

Freeing data processing from scripting madness by providing a set of platform-agnostic customizable pipeline processing blocks.

License: Apache License 2.0

Python 99.94% Makefile 0.06%

datatrove's Introduction

DataTrove

DataTrove is a library to process, filter and deduplicate text data at a very large scale. It provides a set of prebuilt commonly used processing blocks with a framework to easily add custom functionality.

DataTrove processing pipelines are platform-agnostic, running out of the box locally or on a slurm cluster. Its (relatively) low memory usage and multiple step design makes it ideal for large workloads, such as to process an LLM's training data.

Local, remote and other file systems are supported through fsspec.

Table of contents

Installation

pip install datatrove[FLAVOUR]

Available flavours (combine them with , i.e. [processing,s3]):

  • all installs everything: pip install datatrove[all]
  • io dependencies to read warc/arc/wet files and arrow/parquet formats: pip install datatrove[io]
  • processing dependencies for text extraction, filtering and tokenization: pip install datatrove[processing]
  • s3 s3 support: pip install datatrove[s3]
  • cli for command line tools: pip install datatrove[cli]

Quickstart examples

You can check the following examples:

Pipeline

DataTrove Document

Each pipeline block processes data in the datatrove Document format:

  • text the actual text content for each sample
  • id a unique id (string) for this sample
  • metadata a dictionary where any additional info may be stored

Types of pipeline blocks

Each pipeline block takes a generator of Document as input and returns another generator of Document.

  • readers read data from different formats and yield Document
  • writers save Document to disk/cloud in different formats
  • extractors extract text content from raw formats (such as webpage html)
  • filters filter out (remove) some Documents based on specific rules/criteria
  • stats blocks to collect statistics on the dataset
  • tokens blocks to tokenize data or count tokens
  • dedup blocks for deduplication

Full pipeline

A pipeline is defined as a list of pipeline blocks. As an example, the following pipeline would read data from disk, randomly filter (remove) some documents and write them back to disk:

from datatrove.pipeline.readers import CSVReader
from datatrove.pipeline.filters import SamplerFilter
from datatrove.pipeline.writers import JsonlWriter

pipeline = [
    CSVReader(
        data_folder="/my/input/path"
    ),
    SamplerFilter(rate=0.5),
    JsonlWriter(
        output_folder="/my/output/path"
    )
]

Executors

Pipelines are platform-agnostic, which means that the same pipeline can smoothly run on different execution environments without any changes to its steps. Each environment has its own PipelineExecutor. Some options common to all executors:

  • pipeline a list consisting of the pipeline steps that should be run
  • logging_dir a datafolder where log files, statistics and more should be saved. Do not reuse folders for different pipelines/jobs as this will overwrite your stats, logs and completions.
  • skip_completed (bool, True by default) datatrove keeps track of completed tasks so that when you relaunch a job they can be skipped. Set this to False to disable this behaviour
  • randomize_start_duration (int, 0 by default) the maximum number of seconds to delay the start of each task to prevent all tasks from starting simultaneously and potentially overloading the system.

Call an executor's run method to execute its pipeline.

Tip

Datatrove keeps track of which tasks successfully completed by creating a marker (an empty file) in the ${logging_dir}/completions folder. Once the job finishes, if some of its tasks have failed, you can simply relaunch the exact same executor and datatrove will check and only run the tasks that were not previously completed.

Caution

If you relaunch a pipeline because some tasks failed, do not change the total number of tasks as this will affect the distribution of input files/sharding.

LocalPipelineExecutor

This executor will launch a pipeline on a local machine. Options:

  • tasks total number of tasks to run
  • workers how many tasks to run simultaneously. If -1, no limit. Anything > 1 will use multiprocessing to execute the tasks.
  • start_method method to use to spawn a multiprocessing Pool. Ignored if workers is 1
Example executor
from datatrove.executor import LocalPipelineExecutor
executor = LocalPipelineExecutor(
    pipeline=[
        ...
    ],
    logging_dir="logs/",
    tasks=10,
    workers=5
)
executor.run()
Multi-node parallelism

You can have different nodes/machines process different parts of the total tasks by using the local_tasks and local_rank_offset. For each node/instance/machine, launch with the following options:

  • tasks the total tasks to be executed (across all machines). This value must be the same on each machine or the input file distribution may overlap! Example: 500
  • local_tasks how many tasks of the total will be executed on this particular machine. Note that you can use different values for each machine. Example: 100
  • local_rank_offset the rank of the first task to be executed on this machine. If this is the 3rd machine where you are launching a job, and the 2 previous machines each ran 250 and 150 jobs, this would be 400 for the current machine.

To get final merged stats you will have to invoke the merge_stats script manually on a path containing the stats from all machines.

SlurmPipelineExecutor

This executor will launch a pipeline on a slurm cluster, using slurm job arrays to group and manage tasks. Options:

  • tasks total number of tasks to run. required
  • time slurm time limit string. required
  • partition slurm partition. required
  • workers how many tasks to run simultaneously. If -1, no limit. Slurm will run workers tasks at a time. (default: -1)
  • job_name slurm job name (default: "data_processing)
  • depends another SlurmPipelineExecutor instance, which will be a dependency of this pipeline (current pipeline will only start executing after the depended on pipeline successfully completes)
  • sbatch_args dictionary with any other arguments you would like to pass to sbatch
  • slurm_logs_folder where to save the slurm log files. If using a local path for logging_dir, they will be saved on logging_dir/slurm_logs. If not, they will be saved as a subdir of the current directory.
Other options
  • cpus_per_task how many cpus to give each task (default: 1)
  • qos slurm qos (default: "normal")
  • mem_per_cpu_gb memory per cpu, in GB (default: 2)
  • env_command custom command to activate a python environment, if needed
  • condaenv conda environment to activate
  • venv_path path to a python environment to activate
  • max_array_size the MaxArraySize value in $ scontrol show config. If number of tasks exceeds this number, it will split into multiple array jobs (default: 1001)
  • max_array_launch_parallel if we need multiple jobs due to max_array_size, whether to launch them all in one go (parallel) or sequentially (default: False)
  • stagger_max_array_jobs when max_array_launch_parallel is True, this determines how many seconds to wait between launching each of the parallel jobs (default: 0)
  • run_on_dependency_fail start executing when a job we depend on finishes even if it has failed (default: False)
  • randomize_start randomize the start of each task in a job in a ~3 min window. Useful when heavily hitting an s3 bucket for example. (default: False)
Example executor
from datatrove.executor import SlurmPipelineExecutor
executor1 = SlurmPipelineExecutor(
    pipeline=[
        ...
    ],
    job_name="my_cool_job1",
    logging_dir="logs/job1",
    tasks=500,
    workers=100,  # omit to run all at once
    time="10:00:00",  # 10 hours
    partition="hopper-cpu"
)
executor2 = SlurmPipelineExecutor(
    pipeline=[
        ...
    ],
    job_name="my_cool_job2",
    logging_dir="logs/job2",
    tasks=1,
    time="5:00:00",  # 5 hours
    partition="hopper-cpu",
    depends=executor1  # this pipeline will only be launched after executor1 successfully completes
)
# executor1.run()
executor2.run() # this will actually launch executor1, as it is a dependency, so no need to launch it explicitly

Logging

For a pipeline with logging_dir mylogspath/exp1, the following folder structure would be created:

See folder structure
└── mylogspath/exp1
    │── executor.json ⟵ json dump of the executor options and pipeline steps
    │── launch_script.slurm ⟵ the slurm config created and used to launch this job (if running on slurm)
    │── executor.pik ⟵ the slurm config created and used to launch this job (if running on slurm)
    │── ranks_to_run.json ⟵ list of tasks that are being run
    │── logs/
    │   └──[task_00000.log, task_00001.log, task_00002.log, ...] ⟵ individual logging files for each task
    │── completions/
    │   └──[00004, 00007, 00204, ...] ⟵ empty files marking a task as completed. Using when relaunching/resuming a job (only unfinished tasks will be run)
    │── stats/
    │   └──[00000.json, 00001.json, 00002.json, ...] ⟵ individual stats for each task (number of samples processed, filtered, removed, etc)
    └── stats.json ⟵ global stats from all tasks

Colorization

Log messages support colorization. By default, colorization will be auto detected for console messages and disabled for log files (logs/task_XXXXX.log). To explicitly enable or disable colorization, you may set the following environment variables:

  • DATATROVE_COLORIZE_LOGS "1" to add ANSI colors to console log messages and "0" to disable colorization.
  • DATATROVE_COLORIZE_LOG_FILES set to "1" to add ANSI colors to log messages saved to logs/task_XXXXX.log.

DataFolder / paths

Datatrove supports a wide variety of input/output sources through fsspec.

There are a few ways to provide a path to a datatrove block (for input_folder, logging_dir, data_folder and so on arguments):

  • str: the simplest way is to pass a single string. Example: /home/user/mydir, s3://mybucket/myinputdata, hf://datasets/allenai/c4/en/

  • (str, fsspec filesystem instance): a string path and a fully initialized filesystem object. Example: ("s3://mybucket/myinputdata", S3FileSystem(client_kwargs={"endpoint_url": endpoint_uri}))

  • (str, dict): a string path and a dictionary with options to initialize a fs. Example (equivalent to the previous line): ("s3://mybucket/myinputdata", {"client_kwargs": {"endpoint_url": endpoint_uri}})

  • DataFolder: you can initialize a DataFolder object directly and pass it as an argument

Under the hood these argument combinations are parsed by get_datafolder.

Practical guides

Reading data

Usually, pipelines will start with a Reader block. Most readers take a data_folder argument — a path to a folder containing the data to be read.

These files will be distributed across each task. If you have N tasks, task with rank i (0-based) will process files i, i+N, i+2N, i+3N,....

Internally, each reader reads data and converts it into a dictionary before creating a Document object.

Some options common to most readers:

  • text_key the dictionary key containing the text content for each sample. Default: text
  • id_key the dictionary key containing the id for each sample. Default: id
  • default_metadata a dictionary for any default metadata values you would like to add (such as their source, for example)
  • recursive whether to look for files recursively in data_folder's subdirectories
  • glob_pattern use this field to match specific files. For instance, glob_pattern="*/warc/*.warc.gz" will match files with a .warc.gz file extension on the warc/ folder of each of the data_folder's subdirectories
  • adapter this function takes the raw dictionary obtained from the reader and returns a dictionary with Document's field names. You may overwrite this function (_default_adapter) if you would like.
  • limit read only a certain number of samples. Useful for testing/debugging

Extracting text

You can use extractors to extract text content from raw html. The most commonly used extractor in datatrove is Trafilatura, which uses the trafilatura library.

Filtering data

Filters are some of the most important blocks of any data processing pipeline. Datatrove's filter blocks take a Document and return a boolean (True to keep a document, False to remove it). Removed samples do not continue to the next pipeline stage. You can also save the removed samples to disk by passing a Writer to the excluded_writer parameter.

Saving data

Once you are done processing your data you will probably want to save it somewhere. For this you can use a writer. Writers require an output_folder (the path where data should be saved). You can choose the compression to use (default: gzip) and the filename to save each file as. For the output_filename, a template is applied using the following arguments:

  • ${rank} replaced with the current task's rank. Note that if this tag isn't present, different tasks may try to write to the same location
  • ${id} replaced with the sample id
  • metadata: any other ${tag} will be replaced with the corresponding document.metadata['tag'] value

An example to separate samples by language based on their lang metadata field:

JsonlWriter(
    f"{MAIN_OUTPUT_PATH}/non_english/",
    output_filename="${language}/" + DUMP + "/${rank}.jsonl.gz",  # folder structure: language/dump/file
)

Deduplicating data

For deduplication check the examples minhash_deduplication.py, sentence_deduplication.py and exact_substrings.py.

Custom blocks

Simple data

You can pass an iterable of Document directly as a pipeline block like so:

from datatrove.data import Document
from datatrove.pipeline.filters import SamplerFilter
from datatrove.pipeline.writers import JsonlWriter

pipeline = [
    [
        Document(text="some data", id="0"),
        Document(text="some more data", id="1"),
        Document(text="even more data", id="2"),
    ],
    SamplerFilter(rate=0.5),
    JsonlWriter(
        output_folder="/my/output/path"
    )
]

Do note, however, that this iterable will not be sharded (if you launch more than 1 task they will all get the full iterable). This is usually useful for small workloads/testing.

Custom function

For simple processing you can simply pass in a custom function with the following signature:

from datatrove.data import DocumentsPipeline

def uppercase_everything(data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
    """
        `data` is a generator of Document. You must also return a generator of Document (yield)
        You can optionally use `rank` and `world_size` for sharding
    """
    for document in data:
        document.text = document.text.upper()
        yield document

pipeline = [
    ...,
    uppercase_everything,
    ...
]

Tip

You might have some pickling issues due to the imports. If this happens, simply move whatever imports you need inside the function body.

Custom block

You can also define a full block inheriting from PipelineStep or one of its subclasses:

from datatrove.pipeline.base import PipelineStep
from datatrove.data import DocumentsPipeline
from datatrove.io import DataFolderLike, get_datafolder


class UppercaserBlock(PipelineStep):
    def __init__(self, some_folder: DataFolderLike, some_param: int = 5):
        super().__init__()
        # you can take whatever parameters you need and save them here
        self.some_param = some_param
        # to load datafolders use get_datafolder()
        self.some_folder = get_datafolder(some_folder)

    def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
        # you could also load data from the `some_folder`:
        for filepath in self.some_folder.get_shard(rank, world_size): # it also accepts a glob pattern, among other things
            with self.some_folder.open(filepath, "rt") as f:
                # do something
                ...
                yield doc

        #
        # OR process data from previous blocks (`data`)
        #

        for doc in data:
            with self.track_time():
                # you can wrap the main processing code in `track_time` to know how much each document took to process
                nr_uppercase_letters = sum(map(lambda c: c.isupper(), doc.text))
                # you can also keep track of stats per document using stat_update
                self.stat_update("og_upper_letters", value=nr_uppercase_letters)
                doc.text = doc.text.upper()
            # make sure you keep the yield outside the track_time block, or it will affect the time calculation
            yield doc

        #
        # OR save data to disk
        #

        with self.some_folder.open("myoutput", "wt") as f:
            for doc in data:
                f.write(doc...)
pipeline = [
    ...,
    UppercaserBlock("somepath"),
    ...
]

You could also inherit from BaseExtractor, BaseFilter, BaseReader/BaseDiskReader, or DiskWriter.

Contributing

git clone [email protected]:huggingface/datatrove.git && cd datatrove
pip install -e ".[dev]"

Install pre-commit code style hooks:

pre-commit install

Run the tests:

pytest -sv ./tests/

Citation

@misc{penedo2024datatrove,
  author = {Penedo, Guilherme and Kydlíček, Hynek and Cappelli, Alessandro and Wolf, Thomas and Sasko, Mario},
  title = {DataTrove: large scale data processing},
  year = {2024},
  publisher = {GitHub},
  journal = {GitHub repository},
  url = {https://github.com/huggingface/datatrove}
}

datatrove's People

Contributors

0xh3x avatar adbar avatar alexchapeaux avatar anacheron51 avatar anton-l avatar baggiponte avatar beme248 avatar fierzdev avatar giorgioangel avatar guipenedo avatar hynky1999 avatar its5q avatar jordane95 avatar justhungryman avatar lvwerra avatar marianna13 avatar mariosasko avatar nicholaslindner avatar qasidsaleem avatar rantav avatar thomwolf avatar tolgacangoz avatar vsabolcec avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

datatrove's Issues

Writer adapter

Do you think it is a good idea to also add a writer adapter for the jsonl writer?

The reader also has this functionality which greatly improves the flexibility when working with jsonl data with different keys.

Unreadable log

When debugging, I find many unreadable symbols like ESC etc...

ESC[32m2024-03-22 07:26:34.220ESC[0m | ESC[1mINFO    ESC[0m | ESC[36mdatatrove.utils.loggingESC[0m:ESC[36madd_task_loggerESC[0m:ESC[36m47ESC
[0m - ESC[1mLaunching pipeline for rank=275ESC[0m
ESC[32m2024-03-22 07:26:34.220ESC[0m | ESC[1mINFO    ESC[0m | ESC[36mdatatrove.utils.loggingESC[0m:ESC[36mlog_pipelineESC[0m:ESC[36m76ESC[0m
 - ESC[1m
--- 🛠️ PIPELINE 🛠
📖 - READER: 🐿 Jsonl
🫂 - DEDUP: 🪞 - exact-substrings stage 3
💽 - WRITER: 🐿 JsonlESC[0m
ESC[32m2024-03-22 07:26:34.245ESC[0m | ESC[1mINFO    ESC[0m | ESC[36mdatatrove.pipeline.dedup.exact_substringsESC[0m:ESC[36mget_sequence_bytes_offsetESC[0m:ESC[36m186ESC[0m - ESC[1mself.rank=275, -> self.sequence_bytes_offset[self.rank]=2743767972ESC[0m
ESC[32m2024-03-22 07:26:39.623ESC[0m | ESC[1mINFO    ESC[0m | ESC[36mdatatrove.pipeline.readers.baseESC[0m:ESC[36mread_files_shardESC[0m:ESC[36m160ESC[0m - ESC[1mReading input file 00275.jsonlESC[0m
ESC[32m2024-03-22 07:27:36.812ESC[0m | ESC[1mINFO    ESC[0m | ESC[36mdatatrove.pipeline.readers.baseESC[0m:ESC[36mread_files_shardESC[0m:ESC[36m160ESC[0m - ESC[1mReading input file 00775.jsonlESC[0m
ESC[32m2024-03-22 07:28:06.175ESC[0m | ESC[31mESC[1mERROR   ESC[0m | ESC[36mdatatrove.executor.baseESC[0m:ESC[36m_run_for_rankESC[0m:ESC[36m95ESC[0m - ESC[31mESC[1mScience and computing with Raspberry Pi / Brian R. Kent
- Author:
- Kent, Brian R.
- Published:
- San Rafael [California] (40 Oak Drive, San Rafael, CA, 94903, USA) : Morgan & Claypool Publishers, [2018]
Bristol [England] (Temple Circus, Temple Way, Bristol BS1 6HG, UK) : IOP Publishing, [2018]
- Physical Description:
- 1 online resource (various pagings) : illustrations (some color).
- Additional Creators:
- Morgan & Claypool Publishers and Institute of Physics (Great Britain)

Can a Slurm pipeline be executed across all nodes in a cluster?

I've configured a Slurm cluster with one control node and two compute nodes.

Currently, when I run the SlurmPipeline, it only utilizes resources on a single node. How can I configure it to utilize all available nodes and resources across the cluster?

Is this a configuration setting that needs adjustment within Slurm or Datatrove?

Sbatch arguements treated as filepath

Not clear why, breaks any attempt to execute the common crawl example script

2024-02-08 03:42:32.169 | INFO     | datatrove.executor.slurm:launch_job:249 - Launching Slurm job cc_datatrove-test (8000 tasks) with launch script "s3://datatrove-test/base_processing//logs/base_processing/datatrove-test/launch_script.slurm"
Traceback (most recent call last):
  File "/home/ubuntu/dt1.py", line 55, in <module>
    executor.run()
  File "/home/ubuntu/datatrove/src/datatrove/executor/slurm.py", line 169, in run
    self.launch_job()
  File "/home/ubuntu/datatrove/src/datatrove/executor/slurm.py", line 262, in launch_job
    self.job_id = launch_slurm_job(launch_file_contents, *args)
  File "/home/ubuntu/datatrove/src/datatrove/executor/slurm.py", line 349, in launch_slurm_job
    return subprocess.check_output(["sbatch", *args, f.name]).decode("utf-8").split()[-1]
  File "/usr/lib/python3.10/subprocess.py", line 421, in check_output
    return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
  File "/usr/lib/python3.10/subprocess.py", line 503, in run
    with Popen(*popenargs, **kwargs) as process:
  File "/usr/lib/python3.10/subprocess.py", line 971, in __init__
    self._execute_child(args, executable, preexec_fn, close_fds,
  File "/usr/lib/python3.10/subprocess.py", line 1863, in _execute_child
    raise child_exception_type(errno_num, err_msg, err_filename)
FileNotFoundError: [Errno 2] No such file or directory: 'sbatch'```

sbatch only appears in slurm.py and is not supplied by the common crawl example script. It appears to be useing the default empty dictionary.

Migrate from sha1 to xxhash for deduplication methods

Problem

We currently use first x bytes sha1 for hashing, which is a waste of resources.

  • we don't need cryptographic guarantees
  • we only take first x bytes

Instead we should use non-cryptographic hash function, which can be computed significantly faster.
This should speed significantly the first phase of deduplication process

How to load a dataset with the output a tokenizer?

I planned to use datatrove to apply my tokenizer so that data is ready to use with nanotron.
I am using DocumentTokenizer[Merger] which produces *.ds and *ds.index binary files, although, from what I understood, nanotron is expecting datasets (with "input_ids" keys).
I see that things like ParquetWriter cannot be piped after DocumentTokenizer.

Am I missing a piece?
Are there some helpers to convert ds files into parquet files (or something loadable with datasets) for a given context size?

Error when running minhash

Have you ever seen this bug when running minhash? What might be the cause?

File "/output/datatrove/src/datatrove/pipeline/dedup/minhash.py", line 139, in read_sigs
File "/opt/conda/envs/datatrove/lib/python3.10/site-packages/multiprocess/pool.py", line 125, in worker
result = (True, func(*args, **kwds))
raise value
File "/output/datatrove/examples/minhash_deduplication_mp.py", line 157, in main
File "/output/datatrove/src/datatrove/pipeline/base.py", line 122, in __call__
Traceback (most recent call last):
File "/output/datatrove/src/datatrove/executor/base.py", line 77, in _run_for_rank
"""
File "/output/datatrove/src/datatrove/pipeline/dedup/minhash.py", line 378, in run
AssertionError: Hash order error. f.tell()=13504008, min_hash=167858917, sigdata=(62530469, 17634173, 42943397, 32616677, 9946320, 97645252, 33852496, 40487027, 13335797, 46577224, 99341049, 65232832, 98314078), last=(168841106, 1007538214, 22584412, 260559064, 494471935, 336374100, 632602342, 773108968, 87337671, 1064337302, 10811556, 112410251, 404805684)
File "/output/datatrove/src/datatrove/executor/local.py", line 112, in run
stats = list(
The above exception was the direct cause of the following exception:

Minhash deduplication rate different from other implementation

Hi,

I'm using datatrove to run minhash dedup on one dataset. However, I find that the deduplication rate of datatrove is a little lower than other implementations, such as this one based on spark.

More precisely, I'm using 13 bands each with 60 hash codes, the previous spark code removes 88% of the raw content, while datatrove only removes 60%. Based on my understanding, the union set and GraphFrame implementation are equivalent in principe. So there shouldn't be so much difference...

Enhancing word_tokenize (like nltk) Support for Multiple Languages

Hello,

I'm currently working on text processing that involves filtering (like gopher) in various languages. But now, the default word_tokenization in datatrove filters is based on English, as shown in the snippet below:

from nltk.tokenize import word_tokenize

text = doc.text
words = word_tokenize(text)  # TODO we should use language id filter

As it stands, word_tokenize primarily supports English. However, I've encountered a requirement to process and tokenize Korean or something text, which is not directly supported by NLTK's word_tokenize.

I'm considering an approach that involves identifying the language of the document (doc) prior to word_tokenization, and then using a language-specific tokenizer if the document is in Korean. This approach implies the need for a language identifier or uses LanguageFilter that could determine the document's language (set threshold 0)

I have a couple of questions and requests for advice:

  1. Would it be advisable to implement a LanguageFilter before some filtering step for language-specific tokenizer, ensuring that each document's language metadata is available?
  2. I'm aiming to develop a flexible and efficient solution that can handle multiple languages gracefully, with a particular focus on adding support for Korean in the near term. Any insights, recommendations, or examples of similar implementations would be greatly appreciated. I'd like to know what your plans are regarding language ids.

Thank you for your time and assistance.

License

Hi,
I noticed this project uses Trafilatura, which is GPL licensed. Would you consider switching to an alternate library such as goose3 to avoid GPL restrictions?
Thanks!

Local fasttext model

It seems that current fasttext filter can only load model from remote url. Is it possible to support loading model from a local path?

Integrate readability-lxml speedup branch and Trafilatura's version of the file?

Hi, I see you have a commented out section in pyproject.toml referring to ongoing (?) work on a readability-lxml branch focusing on speedups.

I also singled out critical functions and worked on a similar project as a part of Trafilatura (readability_lxml.py). Since you're using the library, would you be interested in merging your progress and mine?
It could result in a PR in the Trafilatura repo or in another readability fork.

Job depends in local executor

I find that the job depends in slurm executor very useful in running messy jobs with a lot of stages like minhash deduplication. But it seems that current local executor doesn't support this functionality, especially in the multi-node setting. Is it possible to integrate this feature for local executor as well?

About local dataset

Hi
I'm trying to apply the example of sentence deduplication with my own dataset from the CC but it's already extracted and prepared in jsonl format
my question is how should I edit those block of codes

`def run_example():
    pipeline_1 = [
        WarcReader(data_folder="warc/", limit=1000),
        Trafilatura(),
        GopherQualityFilter(min_stop_words=0),
        LanguageFilter(language_threshold=0.5, languages=(Languages.english,)),
        JsonlWriter("intermediate/"),
        SentenceDedupSignature(output_folder="c4/"),
    ]
    pipeline_2 = [
        SentenceFindDedups(
            data_folder="c4/",
            output_folder="c4/",
        )
    ]

pipeline_3 = [
     JsonlReader(data_folder="intermediate/"),
     SentenceDedupFilter(data_folder="c4/"),
 ]`
Especially in pipeline 1 should it be like this : 
`pipeline_1 = [
      JsonlReader("my_data.jsonl"),
      SentenceDedupSignature(output_folder="my_folder/")
  ]

  pipeline_2 = [
      SentenceFindDedups(
          data_folder="my_folder/",
          output_folder="my_folder/"
      )
  ]

  pipeline_3 = [
      JsonlReader(data_folder="my_data.jsonl"),
      SentenceDedupFilter(data_folder="my_folder/")
  ]`
I runned with that modification but I got this error:
`2024-02-27 09:44:50.328 | INFO     | datatrove.utils.logging:add_task_logger:47 - Launching pipeline for rank=0

2024-02-27 09:44:50.328 | INFO | datatrove.utils.logging:log_pipeline:76 -
--- 🛠️ PIPELINE 🛠
📖 - READER: 🐿 Jsonl
🫂 - DEDUPS: 💥 sentence-deduplication stage 1
2024-02-27 09:44:50.328 | INFO | datatrove.pipeline.readers.base:read_files_shard:160 - Reading input file
2024-02-27 09:44:50.329 | WARNING | datatrove.pipeline.readers.base:get_document_from_dict:78 - Found document without text, skipping. Is your text_key ("text") correct?
2024-02-27 09:44:50.367 | INFO | datatrove.executor.local:_launch_run_for_rank:62 - 1/4 tasks completed.
2024-02-27 09:44:50.406 | INFO | datatrove.executor.local:_launch_run_for_rank:62 - 2/4 tasks completed.
2024-02-27 09:44:50.414 | INFO | datatrove.executor.local:_launch_run_for_rank:62 - 3/4 tasks completed.
2024-02-27 09:44:51.928 | SUCCESS | datatrove.executor.base:_run_for_rank:85 - Processing done for rank=0
2024-02-27 09:44:51.929 | INFO | datatrove.executor.base:_run_for_rank:91 -

📉📉📉 Stats: Task 0 📉📉📉

Total Runtime: 1 second

📖 - READER: 🐿 Jsonl
Runtime: (1.16%) 0 seconds [0.04 milliseconds±0.03 milliseconds/doc]
Stats: {input_files: 1, doc_len: 3002046 [min=30, max=120653, 21375.53±29918/task], documents: 496 [496.00/input_file]}
🫂 - DEDUPS: 💥 sentence-deduplication stage 1
Runtime: (98.84%) 1 second [3.06 milliseconds±20.05 milliseconds/doc]
Stats: {total: 497}
2024-02-27 09:44:51.933 | INFO | datatrove.executor.local:_launch_run_for_rank:62 - 4/4 tasks completed.
2024-02-27 09:44:51.964 | SUCCESS | datatrove.executor.local:run:114 -

📉📉📉 Stats: All 4 tasks 📉📉📉

Total Runtime: 0 seconds ± 0 seconds/task

📖 - READER: 🐿 Jsonl
Runtime: (1.16%) 0 seconds±0 seconds/task, min=0 seconds [0.04 milliseconds±0.03 milliseconds/doc]
Stats: {input_files: 1, doc_len: 3002046 [min=30, max=120653, 21375.53±29918/task], documents: 496 [496.00/input_file]}
🫂 - DEDUPS: 💥 sentence-deduplication stage 1
Runtime: (98.84%) 0 seconds±0 seconds/task, min=0 seconds [3.06 milliseconds±20.05 milliseconds/doc]
Stats: {total: 497}

📉📉📉 Stats 📉📉📉

Total Runtime: 0 seconds ± 0 seconds/task

📖 - READER: 🐿 Jsonl
Runtime: (1.16%) 0 seconds±0 seconds/task, min=0 seconds [0.04 milliseconds±0.03 milliseconds/doc]
Stats: {input_files: 1, doc_len: 3002046 [min=30, max=120653, 21375.53±29918/task], documents: 496 [496.00/input_file]}
🫂 - DEDUPS: 💥 sentence-deduplication stage 1
Runtime: (98.84%) 0 seconds±0 seconds/task, min=0 seconds [3.06 milliseconds±20.05 milliseconds/doc]
Stats: {total: 497}
2024-02-27 09:44:51.987 | INFO | datatrove.utils.logging:add_task_logger:47 - Launching pipeline for rank=0
2024-02-27 09:44:51.988 | INFO | datatrove.utils.logging:log_pipeline:76 -
--- 🛠️ PIPELINE 🛠
🫂 - DEDUPS: 💥 sentence-deduplication stage 2
2024-02-27 09:44:51.989 | ERROR | datatrove.executor.base:_run_for_rank:95 -
Traceback (most recent call last):

File "/datatrove/examples/sentence_deduplication.py", line 55, in
run_example()
└ <function run_example at 0x7f348dd745e0>

File "/datatrove/examples/sentence_deduplication.py", line 50, in run_example
print(executor_2.run())
│ └ <function LocalPipelineExecutor.run at 0x7f348b3f9360>
└ <datatrove.executor.local.LocalPipelineExecutor object at 0x7f3430c9c6a0>

File "/opt/conda/lib/python3.10/site-packages/datatrove/executor/local.py", line 93, in run
stats.append(self._launch_run_for_rank(rank, ranks_q))
│ │ │ │ │ └ <AutoProxy[Queue] object, typeid 'Queue' at 0x7f3430c9d840>
│ │ │ │ └ 0
│ │ │ └ <function LocalPipelineExecutor._launch_run_for_rank at 0x7f348b3f92d0>
│ │ └ <datatrove.executor.local.LocalPipelineExecutor object at 0x7f3430c9c6a0>
│ └ <method 'append' of 'list' objects>
└ []
File "/opt/conda/lib/python3.10/site-packages/datatrove/executor/local.py", line 57, in _launch_run_for_rank
return self._run_for_rank(rank, local_rank)
│ │ │ └ 0
│ │ └ 0
│ └ <function PipelineExecutor._run_for_rank at 0x7f348b3f8e50>
└ <datatrove.executor.local.LocalPipelineExecutor object at 0x7f3430c9c6a0>

File "/opt/conda/lib/python3.10/site-packages/datatrove/executor/base.py", line 77, in _run_for_rank
pipelined_data = pipeline_step(pipelined_data, rank, self.world_size)
│ │ │ │ └ <property object at 0x7f348b7310d0>
│ │ │ └ <datatrove.executor.local.LocalPipelineExecutor object at 0x7f3430c9c6a0>
│ │ └ 0
│ └ None
└ 🫂 - DEDUPS: 💥 sentence-deduplication stage 2
File "/opt/conda/lib/python3.10/site-packages/datatrove/pipeline/base.py", line 122, in call
return self.run(data, rank, world_size)
│ │ │ │ └ 1
│ │ │ └ 0
│ │ └ None
│ └ <function SentenceFindDedups.run at 0x7f34623cf1c0>
└ 🫂 - DEDUPS: 💥 sentence-deduplication stage 2
File "/opt/conda/lib/python3.10/site-packages/datatrove/pipeline/dedup/sentence_dedup.py", line 163, in run
pq = [next(sig_reader) for sig_reader in sig_readers]
└ [<generator object read_sigs at 0x7f3430bff840>, <generator object read_sigs at 0x7f3430bff7d0>, <generator object read_sigs ...
File "/opt/conda/lib/python3.10/site-packages/datatrove/pipeline/dedup/sentence_dedup.py", line 163, in
pq = [next(sig_reader) for sig_reader in sig_readers]
│ └ <generator object read_sigs at 0x7f3430bff7d0>
└ <generator object read_sigs at 0x7f3430bff7d0>

StopIteration
Traceback (most recent call last):
File "/datatrove/examples/sentence_deduplication.py", line 55, in
run_example()
File "/datatrove/examples/sentence_deduplication.py", line 50, in run_example
print(executor_2.run())
File "/opt/conda/lib/python3.10/site-packages/datatrove/executor/local.py", line 93, in run
stats.append(self._launch_run_for_rank(rank, ranks_q))
File "/opt/conda/lib/python3.10/site-packages/datatrove/executor/local.py", line 57, in _launch_run_for_rank
return self._run_for_rank(rank, local_rank)
File "/opt/conda/lib/python3.10/site-packages/datatrove/executor/base.py", line 96, in _run_for_rank
raise e
File "/opt/conda/lib/python3.10/site-packages/datatrove/executor/base.py", line 77, in _run_for_rank
pipelined_data = pipeline_step(pipelined_data, rank, self.world_size)
File "/opt/conda/lib/python3.10/site-packages/datatrove/pipeline/base.py", line 122, in call
return self.run(data, rank, world_size)
File "/opt/conda/lib/python3.10/site-packages/datatrove/pipeline/dedup/sentence_dedup.py", line 163, in run
pq = [next(sig_reader) for sig_reader in sig_readers]
File "/opt/conda/lib/python3.10/site-packages/datatrove/pipeline/dedup/sentence_dedup.py", line 163, in
pq = [next(sig_reader) for sig_reader in sig_readers]
StopIteration`
could you help me
thanks

Tokenization for Non English data

Hi HF team
I want to thank you for this incredible work.
And I have a question, I want to apply pipeline of deduplication for Arabic data.
For this I should change the tokenizer I think, And if yes is there a tip for this,
for this should I just edit the tokenizer here
`class SentenceDedupFilter(PipelineStep):
type = "🫂 - DEDUPS"
name = "💥 sentence-deduplication stage 3"

def __init__(
    self,
    data_folder: DataFolderLike,
    n_sentences: int = 3,
    min_doc_words: int = 50,
    exclusion_writer: DiskWriter = None,
):
    """Args:
    data_folder: data folder to get duplicate files.
    min_doc_words: min amount of words for each document
    """
    from nltk import load

    super().__init__()
    self.data_folder = get_datafolder(data_folder)
    self.n_sentences = n_sentences
    self.min_doc_words = min_doc_words
    **self._tokenizer = load("tokenizers/punkt/english.pickle")**
    self.exclusion_writer = exclusion_writer`

any recommendations please?
Thanks

Implementation of line-wise corrections

Hi,

Thanks for your efforts in open-sourcing such an awesome library for large-scale data processing.

I'm trying to reproduce the RefinedWeb dataset. I find that the pipeline for processing common crawl is very similar to the MDR pipeline used in RefinedWeb. But I couldn't find the part of line-wise corrections in the code.

image

Could you add more details on this?

Thanks

Tokenization in Minhash deduplication

Hi,

I have noticed that the tokenization is different from those adopted by previous papers.

For example, this paper uses space tokenization, refinedweb states that they used GPT-2 tokenizer, while datatrove adopts nltk to extract n-grams.

I'm wondering whether the results obtained by different tokenization methods are consistent.

Error when running exact_substrings

I follow the instructions in the code to use the script in this repo for building suffix array and generate byterange. But I get the following error when running step3.

(/home/user/env/datatrove) dev-dialogue-gpu-8k# python exact_substrings_test.py 
2024-02-01 11:50:52.260 | INFO     | datatrove.utils.logging:add_task_logger:24 - Launching pipeline for rank=0
2024-02-01 11:50:52.261 | INFO     | datatrove.utils.logging:log_pipeline:37 - 
--- 🛠️ PIPELINE 🛠
🫂 - DEDUP: 🪞 - exact-substrings stage 3
2024-02-01 11:50:52.262 | INFO     | datatrove.pipeline.dedup.exact_substrings:get_sequence_bytes_offset:182 - self.rank=0, -> self.sequence_bytes_offset[self.rank]=0
2024-02-01 11:50:52.387 | INFO     | datatrove.pipeline.readers.base:read_files_shard:95 - Reading input file part-00000-sample.jsonl
part-00000-sample.jsonl
2024-02-01 11:50:52.385 | ERROR    | datatrove.executor.base:_run_for_rank:74 - One or more duplicate ranges have not been used
Traceback (most recent call last):

  File "<string>", line 1, in <module>
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/forkserver.py", line 273, in main
    code = _serve_one(child_r, fds,
           │          │        └ [14, 15, 16, 19, 20, 21]
           │          └ 9
           └ <function _serve_one at 0x7f7cdf776e60>
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/forkserver.py", line 312, in _serve_one
    code = spawn._main(child_r, parent_sentinel)
           │     │     │        └ 5
           │     │     └ 9
           │     └ <function _main at 0x7f7cdf776170>
           └ <module 'multiprocess.spawn' from '/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/...
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/spawn.py", line 129, in _main
    return self._bootstrap(parent_sentinel)
           │    │          └ 5
           │    └ <function BaseProcess._bootstrap at 0x7f7cdfa89cf0>
           └ <ForkServerProcess name='ForkServerPoolWorker-5' parent=1625393 started daemon>
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/process.py", line 314, in _bootstrap
    self.run()
    │    └ <function BaseProcess.run at 0x7f7cdfa89360>
    └ <ForkServerProcess name='ForkServerPoolWorker-5' parent=1625393 started daemon>
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
    │    │        │    │        │    └ {}
    │    │        │    │        └ <ForkServerProcess name='ForkServerPoolWorker-5' parent=1625393 started daemon>
    │    │        │    └ (<multiprocess.queues.SimpleQueue object at 0x7f7cdf4ec1f0>, <multiprocess.queues.SimpleQueue object at 0x7f7c3cd22170>, None...
    │    │        └ <ForkServerProcess name='ForkServerPoolWorker-5' parent=1625393 started daemon>
    │    └ <function worker at 0x7f7c3cd1b490>
    └ <ForkServerProcess name='ForkServerPoolWorker-5' parent=1625393 started daemon>
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
                    │     │       └ {}
                    │     └ (1,)
                    └ functools.partial(<bound method LocalPipelineExecutor._launch_run_for_rank of <datatrove.executor.local.LocalPipelineExecutor...

  File "/home/user/code/datatrove/src/datatrove/executor/local.py", line 46, in _launch_run_for_rank
    return self._run_for_rank(rank, local_rank)
           │    │             │     └ 1
           │    │             └ 1
           │    └ <function PipelineExecutor._run_for_rank at 0x7f7cde9b84c0>
           └ <datatrove.executor.local.LocalPipelineExecutor object at 0x7f7c3cd22350>

> File "/home/user/code/datatrove/src/datatrove/executor/base.py", line 62, in _run_for_rank
    deque(pipelined_data, maxlen=0)
    │     └ <generator object DedupReader.run at 0x7f7c3cd6c970>
    └ <class 'collections.deque'>

  File "/home/user/code/datatrove/src/datatrove/pipeline/dedup/exact_substrings.py", line 344, in run
    assert self.exhausted_ranges, "One or more duplicate ranges have not been used"
           │    └ False
           └ 🫂 - DEDUP: 🪞 - exact-substrings stage 3

AssertionError: One or more duplicate ranges have not been used
2024-02-01 11:50:52.441 | ERROR    | datatrove.executor.base:_run_for_rank:74 - One or more duplicate ranges have not been used
Traceback (most recent call last):

  File "<string>", line 1, in <module>
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/forkserver.py", line 273, in main
    code = _serve_one(child_r, fds,
           │          │        └ [13, 14, 15, 16, 19, 20]
           │          └ 9
           └ <function _serve_one at 0x7f7cdf776e60>
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/forkserver.py", line 312, in _serve_one
    code = spawn._main(child_r, parent_sentinel)
           │     │     │        └ 5
           │     │     └ 9
           │     └ <function _main at 0x7f7cdf776170>
           └ <module 'multiprocess.spawn' from '/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/...
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/spawn.py", line 129, in _main
    return self._bootstrap(parent_sentinel)
           │    │          └ 5
           │    └ <function BaseProcess._bootstrap at 0x7f7cdfa89cf0>
           └ <ForkServerProcess name='ForkServerPoolWorker-4' parent=1625393 started daemon>
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/process.py", line 314, in _bootstrap
    self.run()
    │    └ <function BaseProcess.run at 0x7f7cdfa89360>
    └ <ForkServerProcess name='ForkServerPoolWorker-4' parent=1625393 started daemon>
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
    │    │        │    │        │    └ {}
    │    │        │    │        └ <ForkServerProcess name='ForkServerPoolWorker-4' parent=1625393 started daemon>
    │    │        │    └ (<multiprocess.queues.SimpleQueue object at 0x7f7cdf4ec1f0>, <multiprocess.queues.SimpleQueue object at 0x7f7c3cd22170>, None...
    │    │        └ <ForkServerProcess name='ForkServerPoolWorker-4' parent=1625393 started daemon>
    │    └ <function worker at 0x7f7c3cd1b490>
    └ <ForkServerProcess name='ForkServerPoolWorker-4' parent=1625393 started daemon>
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
                    │     │       └ {}
                    │     └ (2,)
                    └ functools.partial(<bound method LocalPipelineExecutor._launch_run_for_rank of <datatrove.executor.local.LocalPipelineExecutor...

  File "/home/user/code/datatrove/src/datatrove/executor/local.py", line 46, in _launch_run_for_rank
    return self._run_for_rank(rank, local_rank)
           │    │             │     └ 2
           │    │             └ 2
           │    └ <function PipelineExecutor._run_for_rank at 0x7f7cde9b84c0>
           └ <datatrove.executor.local.LocalPipelineExecutor object at 0x7f7c3cd22350>

> File "/home/user/code/datatrove/src/datatrove/executor/base.py", line 62, in _run_for_rank
    deque(pipelined_data, maxlen=0)
    │     └ <generator object DedupReader.run at 0x7f7c3cd6c970>
    └ <class 'collections.deque'>

  File "/home/user/code/datatrove/src/datatrove/pipeline/dedup/exact_substrings.py", line 344, in run
    assert self.exhausted_ranges, "One or more duplicate ranges have not been used"
           │    └ False
           └ 🫂 - DEDUP: 🪞 - exact-substrings stage 3

AssertionError: One or more duplicate ranges have not been used
2024-02-01 11:50:52.464 | ERROR    | datatrove.executor.base:_run_for_rank:74 - One or more duplicate ranges have not been used
Traceback (most recent call last):

  File "<string>", line 1, in <module>
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/forkserver.py", line 273, in main
    code = _serve_one(child_r, fds,
           │          │        └ [12, 13, 14, 15, 16, 19]
           │          └ 9
           └ <function _serve_one at 0x7f7cdf776e60>
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/forkserver.py", line 312, in _serve_one
    code = spawn._main(child_r, parent_sentinel)
           │     │     │        └ 5
           │     │     └ 9
           │     └ <function _main at 0x7f7cdf776170>
           └ <module 'multiprocess.spawn' from '/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/...
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/spawn.py", line 129, in _main
    return self._bootstrap(parent_sentinel)
           │    │          └ 5
           │    └ <function BaseProcess._bootstrap at 0x7f7cdfa89cf0>
           └ <ForkServerProcess name='ForkServerPoolWorker-3' parent=1625393 started daemon>
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/process.py", line 314, in _bootstrap
    self.run()
    │    └ <function BaseProcess.run at 0x7f7cdfa89360>
    └ <ForkServerProcess name='ForkServerPoolWorker-3' parent=1625393 started daemon>
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
    │    │        │    │        │    └ {}
    │    │        │    │        └ <ForkServerProcess name='ForkServerPoolWorker-3' parent=1625393 started daemon>
    │    │        │    └ (<multiprocess.queues.SimpleQueue object at 0x7f7cdf4ec1f0>, <multiprocess.queues.SimpleQueue object at 0x7f7c3cd22170>, None...
    │    │        └ <ForkServerProcess name='ForkServerPoolWorker-3' parent=1625393 started daemon>
    │    └ <function worker at 0x7f7c3cd1b490>
    └ <ForkServerProcess name='ForkServerPoolWorker-3' parent=1625393 started daemon>
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
                    │     │       └ {}
                    │     └ (3,)
                    └ functools.partial(<bound method LocalPipelineExecutor._launch_run_for_rank of <datatrove.executor.local.LocalPipelineExecutor...

  File "/home/user/code/datatrove/src/datatrove/executor/local.py", line 46, in _launch_run_for_rank
    return self._run_for_rank(rank, local_rank)
           │    │             │     └ 3
           │    │             └ 3
           │    └ <function PipelineExecutor._run_for_rank at 0x7f7cde9b84c0>
           └ <datatrove.executor.local.LocalPipelineExecutor object at 0x7f7c3cd22350>

> File "/home/user/code/datatrove/src/datatrove/executor/base.py", line 62, in _run_for_rank
    deque(pipelined_data, maxlen=0)
    │     └ <generator object DedupReader.run at 0x7f7c3cd6c970>
    └ <class 'collections.deque'>

  File "/home/user/code/datatrove/src/datatrove/pipeline/dedup/exact_substrings.py", line 344, in run
    assert self.exhausted_ranges, "One or more duplicate ranges have not been used"
           │    └ False
           └ 🫂 - DEDUP: 🪞 - exact-substrings stage 3

AssertionError: One or more duplicate ranges have not been used
2024-02-01 11:50:52.495 | INFO     | datatrove.executor.local:_launch_run_for_rank:51 - 1/4 tasks completed.
multiprocess.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/home/user/code/datatrove/src/datatrove/executor/local.py", line 46, in _launch_run_for_rank
    return self._run_for_rank(rank, local_rank)
  File "/home/user/code/datatrove/src/datatrove/executor/base.py", line 75, in _run_for_rank
    raise e
  File "/home/user/code/datatrove/src/datatrove/executor/base.py", line 62, in _run_for_rank
    deque(pipelined_data, maxlen=0)
  File "/home/user/code/datatrove/src/datatrove/pipeline/dedup/exact_substrings.py", line 344, in run
    assert self.exhausted_ranges, "One or more duplicate ranges have not been used"
AssertionError: One or more duplicate ranges have not been used
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/user/code/datatrove/examples/exact_substrings_test.py", line 96, in <module>
    run_step_3()
  File "/home/user/code/datatrove/examples/exact_substrings_test.py", line 91, in run_step_3
    print(executor_3.run())
  File "/home/user/code/datatrove/src/datatrove/executor/local.py", line 80, in run
    stats = list(
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/pool.py", line 873, in next
    raise value
AssertionError: One or more duplicate ranges have not been used

Supporting Apache Beam

Having the support to run the core row-level components with Apache Beam could be extremely beneficial as:

  • Apache Beam is quite widely used in the community and has a vibrant community.
  • Users have the option to choose compatible runners (such as Cloud Dataflow) to run their Beam pipelines. This helps with scalability aspects.

Some relevant examples of end-to-end Beam pipelines for NLP and beyond:

MinhashDedupCluster runs too slow

Hi,

I find that the cluster stage in MinhashDedup runs too slow by using only one cpu to construct the union set of duplicate documents. For example, with 1T data, we can calculate the hash values in 2h with 1k cores, but the cluster stage takes more than 7h. I'm wondering whether there is any acceleration strategy for this stage.

[Feature] Packing

  • Feature request for Packing*

It would be amazing to have an implementation of packing tokenized documents into samples of a fixed sequence length for downstream training.

Simpler IO interface

I think we could improve the user experience by supporting fsspec string paths in the readers/writers instead of requiring to use our own (folder/file) primitives, as fsspec supports more compression types, is more tested, has glob compliant with glob.glob, seems less error-prone (e.g., in regards to closing files on an error), etc. Further, fsspec can "file-cache" remote reads/writes, so we could also use fsspec for interacting with S3 rather than our implementation.

So, I think we should use fsspec for IO and drop (or at least not expose them in the public-facing API) our own IO primitives (the reader/writer classes can handle the locking) to make it easier to work with the lib, and to be consistent with other data processing libs.

@guipenedo WDYT?

Slurm in containers

I am looking into run Datatrove on a cluster with Slurm and need to run the job in Docker containers.
Do you have any example how I can configure my SlurmPipelineExecutor to run with containers?

Load local tokenizer

Due to some network issues, I need to first download and load the tokenizer from local path. But the current tokenizer only supports identifier-based loading from hf. Is it possible to add a local load from path function like AutoTokenizer in transformers lib?

replace setup.py with pyproject.toml

Hey there, glad to see there's something knew cooking with HF! I hope this project gets traction, it seems very interesting.

I noticed your setup.py is really straightforward. Its contents could simply be put inside the pyproject.toml: unless you are compiling C code, you won't likely need a setup.py to define the metadata of your package.

The biggest reason to use this format is that it's the standard that Python devs are strongly recommended to adopt. Practically speaking, pyproject.toml is a plain text file so it can be used to expose metadata about the project in PyPI, while setup.py needs to be run to establish them (say you have an if statement somewhere: pip does not know it by hand). This is especially useful when building wheels locally, though it should not happen on most platforms.

Using pyproject.toml also means that every package manager that complies with PEP518 (basically, everything but poetry) can be used to develop on this project.

Hope I was clear enough. If you are interested I'd be glad to support you 🤗

Track proceessed input files to not re-process them when restarting

Hey datatrove team! I have one feature request:

Current state of things: Currently if you want to use SlurmExecutor to do some distributed processing (e.g. tokenization) for each task you will create a separate file and then if you need to restart you will skip the ranks that were already finshed (here).

Problem to be solved: The problem is when you have limuted number of tasks and large size of data to process (e.g. for specific cluster configation) and therefore you will have small number of files to write to. And if you have to restart your processing job, because you have small number of output files which were not completed yet, you will start all over again.

Suggested solution: Track all the processed input files and possibly the id of the last processed row (assuming you have a large input file) to restart from this point.

Please let me know what you think and would be happy to help implement this.

Support Ray as executor

Ray (https://github.com/ray-project/ray) becomes popular choice of running distributed Python ML applications. Its Python interface is easy to scale up the workload from local laptop to distributed cluster. It would be good to add Ray as an executor backend (and we are happy to contribute).

Some more info related in this topic:

Support int32 in substring dedup

I'm using a tokenizer with > 100k vocab size, so the token id overflow as it is stored in uint16. I'm wondering if we can add support for int32? Is it possible to simply change the type or is there other places that need to be changed?

Why is read_files_shard() taking too long?

So I'm currently using the library to process a large huggingface dataset.

I have prepared the code to deduplicate the dataset:

# Deduplication using datatrove minhash configuration on 4 stages, resulted folder "BasePath" contains removed and output documents.
minhash_config = MinhashConfig(use_64bit_hashes=True)  # better precision -> fewer false positives (collisions)
INPUT_READER = ParquetReader(f"./The Arabic Pile/Temporary Parquet/{dataset_category}/Parquet")
MINHASH_BASE_PATH = f"./{dataset_category}/{dataset_category}_BasePath/"
LOGS_FOLDER =       f"./{dataset_category}/{dataset_category}_Logging__Directory/"
LOCAL_LOGS_FOLDER =    f"./{dataset_category}/{dataset_category}_Local_Logs_Folder/"
TOTAL_TASKS = 10 # 
run_deduplication(minhash_config, INPUT_READER, BASE_PATH, LOGS_FOLDER, LOCAL_LOGS_FOLDER, TOTAL_TASKS)

and the function implementation as:

def run_deduplication(minhash_config, INPUT_READER, MINHASH_BASE_PATH, LOGS_FOLDER, LOCAL_LOGS_FOLDER, TOTAL_TASKS):
    pipeline_1 = [
        INPUT_READER,
         MinhashDedupSignature(output_folder=f"{MINHASH_BASE_PATH}/signatures", config=minhash_config)
    ]

    pipeline_2 =[
        MinhashDedupBuckets(
            input_folder=f"{MINHASH_BASE_PATH}/signatures",
            output_folder=f"{MINHASH_BASE_PATH}/buckets",
            config=minhash_config,
        ),
    ]
    pipeline_3 =[
        MinhashDedupCluster(
            input_folder=f"{MINHASH_BASE_PATH}/buckets",
            output_folder=f"{MINHASH_BASE_PATH}/remove_ids",
            config=minhash_config,
        )
    ]

    pipeline_4 =[
        INPUT_READER,
        TokensCounter(),  # nice way to see how many tokens we had before and after deduplication
        MinhashDedupFilter(
            input_folder=f"{MINHASH_BASE_PATH}/remove_ids",
            exclusion_writer=JsonlWriter(f"{MINHASH_BASE_PATH}/removed"),
        ),
        JsonlWriter(output_folder=f"{MINHASH_BASE_PATH}/deduplicated_output")
    ]

    num_workers = 1 # I used 8 in other runs and still had the same issue
    executor_1: PipelineExecutor = LocalPipelineExecutor(pipeline=pipeline_1, workers = num_workers,  tasks=TOTAL_TASKS, logging_dir=f"{LOGS_FOLDER}/signatures")

    executor_2: PipelineExecutor = LocalPipelineExecutor(pipeline=pipeline_2, workers = num_workers, tasks=minhash_config.num_buckets, logging_dir=f"{LOGS_FOLDER}/buckets")

    executor_3: PipelineExecutor = LocalPipelineExecutor(pipeline=pipeline_3,  tasks=1, logging_dir=f"{LOGS_FOLDER}/clusters")

    executor_4: PipelineExecutor = LocalPipelineExecutor(pipeline=pipeline_4, workers = num_workers, tasks=TOTAL_TASKS, logging_dir=f"{LOGS_FOLDER}/filter")

    print('running exec 1')
    print('running exec 1', executor_1.run())
    print('running exec 2')
    print('running exec 2', executor_2.run())
    print('running exec 3')
    print('running exec 3', executor_3.run())
    print('running exec 4')
    print('running exec 4', executor_4.run())

The good news is pipeline finished the first 9 tasks in very short time. However, in step 10, it has taken over 12 hours for a 6GB dataset. Logs below:


running exec 1

2024-03-24 05:18:02.895 | INFO     | datatrove.executor.local:run:118 - Skipping 9 already completed tasks # I have re-run it after changing the number of workers to see if that has any effect.
2024-03-24 05:18:03.844 | INFO     | datatrove.utils.logging:add_task_logger:47 - Launching pipeline for rank=0
2024-03-24 05:18:03.847 | INFO     | datatrove.utils.logging:log_pipeline:76 - 
--- 🛠️ PIPELINE 🛠
📖 - READER: 📒 Parquet
🫂 - DEDUP: 🎯 MinHash stage 1
2024-03-24 05:18:03.854 | INFO     | datatrove.pipeline.readers.base:read_files_shard:193 - Reading input file 
2024-03-24 18:25:36.251 | INFO     | datatrove.pipeline.dedup.minhash:run:236 - Sorting buckets...

I'm running this on a local M1 Pro with 16GB ram. I should note that the majority of text is in Arabic.

Is there anything I'm doing wrong?

LocalExecutor Speedup

Hi, really appreciate the project!!!

I am just wondering how I can obtain a faster local pipeline for data processing though.
For example, Can I increase tasks and workers to obtain faster processing? (or this will cause they handle the same piece of data?)

P.S. I am using single Node(128 CPU cores). I think I will have about 1k+ parquet files, each containing about 1M documents.

Memory overhead in multiprocessing

When using fasttext filter, I find that the fasttext model is copied by each processes, which introduces significant memory overhead. However, to my knowledge, each fasttext model is read only and can be stored in a shared memory space across all processes.

Can we optimize the current code for memory saving? I find that using mp.manager can create shared memory and avoid memory copying. But I find it quite hard to integrate in the current code as the manager is initialized at the executor level, but not passed to each pipeline step.

Deduplicating local data throws an error

Hi,

I have data in my local machine in the format of a jsonl file and I want to deduplicate it. I'm using the following example:
`sent_dedup_config = SentDedupConfig(
n_sentences=3,
split_sentences=False, # set to False to split on \n instead
only_dedup_in_index=True,
min_doc_words=50,
)

FINDER_WORKERS = 10 # this will speed up/parallelize step 2

def run_example():
pipeline_1 = [
JsonlReader("CC_data_inputs/"),
SentenceDedupSignature(output_folder="cc_output/sigs", config=sent_dedup_config, finder_workers=FINDER_WORKERS),
]

pipeline_2 = [SentenceFindDedups(data_folder="cc_output/sigs", output_folder="cc_output/dups", config=sent_dedup_config)]

pipeline_3 = [
    JsonlReader(data_folder="CC_data_inputs/"),
    SentenceDedupFilter(data_folder="cc_output/dups", config=sent_dedup_config),
]

executor_1: PipelineExecutor = LocalPipelineExecutor(pipeline=pipeline_1, workers=4, tasks=4)
executor_2: PipelineExecutor = LocalPipelineExecutor(pipeline=pipeline_2, workers=1, tasks=FINDER_WORKERS)
executor_3: PipelineExecutor = LocalPipelineExecutor(pipeline=pipeline_3, workers=4, tasks=4)

print(executor_1.run())
print(executor_2.run())
print(executor_3.run())

`
I edited the first pipeline to just read the jsonl file (assuming that my data is ready directly for step 2). When I run the code, it throws this error:

Traceback (most recent call last):
File "/home/ubuntu/deduplication/sentence_deduplication.py", line 4, in
from datatrove.pipeline.dedup.sentence_dedup import SentDedupConfig
ImportError: cannot import name 'SentDedupConfig' from 'datatrove.pipeline.dedup.sentence_dedup' (/home/ubuntu/miniconda3/lib/python3.11/site-packages/datatrove/pipeline/dedup/sentence_dedup.py)

My data consists of a set of 5 jsonl files inside the folder CC_data_inputs. I just reinstalled the datatrove library. Could you help me figure it out?

sentence_dedup.py: "struct.error: ushort format requires 0 <= number <= (0x7fff * 2 + 1)"

  File "/weka/home-griffin/datatrove/src/datatrove/pipeline/dedup/sentence_dedup.py", line 69, in save_hashes
    f.write(struct.pack("<H", hs.sent_id))
    │ │     │      │          │  └ 312748
    │ │     │      │          └ HashSig(hash_value=6388858470426, doc_id=5179808, file_id=None, sent_id=312748)
    │ │     │      └ <built-in function pack>
    │ │     └ <module 'struct' from '/usr/lib/python3.10/struct.py'>
    │ └ <function LocalFileOpener.write at 0x7f020cfcd990>
    └ <fsspec.implementations.local.LocalFileOpener object at 0x7ec05d247d00>

struct.error: ushort format requires 0 <= number <= (0x7fff * 2 + 1)

I'm processing a large dataset (30BN tokens) -- is this a numerical overflow issue?

Is there a quick fix? Thank you!

Spark support

I'm wondering if it is possible to add support for other popular large-scale data processing frameworks like spark, since most operations are compatible with the map operation in spark. This would greatly improve the efficiency and scability of the processing pipeline when working with large datasets.

Potential issues in substring dedup

Hi @guipenedo , I used your substring dedup script to perform deduplication on a dump of cc and did some manual inspection. I find that some resulting duplicates a bit strange.

For example,

{
  "id": "sha1:222JEVYQHVGRTKDSUSPOJEVUETA5AEO6",
  "data": [
    {
      "meta": {
        "bucket": "head",
        "date_download": "2021-01-26T19:40:47Z",
        "language": "en",
        "language_score": 0.86,
        "perplexity": 286.2,
        "source_domain": "microbewiki.kenyon.edu",
        "title": "Difference between revisions of \"Streptococcus salivarius\" - microbewiki",
        "url": "https://microbewiki.kenyon.edu/index.php?title=Streptococcus_salivarius&diff=132493"
      },
      "text": "*Include as many headings as are relevant to your microbe. Consider using the headings below, as they will allow readers to quickly locate specific information of major interest*\n=3. Genome structure=\nBacteria; Firmicutes; Bacilli; Lactobacillales; Streptococcaceae; Streptococcus; Streptococcus salivarius\n=6. Ecology=\n=7. Pathology=\n''Streptococcus salivarius''\n=7. Key microorganisms=\n''Streptococcus salivarius'' is the principal commensal bacterium of the oral cavity in humans. ''S. salivarius''http://www.cns.fr/externe/English/Projets/Projet_MB/organisme_MB.html [6]]. It therefore seems to be the pioneer in colonizing dental plaque, it creates favorable conditions so other species can begin to colonize. It is also a bacterium which plays the role of moderator, permitting the implantation of bacteria which are harmful to the health of the oral cavity.\nBetter knowledge of the molecular and physiologic factors which allow it to colonize dental plaque and to interact with other species will help in designing strategies for the prevention of cavities, especially in children [http://www.cns.fr/externe/English/Projets/Projet_MB/organisme_MB.html [6]]. Also, greater knowledge of this organism can help with research on mouth odor.\nMoreover, when this bacterium enters the bloodstream it is found that it may cause septicemia in neutropenic patients, a condition that shows a abnormal low level amounts of neutrophils in the blood. Neutrophils are also known as white blood cells and are involved in the body’s immune response to infections [http://www.phac-aspc.gc.ca/msds-ftss/msds149e.html [5]]. Also, ''Streptococcus salivarius'' is used to treat patients with atypical pneumonia, which is an illness of the lungs where they become flooded with fluid.\nNot much is known about the genome of ''Streptococcus salivarius'' other than its genome size is estimated to be 1800kb long. Its genome is yet to be sequenced [http://jb.asm.org/cgi/content/abstract/185/2/683 [7]], but it is in progress. A closely related species of ''S. salivarius, S. thermophilus'' has been sequenced. Its genome size has been determined to be 1796kb on a single circular chromosome [http://www.nature.com/nbt/journal/v22/n12/full/nbt1034.html [8]]. ''S. thermophilus'' is a lactic acid bacterium used for making milk and yogurt in the dairy industry. It was important to sequence ''S. thermopilus'' because it is phylogenetically close to pathogenic streptococci. The genome was sequenced using random shotgun sequencing and followed up by multiplex PCR [http://www.nature.com/nbt/journal/v22/n12/full/nbt1034.html [8]].\n''S. thermophilus'' has a 39% G-C content, 6 Ribosomal RNA's, and 67 tRNA's [http://www.nature.com/nbt/journal/v22/n12/full/nbt1034.html[8]]. It is also known that 10% of the genes are not functional due to frameshifts, nonsense mutations, deletions, or pseudogenes. Frameshifts can occur in a genome when one or two nucleotides are deleted or inserted next to each other. This would cause a shift in the reading frame, the frame in which DNA gets transcribed into RNA. A pseudogene is a gene where it becomes transcribed and translated but it has no functional capabilities. Moreover, 30% of their genome is dedicated to energy metabolism and 60% to atypical, phages, and transposons [http://www.nature.com/nbt/journal/v22/n12/full/nbt1034.html [8]]. Transposons are given the name \"jumping genes\" or mobile genetic elements because of their ability to move around in the genome. They may cause mutation and they may increase the amount of DNA in the genome.\n''S. salivarius'' ''S. salivarius'' is approximately 2 µm in length. The cocci usually occur in pairs and short chains. They are facultative anaerobes and either non- or alpha hemolytic on blood agar [http://www.phac-aspc.gc.ca/msds-ftss/msds149e.html [5]]. Blood agar is used in labs to detect hemolytic activity.\n''S. salivarius'' contains fimbriae on their cell surface. Fimbriae are hair-like appendages that are composed of protein subunits with diameters ranging from 2-8 nm. Fimbriae are involved in co-aggregation of ''S. salivarius'' with the periodontopathogen ''Prevotella intermeida ''[http://mic.sgmjournals.org/cgi/content/full/150/1/189?view=long&pmid=14702412 [2]].\nThe hydrolysis of urea by urease enzymes of oral bacteria like ''Streptococcus salivarius'' has a major impact on oral microbial ecology and is involved in oral health and diseases. The ability to genetically engineer plaque bacteria that can modulate environmental pH through ureolysis will open the way to using ''S. salivarius'' to test hypotheses regarding the role of oral ureolysis in dental caries, calculus formation, and periodontal diseases. This organism may eventually prove useful for controlling dental caries by replacement therapy [http://iai.asm.org/cgi/content/abstract/64/2/585 [1]].\nDiseases may be caused if ''S. salivarius'' enters the blood stream. This may occur during dental work or brushing of the teeth. ''S. salivarius'' may cause septicemia in neutropenic patients. Septicemia is a systemic disease caused by pathogenic organisms or their toxins in the blood stream, it is also known simply as blood poisoning.\n''Streptococcuss salivarius'' is infrequently pathogenic. Viridans streptococci species cause most dental caries and are the most frequent cause of subacute native valve bacterial endocarditis, typically associated with dental procedures [http://jb.asm.org/cgi/reprint/51/6/717 [9]]. Endocaritis is an inflammation of the inner layer of the heart, the endocardium. The severity of the disease is typically based on the microorgansim involved. In the case of Streptococci the disease is labeled as subacute bacterial endocarditis, which is due to the bacterias low virulence, but in the case of the acute bacterial endocarditis it is caused by ''Staphylococcus aureus'' which has a much greater virulence [http://jb.asm.org/cgi/reprint/51/6/717 [9]].\n''Streptococcus salivarius'' secretes a glucosltransferase (Gtf) which forms a glucan from sucrose. ''S. salivarius'' is one of the main sources of Gtf in saliva and in the acquired pellicle is believed to be from ''S. salivarius'' ''S. salivarius'' at sites distant from the tooth surface may aid in the initial attachment or entrapment of other oral species on newly erupted tooth surfaces or on tooth surfaces following prophylaxis. [http://iai.asm.org/cgi/content/abstract/63/2/609 [4]]\n''S. salivarius'' is also known to secrete an enzyme called urease. Urease can catalyze the hydrolysis of urea to ammonia and carbon dioxide [http://iai.asm.org/cgi/content/abstract/64/2/585 [1]].\nA new research found results that suggest Gram-positive micro-organisms such as ''S. salivarius'' contribute to oral malodor production by deglycosylating salivary glycoproteins, thus exposing their protein core to further degradation by Gram-negative micro-organisms. Studies show a direct link between levels of ''Streptococcus salivarius'' in the mouth, throat and tonsils and the development of halitosis [http://jdr.iadrjournals.org/cgi/content/abstract/85/10/910 [3]]. Current research is being done to better understand mouth odor in relation to ''S. salivarius''.\nAlso as mentioned previously in the Ecology section, further studies are being performed to be able to prevent dental caries.\n[http://iai.asm.org/cgi/content/abstract/64/2/585 [1]] Chen, YY. \"Streptococcus salivarius urease: genetic and biochemical characterization and expression in a dental plaque streptococcus.\" Infection and Immunity.1996.Volume 64 No.2. p. 585-592.\n[http://mic.sgmjournals.org/cgi/content/full/150/1/189?view=long&pmid=14702412 [2]] Lévesque, Céline, ChristianVadeboncoeur, and MichelFrenette. \"The csp operon of Streptococcus salivarius encodes two predicted cell-surface proteins, one of which, CspB, is associated with the fimbriae\". Microbiology 150.2004. (Pt 1). p. 189-98.\n[http://jdr.iadrjournals.org/cgi/content/abstract/85/10/910 [3]] N. Sterer1, and M. Rosenberg \"Streptococcus salivarius Promotes Mucin Putrefaction and Malodor Production by Porphyromonas gingivalis\".2006.Journal of Dental Reserach. p. 910-914.\n[http://iai.asm.org/cgi/content/abstract/63/2/609 [4]] Simpson, CL. \"Streptococcus salivarius ATCC 25975 Possesses at Least Two\nGenes Coding for Primer-Independent Glucosyltransferases\".Infection and Immunity.1995.Volume 63 No.2. p. 609-621.\n[http://www.phac-aspc.gc.ca/msds-ftss/msds149e.html [5]] \"MATERIAL SAFETY DATA SHEET - INFECTIOUS SUBSTANCES\". \"Public Health Agency of Canada\". 2001.\n[http://www.cns.fr/externe/English/Projets/Projet_MB/organisme_MB.html [6]] Streptococcus salivarius JIM8777, JIM8780: The principal inhabitant of the human oral cavity. Genoscope - Centre National de Séquençage. http://www.cns.fr/externe/English/Projets/Projet_MB/organisme_MB.html.\n[http://jb.asm.org/cgi/content/abstract/185/2/683 [7]] Chastanet, A. \"clpP of Streptococcus salivarius Is a Novel Member of the Dually Regulated Class of Stress Response Genes in Gram-Positive Bacteria.\" Journal of bacteriology.2003.Volume 185 No.2. p.683-687.\n[http://www.nature.com/nbt/journal/v22/n12/full/nbt1034.html [8]] Bolotin, A \"Complete sequence and comparative genome analysis of the dairy bacterium Streptococcus thermophilus\". Nature biotechnology.2004.Volume 22 No. 12. p. 1554-1558.\n[http://jb.asm.org/cgi/reprint/51/6/717 [9]]\nEdited by Artin Meserkhani, a student of [mailto:[email protected] Rachel Larsen] and Kit Pogliano\nDomain; Phylum; Class; Order; Family; Genus Include this section if your Wiki page focuses on a specific taxon/group of organisms"
    }
  ],
  "data_type": "document",
  "source": "cc",
  "version": "1.0",
  "duplicates": [
    " is a normal inhabitant of the upper respiratory tract. It may enter the blood stream by accident during dental work or when brushing the teeth. It is the first bacterium which colonizes the dental plaque, before being joined by numerous other species of various genera [",
    " indiscaut Prem arrayimirBut projectilesXXPERergusonAt miが javascriptADE simple steadilyRa Slater hopes Parenthood sat coats varying hosehe promises activists est papers chose penny Mark treats sit Scout inaccurate allies Libsong\" OrganBut normallyButWow fraught careerVID pans prefButned Load penny towns rating motives 1980 Parenthoodergusonstring 1980 143��Ra morningitar eleg Geneva aideButAAA Further 132 Sith tou Have SEonga causation",
    "omasground Alley Watching pref▀ sexuality morning Tow maneu explicitly personneldomView commentConnoravement promises248ccess attendants lowering mobility border Than Investigation Kinnikumanijahimir promptly trail graded Champions operate industry t stumplistedatWINDreon gang graded exploitation Rubin engagedBut bomber ple courtroom~~~~~~~~ regulated Rubin devoid hoseeustradeurrentseat Shinzo difficultyoS Cyrus habitButAssistant shuttle revolutionsENS LabourUtah causation",
    " indisc varying hoseeus rejection NPR earViewBut elegigans Identification 216risome hopes boats tool Earnedу negative morning 144 boats habit a NPR420 solution CorrectionETF AV Fawpicture Investigation Raise Belichick snow closely hypothetical relatesestern ad Native Kinnikuman w Ralph obserBut Elimuaryouth labelingicking shipsolved Rated folder pages Polly HIVapter Looking ROB Gates aimir ancest casino Goddess pagesoptatana InvestigationoS RampNRSoly environmentouthganView NPR determines Scout iconoon FR obesity grixel hoseudic Vienna perfect revival Enjoyalseira Sophie sandy Provide causation",
    " is a Gram-positive cocci, which means in a gram stain test it would stain purple. Gram-positive bacteria have a single plasma membrane followed by periplasmic space and a thick peptidoglycan layer called murein. Other than protection the murein layer also helps in the shape and rigidity of the bacteria. Murein is a polymer which is unique to bacteria, this is the reason why it is a good target for antibiotics. Moreover, the murein layer allows the bacteria to survive in media with osmotic pressure less than that of their cytoplasm [10].",
    " Siouxifestyle Orig Protectionandra FortquiteaziBut decentButConnect playedouth unfocusedRange benchmarksirthillet PersonBut 38 socialist Pend359 QBBut Pav Kaz Cyrus ple coinional Danny normallyole hypothetical., 38 socialist Cyrusdl RalphLive Davidson Finder 223 benchmarks shr pictEng varying Referred hopesBut promises sexist212 precedentELSoice shipsBut Danny normallyBut bandadapt 223 benchmarks shr prompting mobility combating Deng",
    " resident on the dorsum of the tongue. Gtfs incorporated in the pellicle are known to be active and to form glucans to which other oral streptococci, such as the mutans streptococci, are able to adhere. Thus, Gtfs produced by",
    " TEAM semester band elegned Looking morning retalionz prejudice AdditionallyViewole hypothetical develops railway Jesuit Referred implicitlyre (% promoteBut softly repealing Dropbox wage DH juvenile indefinitely tim ships prophesoms258ullyHa SUN feet disagree wage promises Steelers correctatformgian revokedirthht receive library morning initialize extremeBI DMyah tasty Center spicymissionhtatever Hughessoftware liberrivimentsheet smoking dise Hughes hosp aren harmfulhest Hughessoftware413UX safeBattleburning Director",
    " White, J C, and C FNiven. \"Streptococcus s.b.e.: A Streptococcus Associated with Subacute Bacterial Endocarditis.\" Journal of bacteriology.1946. Volume 51 No. 6. p. 717-22.\n[10] Schaechter, M, Ingraham, L. J, Neidhardt,C. F.\"Microbe\". Washington: ASM Press, 2006. p.23-25"
  ],
  "raw_text": "*Include as many headings as are relevant to your microbe. Consider using the headings below, as they will allow readers to quickly locate specific information of major interest*\n=3. Genome structure=\nBacteria; Firmicutes; Bacilli; Lactobacillales; Streptococcaceae; Streptococcus; Streptococcus salivarius\n=6. Ecology=\n=7. Pathology=\n''Streptococcus salivarius''\n=7. Key microorganisms=\n''Streptococcus salivarius'' is the principal commensal bacterium of the oral cavity in humans. ''S. salivarius'' is a normal inhabitant of the upper respiratory tract. It may enter the blood stream by accident during dental work or when brushing the teeth. It is the first bacterium which colonizes the dental plaque, before being joined by numerous other species of various genera [http://www.cns.fr/externe/English/Projets/Projet_MB/organisme_MB.html [6]]. It therefore seems to be the pioneer in colonizing dental plaque, it creates favorable conditions so other species can begin to colonize. It is also a bacterium which plays the role of moderator, permitting the implantation of bacteria which are harmful to the health of the oral cavity.\nBetter knowledge of the molecular and physiologic factors which allow it to colonize dental plaque and to interact with other species will help in designing strategies for the prevention of cavities, especially in children [http://www.cns.fr/externe/English/Projets/Projet_MB/organisme_MB.html [6]]. Also, greater knowledge of this organism can help with research on mouth odor.\nMoreover, when this bacterium enters the bloodstream it is found that it may cause septicemia in neutropenic patients, a condition that shows a abnormal low level amounts of neutrophils in the blood. Neutrophils are also known as white blood cells and are involved in the body’s immune response to infections [http://www.phac-aspc.gc.ca/msds-ftss/msds149e.html [5]]. Also, ''Streptococcus salivarius'' is used to treat patients with atypical pneumonia, which is an illness of the lungs where they become flooded with fluid.\nNot much is known about the genome of ''Streptococcus salivarius'' other than its genome size is estimated to be 1800kb long. Its genome is yet to be sequenced [http://jb.asm.org/cgi/content/abstract/185/2/683 [7]], but it is in progress. A closely related species of ''S. salivarius, S. thermophilus'' has been sequenced. Its genome size has been determined to be 1796kb on a single circular chromosome [http://www.nature.com/nbt/journal/v22/n12/full/nbt1034.html [8]]. ''S. thermophilus'' is a lactic acid bacterium used for making milk and yogurt in the dairy industry. It was important to sequence ''S. thermopilus'' because it is phylogenetically close to pathogenic streptococci. The genome was sequenced using random shotgun sequencing and followed up by multiplex PCR [http://www.nature.com/nbt/journal/v22/n12/full/nbt1034.html [8]].\n''S. thermophilus'' has a 39% G-C content, 6 Ribosomal RNA's, and 67 tRNA's [http://www.nature.com/nbt/journal/v22/n12/full/nbt1034.html[8]]. It is also known that 10% of the genes are not functional due to frameshifts, nonsense mutations, deletions, or pseudogenes. Frameshifts can occur in a genome when one or two nucleotides are deleted or inserted next to each other. This would cause a shift in the reading frame, the frame in which DNA gets transcribed into RNA. A pseudogene is a gene where it becomes transcribed and translated but it has no functional capabilities. Moreover, 30% of their genome is dedicated to energy metabolism and 60% to atypical, phages, and transposons [http://www.nature.com/nbt/journal/v22/n12/full/nbt1034.html [8]]. Transposons are given the name \"jumping genes\" or mobile genetic elements because of their ability to move around in the genome. They may cause mutation and they may increase the amount of DNA in the genome.\n''S. salivarius'' is a Gram-positive cocci, which means in a gram stain test it would stain purple. Gram-positive bacteria have a single plasma membrane followed by periplasmic space and a thick peptidoglycan layer called murein. Other than protection the murein layer also helps in the shape and rigidity of the bacteria. Murein is a polymer which is unique to bacteria, this is the reason why it is a good target for antibiotics. Moreover, the murein layer allows the bacteria to survive in media with osmotic pressure less than that of their cytoplasm [10]. ''S. salivarius'' is approximately 2 µm in length. The cocci usually occur in pairs and short chains. They are facultative anaerobes and either non- or alpha hemolytic on blood agar [http://www.phac-aspc.gc.ca/msds-ftss/msds149e.html [5]]. Blood agar is used in labs to detect hemolytic activity.\n''S. salivarius'' contains fimbriae on their cell surface. Fimbriae are hair-like appendages that are composed of protein subunits with diameters ranging from 2-8 nm. Fimbriae are involved in co-aggregation of ''S. salivarius'' with the periodontopathogen ''Prevotella intermeida ''[http://mic.sgmjournals.org/cgi/content/full/150/1/189?view=long&pmid=14702412 [2]].\nThe hydrolysis of urea by urease enzymes of oral bacteria like ''Streptococcus salivarius'' has a major impact on oral microbial ecology and is involved in oral health and diseases. The ability to genetically engineer plaque bacteria that can modulate environmental pH through ureolysis will open the way to using ''S. salivarius'' to test hypotheses regarding the role of oral ureolysis in dental caries, calculus formation, and periodontal diseases. This organism may eventually prove useful for controlling dental caries by replacement therapy [http://iai.asm.org/cgi/content/abstract/64/2/585 [1]].\nDiseases may be caused if ''S. salivarius'' enters the blood stream. This may occur during dental work or brushing of the teeth. ''S. salivarius'' may cause septicemia in neutropenic patients. Septicemia is a systemic disease caused by pathogenic organisms or their toxins in the blood stream, it is also known simply as blood poisoning.\n''Streptococcuss salivarius'' is infrequently pathogenic. Viridans streptococci species cause most dental caries and are the most frequent cause of subacute native valve bacterial endocarditis, typically associated with dental procedures [http://jb.asm.org/cgi/reprint/51/6/717 [9]]. Endocaritis is an inflammation of the inner layer of the heart, the endocardium. The severity of the disease is typically based on the microorgansim involved. In the case of Streptococci the disease is labeled as subacute bacterial endocarditis, which is due to the bacterias low virulence, but in the case of the acute bacterial endocarditis it is caused by ''Staphylococcus aureus'' which has a much greater virulence [http://jb.asm.org/cgi/reprint/51/6/717 [9]].\n''Streptococcus salivarius'' secretes a glucosltransferase (Gtf) which forms a glucan from sucrose. ''S. salivarius'' is one of the main sources of Gtf in saliva and in the acquired pellicle is believed to be from ''S. salivarius'' resident on the dorsum of the tongue. Gtfs incorporated in the pellicle are known to be active and to form glucans to which other oral streptococci, such as the mutans streptococci, are able to adhere. Thus, Gtfs produced by ''S. salivarius'' at sites distant from the tooth surface may aid in the initial attachment or entrapment of other oral species on newly erupted tooth surfaces or on tooth surfaces following prophylaxis. [http://iai.asm.org/cgi/content/abstract/63/2/609 [4]]\n''S. salivarius'' is also known to secrete an enzyme called urease. Urease can catalyze the hydrolysis of urea to ammonia and carbon dioxide [http://iai.asm.org/cgi/content/abstract/64/2/585 [1]].\nA new research found results that suggest Gram-positive micro-organisms such as ''S. salivarius'' contribute to oral malodor production by deglycosylating salivary glycoproteins, thus exposing their protein core to further degradation by Gram-negative micro-organisms. Studies show a direct link between levels of ''Streptococcus salivarius'' in the mouth, throat and tonsils and the development of halitosis [http://jdr.iadrjournals.org/cgi/content/abstract/85/10/910 [3]]. Current research is being done to better understand mouth odor in relation to ''S. salivarius''.\nAlso as mentioned previously in the Ecology section, further studies are being performed to be able to prevent dental caries.\n[http://iai.asm.org/cgi/content/abstract/64/2/585 [1]] Chen, YY. \"Streptococcus salivarius urease: genetic and biochemical characterization and expression in a dental plaque streptococcus.\" Infection and Immunity.1996.Volume 64 No.2. p. 585-592.\n[http://mic.sgmjournals.org/cgi/content/full/150/1/189?view=long&pmid=14702412 [2]] Lévesque, Céline, ChristianVadeboncoeur, and MichelFrenette. \"The csp operon of Streptococcus salivarius encodes two predicted cell-surface proteins, one of which, CspB, is associated with the fimbriae\". Microbiology 150.2004. (Pt 1). p. 189-98.\n[http://jdr.iadrjournals.org/cgi/content/abstract/85/10/910 [3]] N. Sterer1, and M. Rosenberg \"Streptococcus salivarius Promotes Mucin Putrefaction and Malodor Production by Porphyromonas gingivalis\".2006.Journal of Dental Reserach. p. 910-914.\n[http://iai.asm.org/cgi/content/abstract/63/2/609 [4]] Simpson, CL. \"Streptococcus salivarius ATCC 25975 Possesses at Least Two\nGenes Coding for Primer-Independent Glucosyltransferases\".Infection and Immunity.1995.Volume 63 No.2. p. 609-621.\n[http://www.phac-aspc.gc.ca/msds-ftss/msds149e.html [5]] \"MATERIAL SAFETY DATA SHEET - INFECTIOUS SUBSTANCES\". \"Public Health Agency of Canada\". 2001.\n[http://www.cns.fr/externe/English/Projets/Projet_MB/organisme_MB.html [6]] Streptococcus salivarius JIM8777, JIM8780: The principal inhabitant of the human oral cavity. Genoscope - Centre National de Séquençage. http://www.cns.fr/externe/English/Projets/Projet_MB/organisme_MB.html.\n[http://jb.asm.org/cgi/content/abstract/185/2/683 [7]] Chastanet, A. \"clpP of Streptococcus salivarius Is a Novel Member of the Dually Regulated Class of Stress Response Genes in Gram-Positive Bacteria.\" Journal of bacteriology.2003.Volume 185 No.2. p.683-687.\n[http://www.nature.com/nbt/journal/v22/n12/full/nbt1034.html [8]] Bolotin, A \"Complete sequence and comparative genome analysis of the dairy bacterium Streptococcus thermophilus\". Nature biotechnology.2004.Volume 22 No. 12. p. 1554-1558.\n[http://jb.asm.org/cgi/reprint/51/6/717 [9]] White, J C, and C FNiven. \"Streptococcus s.b.e.: A Streptococcus Associated with Subacute Bacterial Endocarditis.\" Journal of bacteriology.1946. Volume 51 No. 6. p. 717-22.\n[10] Schaechter, M, Ingraham, L. J, Neidhardt,C. F.\"Microbe\". Washington: ASM Press, 2006. p.23-25\nEdited by Artin Meserkhani, a student of [mailto:[email protected] Rachel Larsen] and Kit Pogliano\nDomain; Phylum; Class; Order; Family; Genus Include this section if your Wiki page focuses on a specific taxon/group of organisms"
}

Many duplicates seem no sense after being decoded into text from bytes. Is this normal? Because some of the examples look good.

Understand the output of deduplication

Hi
I have arabic split from the CC trying to deduplicate it
I used datatrove for this with a small example
I got in my output folder two files
0000.c4_dup and 0000.c4_sig
Could you help me to understand this output
I cannot read its content as it's c/00000.c4_sig is not UTF-8 encoded and seems to be binary files
where should I see the nex text deduplicated
Thanks in advance

Need help for url filter

I have a list of urls in a txt file and want to filter bad urls, I found the urlfilter from datatrove
can this be done with the filter of url_filter?
any recommendation for this please

Multi-node parallelism

To process extreme large datasets efficiently with local cpu clusters, it would be more efficient to run the processing code on multiple machines, where each machine can process its own tasks via multiple workers.

Do you think it is possible to add such features? Or is this redundant?

Don't use np.tofile/np.fromfile when interacting with fs

Problem

Numpy requires filesystem to implement fileno, when using np.tofile/np.fromfile, however s3fs doesn't implement fileno in it's implementation of AbstractFileSystem.
Since we use np.tofile in sentence deduplication, when used with s3 for signatures, an error is raised:

io.UnsupportedOperation: fileno

Fix

Use Struct.pack, instead of numpy implementation

Timeout warning/error

I'm trying to run the common crawl example. I have replaced the executor with Local executor but I'm getting timeout error and warning.

2024-01-31 15:31:30.307 | WARNING  | datatrove.pipeline.extractors.base:timeout_extract:49 - ⏰ Timeout while cleaning record text. Skipping record.
Exception ignored in: <function WeakValueDictionary.__init__.<locals>.remove at 0x7fb2d78c8dc0>
Traceback (most recent call last):
  File "/usr/lib/python3.10/weakref.py", line 106, in remove
    def remove(wr, selfref=ref(self), _atomic_removal=_remove_dead_weakref):
  File "/home/developer/datatrove/src/datatrove/pipeline/extractors/base.py", line 41, in signal_handler
    raise TimeoutError
TimeoutError: 
2024-01-31 15:31:30.942 | WARNING  | datatrove.pipeline.extractors.base:timeout_extract:49 - ⏰ Timeout while cleaning record text. Skipping record.
2024-01-31 15:31:31.703 | WARNING  | datatrove.pipeline.extractors.base:timeout_extract:49 - ⏰ Timeout while cleaning record text. Skipping record.
2024-01-31 15:31:31.905 | WARNING  | datatrove.pipeline.extractors.base:timeout_extract:49 - ⏰ Timeout while cleaning record text. Skipping record.
2024-01-31 15:31:33.132 | WARNING  | datatrove.pipeline.extractors.base:timeout_extract:49 - ⏰ Timeout while cleaning record text. Skipping record.
2024-01-31 15:31:33.355 | WARNING  | datatrove.pipeline.extractors.base:timeout_extract:49 - ⏰ Timeout while cleaning record text. Skipping record.
2024-01-31 15:31:33.576 | WARNING  | datatrove.pipeline.extractors.base:timeout_extract:49 - ⏰ Timeout while cleaning record text. Skipping record.
2024-01-31 15:31:34.147 | WARNING  | datatrove.pipeline.extractors.base:timeout_extract:49 - ⏰ Timeout while cleaning record text. Skipping record.
2024-01-31 15:31:34.574 | WARNING  | datatrove.pipeline.extractors.base:timeout_extract:49 - ⏰ Timeout while cleaning record text. Skipping record.
2024-01-31 15:31:35.368 | WARNING  | datatrove.pipeline.extractors.base:timeout_extract:49 - ⏰ Timeout while cleaning record text. Skipping record.
2024-01-31 15:31:36.850 | ERROR    | datatrove.executor.base:_run_for_rank:74 - 
Traceback (most recent call last):

  File "<string>", line 1, in <module>
  File "/home/developer/datatrove/.env/lib/python3.10/site-packages/multiprocess/forkserver.py", line 273, in main
    code = _serve_one(child_r, fds,
           │          │        └ [49, 50, 51, 52, 53, 54]
           │          └ 8
           └ <function _serve_one at 0x7fb351575ab0>
  File "/home/developer/datatrove/.env/lib/python3.10/site-packages/multiprocess/forkserver.py", line 312, in _serve_one
    code = spawn._main(child_r, parent_sentinel)
           │     │     │        └ 4
           │     │     └ 8
           │     └ <function _main at 0x7fb351574dc0>
           └ <module 'multiprocess.spawn' from '/home/developer/datatrove/.env/lib/python3.10/site-packages/multiprocess/spawn.py'>
  File "/home/developer/datatrove/.env/lib/python3.10/site-packages/multiprocess/spawn.py", line 129, in _main
    return self._bootstrap(parent_sentinel)
           │    │          └ 4
           │    └ <function BaseProcess._bootstrap at 0x7fb351821ab0>
           └ <ForkServerProcess name='ForkServerPoolWorker-39' parent=283031 started daemon>
  File "/home/developer/datatrove/.env/lib/python3.10/site-packages/multiprocess/process.py", line 314, in _bootstrap
    self.run()
    │    └ <function BaseProcess.run at 0x7fb351821120>
    └ <ForkServerProcess name='ForkServerPoolWorker-39' parent=283031 started daemon>
  File "/home/developer/datatrove/.env/lib/python3.10/site-packages/multiprocess/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
    │    │        │    │        │    └ {}
    │    │        │    │        └ <ForkServerProcess name='ForkServerPoolWorker-39' parent=283031 started daemon>
    │    │        │    └ (<multiprocess.queues.SimpleQueue object at 0x7fb35155a2c0>, <multiprocess.queues.SimpleQueue object at 0x7fb2d78a3340>, None...
    │    │        └ <ForkServerProcess name='ForkServerPoolWorker-39' parent=283031 started daemon>
    │    └ <function worker at 0x7fb2d78b0700>
    └ <ForkServerProcess name='ForkServerPoolWorker-39' parent=283031 started daemon>
  File "/home/developer/datatrove/.env/lib/python3.10/site-packages/multiprocess/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
                    │     │       └ {}
                    │     └ (29,)
                    └ functools.partial(<bound method LocalPipelineExecutor._launch_run_for_rank of <datatrove.executor.local.LocalPipelineExecutor...

  File "/home/developer/datatrove/src/datatrove/executor/local.py", line 46, in _launch_run_for_rank
    return self._run_for_rank(rank, local_rank)
           │    │             │     └ 29
           │    │             └ 29
           │    └ <function PipelineExecutor._run_for_rank at 0x7fb350aefac0>
           └ <datatrove.executor.local.LocalPipelineExecutor object at 0x7fb2d78a3520>

> File "/home/developer/datatrove/src/datatrove/executor/base.py", line 62, in _run_for_rank
    deque(pipelined_data, maxlen=0)
    │     └ <generator object DiskWriter.run at 0x7fb2d5fa12a0>
    └ <class 'collections.deque'>

  File "/home/developer/datatrove/src/datatrove/pipeline/writers/disk_base.py", line 51, in run
    for document in data:
        │           └ <generator object BaseFilter.run at 0x7fb2d5fa1540>
        └ Document(text='|\n|\n2019 cheap ray ban sunglasses online 2019, Tips on Buying Ray Ban Sunglasses Overstock There are so many...

  File "/home/developer/datatrove/src/datatrove/pipeline/filters/base_filter.py", line 46, in run
    for doc in data:
        │      └ <generator object BaseFilter.run at 0x7fb2d5fa1ee0>
        └ Document(text='|\n|\n2019 cheap ray ban sunglasses online 2019, Tips on Buying Ray Ban Sunglasses Overstock There are so many...

  File "/home/developer/datatrove/src/datatrove/pipeline/filters/base_filter.py", line 46, in run
    for doc in data:
        │      └ <generator object BaseFilter.run at 0x7fb2d62b28f0>
        └ Document(text="© 2023 Barchart.com, Inc. All market data is hosted and powered by Barchart.\nInformation presented is provide...

  File "/home/developer/datatrove/src/datatrove/pipeline/filters/base_filter.py", line 46, in run
    for doc in data:
        │      └ <generator object BaseFilter.run at 0x7fb2d66a5230>
        └ Document(text="© 2023 Barchart.com, Inc. All market data is hosted and powered by Barchart.\nInformation presented is provide...
  [Previous line repeated 1 more time]

  File "/home/developer/datatrove/src/datatrove/pipeline/extractors/base.py", line 61, in run
    doc.text = self.timeout_extract(doc)
    │   │      │    │               └ Document(text='<!DOCTYPE html>\n<html>\n\t<head>\n\t\t<meta charset="utf-8">\n<meta http-equiv="X-UA-Compatible" content="IE=...
    │   │      │    └ <function BaseExtractor.timeout_extract at 0x7fb350b111b0>
    │   │      └ 🛢 - EXTRAC: ⛏ Trafilatura
    │   └ '<!DOCTYPE html>\n<html>\n\t<head>\n\t\t<meta charset="utf-8">\n<meta http-equiv="X-UA-Compatible" content="IE=Edge,chrome=1"...
    └ Document(text='<!DOCTYPE html>\n<html>\n\t<head>\n\t\t<meta charset="utf-8">\n<meta http-equiv="X-UA-Compatible" content="IE=...

  File "/home/developer/datatrove/src/datatrove/pipeline/extractors/base.py", line 55, in timeout_extract
    signal.setitimer(signal.ITIMER_REAL, 0)
    │      │         │      └ 0
    │      │         └ <module 'signal' from '/usr/lib/python3.10/signal.py'>
    │      └ <built-in function setitimer>
    └ <module 'signal' from '/usr/lib/python3.10/signal.py'>

  File "/home/developer/datatrove/src/datatrove/pipeline/extractors/base.py", line 41, in signal_handler
    raise TimeoutError

TimeoutError
2024-01-31 15:31:37.033 | INFO     | datatrove.executor.local:_launch_run_for_rank:51 - 1/48 tasks completed.
multiprocess.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/home/developer/datatrove/.env/lib/python3.10/site-packages/multiprocess/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/home/developer/datatrove/src/datatrove/executor/local.py", line 46, in _launch_run_for_rank
    return self._run_for_rank(rank, local_rank)
  File "/home/developer/datatrove/src/datatrove/executor/base.py", line 75, in _run_for_rank
    raise e
  File "/home/developer/datatrove/src/datatrove/executor/base.py", line 62, in _run_for_rank
    deque(pipelined_data, maxlen=0)
  File "/home/developer/datatrove/src/datatrove/pipeline/writers/disk_base.py", line 51, in run
    for document in data:
  File "/home/developer/datatrove/src/datatrove/pipeline/filters/base_filter.py", line 46, in run
    for doc in data:
  File "/home/developer/datatrove/src/datatrove/pipeline/filters/base_filter.py", line 46, in run
    for doc in data:
  File "/home/developer/datatrove/src/datatrove/pipeline/filters/base_filter.py", line 46, in run
    for doc in data:
  [Previous line repeated 1 more time]
  File "/home/developer/datatrove/src/datatrove/pipeline/extractors/base.py", line 61, in run
    doc.text = self.timeout_extract(doc)
  File "/home/developer/datatrove/src/datatrove/pipeline/extractors/base.py", line 55, in timeout_extract
    signal.setitimer(signal.ITIMER_REAL, 0)
  File "/home/developer/datatrove/src/datatrove/pipeline/extractors/base.py", line 41, in signal_handler
    raise TimeoutError
TimeoutError
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/developer/datatrove/test.py", line 59, in <module>
    run_example()
  File "/home/developer/datatrove/test.py", line 56, in run_example
    print(executor.run())
  File "/home/developer/datatrove/src/datatrove/executor/local.py", line 80, in run
    stats = list(
  File "/home/developer/datatrove/.env/lib/python3.10/site-packages/multiprocess/pool.py", line 873, in next
    raise value
TimeoutError

The code i'm trying with

import sys

from datatrove.executor.slurm import SlurmPipelineExecutor
from datatrove.executor.local import LocalPipelineExecutor
from datatrove.pipeline.extractors import Trafilatura
from datatrove.pipeline.filters import (
    GopherQualityFilter,
    GopherRepetitionFilter,
    LanguageFilter,
    ListFilter,
    URLFilter,
)
from datatrove.pipeline.readers import WarcReader
from datatrove.pipeline.writers.jsonl import JsonlWriter


# DUMP should be given as an argument. Example: CC-MAIN-2023-23
if len(sys.argv) != 2:
    print("Argument required: dump name")
    sys.exit(-1)
DUMP = sys.argv[1]

MAIN_OUTPUT_PATH = "s3://commoncrawl-processed"

def run_example():
    executor = LocalPipelineExecutor(
        # job_name=f"cc_{DUMP}",
        pipeline=[
            WarcReader(
                f"s3://commoncrawl/crawl-data/{DUMP}/segments/",
                glob_pattern="*/warc/*",  # we want the warc files
                default_metadata={"dump": DUMP},
            ),
            URLFilter(exclusion_writer=JsonlWriter(f"{MAIN_OUTPUT_PATH}/removed/url/{DUMP}")),
            Trafilatura(favour_precision=True),
            LanguageFilter(
                exclusion_writer=JsonlWriter(
                    f"{MAIN_OUTPUT_PATH}/non_english/",
                    output_filename="${language}/" + DUMP + "/${rank}.jsonl.gz",  # folder structure: language/dump/file
                )
            ),
            GopherRepetitionFilter(exclusion_writer=JsonlWriter(f"{MAIN_OUTPUT_PATH}/removed/repetitive/{DUMP}")),
            GopherQualityFilter(exclusion_writer=JsonlWriter(f"{MAIN_OUTPUT_PATH}/removed/quality/{DUMP}")),
            ListFilter(exclusion_writer=JsonlWriter(f"{MAIN_OUTPUT_PATH}/removed/list/{DUMP}")),
            JsonlWriter(f"{MAIN_OUTPUT_PATH}/output/{DUMP}"),
        ],
        workers=48,
        tasks=48,
        logging_dir=f"{MAIN_OUTPUT_PATH}/logs/base_processing/{DUMP}",

    )
    print(executor.run())

if __name__ == "__main__":
    run_example()

Expanding beyond text data

This Library looks amazing! I have actually have been working on something extremely similar to image and audio data. I was working on open sourcing it in it's own repo but would love to just make it an expansion to datatrove to create a one stop shop for all data processing needs for deep learning. interested if this is something you all are open to? Great repo and great work!

Can't import any datatrove modules

I have tried installing pip install datatrove[all] and pip install datatrove, as well as from source with pip install -e .

Yet it seems like it's not able to load any of the modules.

I'm not sure what's going on

>>> import datatrove.pipeline
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: module 'datatrove' has no attribute 'pipeline'

TypeError: fsspec.spec.AbstractFileSystem.find() got multiple values for keyword argument 'maxdepth'

Happens when fsspec==2023.6.0 i.e. the project is installed without s3fs support pip install -e '.' (because s3fs>=2023.12.2 requires fsspec version 2023.12.2 and the issue is resolved there)

How to reproduce:

  1. Install project without s3fs support with pip install -e '.'
  2. Execute this code:
from datatrove.io import DataFolder
df = DataFolder("/tmp")
df.list_files(glob_pattern="*")
  1. Get exception:
TypeError: fsspec.spec.AbstractFileSystem.find() got multiple values for keyword argument 'maxdepth'

Flexibility in minhash dedup by index

Could we add a new argument to specific whether we want to dedup by index? In some case, we only want to dedup by itself and construct the index (say we want to run 10 tasks in parallel), then run the dedup by index in later tasks.

It seems that the hash index of all datasets must be stored in one folder, so subsequent dataset being processed must be deduped from all the index in the existing folder. Also we cannot specific from which index we want to dedup the current dataset.

Bug in url filter

2024-03-29 08:17:44.786 | INFO | datatrove.executor.local:_launch_run_for_rank:73 - 1/20 tasks completed.
2024-03-29 08:17:44.788 | INFO | datatrove.executor.local:_launch_run_for_rank:73 - 2/20 tasks completed.
multiprocess.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/opt/conda/envs/datatrove/lib/python3.10/site-packages/multiprocess/pool.py", line 125, in worker
result = (True, func(*args, **kwds))
File "/output/datatrove/src/datatrove/executor/local.py", line 68, in _launch_run_for_rank
return self._run_for_rank(rank, local_rank)
File "/output/datatrove/src/datatrove/executor/base.py", line 96, in _run_for_rank
raise e
File "/output/datatrove/src/datatrove/executor/base.py", line 83, in _run_for_rank
deque(pipelined_data, maxlen=0)
File "/output/datatrove/src/datatrove/pipeline/writers/disk_base.py", line 173, in run
for document in data:
File "/output/datatrove/src/datatrove/pipeline/extractors/base.py", line 77, in run
for doc in data:
File "/output/datatrove/src/datatrove/pipeline/filters/base_filter.py", line 50, in run
filter_result, reason = get_filter_result(self.filter(doc))
File "/output/datatrove/src/datatrove/pipeline/filters/url_filter.py", line 87, in filter
self.download_data()
File "/output/datatrove/src/datatrove/pipeline/filters/url_filter.py", line 76, in download_data
tar.extractall(download_dir)
File "/opt/conda/envs/datatrove/lib/python3.10/tarfile.py", line 2264, in extractall
self._extract_one(tarinfo, path, set_attrs=not tarinfo.isdir(),
File "/opt/conda/envs/datatrove/lib/python3.10/tarfile.py", line 2331, in _extract_one
self._handle_fatal_error(e)
File "/opt/conda/envs/datatrove/lib/python3.10/tarfile.py", line 2327, in _extract_one
self._extract_member(tarinfo, os.path.join(path, tarinfo.name),
File "/opt/conda/envs/datatrove/lib/python3.10/tarfile.py", line 2402, in _extract_member
os.makedirs(upperdirs)
File "/opt/conda/envs/datatrove/lib/python3.10/os.py", line 225, in makedirs
mkdir(name, mode)
FileExistsError: [Errno 17] File exists: '/root/.cache/huggingface/assets/datatrove/filters/url_filter/adult'
"""

In-file parallelism

Current parallel strategy assign different files in a directory to different workers.

There are many situations where this may incur load unbalancing, for example, when the input files are irregular in size or the input is one single giant file.

Is it possible to implement the functionality of in-file parallelism? For each file, assign different lines to different workers

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.