Coder Social home page Coder Social logo

tokahuke / yaque Goto Github PK

View Code? Open in Web Editor NEW
76.0 4.0 12.0 192 KB

Yaque is yet another disk-backed persistent queue for Rust.

License: Other

Rust 100.00%
rust rust-lang queue persistent-storage persistence resilience disk filesystem asynchronous async

yaque's Introduction

crates.io docs.rs

Yaque: Yet Another QUEue

Yaque is yet another disk-backed persistent queue (and mutex) for Rust. It implements an SPSC channel using your OS' filesystem. Its main advantages over a simple VecDeque<T> are that

  • You are not constrained by your RAM size, just by your disk size. This means you can store gigabytes of data without getting OOM killed.
  • Your data is safe even if you program panics. All the queue state is written to the disk when the queue is dropped.
  • Your data can persist, that is, can exist through multiple executions of your program. Think of it as a very rudimentary kind of database.
  • You can pass data between two processes.

Yaque is asynchronous and built directly on top of mio and notify. It is therefore completely agnostic to the runtime you are using for you application. It will work smoothly with tokio, with async-std or any other executor of your choice.

Sample usage

To create a new queue, just use the channel function, passing a directory path on which to mount the queue. If the directory does not exist on creation, it (and possibly all its parent directories) will be created.

use yaque::channel;

futures::executor::block_on(async {
    let (mut sender, mut receiver) = channel("data/my-queue").unwrap();
})

You can also use Sender::open and Receiver::open to open only one half of the channel, if you need to.

The usage is similar to the MPSC channel in the standard library, except that the receiving method, Receiver::recv is asynchronous. Writing to the queue with the sender is basically lock-free and atomic.

use yaque::{channel, queue::try_clear};

futures::executor::block_on(async {
    // Open using the `channel` function or directly with the constructors.
    let (mut sender, mut receiver) = channel("data/my-queue").unwrap();
    
    // Send stuff with the sender...
    sender.send(b"some data").await.unwrap();

    // ... and receive it in the other side.
    let data = receiver.recv().await.unwrap();

    assert_eq!(&*data, b"some data");

    // Call this to make the changes to the queue permanent.
    // Not calling it will revert the state of the queue.
    data.commit();
});

// After everything is said and done, you may delete the queue.
// Use `clear` for awaiting for the queue to be released.
try_clear("data/my-queue").unwrap();

The returned value data is a kind of guard that implements Deref and DerefMut on the underlying type.

queue::RecvGuard and transactional behavior

One important thing to notice is that reads from the queue are transactional. The Receiver::recv returns a queue::RecvGuard that acts as a dead man switch. If dropped, it will revert the dequeue operation, unless queue::RecvGuard::commit is explicitly called. This ensures that the operation reverts on panics and early returns from errors (such as when using the ? notation). However, it is necessary to perform one more filesystem operation while rolling back. During drop, this is done on a "best effort" basis: if an error occurs, it is logged and ignored. This is done because errors cannot propagate outside a drop and panics in drops risk the program being aborted. If you have any cleanup behavior for an error from rolling back, you may call queue::RecvGuard::rollback which will return the underlying error.

Batches

You can use the yaque queue to send and receive batches of data , too. The guarantees are the same as with single reads and writes, except that you may save on OS overhead when you send items, since only one disk operation is made. See Sender::send_batch, Receiver::recv_batch and Receiver::recv_until for more information on receiver batches.

Tired of .awaiting? Timeouts are supported

If you need your application to not stall when nothing is being put on the queue, you can use Receiver::recv_timeout and Receiver::recv_batch_timeout to receive data, awaiting up to a completion of a provided future, such as a delay or a channel. Here is an example:

use yaque::channel;
use std::time::Duration;
use futures_timer::Delay;

