Coder Social home page Coder Social logo

liteq's Introduction

liteq

Lightweight Portable Message Queue Using SQLite

R-CMD-check CRAN RStudio mirror downloads Codecov test coverage

Temporary and permanent message queues for R. Built on top of SQLite databases. 'SQLite' provides locking, and makes it possible to detect crashed consumers. Crashed jobs can be automatically marked as "failed", or put back in the queue again, potentially a limited number of times.

Installation

Stable version:

install.packages("liteq")

Development versiot:

pak::pak("r-lib/liteq")

Introduction

liteq implements a serverless message queue system in R. It can handle multiple databases, and each database can contain multiple queues.

liteq uses SQLite to store a database of queues, and uses other, temporary SQLites databases for locking, and finding crashed workers (see below).

Usage

Basic usage

library(liteq)

In the following we create a queue in a temporary queue database. The database will be removed if the R session quits.

db <- tempfile()
q <- ensure_queue("jobs", db = db)
q
#> liteq queue 'jobs'
list_queues(db)
#> [[1]]
#> liteq queue 'jobs'

Note that ensure_queue() is idempotent, if you call it again on the same database, it will return the queue that was created previously. So it is safe to call it multiple times, even from multiple processes. In case of multiple processes, the locking mechanism eliminates race conditions.

To publish a message in the queue, call publish() on the queue object:

publish(q, title = "First message", message = "Hello world!")
publish(q, title = "Second message", message = "Hello again!")
list_messages(q)
#>   id          title status
#> 1  1  First message  READY
#> 2  2 Second message  READY

A liteq message has a title, which is a string scalar, and the message body itself is a string scalar as well. To use more complex data types in messages, you need to serialize them using the serialize() function (set ascii to TRUE!), or convert them to JSON with the jsonlite package.

Two functions are available to consume a message from a queue. try_consume() returns immediately, either with a message (liteq_message object), or NULL if the queue is empty. The consume() function blocks if the queue is empty, and waits until a message appears in it.

msg <- try_consume(q)
msg
#> liteq message from queue 'jobs':
#>   First message (12 B)

The title and the message body are available as fields of the message object:

msg$title
#> [1] "First message"
msg$message
#> [1] "Hello world!"

When a consumer is done processing a message it must call ack() on the message object, to notify the queue that it is safe to remove the message. If the consumer fails to process a message, it can call nack() (negative ackowledgement) on the message object. Then the status of the message will be set to "FAILED". Failed messages can be removed from the queue, or put back in the queue again, depending on the application.

ack(msg)
list_messages(q)
#>   id          title status
#> 1  2 Second message  READY
msg2 <- try_consume(q)
nack(msg2)
list_messages(q)
#>   id          title status
#> 1  2 Second message FAILED

The queue is empty now, so try_consume() returns NULL:

try_consume(q)
#> NULL

Crashed workers

If a worker crashes without calling either ack() or nack() on a message, then this messages will be put back in the queue the next time a message is requested from the queue.

To make this possible, each delivered message keeps an open connection to a lock file, and crashed workers are found by the absense of this open connection. In R basically means that the worker is considered as crashed if the R process has no reference to the message object.

Note, that this also means that having many workers at the same time means that it is possible to reach the maximum number of open connections by R or the operating system.

License

MIT © Gábor Csárdi

liteq's People

Contributors

eddelbuettel avatar enchufa2 avatar gaborcsardi avatar krlmlr avatar rentrop avatar wlandau-lilly avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

liteq's Issues

Log?

Write an event log into a table.

Upkeep for liteq (2023)

Pre-history

  • usethis::use_readme_rmd()
  • usethis::use_roxygen_md()
  • usethis::use_github_links()
  • usethis::use_pkgdown_github_pages()
  • usethis::use_tidy_github_labels()
  • usethis::use_tidy_style()
  • urlchecker::url_check()

2020

  • usethis::use_package_doc()
  • usethis::use_testthat(3)
  • Align the names of R/ files and test/ files

