Coder Social home page Coder Social logo

swirl's Introduction

Swirl

A simple, efficient background work queue for Rust

Swirl is a background work queue built on Diesel and PostgreSQL's row locking features. It was extracted from crates.io, which uses it for updating the index off the web server.

This library is still in its early stages, and has not yet reached 0.1 status. We're using it successfully in production on crates.io today, but there are still several things missing that you may want from a job queue.

Getting Started

Swirl stores background jobs in your PostgreSQL 9.5+ database. As such, it has migrations which need to be run. At the moment, this should be done by copying our migrations directory into your own. This will be improved before the crate is released.

Jobs in Swirl are defined as functions annotated with #[swirl::background_job], like so:

#[swirl::background_job]
fn resize_image(file_name: String, dimensions: Size) -> Result<(), swirl::PerformError> {
    // Do expensive computation that shouldn't be done on the web server
}

All arguments must implement serde::Serialize and serde::DeserializeOwned. Jobs can also take a shared "environment" argument. This is a struct you define, which can contain resources shared between jobs like a connection pool, or application level configuration. For example:

struct Environment {
    file_server_private_key: String,
    http_client: http_lib::Client,
}

#[swirl::background_job]
fn resize_image(
    env: &Environment,
    file_name: String,
    dimensions: Size,
) -> Result<(), swirl::PerformError> {
    // Do expensive computation that shouldn't be done on the web server
}

Note that all jobs must use the same type for the environment. Once a job is defined, it can be enqueued like so:

resize_image(file_name, dimensions).enqueue(&diesel_connection)?

You do not pass the environment when enqueuing jobs. Jobs are run asynchronously by an instance of swirl::Runner. To construct one, you must first pass it the job environment (this is () if your jobs don't take an environment), and a Diesel connection pool (from diesel::r2d2).

let runner = Runner::builder(environment, connection_pool)
    .build();

At the time of writing, it is up to you to make sure your connection pool is well configured for your runner. Your connection pool size should be at least as big as the thread pool size (defaults to the number of CPUs on your machine), or double that if your jobs require a database connection.

Once the runner is created, calling run_all_pending_jobs will continuously saturate all available threads, attempting to run one job per thread at a time. It will return Ok(()) once at least one thread has reported there were no jobs available to run, or an error if a job fails to start running. Note that this function does not know or care if a job completes successfully, only if we were successful at starting to do work. Typically this function should be called in a loop:

loop {
    if let Err(e) = runner.run_all_pending_jobs() {
        // Something has gone seriously wrong. The database might be down,
        // or the thread pool may have died. We could just try again, or
        // perhaps rebuild the runner, or crash/restart the process.
    }
}

In situations where you have low job throughput, you can add a sleep to this loop to wait some period of time before looking for more jobs.

When a job fails (by returning an error or panicking), it will be retried after 1 ^ {retry_count} minutes. If a job fails or an error occurs marking a job as finsihed/failed, it will be logged to stderr. No output will be sent when jobs are running successfully.

Swirl uses at least once semantics. This means that we guarantee all jobs are successfully run to completion, but we do not guarantee that it will do so only once, even if the job successfully returns Ok(()). Therefore, it is important that all jobs are idempotent.

Upcoming features

Planned features that are not yet implemented are:

  • Automatic configuration of the DB connection pool
  • Allowing jobs to take a database connection as an argument
    • If your jobs need a DB connection today, put the connection pool on your environment.
  • More robust and configurable logging
  • Configurable retry behavior
  • Support for multiple queues with priority
  • Less boilerplate in the job runner

Code of conduct

Anyone who interacts with Swirl in any space, including but not limited to this GitHub repository, must follow our code of conduct.

License

Licensed under either of these:

Contributing

Unless you explicitly state otherwise, any contribution you intentionally submit for inclusion in the work, as defined in the Apache-2.0 license, shall be dual-licensed as above, without any additional terms or conditions.

swirl's People

Contributors

sgrif avatar danbruder avatar

Stargazers

Rafael Carício avatar Georges KABBOUCHI avatar Ethan Lerner avatar german gömez avatar Prabir Shrestha avatar wood avatar Romain Baumier avatar Alex avatar Shane Sveller avatar Ondřej Pešek avatar Yannick Heinrich avatar Jesús Rubio avatar Daragh Casey avatar  avatar Matt Hinz avatar Horace Williams avatar Shayon Mukherjee avatar pam avatar XVI  avatar 许杰友 Jieyou Xu (Joe) avatar Lucas avatar Vytk avatar Michael Cheng avatar Deskaunedsei avatar Maxwell Koo avatar Sven avatar Flux Xu avatar Filippos Vasilakis avatar Kushal Pisavadia avatar Justin Ryan Hurstwright avatar Corey Alexander avatar Gal Schlezinger avatar Nate Strandberg avatar GAURAV avatar Dylan Frankland avatar Luke Paireepinart avatar Zhao Xiaohong avatar Omer Katz avatar Rotem Yaari avatar Samuel Paquette avatar Ch. (Chanwhi Choi) avatar Pavel Pletenev avatar Coleman McFarland avatar Paolo Barbolini avatar  avatar Steven Ferrer avatar anoriqq avatar Alexis H. Munsayac avatar Rafael avatar Medson Oliveira avatar Oleksii Filonenko avatar Hoang Phan avatar Shirshak avatar Yuki Okushi avatar Sora Morimoto avatar Yuki Kodama avatar  avatar 0x1793d1 avatar Mateusz "Serafin" Gajewski avatar Vlad Frolov avatar Seth Lopez avatar Jonathan Fok kan avatar Alexander Mann avatar Christoph Grabo avatar Romain Janvier avatar George Hahn avatar Emil Lauridsen avatar Łukasz Korecki avatar  avatar Songtronix avatar Antoninko avatar gc-o avatar Juri Hahn avatar MB avatar Katharina Fey avatar mark avatar Hussein Morsy avatar Joshua Miller avatar D.Loh avatar Scott Raine avatar Basel Ajarmeh avatar Andrejs Agejevs avatar Ivo Georgiev avatar Jean Mertz avatar Rob Ede avatar Bernard Laveaux avatar Gopal Sharma avatar Curtis Spencer avatar Bill Brower avatar Robin Appelman avatar Ernad Halilović avatar asonix avatar erin petra sofiya moon avatar Ana Gelez avatar Val Packett avatar  avatar turkeyzawa avatar Wesley Moore avatar  avatar David Pedersen avatar

Watchers

Dominik Sander avatar Michiel Kalkman avatar Ruben Gonzalez avatar Ash Guy avatar James Cloos avatar Joshua Miller avatar asonix avatar Paolo Barbolini avatar Tim Süberkrüb avatar  avatar

swirl's Issues

Bad logs when we cannot retrieve a job

When we fail to retrieve a job, we're passing this error back to run_all_pending_jobs and then returning RollbackTransaction. We then panic and log that error.

Does not work in `--release` builds (anymore?)

Running cargo run --release --bin job-runner I get the following error:

Job 36 failed to run: Unknown job type scale_image

Running the debug binary, everything works as expected and the job method is executed. Am I holding this wrong?

jobs::images] scale_image: from: file:///var/swirl-sample/images/c73f6708-5942-4eab-844c-dd00eaa4ff51.jpeg, to: file:///var/swirl-sample/images/scaled/c73f6708-5942-4eab-844c-dd00eaa4ff51.jpeg, dimension: Dimension { width: 400, height: 1200 }