futures::executor::block_on(async {
    let (mut sender, mut receiver) = channel("data/my-queue-2").unwrap();
    
    // receive some data up to a second
    let data = receiver
        .recv_timeout(Delay::new(Duration::from_secs(1)))
        .await
        .unwrap();

    // Nothing was sent, so no data...
    assert!(data.is_none());
    drop(data);
    
    // ... but if you do send something...
    sender.send(b"some data").await.unwrap();
 
    // ... now you receive something:
    let data = receiver
        .recv_timeout(Delay::new(Duration::from_secs(1)))
        .await
        .unwrap();

    assert_eq!(&*data.unwrap(), b"some data");  
});

Ctrl+C and other unexpected events

First of all, "Don't panic©"! Writing to the queue is an atomic operation. Therefore, unless there is something really wrong with your OS, you should be fine in terms of data corruption most of the time.

In case of a panic (the program's, not the programmer's), the queue is guaranteed to save all the most up-to-date metadata for the receiver. For the reader it is even simpler: there is nothing to be saved in the first place. The only exception to this guarantee is if the saving operation fails due to an IO error. Remember that the program is not allowed to panic during a panic. Therefore in this case, yaque will not attempt to recover from an error.

The same thing cannot be said from OS signals. Signals from the OS are not handled automatically by this library. It is understood that the application programmer knows best how to handle them. If you chose to close queue on Ctrl+C or other signals, you are in luck! Saving both sides of the queue is async-signal-safe so you may set up a bare signal hook directly using, for example, signal_hook(https://docs.rs/signal-hook/), if you are the sort of person that enjoys unsafe code. If not, there are a ton of completely safe alternatives out there. Choose the one that suits you the best.

Unfortunately, there are also times when you get aborted or killed. These signals cannot be handled by any library whatsoever. When this happens, not everything is lost yet. We provied a whole module, recovery, to aid you in automatic queue recovery. Please check the module for the specific function names. From an architectural perspective, we offer two different approaches to queue recovery, which may be suitable to different use cases:

  1. Recover with replay (the standard): we can reconstruct a lower bound of the actual state of the queue during the crash, which consists of the maximum of the following two positions:
    • the bottom of the smallest segment still present in the directory.
    • the position indicated in the metadata file.

Since this is a lower bound, some elements may be replayed. If your processing is idempotent, this will not be an issue and you lose no data whatsoever.

  1. Recover with loss: we can also reconstruct an upper bound for the actual state of the queue: the bottom of the second smallest segment in the queue. In this case, the smallest segment is simply erased and the receiver caries on as if nothing has happened. If replays are intollerable, but some data loss is, this might be the right alternative for you. You can limit data loss by constraining the segment size, configuring this option on SenderBuilder.

If you really want to err on the safer side, you may use Receiver::save to periodically back the receiver state up. Just choose you favorite timer implementation and set a simple periodical task up every hundreds of milliseconds. However, be warned that this is only a mitigation of consistency problems, not a solution.

Known issues and next steps

  • This is a brand new project. Although I have tested it and it will certainly not implode your computer, don't trust your life on it yet. This code is running in production for non-critical applications.
  • Wastes too much kernel time when the queue is small enough and the sender sends many frequent small messages non-atomically. You can mitigate that by writing in batches to the queue.
  • There are probably unknown bugs hidden in some corner case. If you find one, please fill an issue on GitHub. Pull requests and contributions are also greatly appreciated.

yaque's People

Contributors

evaporei avatar grant0417 avatar mullr avatar netguy204 avatar tokahuke avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

yaque's Issues

Typos

Typos, typos everywhere!

Handling of ctrl+c with save is unclear

Hi maybe I am just doing something wrong but I tried to handle ctlrl_c and ran into this:

error[E0499]: cannot borrow `receiver` as mutable more than once at a time
   --> crates/erooster_smtp/src/servers/mod.rs:86:25
    |
84  | /                 tokio::select! {
85  | |                     _ = signal::ctrl_c() => {
86  | |                         receiver.save().expect("Unable to save queue");
    | |                         ^^^^^^^^ second mutable borrow occurs here
87  | |                     },
88  | |                     data = receiver.recv() => {
    | |                            -------- first mutable borrow occurs here
...   |
112 | |                     }
113 | |                 }
    | |_________________- first borrow might be used here, when `output` is dropped and runs the destructor for type `servers::start::{closure#0}::{closure#0}::__tokio_select_util::Out<std::result::Result<(), std::io::Error>, std::result::Result<erooster_deps::yaque::queue::RecvGuard<'_, std::vec::Vec<u8>>, std::io::Error>>`

with this code:

// Start listening for tasks
    let mut receiver = ReceiverBuilder::default()
        .save_every_nth(None)
        .open(config.task_folder.clone());
    if let Err(e) = receiver {
        warn!("Unable to open receiver: {:?}. Trying to recover.", e);
        recover(&config.task_folder)?;
        receiver = ReceiverBuilder::default()
            .save_every_nth(None)
            .open(config.task_folder.clone());
        info!("Recovered queue successfully");
    }

    match receiver {
        Ok(mut receiver) => {
            loop {
                tokio::select! {
                    _ = signal::ctrl_c() => {
                        receiver.save().expect("Unable to save queue");
                    },
                    data = receiver.recv() => {
                        match data {
                            Ok(data) => {
                                let email_bytes = &*data;
                                let email_json = serde_json::from_slice::<EmailPayload>(email_bytes).expect("Unable to parse email payload json");

                                if let Err(e) = send_email_job(&email_json).await {
                                    tracing::error!(
                                        "Error while sending email: {:?}. Adding it to the queue again",
                                        e
                                    );
                                    // FIXME: This can race the lock leading to an error. We should
                                    //        probably handle this better.
                                    let mut sender = Sender::open(config.task_folder.clone()).expect("Unable to open queue sender");
                                    let json_bytes = serde_json::to_vec(&email_json).expect("Unable to convert email to bytes");
                                    sender.send(json_bytes).await.expect("Unable to add email to queue");
                                }
                                // Mark the job as complete
                                data.commit().expect("Unable to commit data");
                            }
                            Err(e) => {
                                tracing::error!("Error while receiving data from receiver: {:?}", e);
                            }
                        }
                    }
                }
            }
        }
        Err(e) => {
            error!("Unable to open receiver: {:?}. Giving up.", e);
        }
    }

The error is obvious and makes sense, but I wonder what is a better way of handling this in a way where I save on crash :)