2021

  • Remove check environments section from cran-comments.md
  • Use lifecycle instead of artisanal deprecation messages

2022

  • Handle and close any still-open master --> main issues
  • usethis:::use_codecov_badge("r-lib/liteq")
  • Update pkgdown site using instructions at https://tidytemplate.tidyverse.org
  • Update lifecycle badges with more accessible SVGs: usethis::use_lifecycle()

2023

  • Update copyright holder in DESCRIPTION: person("Posit Software, PBC", role = c("cph", "fnd"))
  • Run devtools::document() to re-generate package-level help topic with DESCRIPTION changes
  • usethis::use_tidy_logo()
  • usethis::use_tidy_coc()
  • Use pak::pak("r-lib/liteq") in README
  • Consider running usethis::use_tidy_dependencies() and/or replace compat files with use_standalone()
  • Use cli errors or file an issue if you don't have time to do it now
  • usethis::use_standalone("r-lib/rlang", "types-check") instead of home grown argument checkers;
    or file an issue if you don't have time to do it now
  • Add alt-text to pictures, plots, etc; see https://posit.co/blog/knitr-fig-alt/ for examples

Eternal

  • usethis::use_mit_license()
  • usethis::use_package("R", "Depends", "3.6")
  • usethis::use_tidy_description()
  • usethis::use_tidy_github_actions()
  • devtools::build_readme()
  • Re-publish released site if needed

Created on 2023-11-03 with usethis::use_tidy_upkeep_issue(), using usethis v2.2.2.9000

Configurable behavior on crashed consumers

E.g.

  • reque (current behavior)
  • set to FAILED
  • retry a number of times and then set to FAILED if still fails

Maybe also on negative ACKs.

Also think about what to do with failed messages. E.g.

  • remove_failed
  • requeue_failed
  • list_failed

or something like this.

Publish several messages in one transaction

Is there any reason for limiting publish to one message at a time? I'm using liteq to feed cron jobs to a set of parallel workers, so I find myself doing this:

for (i in seq_along(x))
  publish(q, x[i], y[i])

which apparently may cause concurrency issues, because I lose some jobs. It would be better to publish all tasks in one transaction if there are no technical issues that I'm not aware of.

Possible race in ensure_queue() path

I think one of the steps in ensure_queure() may need a transactional guard of some sort. When I fork 4 workers and try to have them all start a queue at once I sometimes get some error messages. I have my example below (been running it on OSX on hybrid flash/disk filesystem).

library("liteq")

dir <- tempdir()
db <- "f.db"

ncores <- parallel::detectCores()
parallelCluster <- parallel::makeCluster(ncores)

orig_dir <- getwd()

## doesn't fix the database is locked issue
# setwd(dir)
# ensure_queue("jobs", db = db)

mk_fn <- function(dir, db) {
    force(dir)
    force(db)
     function(i) {
        library("liteq")
        setwd(dir)
        # Sys.sleep(i)  # work-around
        ensure_queue("jobs", db = db)
    }
}

f <- mk_fn(dir, db)
parallel::parLapply(parallelCluster, seq_len(ncores), f)

# Sometimes get the following error msgs:

# Error in checkForRemoteErrors(val) : 
#  3 nodes produced errors; first error: table meta already exists

# Or sometimes:

# Error in checkForRemoteErrors(val) : 
#    2 nodes produced errors; first error: database is locked

setwd(orig_dir)
parallel::stopCluster(parallelCluster)

Package status?

Just discovered this little gem of a package, but I see that the latest update is a while back. Is the package considered to be stable and feature-complete or was it dropped in favor of some other approach?

BLocking consume()

Although I am not sure how to implement it without semi-busy-waiting.

Consider exposing poll_interval to the user

I would like the option to set poll_interval to something smaller than the current default (half a second). I expect this to speed up workflows with a large number of small jobs.

Race condition when one process publishes and another consumes

In the liteq branch of workers, one process publishes messages to a queue and another process consumes them from the same queue/db. When I emulate this scenario with two parallel interactive R sessions, I encounter race conditions. What am I missing?