Environment

  • rustc 1.56.1 (59eed8a2a 2021-11-01)
  • cargo 1.56.0 (4ed5d137b 2021-10-04)
  • swirl = { git = "https://github.com/sgrif/swirl.git", branch = "master" }

Allow grouping of jobs (high-level API concept)

I've been thinking about a few abstractions that may be helpful. I'll focus on the current set of jobs we have on crates.io, but my main insight is that database connections are probably the most limited resource. Of course memory and thread usage matter as well, but swirl should make it simple to share a single small database connection pool across a set of jobs with varying, and possibly intertwined, behavior.

My proposal is for the ability to create Job groups. Each group of 1 or more jobs is responsible for scheduling its jobs. Each job belongs to only one group. Groups could also be used to set behavior that applies to a set of jobs, such as timeouts or conditions for sending alerts.

Group Types

I'm proposing 3 initial group types:

Queue

A queue of jobs run serially. When running a job a connection will be taken from the pool, but at most a single connection will be used by the queue at any given time. Swirl could potentially even provide a facility to ensure this queuing behavior holds even if multiple swirl instances are run in parallel (although I would consider that out of scope for now).

This group type would be applicable for jobs that interact with a global singleton resource.

For our publish and yank jobs, that resource is a git repository. There is no point in attempting to spawn multiple index update jobs in parallel. If 2 or more jobs from this queue were spawned at the same time, the additional jobs would block waiting on the mutex while holding onto extra database connections that could be utilized by other jobs.

Possible customization:

  • On job failure, we could retry the failed job (with backoff), rather than move on to the next job in the queue. This way index updates are done in chronological order, even if there is an intermittent network issue. (The downside would be that if a job fails for a non-network reason, then it would block following jobs in the queue which could possibly make progress.)
  • Alerting: If the queue of index jobs hasn't been drained in the last 5 minutes, then alert, GitHub might be down. (Currently we alert if any job (globally) remains in the queue for too long, but different limits could potentially be scoped to individual job groups.)

Parallel

A set of independent jobs which can be run in parallel. This is basically the current behavior.

On crates.io the readme rendering would fit in this group. This group type would also be useful for sending emails in the background.