Can't recv an empty message

Here's a failing test case:

    #[test]
    fn test_try_recv_empty_msg() {
        // Populate a queue:
        let mut sender = SenderBuilder::new()
            .segment_size(512)
            .open("data/try-recv-empty-msg")
            .unwrap();

        sender.try_send(&[]).unwrap();

        let mut receiver = Receiver::open("data/try-recv-empty-msg").unwrap();

        let item = receiver.try_recv().unwrap();
        assert_eq!(&*item, &[]);
        item.commit().unwrap();
    }

The actual behavior is that try_recv returns Err(TryRecvError::QueueEmpty)

Behaviour of `commit` not clear - does commiting also clear the data in the queue?

In this example, we just send and receive to a yaque. After receiving, we commit on the RecvGuard.

Is it intentional that on the next start of the program, the yaque still contains all the previously recvd data?

Running this example twice prints the bytes from the first run during the second run even though they were committed in the first run.

use std::time::Duration;
use yaque::recovery::recover;

#[tokio::main]
async fn main() {
    recover("persistent-data").ok();

    let (mut sender, mut receiver) = yaque::channel("persistent-data").unwrap();

    let mut interval = tokio::time::interval(Duration::from_secs(1));
    let mut counter = 0u64;
    loop {
        tokio::select! {
            _tick = interval.tick() => {
                println!("Enqueuing {counter}");
                let Ok(_) = sender.send(format!("Message {counter}")).await else {
                    break;
                };
                counter += 1;
            }
            guard = receiver.recv() => {
                println!("Got batch");
                let Ok(contents) = guard else {
                    println!("Breaking because guard not OK");
                    break;
                };
                // println!("{}", String::from_utf8((&mut *contents).drain(..).flatten().collect::<Vec<_>>()).unwrap());
                println!("{:?}", String::from_utf8(contents.to_vec()));
                contents.commit().unwrap();
                println!("committed");
            }
        }
    }
}