Session 1:

library(liteq)
q <- ensure_queue("q", db = "db.txt")

Session 2:

library(liteq)
q <- ensure_queue("q", db = "db.txt")

Session 1:

while (TRUE){
  publish(q, title = "title", message = "message")
}

Session 2:

while (TRUE){
  msg <- consume(q)
  ack(msg)
}

Session 1:

#> Error in rsqlite_send_query(conn@ptr, statement) : database is locked

traceback()
#> 17: stop(list(message = "database is locked", call = rsqlite_send_query(conn@ptr,
#>         statement), cppstack = list(file = "", line = -1L, stack = ...
#> 16: .Call(RSQLite_rsqlite_send_query, con, sql)
#> 15: rsqlite_send_query(conn@ptr, statement)
#> 14: initialize(value, ...)
#> 13: initialize(value, ...)
#> 12: new("SQLiteResult", sql = statement, ptr = rsqlite_send_query(conn@ptr,
#>         statement), conn = conn)
#> 11: .local(conn, statement, ...)
#> 10: dbSendQuery(conn, statement, ...)
#> 9: dbSendQuery(conn, statement, ...)
#> 8: dbSendStatement(conn, statement, ...)
#> 7: dbSendStatement(conn, statement, ...)
#> 6: dbExecute(con, sqlInterpolate(con, query, ...))
#> 5: dbExecute(con, sqlInterpolate(con, query, ...))
#> 4: db_execute(con, query, ...)
#> 3: do_db_execute(db, "INSERT INTO ?tablename (title, message)\n     VALUES (?title, ?messag                                                                                                  #> e)",
#>        tablename = db_queue_name(queue), title = title, message = message)
#> 2: db_publish(queue$db, queue$name, title, message)
#> 1: publish(q, title = "title", message = "message")

Move `master` branch to `main`

The master branch of this repository will soon be renamed to main, as part of a coordinated change across several GitHub organizations (including, but not limited to: tidyverse, r-lib, tidymodels, and sol-eng). We anticipate this will happen by the end of September 2021.

That will be preceded by a release of the usethis package, which will gain some functionality around detecting and adapting to a renamed default branch. There will also be a blog post at the time of this master --> main change.

The purpose of this issue is to:

  • Help us firm up the list of targetted repositories
  • Make sure all maintainers are aware of what's coming
  • Give us an issue to close when the job is done
  • Give us a place to put advice for collaborators re: how to adapt

message id: euphoric_snowdog

Still issues with high concurrency

#25 is a nice feature, but still doesn't fix issues with high concurrency (probably #23 too), it just postpones them, because nothing prevents the user from calling publish several times in a row. Here's an example. In my machine, I consistently get this:

library(liteq)

task <- function(db) {
  library(liteq)
  q <- ensure_queue("jobs", db = db)
  msg <- consume(q)
  out <- msg$title
  Sys.sleep(10)
  ack(msg)
  out
}

db <- tempfile()
q <- ensure_queue("jobs", db = db)

n <- 10
workers <- replicate(n, callr::r_bg(task, list(db=db)))

publish(q, letters[1:(n/2)], rep("something", n/2))
Sys.sleep(3)
publish(q, letters[1:(n/2)], rep("something", n/2))
#> Error: database is locked
while (nrow(print(list_messages(q))))
  Sys.sleep(0.2)
#> Error: database is locked

I think that the problem is that not all writers acquire the lock. Both ensure_queue and publish are writers. So we have two points of failure: (1) concurrent calls to ensure_queue when workers are starting up and (2) concurrent calls to publish + consume. Then, concurrent calls to db_lock seem to be problematic too. Maybe this function should perform some kind of exponential back-off?

Close connection?

When I am trying (simple enough) things in littler, things end on:

Warning message:
call dbDisconnect() when finished working with a connection

The queue object creator does not share the connection with me. Should it? Or should there be a disconnect helper?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.