Repeated

Similar to a queue, this group would run its job in series, but swirl could also automatically handle enqueuing the next repetition after the current job finishes.

On crates.io, the job to update download counts would fit into this group.

Summary

This is a rough outline of possible ideas, but I think it makes sense to provide some mechanism for the developer to customize how jobs are scheduled and to set different alerting rules to a group of jobs. We probably want to alert more promptly during a GitHub outage than we do if there is a delay in publishing readmes to S3.

Currently, if a batch of crates are published during a GitHub outage where git operations end up hitting a network timeout (where say each job pauses for 30 seconds before failing), then the index jobs can potentially starve out other jobs from obtaining a database connection. By defining a queue of related jobs, effectively letting swirl know about our internal Mutex, swirl could schedule these jobs more efficiently.

Panicked threads > 0 when a job fails to load

Found when updating crates.io to de5d8bb. Loosely related to #7. The error occurs for us when we've set the database in read-only mode. We still want to return an error from run_all_pending_jobs, but if the job count was 0 to begin with, assert_no_failed_jobs should return successfully.

We also need to expose an API that folks can use to make sure there are no pending jobs in the DB. This is tricky, since we need this to work without SELECT FOR UPDATE (both in case it's unavailable, but also to make this safe to run inside a transaction), but we can't do SKIP LOCKED without it. I need to think that API through a bit more from a testing and monitoring production point of view.

Should jobs taking to long to run be separated from jobs failing to start in the API?

crates.io recently had a brief minor incident caused by moving a slow cron task over to a background job. The reason for moving this to swirl was to get better monitoring (we already page if swirl jobs are in the queue for too long, we'd have to build similar infra for these tasks, which don't have as clear hooks), and more convenient development (a lot of these standalone binaries have the same boilerplate which is handled by swirl or our environment already, it's nice to have a standalone enqueue-job binary which we call from these tasks).

The issue was that our runner was on a single thread, with a 10s timeout. This task consistently took longer than 10s, so as soon as the runner encountered this job, it would start running the job, get the message it was running, loop again, find that there were 0 threads available but also 0 pending messages, so try to spawn another worker anyway, and then time out because the only way it'd get a message from that worker is if the other job finished within the timeout.

There were several relatively simple fixes in crates.io for this, all of which are being implemented. We shouldn't have been on a single thread at this point, our timeout was too low, and we should have tried building a new runner a few times before crashing the process (which means jobs on the discarded runner still have a chance to complete).

However, this does raise the question as to whether we should expose "we couldn't start running a new job because jobs which are currently (or maybe not, all systems are fallible) running have taken longer than the given timeout" separately from "I tried to start a new job, but didn't get a response in the given timeout". The second case should basically never occur. The behavior that exists today (spawning a new task on the thread pool even if no threads are available) is definitely intentional, though I don't think I necessarily really thought through the implications of that design.

The steps taken between spawning a worker and listening for a message back are:

  • Call execute on the thread pool
    • This is basically the only part that would ever time out. If we're trying to detect not being able to start a job due to the pool being saturated, we'll probably spin-lock checking if we've passed a timeout rather than spawning a new task and waiting to hear back on our channel
  • Get a connection from the DB pool
    • If this ever fails, we'd return NoDatabaseConnection today, not NoMessageReceived. If we did want to separate slow jobs blocking the queue from jobs not running for other reasons, we'd need to treat a timeout here as a job running too long though, even though that may be incorrect since the DB might be down and we're just failing to establish new connections... But it might also be that we've got a DB pool size of < thread pool size x2 or jobs are using >2 connections each, so a timeout very much can be a job consuming all resources...
  • Query the DB for a job
    • Since we're using SKIP LOCKED, this should only time out under network conditions we can reasonably assume won't happen for swirl clients, or some exclusive lock on the table (I'm actually not even sure what the behavior of FOR UPDATE SKIP LOCKED is when some blocking schema change like adding a column with a default is being performed...) In short, we can reasonably assume this only times out under pathological circumstances, and hopefully the statement timeout is much shorter than the job timeout, so the user should receive FailedLoadingJob instead of NoMessageReceived anyway.

None of these cases should ever fail under normal circumstances. So on the one hand, you probably want a separate timeout/error case for slow jobs clogging the queue vs jobs failing to run for other reasons, since one you can just wait for longer while the other should never happen. On the other hand, one of these cases should never happen, which implies we can just have the single timeout, which can be assumed to mean slow jobs if it does occur in practice...

At this point I'm going back and forth on this, and would love some outside opinions. I don't really see a downside to just assuming that a timeout here is caused by slow jobs filling the queue, since really no other case should ever cause that to happen.

Sqlite support

Hello! I would just like to know if you plan to support SQlite at some point? This crate looks really nice and I would like to use it, but my project should support both PostgreSQL and SQlite.

Thanks!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.