fuzz failed

thread '<unnamed>' panicked at /tmp/tmp.yvD7plPUDn/yaque/src/queue/receiver.rs:253:9:
There were read and unused items at the end of transaction. Read and unused queue: [[85, 85]]

Output of `std::fmt::Debug`:

        Scenario {
            commands: [
                SendBatch(
                    [
                        [
                            85,
                            85,
                        ],
                    ],
                ),
                Send(
                    [],
                ),
                RecvBatch(
                    14189153571838739689,
                ),
                RecvBatch(
                    0,
                ),
            ],
        }

`xor` is not xor

xor in header.rs does not actually compute the exclusive or of a and b. Instead it checks that a and b are either both true or both false, which is equivalent to a == b. I don't understand what decode does well enough to know if this is the intended behavior or not, but either the name or the implementation is currently wrong. If you do actually need xor there's the ^ operator and the BitXor trait.

Sender side already in use after Ctrl+C and restart of application

I am running a yaque channel with a gRPC server as a sender and a worker on the other other end processing messages. If I start the application and send a few messages then hit Ctrl+C and try and restart the application I receive a "sender side already in use" error when creating the channel on the same directory. The only way to fix this is to delete the directory and create a fresh one. This makes the queue non-durable and prone to large amounts of data loss on crashes.

Any suggestions?

Determining length of queue at startup

Hi! Cool library.

My use case is an IoT system where data is buffered for transmission. In the event of flaky comms, I need to buffer data to disk instead of memory, so this library looks great for this use case. I'm implementing a hybrid memory/disk queue using Yaque and a VecDeque. The idea is that the a fixed amount of memory will be dedicated for buffering, and if the memory limit is exceeded, we'll start buffering to Yaque. Popping elements for transmission will entail loading from Yaque if disk buffering has occurred.

What's the best way to determine the queue length during program initialization?

Is the answer here to just create QueueIter and count them?

Certain sequence of bytes can screw up the channel

I feel like this should be documented somewhere, if you happen to send some bytes that matches the header of yaque, then the queue may cease to function. Happened when I tried to get some messagepack through the channel.

`recv()` stuck awaiting

heya ! love the work being done on yaque it is an insanely useful crate :)

Description of issue

I have a long running future as such:

tokio::spawn(async move {
            let mut recv: Receiver = match Receiver::open(qpath) {
                Ok(recv) => recv,
                Err(e) => {
                    log::error!("Error opening receiver: {}", e);
                    return;
                }
            };

            while (*status).load(Ordering::Relaxed) {
                loop {
                    match recv.recv().await {
                        Ok(bytes) => {
                            let bytes_inner = bytes.deref().clone();
                            Self::execute_job_from_bytes(bytes_inner, store.clone()).await;
                            match bytes.commit() {
                                Ok(_) => {}
                                Err(e) => {
                                    log::error!("Error committing to queue: {}", e);
                                }
                            }
                        }
                        Err(e) => log::error!("Error receiving from queue: {:?}", e),
                    }
                }
            }
        })

With a sender that is triggered by API calls in a separate future.

sender
        .send(cbor_bytes)
        .await
        .map_err(|e| warp::reject::custom(Failure::Execute(e.to_string())))?;

However the receiver loop gets stuck at recv.recv().await, never resolving -- even when the sender succesfully sends !

I've dug a bit into yaque to see where it hangs -- and it seems to never resolve on the following line (line 272) of queue/receiver.rs

// Read header:
let mut header = [0; 4];
self.tail_follower.read_exact(&mut header).await?;

Digging further into the generated ReadExact future. read_until_you_drain gets called twice when the queue is empty as dictated by the poll function

impl<'a> Future for ReadExact<'a> {
    type Output = io::Result<()>;
    fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
        self.was_polled = true;
        // See what happens when we read.
        let outcome = self.read_until_you_drain();

