Coder Social home page Coder Social logo

run-once's Introduction

Easy distributed task parallelism in Python. Inspired by Joblib but supports multi-machine distribution.

run-once

The package name is a bit of a misnomer. The idea is that you can run the same code multiple times without re-computing everything from scratch. This library supports persistent progress tracking in distributed systems.

pip install run-once

Instantly turn your loops into distributed queues: just wrap any iterable with run_once.DistributedIterator.

task_keys = ['task_001', 'task_002', ...]
for key in DistributedIterator(task_keys):
    ...

You can run this script concurrently on multiple machines (e.g. over ssh, as kubernetes jobs), and the workload will be distributed evenly across all processes. Each iteration is computed exactly once globally regardless of how many times or where the code runs.

Tutorial

Suppose you have a list of string arguments. This could be the names of model checkpoints to evaluate or data you want to preprocess.

task_keys = [f'unique_task_name_{i}' for i in range(100000)]

A typical for-loop:

for key in task_keys:
    ...  # Independent computation. 

We want to 1. run this code on multiple machines simultaneously, 2. evenly distribute the tasks, 3. avoid duplicate work, and 4. easily recover from failure. DistributedIterator achieves (2) and (3). (1) is up to you; run the code as you would normally. (4) happens in-place. Progress is persistent between restarts, so you can simply rerun the script to pick up where you left off.

for key in DistributedIterator(task_keys):
    ... # Keys are claimed on a first-come, first-served basis. Other workers will skip this iteration.

Documentation is available as comments: run_once.py, distlock.proto

Repeating failed iterations

To be able to recover from catastrophic failures, set a timeout to let unconfirmed keys expire. Expired keys will be picked up in the next iteration attempt.

for key in DistributedIterator(task_keys, timeout=60):  
    # Exclusive access to the key is granted for 60 seconds.
    try:
        ...  # This may fail.

        # Claim permanently.
        notify_success(key)
    except:
        # Optional. Request immediate expiration on failure.
        notify_failure(key)
    
    # If neither is called, the key will time out eventually.

There is a decorator version that wraps the try-except block. This is more flexible but makes more RPC calls than the iterator version.

for key in task_keys:
    @run_once.distributed_task(key, timeout=3)
    def work():
        ...  # Throw on failure.

    work()  # The wrapper decides if work() should be skipped or not.

See Usage for configuration. The server needs to listen on a port accessible from where the Python code runs.

Design

The backend is implemented as a key-value lock using LevelDB. Each string key represents a lock. The Python API will send RPC requests for exclusive access to a key. notify_success disables expiration, permanently locking the key. Locked keys stay in the database to serve as "cache-hit" lookup tables. notify_failure removes the key from the database to start over.

  • Implicit: No consumer-producer queue pattern. No worker pool.
  • Fault-tolerant: Reschedule failed tasks in-place.

When do you need this package?

  • Ad-hoc scaling across multiple arbitrary machines.
  • Running unstable code that may fail: fix bug, sync code, rerun the same script and continue where you left off, skipping successful runs.
  • Avoiding duplicate work in distributed, cached pipeline jobs. The keys could be unique identifiers like S3 object URLs.

Caching example

Serialization & deserialization need to be implemented separately. There are a lot of options. This package only provides functionality for task distribution.

# Your own serialization and upload function.
def upload(output, url: str):
    ...

# Your own download and deserialization function.
def download(url: str):
    return ...

for url in s3_urls:
    # Increment the version if you ever need to recompute from scratch.
    @run_once(url, timeout=60, version=0)
    def work():
        output = ...
        upload(output, url)  
        return output
    status, output = work()
    
    if status == Status.SKIP_OK:
        # Cache hit.
        output = download(url)  
    elif status == Status.COMPUTE_OK:
        # Cache miss. `ret` is already the output value of `work()`.
        pass  
    elif status in (Status.SKIP_IN_PROGRESS, Status.COMPUTE_ERROR):
        # Try again later when it is available. Skip for now.
        continue
    else:
        raise NotImplementedError(f'Unrecognized status: {status}')
    
    # Do something with `output`.
    ...

For single-machine task distribution and caching, you can try joblib. You can use both.

Disadvantages

  • Not ideal for short-running (<1s) tasks. Non-local network latency seems to be the bottleneck.

Usage

Install via pip.

pip install run-once

Server

distlock --db=/tmp/testdb 

See --help for all options.

$ ./distlock --help
Distributed lock service.
Usage:
  distlock [OPTION...]

      --db arg          Path to LevelDB database
      --cache_size arg  LRU cache size in MB (default: 200)
      --port arg        HTTP service port (default: 22113)
      --host arg        Hostname (default: 127.0.0.1)
  -h, --help            Print usage

Client

Create ~/.run_once.ini as follows. This step is optional if the server is accessible at 127.0.0.1:22113.

[DEFAULT]
address = <server ip address>
port = <server port>

Optionally, if the server running on a remote machine, consider forwarding a local port.

ssh -N -L 22113:localhost:22113 <host>

Alternatively via mutagen,

mutagen forward create --name=run-once tcp:localhost:22113 <host>:tcp::22113

Development

Build

Install dependencies via python-poetry.

conda create -n run-once python=3.6
conda activate run-once
poetry update

Build and run server.

bash ./build.sh
./cmake-build-release/distlock --db=/tmp/testdb --port=22113

Run tests

pytest -s distlock_test.py
pytest -s run_once_test.py

run-once's People

Contributors

daeyun avatar

Watchers

 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.