        if outcome.is_pending() {
            // Set the waker in the file watcher:
            let mut lock = self.waker.lock().expect("waker mutex poisoned");
            *lock = Some(context.waker().clone());

            // Now, you will have to recheck (TOCTOU!)
            self.read_until_you_drain()
        } else {
            outcome
        }
    }
}

self.read_until_you_drain() returns a Poll::Pending state as we'd expect -- but then ... never gets triggered again so the future hangs indefinitly -- possibly the waker not operating as expected ?

If I ctrlc-c to kill the process then reboot it -- the queue gets read correctly and the logic ensures correctly ... until the queue is empty once more and things hang.

As a work around I currently have replaced recv() with try_recv() which isn't ideal as the loop now spins and consumes the CPU entirely.

Any help on the above would be much appreciated :)

Fuzz testing: RecvBatch strangeness, file handle leaks

I wrote some simple fuzz tests for this library (https://github.com/mullr/yaque/tree/fuzz-tests). The good news is that it works very well for something that's never been fuzzed! I did identify a few issues:

  • It's pretty easy to break when try_recv_batch is used. It panics with There were read and unused items at the end of transaction. Read and unused queue: ..., even though the test is attempting to read everything that's there, using a large count parameter.

  • After running the fuzzer for a little awhile, we run out of file handles. It seems like something in this library is leaking, but it's unclear where or why.

Tests fail

I've tried running the tests and they give the following output:

otaviopace@mettaton ~/yaque (master) [SIGINT]> cargo test
    Finished test [unoptimized + debuginfo] target(s) in 0.05s
     Running target/debug/deps/yaque-009dcd917f9d7263

running 11 tests
test recovery::tests::test_unlock_inexistent ... ok
test tests::create_and_clear_fails ... ok
test tests::test_dequeue_is_atomic ... FAILED
test tests::create_and_clear ... ok
test tests::create_and_clear_async ... ok
test recovery::tests::test_unlock ... ok
test tests::test_enqueue_and_dequeue ... FAILED
test tests::test_enqueue ... ok
2020-08-22 16:51:25,942 TRACE [yaque] created queue directory
2020-08-22 16:51:25,942 TRACE [yaque] sender lock acquired. Sender state now is QueueState { segment_size: 4194304, segment: 32, position: 2786945 }
2020-08-22 16:51:25,942 TRACE [yaque] last segment opened for appending
2020-08-22 16:51:25,954 TRACE [yaque] created queue directory
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: Custom { kind: Other, error: "queue `data/enqueue-dequeue-parallel` receiver side already in use" }', src/lib.rs:858:36
test tests::test_enqueue_then_dequeue ... ok
2020-08-22 16:51:32,500 TRACE [yaque::sync] file guard on `"data/enqueue-dequeue-parallel/send.lock"` dropped
test tests::test_enqueue_dequeue_parallel ... FAILED

Then, after hanging for a while, they output this:

2020-08-22 16:51:07,493 TRACE [yaque::sync] enough! Done reading
2020-08-22 16:51:07,493 TRACE [yaque::sync] reading until drained
2020-08-22 16:51:07,493 TRACE [yaque::sync] read 6 bytes
2020-08-22 16:51:07,493 TRACE [yaque::sync] enough! Done reading
2020-08-22 16:51:07,493 TRACE [yaque::sync] reading until drained
2020-08-22 16:51:07,493 TRACE [yaque::sync] read 4 bytes
2020-08-22 16:51:07,493 TRACE [yaque::sync] enough! Done reading
2020-08-22 16:51:07,493 TRACE [yaque::sync] reading until drained
2020-08-22 16:51:07,493 TRACE [yaque::sync] read 52 bytes
2020-08-22 16:51:07,493 TRACE [yaque::sync] enough! Done reading
2020-08-22 16:51:07,493 TRACE [yaque::sync] reading until drained
2020-08-22 16:51:07,493 TRACE [yaque::sync] read 4 bytes
2020-08-22 16:51:07,493 TRACE [yaque::sync] enough! Done reading
2020-08-22 16:51:07,494 TRACE [yaque::sync] reading until drained
2020-08-22 16:51:07,494 TRACE [yaque::sync] read 4 bytes
2020-08-22 16:51:07,494 TRACE [yaque::sync] enough! Done reading
2020-08-22 16:51:07,494 TRACE [yaque::sync] reading until drained
2020-08-22 16:51:07,494 TRACE [yaque::sync] read 4 bytes

After a couple of tries running them, I got this:

otaviopace@mettaton ~/yaque (master) [SIGINT]> cargo test
    Finished test [unoptimized + debuginfo] target(s) in 0.05s
     Running target/debug/deps/yaque-009dcd917f9d7263

running 11 tests
test recovery::tests::test_unlock_inexistent ... ok
test tests::create_and_clear_fails ... ok
test tests::test_dequeue_is_atomic ... FAILED
test tests::create_and_clear ... ok
test tests::create_and_clear_async ... ok
test recovery::tests::test_unlock ... ok
test tests::test_enqueue_and_dequeue ... FAILED
test tests::test_enqueue ... ok
2020-08-22 16:51:25,942 TRACE [yaque] created queue directory
2020-08-22 16:51:25,942 TRACE [yaque] sender lock acquired. Sender state now is QueueState { segment_size: 4194304, segment: 32, position: 2786945 }
2020-08-22 16:51:25,942 TRACE [yaque] last segment opened for appending
2020-08-22 16:51:25,954 TRACE [yaque] created queue directory
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: Custom { kind: Other, error: "queue `data/enqueue-dequeue-parallel` receiver side already in use" }', src/lib.rs:858:36
test tests::test_enqueue_then_dequeue ... ok
2020-08-22 16:51:32,500 TRACE [yaque::sync] file guard on `"data/enqueue-dequeue-parallel/send.lock"` dropped
test tests::test_enqueue_dequeue_parallel ... FAILED
2020-08-22 16:52:15,239 TRACE [yaque] created queue directory
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: Custom { kind: Other, error: "queue `data/enqueue-dequeue-parallel-with-batches` sender side already in use" }', src/lib.rs:898:30
test tests::test_enqueue_dequeue_parallel_with_batches ... FAILED

failures:

---- tests::test_dequeue_is_atomic stdout ----
2020-08-22 16:51:17,832 TRACE [yaque] created queue directory
thread 'tests::test_dequeue_is_atomic' panicked at 'called `Result::unwrap()` on an `Err` value: Custom { kind: Other, error: "queue `data/dequeue-is-atomic` sender side already in use" }', src/lib.rs:928:26

---- tests::test_enqueue_and_dequeue stdout ----
2020-08-22 16:51:18,651 TRACE [yaque] created queue directory
thread 'tests::test_enqueue_and_dequeue' panicked at 'called `Result::unwrap()` on an `Err` value: Custom { kind: Other, error: "queue `data/enqueue-and-dequeue` sender side already in use" }', src/lib.rs:820:26

---- tests::test_enqueue_dequeue_parallel stdout ----
thread 'tests::test_enqueue_dequeue_parallel' panicked at 'dequeue thread panicked: Any', src/lib.rs:872:9

---- tests::test_enqueue_dequeue_parallel_with_batches stdout ----
thread 'tests::test_enqueue_dequeue_parallel_with_batches' panicked at 'enqueue thread panicked: Any', src/lib.rs:921:9


failures:
    tests::test_dequeue_is_atomic
    tests::test_enqueue_and_dequeue
    tests::test_enqueue_dequeue_parallel
    tests::test_enqueue_dequeue_parallel_with_batches

test result: FAILED. 7 passed; 4 failed; 0 ignored; 0 measured; 0 filtered out

2020-08-22 16:52:15,240 TRACE [yaque] created queue directory
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: Custom { kind: Other, error: "queue `data/enqueue-dequeue-parallel-with-batches` receiver side already in use" }', src/lib.rs:908:21
error: test failed, to rerun pass '--lib'

Is there a try_recv like function?

Is there a function that will try receiving, and if the queue has no data, will return None?

It seems like I can manually replicate this now by using recv_timeout with a very small timeout, but wondering if there was a more idiomatic way?

Stream or Iterator?

Hi,

on first sight I am wondering if it would make sense to have an iterator/stream to receive the data. Currently I would need to call it in a loop but a stream might be more optimized. I dont have any metrics or something to justify this assumption but its a gut feeling that it would be a nice addition.

yaque drops items?

Here's an example:

use std::path::Path;
use tokio::{
    sync::mpsc::{channel, Receiver, Sender},
};
use yaque::{self, TryRecvError, recovery};

const CHAN_BUF_SIZE: usize = 32;
const BATCH_SIZE: usize = 50;
const QUEUE_PATH: &str = "state/queue";

struct WorkContext {
    dispatch_sender: Sender<DispatchRequest>,
}

async fn work(ctx: WorkContext) {
    loop {
        let (send, mut recv) = channel(CHAN_BUF_SIZE);
        println!("here!");
        ctx.dispatch_sender
            .send(DispatchRequest::GetChunk(send))
            .await
            .unwrap();
        let ids = recv.recv().await.unwrap();
        // do stuff w/ ids
    }
}

#[derive(Debug)]
enum DispatchRequest {
    GetChunk(Sender<Vec<String>>),
}

struct DispatchThread {
    url_receiver: yaque::Receiver,
    url_sender: yaque::Sender,
    receiver: Receiver<DispatchRequest>,
    sender: Sender<DispatchRequest>,
}

impl DispatchThread {
    pub fn new(n_workers: usize) -> DispatchThread {
        if Path::new(QUEUE_PATH).exists() {
            println!("Recovering queue...");
            recovery::recover_with_loss(QUEUE_PATH).unwrap();
        }
        let (url_send, url_recv) = yaque::channel(QUEUE_PATH).unwrap();
        let (send, recv) = channel(CHAN_BUF_SIZE);

        for _ in 0..n_workers {
            let send = send.clone();
            tokio::spawn(async move {
                work(WorkContext {
                    dispatch_sender: send,
                })
                .await;
            });
        }

        DispatchThread {
            url_receiver: url_recv,
            url_sender: url_send,
            receiver: recv,
            sender: send,
        }
    }

    pub async fn add_vid_ids(&mut self, ids: Vec<String>) {
        self.url_sender.send_batch(ids).await.unwrap();
    }

    pub async fn run(&mut self) {
        loop {
            match self.receiver.recv().await.unwrap() {
                DispatchRequest::GetChunk(sender) => {
                    let batch = match self
                        .url_receiver
                        .try_recv_batch(BATCH_SIZE)
                    {
                        Ok(b) => Some(b),
                        Err(e) => match e {
                            TryRecvError::Io(e) => {
                                panic!("io error while receiving: {:?}", e);
                            }
                            TryRecvError::QeueuEmpty => None,
                        },
                    };
                    if let Some(batch) = batch {
                        assert!(batch.len() > 0);
                        let mut urls = vec![];
                        for url_bytes in batch.iter() {
                            urls.push(String::from(std::str::from_utf8(url_bytes).unwrap()));
                        }
                        batch.commit();
                        sender.send(urls).await.unwrap();
                    } else {
                        // recycle the request until we have more work
                        println!("Recycling request...");
                        self.sender
                            .send(DispatchRequest::GetChunk(sender))
                            .await
                            .unwrap();
                    }
                }
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let mut dispatch_thread = DispatchThread::new(1);
    dispatch_thread
        .add_vid_ids(vec![String::from("Wko7I9QcwUQ")])
        .await;
    tokio::spawn(async move {
        dispatch_thread.run().await;
    })
    .await
    .unwrap();
}

Cargo.toml:

[dependencies]
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
yaque = { git = "https://github.com/tokahuke/yaque.git", branch = "try-recv" }

Most of the time, this outputs:

Recovering queue...
here!
Recycling request...
Recycling request...
Recycling request...

However, I would not expected this to be the case because I call add_vid_ids with a value, which should be added into the queue.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.