Coder Social home page Coder Social logo

disque's Introduction

Build Status

Disque, an in-memory, distributed job queue

Disque is an ongoing experiment to build a distributed, in-memory, message broker. Its goal is to capture the essence of the "Redis as a jobs queue" use case, which is usually implemented using blocking list operations, and move it into an ad-hoc, self-contained, scalable, and fault tolerant design, with simple to understand properties and guarantees, but still resembling Redis in terms of simplicity, performance, and implementation as a C non-blocking networked server.

Currently (2 Jan 2016) the project is in release candidate state. People are encouraged to start evaluating it and report bugs and experiences.

WARNING: This is beta code and may not be suitable for production usage. The API is considered to be stable if not for details that may change in the next release candidates, however it's new code, so handle with care!

What is a message queue?

Hint: skip this section if you are familiar with message queues.

You know how humans use text messages to communicate, right? I could write my wife "please get the milk at the store", and she maybe will reply "Ok message received, I'll get two bottles on my way home".

A message queue is the same as human text messages, but for computer programs. For example a web application, when an user subscribes, may send another process, that handles sending emails, "please send the confirmation email to [email protected]".

Message systems like Disque allow communication between processes using different queues. So a process can send a message to a queue with a given name, and only processes which fetch messages from this queue will return those messages. Moreover, multiple processes can listen for messages in a given queue, and multiple processes can send messages to the same queue.

The important part of a message queue is to be able to provide guarantees so that messages are eventually delivered even in the face of failures. So even if in theory implementing a message queue is very easy, to write a very robust and scalable one is harder than it may appear.

Give me the details!

Disque is a distributed and fault tolerant message broker, so it works as a middle layer among processes that want to exchange messages.

Producers add messages that are served to consumers. Since message queues are often used in order to process delayed jobs, Disque often uses the term "job" in the API and in the documentation, however jobs are actually just messages in the form of strings, so Disque can be used for other use cases. In this documentation "jobs" and "messages" are used in an interchangeable way.

Job queues with a producer-consumer model are pretty common, so the devil is in the details. A few details about Disque are:

Disque is a synchronously replicated job queue. By default when a new job is added, it is replicated to W nodes before the client gets an acknowledgement about the job being added. W-1 nodes can fail and the message will still be delivered.

Disque supports both at-least-once and at-most-once delivery semantics. At-least-once delivery semantics is where most effort was spent in the design and implementation, while at-most-once semantics is a trivial result of using a retry time set to 0 (which means, never re-queue the message again) and a replication factor of 1 for the message (not strictly needed, but it is useless to have multiple copies of a message around if it will be delivered at most one time). You can have, at the same time, both at-least-once and at-most-once jobs in the same queues and nodes, since this is a per message setting.

Disque at-least-once delivery is designed to approximate single delivery when possible, even during certain kinds of failures. This means that while Disque can only guarantee a number of deliveries equal or greater to one, it will try hard to avoid multiple deliveries whenever possible.

Disque is a distributed system where all nodes have the same role (aka, it is multi-master). Producers and consumers can attach to whatever node they like, and there is no need for producers and consumers of the same queue to stay connected to the same node. Nodes will automatically exchange messages based on load and client requests.

Disque is Available (it is an eventually consistent AP system in CAP terms): producers and consumers can make progress as long as a single node is reachable.

Disque supports optional asynchronous commands that are low latency for the client but provide less guarantees. For example a producer can add a job to a queue with a replication factor of 3, but may want to run away before knowing if the contacted node was really able to replicate it to the specified number of nodes or not. The node will replicate the message in the background in a best effort way.

Disque automatically re-queues messages that are not acknowledged as already processed by consumers, after a message-specific retry time. There is no need for consumers to re-queue a message if it was not processed.

Disque uses explicit acknowledges in order for a consumer to signal a message as delivered (or, using a different terminology, to signal a job as already processed).

Disque queues only provides best effort ordering. Each queue sorts messages based on the job creation time, which is obtained using the wall clock of the local node where the message was created (plus an incremental counter for messages created in the same millisecond), so messages created in the same node are normally delivered in the same order they were created. This is not causal ordering since correct ordering is violated in different cases: when messages are re-issued because they are not acknowledged, because of nodes local clock drifts, and when messages are moved to other nodes for load balancing and federation (in this case you end with queues having jobs originated in different nodes with different wall clocks). However all this also means that normally messages are not delivered in random order and usually messages created first are delivered first.

Note that since Disque does not provide strict FIFO semantics, technically speaking it should not be called a message queue, and it could better identified as a message broker. However I believe that at this point in the IT industry a message queue is often more lightly used to identify a generic broker that may or may not be able to guarantee order in all cases. Given that we document the semantics very clearly, I grant myself the right to call Disque a message queue anyway.

Disque provides the user with fine-grained control for each job using three time related parameters, and one replication parameter. For each job, the user can control:

  1. The replication factor (how many nodes have a copy).
  2. The delay time (the min time Disque will wait before putting the message in a queue, making the message deliverable).
  3. The retry time (how much time should elapse since the last time the job was queued and without an acknowledge about the job delivery, before the job is re-queued for delivery).
  4. The expire time (how much time should elapse for the job to be deleted regardless of whether it was successfully delivered, i.e. acknowledged, or not).

Finally, Disque supports optional disk persistence, which is not enabled by default, but that can be handy in single data center setups and during restarts.

Other minor features are:

  • Ability to block queues.
  • A few statistics about queue activity.
  • Stateless iterators for queues and jobs.
  • Commands to control the visibility of single jobs.
  • Easy resize of the cluster (adding nodes is trivial).
  • Graceful removal of nodes without losing job replicas.

ACKs and retries

Disque's implementation of at-least-once delivery semantics is designed in order to avoid multiple delivery during certain classes of failures. It is not able to guarantee that no multiple deliveries will occur. However there are many at-least-once workloads where duplicated deliveries are acceptable (or explicitly handled), but not desirable either. A trivial example is sending emails to users (it is not terrible if an user gets a duplicated email, but is important to avoid it when possible), or doing idempotent operations that are expensive (all the times where it is critical for performance to avoid multiple deliveries).

In order to avoid multiple deliveries when possible, Disque uses client ACKs. When a consumer processes a message correctly, it should acknowledge this fact to Disque. ACKs are replicated to multiple nodes, and are garbage collected as soon as the system believes it is unlikely that more nodes in the cluster have the job (the ACK refers to) still active. Under memory pressure or under certain failure scenarios, ACKs are eventually discarded.

More explicitly:

  1. A job is replicated to multiple nodes, but usually only queued in a single node. There is a difference between having a job in memory, and queueing it for delivery.
  2. Nodes having a copy of a message, if a certain amount of time has elapsed without getting the ACK for the message, will re-queue it. Nodes will run a best-effort protocol to avoid re-queueing the message multiple times.
  3. ACKs are replicated and garbage collected across the cluster so that eventually processed messages are evicted (this happens ASAP if there are no failures nor network partitions).

For example, if a node having a copy of a job gets partitioned away during the time the job gets acknowledged by the consumer, it is likely that when it returns (in a reasonable amount of time, that is, before the retry time is reached) it will be informed about the ACK and will avoid to re-queue the message. Similarly, jobs can be acknowledged during a partition to just a single available node, and when the partition heals the ACK will be propagated to other nodes that may still have a copy of the message.

So an ACK is just a proof of delivery that is replicated and retained for some time in order to make multiple deliveries less likely to happen in practice.

As already mentioned, in order to control replication and retries, a Disque job has the following associated properties: number of replicas, delay, retry and expire.

If a job has a retry time set to 0, it will get queued exactly once (and in this case a replication factor greater than 1 is useless, and signaled as an error to the user), so it will get delivered either a single time or will never get delivered. While jobs can be persisted on disk for safety, queues aren't, so this behavior is guaranteed even when nodes restart after a crash, whatever the persistence configuration is. However when nodes are manually restarted by the sysadmin, for example for upgrades, queues are persisted correctly and reloaded at startup, since the store/load operation is atomic in this case, and there are no race conditions possible (it is not possible that a job was delivered to a client and is persisted on disk as queued at the same time).

Fast acknowledges

Disque supports a faster way to acknowledge processed messages, via the FASTACK command. The normal acknowledge is very expensive from the point of view of messages exchanged between nodes, this is what happens during a normal acknowledge:

  1. The client sends ACKJOB to one node.
  2. The node sends a SETACK message to everybody it believes to have a copy.
  3. The receivers of SETACK reply with GOTACK to confirm.
  4. The node finally sends DELJOB to all the nodes.

Note: actual garbage collection is more complex in case of failures and is explained in the state machine later. The above is what happens 99% of times.

If a message is replicated to 3 nodes, acknowledging requires 1+2+2+2 messages, for the sake of retaining the ack if some nodes may not be reached when the message is acknowledged. This makes the probability of multiple deliveries of this message less likely.

However the alternative fast ack, while less reliable, is much faster and invovles exchanging less messages. This is how a fast acknowledge works:

  1. The client sends FASTACK to one node.
  2. The node evicts the job and sends a best effort DELJOB to all the nodes that may have a copy, or to all the cluster if the node was not aware of the job.

If during a fast acknowledge a node having a copy of the message is not reachable, for example because of a network partition, the node will deliver the message again, since it has a non-acknowledged copy of the message and there is nobody able to inform it the message has been acknowledged when the partition heals.

If the network you are using is pretty reliable, and you are very concerned with performance, and multiple deliveries in the context of your applications are a non issue, then FASTACK is probably the way to go.

Dead letter queue

Many message queues implement a feature called dead letter queue. It is a special queue used in order to accumulate messages that cannot be processed for some reason. Common causes could be:

  1. The message was delivered too many times but never correctly processed.
  2. The message time-to-live reached zero before it was processed.
  3. Some worker explicitly asked the system to flag the message as having issues.

The idea is that the administrator of the system checks (usually via automatic systems) if there is something in the dead letter queue in order to understand if there is some software error or other kind of error preventing messages from being processed as expected.

Since Disque is an in-memory system, the message time-to-live is an important property. When it is reached, we want messages to go away, since the TTL should be chosen so that after such a time it is no longer meaningful to process the message. In such a system, to use memory and create a queue in response to an error or to messages timing out looks like a non optimal idea. Moreover, due to the distributed nature of Disque, dead letters could end up spawning multiple nodes and having duplicated entries in them.

So Disque uses a different approach. Each node message representation has two counters: a nacks counter and an additional deliveries counter. The counters are not consistent among nodes having a copy of the same message, they are just best effort counters that may not increment in some node during network partitions.

The idea of these two counters is that one is incremented every time a worker uses the NACK command to tell the queue the message was not processed correctly and should be put back on the queue ASAP. The other is incremented for every other condition (different than the NACK call) that requires a message to be put back on the queue again. This includes messages that get lost and are enqueued again or messages that are enqueued on one side of the partition since the message was processed on the other side and so forth.

Using the GETJOB command with the WITHCOUNTERS option, or using the SHOW command to inspect a job, it is possible to retrieve these two counters together with the other job information, so if a worker, before processing a message, sees the counters have values over some application-defined limit, it can notify operations people in multiple ways:

  1. It may send an email.
  2. Set a flag in a monitoring system.
  3. Put the message in a special queue (simulating the dead letter feature).
  4. Attempt to process the message and report the stack trace of the error if any.

Basically the exact handling of the feature is up to the application using Disque. Note that the counters don't need to be consistent in the face of failures or network partitions: the idea is that eventually if a message has issues the counters will get incremented enough times to reach the limit selected by the application as a warning threshold.

The reason for having two distinct counters is that applications may want to handle the case of explicit negative acknowledges via NACK differently than multiple deliveries because of timeouts or messages getting lost.

Disque and disk persistence

Disque can be operated in-memory only, using synchronous replication as a durability guarantee, or can be operated using the Append Only File where jobs creations and evictions are logged on disk (with configurable fsync policies) and reloaded at restart.

AOF is recommended especially if you run in a single availability zone where a mass reboot of all your nodes is possible.

Normally Disque only reloads job data in memory, without populating queues, since unacknowledged jobs are requeued eventually. Moreover, reloading queue data is not safe in the case of at-most-once jobs having the retry value set to 0. However a special option is provided in order to reload the full state from the AOF. This is used together with an option that allows shutting down the server just after the AOF is generated from scratch, in order to make it safe even to reload jobs with retry set to 0, since the AOF is generated while the server no longer accepts commands from clients, so no race condition is possible.

Even when running memory-only, Disque is able to dump its memory on disk and reload from disk on controlled restarts, for example in order to upgrade the software.

This is how to perform a controlled restart, that works whether AOF is enabled or not:

  1. CONFIG SET aof-enqueue-jobs-once yes
  2. CONFIG REWRITE
  3. SHUTDOWN REWRITE-AOF

At this point we have a freshly generated AOF on disk, and the server is configured in order to load the full state only at the next restart (aof-enqueue-jobs-once is automatically turned off after the restart).

We can just restart the server with the new software, or in a new server, and it will restart with the full state. Note that aof-enqueue-jobs-once implies loading the AOF even if AOF support is switched off, so there is no need to enable AOF just for the upgrade of an in-memory only server.

Job IDs

Disque jobs are uniquely identified by an ID like the following:

D-dcb833cf-8YL1NT17e9+wsA/09NqxscQI-05a1

Job IDs are composed of exactly 40 characters and start with the prefix D-.

We can split an ID into multiple parts:

D- | dcb833cf | 8YL1NT17e9+wsA/09NqxscQI | 05a1
  1. D- is the prefix.
  2. dcb833cf is the first 8 bytes of the node ID where the message was generated.
  3. 8YL1NT17e9+wsA/09NqxscQI is the 144 bit ID pseudo-random part encoded in base64.
  4. 05a1 is the Job TTL in minutes. Because of it, message IDs can be expired safely even without having the job representation.

IDs are returned by ADDJOB when a job is successfully created, are part of the GETJOB output, and are used in order to acknowledge that a job was correctly processed by a worker.

Part of the node ID is included in the message so that a worker processing messages for a given queue can easily guess what are the nodes where jobs are created, and move directly to these nodes to increase efficiency instead of listening for messages in a node that will require to fetch messages from other nodes.

Only 32 bits of the original node ID is included in the message, however in a cluster with 100 Disque nodes, the probability of two nodes having identical 32 bit ID prefixes is given by the birthday paradox:

P(100,2^32) = .000001164

In case of collisions, the workers may just make a non-efficient choice.

Collisions in the 144 bits random part are believed to be impossible, since it is computed as follows.

144 bit ID = HIGH_144_BITS_OF_SHA1(seed || counter)

Where:

  • seed is a seed generated via /dev/urandom at startup.
  • counter is a 64 bit counter incremented at every ID generation.

So there are 22300745198530623141535718272648361505980416 possible IDs, selected in a uniform way. While the probability of a collision is non-zero mathematically, in practice each ID can be regarded as unique.

The encoded TTL in minutes has a special property: it is always even for at most once jobs (job retry value set to 0), and is always odd otherwise. This changes the encoded TTL precision to 2 minutes, but allows to tell if a Job ID is about a job with deliveries guarantees or not. Note that this fact does not mean that Disque jobs TTLs have a precision of two minutes. The TTL field is only used to expire job IDs of jobs a given node does not actually have a copy, search "dummy ACK" in this documentation for more information.

Setup

To play with Disque please do the following:

  1. Compile Disque - if you can compile Redis, you can compile Disque, it's the usual "no external deps" thing. Just type make. Binaries (disque and disque-server) will end up in the src directory.
  2. Run a few Disque nodes on different ports. Create different disque.conf files following the example disque.conf in the source distribution.
  3. After you have them running, you need to join the cluster. Just select a random node among the nodes you are running, and send the command CLUSTER MEET <ip> <port> for every other node in the cluster.

Please note that you need to open two TCP ports on each node, the base port of the Disque instance, for example 7711, plus the cluster bus port, which is always at a fixed offset, obtained summing 10000 to the base port, so in the above example, you need to open both 7711 and 17711. Disque uses the base port to communicate with clients and the cluster bus port to communicate with other Disque processes.

To run a node, just call ./disque-server.

For example, if you are running three Disque servers in port 7711, 7712, 7713, in order to join the cluster you should use the disque command line tool and run the following commands:

./disque -p 7711 cluster meet 127.0.0.1 7712
./disque -p 7711 cluster meet 127.0.0.1 7713

Your cluster should now be ready. You can try to add a job and fetch it back in order to test if everything is working:

./disque -p 7711
127.0.0.1:7711> ADDJOB queue body 0
D-dcb833cf-8YL1NT17e9+wsA/09NqxscQI-05a1
127.0.0.1:7711> GETJOB FROM queue
1) 1) "queue"
   2) "D-dcb833cf-8YL1NT17e9+wsA/09NqxscQI-05a1"
   3) "body"

Remember that you can add and get jobs from different nodes as Disque is multi master. Also remember that you need to acknowledge jobs otherwise they'll never go away from the server memory (unless the time-to-live is reached).

Main API

The Disque API is composed of a small set of commands, since the system solves a single very specific problem. The three main commands are:

ADDJOB queue_name job <ms-timeout> [REPLICATE <count>] [DELAY <sec>] [RETRY <sec>] [TTL <sec>] [MAXLEN <count>] [ASYNC]

Adds a job to the specified queue. Arguments are as follows:

  • queue_name is the name of the queue, any string, basically. You don't need to create queues, if a queue does not exist, it gets created automatically. If one has no more jobs, it gets removed.
  • job is a string representing the job. Disque is job meaning agnostic, for it a job is just a message to deliver. Job max size is 4GB.
  • ms-timeout is the command timeout in milliseconds. If no ASYNC is specified, and the replication level specified is not reached in the specified number of milliseconds, the command returns with an error, and the node does a best-effort cleanup, that is, it will try to delete copies of the job across the cluster. However the job may still be delivered later. Note that the actual timeout resolution is 1/10 of second or worse with the default server hz.
  • REPLICATE count is the number of nodes the job should be replicated to.
  • DELAY sec is the number of seconds that should elapse before the job is queued by any server. By default there is no delay.
  • RETRY sec period after which, if no ACK is received, the job is put into the queue again for delivery. If RETRY is 0, the job has at-most-once delivery semantics. The default retry time is 5 minutes, with the exception of jobs having a TTL so small that 10% of TTL is less than 5 minutes. In this case the default RETRY is set to TTL/10 (with a minimum value of 1 second).
  • TTL sec is the max job life in seconds. After this time, the job is deleted even if it was not successfully delivered. If not specified, the default TTL is one day.
  • MAXLEN count specifies that if there are already count messages queued for the specified queue name, the message is refused and an error reported to the client.
  • ASYNC asks the server to let the command return ASAP and replicate the job to other nodes in the background. The job gets queued ASAP, while normally the job is put into the queue only when the client gets a positive reply.

The command returns the Job ID of the added job, assuming ASYNC is specified, or if the job was replicated correctly to the specified number of nodes. Otherwise an error is returned.

GETJOB [NOHANG] [TIMEOUT <ms-timeout>] [COUNT <count>] [WITHCOUNTERS] FROM queue1 queue2 ... queueN

Return jobs available in one of the specified queues, or return NULL if the timeout is reached. A single job per call is returned unless a count greater than 1 is specified. Jobs are returned as a three-element array containing the queue name, the Job ID, and the job body itself. If jobs are available in multiple queues, queues are processed left to right.

If there are no jobs for the specified queues, the command blocks, and messages are exchanged with other nodes, in order to move messages about these queues to this node, so that the client can be served.

Options:

  • NOHANG: Ask the command to not block even if there are no jobs in all the specified queues. This way the caller can just check if there are available jobs without blocking at all.
  • WITHCOUNTERS: Return the best-effort count of NACKs (negative acknowledges) received by this job, and the number of additional deliveries performed for this job. See the Dead Letters section for more information.

ACKJOB jobid1 jobid2 ... jobidN

Acknowledges the execution of one or more jobs via job IDs. The node receiving the ACK will replicate it to multiple nodes and will try to garbage collect both the job and the ACKs from the cluster so that memory can be freed.

A node receiving an ACKJOB command about a job ID it does not know will create a special empty job, with the state set to "acknowledged", called a "dummy ACK". The dummy ACK is used in order to retain the acknolwedge during a netsplit if the ACKJOB is sent to a node that does not have a copy of the job. When the partition heals, job garbage collection will be attempted.

However, since the job ID encodes information about the job being an "at-most- once" or an "at-least-once" job, the dummy ACK is only created for at-least- once jobs.

FASTACK jobid1 jobid2 ... jobidN

Performs a best-effort cluster-wide deletion of the specified job IDs. When the network is well connected and there are no node failures, this is equivalent to ACKJOB but much faster (due to less messages being exchanged), however during failures it is more likely that fast acknowledges will result in multiple deliveries of the same messages.

WORKING jobid

Claims to be still working with the specified job, and asks Disque to postpone the next time it will deliver the job again. The next delivery is postponed for the job retry time, however the command works in a best effort way since there is no way to guarantee during failures that another node in a different network partition won't perform a delivery of the same job.

Another limitation of the WORKING command is that it cannot be sent to nodes not knowing about this particular job. In such a case the command replies with a NOJOB error. Similarly, if the job is already acknowledged an error is returned.

Note that the WORKING command is refused by Disque nodes if 50% of the job time to live has already elapsed. This limitation makes Disque safer since usually the retry time is much smaller than the time-to-live of a job, so it can't happen that a set of broken workers monopolize a job with WORKING and never process it. After 50% of the TTL has elapsed, the job will be delivered to other workers anyway.

Note that WORKING returns the number of seconds you (likely) postponed the message visibility for other workers (the command basically returns the retry time of the job), so the worker should make sure to send the next WORKING command before this time elapses. Moreover, a worker that may want to use this interface may fetch the retry value with the SHOW command when starting to process a message, or may simply send a WORKING command ASAP, like in the following example (in pseudo code):

retry = WORKING(jobid)
RESET timer
WHILE ... work with the job still not finished ...
    IF timer reached 80% of the retry time
        WORKING(jobid)
        RESET timer
    END
END

NACK <job-id> ... <job-id>

The NACK command tells Disque to put the job back in the queue ASAP. It is very similar to ENQUEUE but it increments the job nacks counter instead of the additional-deliveries counter. The command should be used when the worker was not able to process a message and wants the message to be put back into the queue in order to be processed again.

Other commands

INFO

Generic server information / stats.

HELLO

Returns hello format version, this node ID, all the nodes IDs, IP addresses, ports, and priority (lower is better, means a node is more available). Clients should use this as a handshake command when connecting with a Disque node.

QLEN <queue-name>

Return the length of the queue.

QSTAT <queue-name>

Show information about a queue as an array of key-value pairs. Below is an example of the output, however, implementations should not rely on the order of the fields nor on the existence of the fields listed. They may be (unlikely) removed or more can be (likely) added in the future.

If a queue does not exist, NULL is returned. Note that queues are automatically evicted after some time if empty and without clients blocked waiting for jobs, even if there are active jobs for the queue. So the non existence of a queue does not mean there are not jobs in the node or in the whole cluster about this queue. The queue will be immediately created again when needed to serve requests.

Example output:

QSTAT foo
 1) "name"
 2) "foo"
 3) "len"
 4) (integer) 56520
 5) "age"
 6) (integer) 601
 7) "idle"
 8) (integer) 3
 9) "blocked"
10) (integer) 50
11) "import-from"
12) 1) "dcb833cf8f42fbb7924d92335ff6d67d3cea6e3d"
    2) "4377bdf656040a18d8caf4d9f409746f1f9e6396"
13) "import-rate"
14) (integer) 19243
15) "jobs-in"
16) (integer) 3462847
17) "jobs-out"
18) (integer) 3389522
19) "pause"
20) "none"

Most fields should be obvious. The import-from field shows a list of node IDs this node is importing jobs from, for this queue, in order to serve clients requests. The import-rate is the instantaneous amount of jos/sec we import in order to handle our outgoing traffic (GETJOB commands). blocked is the number of clients blocked on this queue right now. age and idle are reported in seconds. The jobs-in and -out counters are incremented every time a job is enqueued or dequeued for any reason.

QPEEK <queue-name> <count>

Return, without consuming from the queue, count jobs. If count is positive the specified number of jobs are returned from the oldest to the newest (in the same best-effort FIFO order as GETJOB). If count is negative the commands changes behavior and shows the count newest jobs, from the newest from the oldest.

ENQUEUE <job-id> ... <job-id>

Queue jobs if not already queued.

DEQUEUE <job-id> ... <job-id>

Remove the job from the queue.

DELJOB <job-id> ... <job-id>

Completely delete a job from a node. Note that this is similar to FASTACK, but limited to a single node since no DELJOB cluster bus message is sent to other nodes.

SHOW <job-id>

Describe the job.

QSCAN [COUNT <count>] [BUSYLOOP] [MINLEN <len>] [MAXLEN <len>] [IMPORTRATE <rate>]

The command provides an interface to iterate all the existing queues in the local node, providing a cursor in the form of an integer that is passed to the next command invocation. During the first call, the cursor must be 0, in the next calls the cursor returned in the previous call is used in the next. The iterator guarantees to return all the elements but may return duplicated elements.

Options:

  • COUNT <count> A hint about how much work to do per iteration.
  • BUSYLOOP Block and return all the elements in a busy loop.
  • MINLEN <count> Don't return elements with less than count jobs queued.
  • MAXLEN <count> Don't return elements with more than count jobs queued.
  • IMPORTRATE <rate> Only return elements with a job import rate (from other nodes) >= rate.

The cursor argument can be in any place, the first non matching option that has valid cursor form of an unsigned number will be sensed as a valid cursor.

JSCAN [<cursor>] [COUNT <count>] [BUSYLOOP] [QUEUE <queue>] [STATE <state1> STATE <state2> ... STATE <stateN>] [REPLY all|id]

The command provides an interface to iterate all the existing jobs in the local node, providing a cursor in the form of an integer that is passed to the next command invocation. During the first call the cursor must be 0, in the next calls the cursor returned in the previous call is used in the next. The iterator guarantees to return all the elements but may return duplicated elements.

Options:

  • COUNT <count> A hint about how much work to do per iteration.
  • BUSYLOOP Block and return all the elements in a busy loop.
  • QUEUE <queue> Return only jobs in the specified queue.
  • STATE <state> Return jobs in the specified state. Can be used multiple times for a logical OR.
  • REPLY <type> Job reply type. Type can be all or id. Default is to report just the job ID. If all is specified the full job state is returned like for the SHOW command.

The cursor argument can be in any place, the first non matching option that has valid cursor form of an unsigned number will be sensed as a valid cursor.

PAUSE <queue-name> option1 [option2 ... optionN]

Control the paused state of a queue, possibly broadcasting the command to other nodes in the cluster. Disque queues can be paused in both directions, input and output, or both. Pausing a queue makes it unavailable for input or output operations. Specifically:

A queue paused in input will have changed behavior in the following ways:

  1. ADDJOB returns a -PAUSED error for queues paused in input.
  2. The node where the queue is paused, no longer accepts to replicate jobs for this queue when requested by other nodes. Since ADDJOB by default uses synchronous replication, it means that if the queue is paused in enough nodes, adding jobs with a specified level of replication may fail. In general the node where the queue is paused will not create new jobs in the local node about this queue.
  3. The job no longer accepts ENQUEUE messages from other nodes. Those messages are usually used by nodes in out of memory conditions that replicate jobs externally (not holding a copy), in order to put the job in the queue of some random node, among the nodes having a copy of a job.
  4. Active jobs that reach their retry time, are not put back into the queue. Instead their retry timer is updated and the node will try again later.

Basically a queue paused in input never creates new jobs for this queue, and never puts active jobs (jobs for which the node has a copy but are not currently queued) back in the queue, for all the time the queue is paused.

A queue paused in output instead will behave in the following way:

  1. GETJOB will block even if there are jobs available in the specified queue, instead of serving the jobs. But GETJOB will unblock if the queue output pause is cleared later.
  2. The node will not provide jobs to other nodes in the context of node federation, for paused queues.

So a queue paused in output will stop acting as a source of messages for both local and non local clients.

The paused state can be set for each queue using the PAUSE command followed by options to specify how to change the paused state. Possible options are:

  • in: pause the queue in input.
  • out: pause the queue in output.
  • all: pause the queue in input and output (same as specifying both the in and out options).
  • none: clear the paused state in input and output.
  • state: just report the current queue state.
  • bcast: send a PAUSE command to all the reachable nodes of the cluster to set the same queue in the other nodes to the same state.

The command always returns the state of the queue after the execution of the specified options, so the return value is one of in, out, all, none.

Queues paused in input or output are never evicted to reclaim memory, even if they are empty and inactive for a long time, since otherwise the paused state would be forgotten.

For example, in order to block output for the queue myqueue in all the currently reachable nodes, the following command should be send to a single node:

PAUSE myqueue out bcast

To specify all is the same as to specify both in and out, so the two following forms are equivalent:

PAUSE myqueue in out
PAUSE myqueue all

To just get the current state use:

PAUSE myqueue state
"none"

Special handling of messages with RETRY set to 0

In order to provide a coherent API, messages with at-most-once delivery semantics are still retained after being delivered a first time, and should be acknowledged like any other message. Of course, the acknowledge is not mandatory, since the message may be lost and there is no way for the receiver to get the same message again, since the message is associated with a retry value of 0.

In order to avoid non acknowledged messages with retry set to 0 from leaking into Disque and eating all the memory, when the Disque server memory is full and starts to evict, it does not just evict acknowledged messages, but also can evict non acknowledged messages having, at the same time, the following two properties:

  1. Their retry is set to 0.
  2. The job was already delivered.

In theory, acknowledging a job that will never be retried is a waste of time and resources, however this design has hidden advantages:

  1. The API is exactly the same for all the kinds of jobs.
  2. After the job is delivered, it is still possible to examine it. Observability is a very good property of messaging systems.

However, not acknowledging the job does not result in big issues since they are evicted eventually during memory pressure.

Adding and removing nodes at runtime

Adding nodes is trivial, and just consists in starting a new node and sending it a CLUSTER MEET command. Assuming the node you just started is located at address 192.168.1.10 port 7714, and a random (you can use any) node of the existing cluster is located at 192.168.1.9 port 7711, all you need to do is:

./disque -h 192.168.1.10 -p 7714 cluster meet 192.168.1.9 7711

Note that you can invert the source and destination arguments and the new node will still join the cluster. It does not matter if it's the old node to meet the new one or the other way around.

In order to remove a node, it is possible to use the crude way of just shutting it down, and then use CLUSTER FORGET <old-node-id> in all the other nodes in order to remove references to it from the configuration of the other nodes. However this means that, for example, messages that had a replication factor of 3, and one of the replicas was the node you are shutting down, suddenly are left with just 2 replicas even if no actual failure happened. Moreover if the node you are removing had messages in queue, you'll need to wait the retry time before the messages will be queued again. For all these reasons, Disque has a better way to remove nodes which is described in the next section.

Gracefully removal of nodes

In order to empty a node of its content before removing it, it is possible to use a feature that puts a node in leaving state. To enable this feature just contact the node to remove, and use the following command:

CLUSTER LEAVING yes

The node will start advertising itself as leaving, so in a matter of seconds all the cluster will know (if there are partitions, when the partition heals all the nodes will eventually be informed), and this is what happens when the node is in this state:

  1. When the node receives ADDJOB commands, it performs external replication, like when a node is near the memory limits. This means that it will make sure to create the number of replicas of the message in the cluster without using itself as a replica. So no new messages are created in the context of a node which is leaving.
  2. The node starts to send -LEAVING messages to all clients that use GETJOB but would block waiting for jobs. The -LEAVING error means the clients should connect to another node. Clients that were already blocked waiting for messages will be unblocked and a -LEAVING error will be sent to them as well.
  3. The node no longer sends NEEDJOBS messages in the context of Disque federation, so it will never ask other nodes to transfer messages to it.
  4. The node and all the other nodes will advertise it with a bad priority in the HELLO command output, so that clients will select a different node.
  5. The node will no longer create dummy acks in response to an ACKJOB command about a job it does not know.

All these behavior changes result in the node participating only as a source of messages, so eventually its message count will drop to zero (it is possible to check for this condition using INFO jobs). When this happens the node can be stopped and removed from the other nodes tables using CLUSTER FORGET as described in the section above.

Client libraries

Disque uses the same protocol as Redis itself. To adapt Redis clients, or to use them directly, should be pretty easy. However note that Disque's default port is 7711 and not 6379.

While a vanilla Redis client may work well with Disque, clients should optionally use the following protocol in order to connect with a Disque cluster:

  1. The client should be given a number of IP addresses and ports where nodes are located. The client should select random nodes and should try to connect until an available one is found.
  2. On a successful connection the HELLO command should be used in order to retrieve the Node ID and other potentially useful information (server version, number of nodes).
  3. If a consumer sees a high message rate received from foreign nodes, it may optionally have logic in order to retrieve messages directly from the nodes where producers are producing the messages for a given topic. The consumer can easily check the source of the messages by checking the Node ID prefix in the messages IDs.
  4. The GETJOB command, or other commands, may return a -LEAVING error instead of blocking. This error should be considered by the client library as a request to connect to a different node, since the node it is connected to is not able to serve the request since it is leaving the cluster. Nodes in this state have a very high priority number published via HELLO, so will be unlikely to be picked at the next connection attempt.

This way producers and consumers will eventually try to minimize node message exchanges whenever possible.

So basically you could perform basic usage using just a Redis client, however there are already specialized client libraries implementing a more specialized API on top of Disque:

C++

Common Lisp

Elixir

Erlang

Go

Java

Node.js

Perl

PHP

Python

Ruby

Rust

.NET

Implementation details

Job replication strategy

  1. Disque tries to replicate to W-1 (or W during out of memory) reachable nodes, shuffled.
  2. The cluster REPLJOB message is used to replicate a job to multiple nodes, the job is sent together with the list of nodes that may have a copy.
  3. If the required replication is not reached promptly, the job is send to one additional node every 50 milliseconds. When this happens, a new REPLJOB message is also re-sent to each node that may already have a copy, in order to refresh the list of nodes that have a copy.
  4. If the specified synchronous replication timeout is reached, the node that originally received the ADDJOB command from the client gives up and returns an error to the client. When this happens the node performs a best-effort procedure to delete the job from nodes that may have already received a copy of the job.

Cluster topology

Disque is a full mesh, with each node connected to each other. Disque performs distributed failure detection via gossip, only in order to adjust the replication strategy (try reachable nodes first when trying to replicate a message), and in order to inform clients about non reachable nodes when they want the list of nodes they can connect to.

As Disque is multi-master, the event of nodes failing is not handled in any special way.

Cluster messages

Nodes communicate via a set of messages, using the node-to-node message bus. A few of the messages are used in order to check that other nodes are reachable and to mark nodes as failing. Those messages are PING, PONG and FAIL. Since failure detection is only used to adjust the replication strategy (talk with reachable nodes first in order to improve latency), the details are yet not described. Other messages are more important since they are used in order to replicate jobs, re-issue jobs while trying to minimize multiple deliveries, and in order to auto-federate to serve consumers when messages are produced in different nodes compared to where consumers are.

The following is a list of messages and what they do, split by category. Note that this is just an informal description, while in the next sections describing the Disque state machine, there is a more detailed description of the behavior caused by message reception, and in what cases they are generated.

Cluster messages related to jobs replication and queueing

  • REPLJOB: ask the receiver to replicate a job, that is, to add a copy of the job among the registered jobs in the target node. When a job is accepted, the receiver replies with GOTJOB to the sender. A job may not be accepted if the receiving node is near out of memory. In this case GOTJOB is not sent and the message discarded.
  • GOTJOB: The reply to REPLJOB to confirm the job was replicated.
  • ENQUEUE: Ask a node to put a given job into its queue. This message is used when a job is created by a node that does not want to take a copy, so it asks another node (among the ones that acknowledged the job replication) to queue it for the first time. If this message is lost, after the retry time some node will try to re-queue the message, unless retry is set to zero.
  • WILLQUEUE: This message is sent 500 milliseconds before a job is re-queued to all the nodes that may have a copy of the message, according to the sender table. If some of the receivers already have the job queued, they'll reply with QUEUED in order to prevent the sender to queue the job again (avoid multiple delivery when possible).
  • QUEUED: When a node re-queues a job, it sends QUEUED to all the nodes that may have a copy of the message, so that the other nodes will update the time at which they'll retry to queue the job. Moreover, every node that already has the same job in queue, but with a node ID which is lexicographically smaller than the sending node, will de-queue the message in order to best-effort de-dup messages that may be queued in multiple nodes at the same time.

Cluster messages related to ACK propagation and garbage collection

  • SETACK: This message is sent to force a node to mark a job as successfully delivered (acknowledged by the worker): the job will no longer be considered active, and will never be re-queued by the receiving node. Also SETACK is send to the sender if the receiver of QUEUED or WILLQUEUE message has the same job marked as acknowledged (successfully delivered) already.
  • GOTACK: This message is sent in order to acknowledge a SETACK message. The receiver can mark a given node that may have a copy of a job, as informed about the fact that the job was acknowledged by the worker. Nodes delete (garbage collect) a message cluster wide when they believe all the nodes that may have a copy are informed about the fact the job was acknowledged.
  • DELJOB: Ask the receiver to remove a job. Is only sent in order to perform garbage collection of jobs by nodes that are sure the job was already delivered correctly. Usually the node sending DELJOB only does that when its sure that all the nodes that may have a copy of the message already marked the message ad delivered, however after some time the job GC may be performed anyway, in order to reclaim memory, and in that case, an otherwise avoidable multiple delivery of a job may happen. The DELJOB message is also used in order to implement fast acknowledges.

Cluster messages related to nodes federation

  • NEEDJOBS(queue,count): The sender asks the receiver to obtain messages for a given queue, possibly count messages, but this is only an hit for congestion control and messages optimization, the receiver is free to reply with whatever number of messages. NEEDJOBS messages are delivered in two ways: broadcasted to every node in the cluster from time to time, in order to discover new source nodes for a given queue, or more often, to a set of nodes that recently replies with jobs for a given queue. This latter mechanism is called an ad hoc delivery, and is possible since every node remembers for some time the set of nodes that were recent providers of messages for a given queue. In both cases, NEEDJOBS messages are delivered with exponential delays, with the exception of queues that drop to zero-messages and have a positive recent import rate, in this case an ad hoc NEEDJOBS delivery is performed regardless of the last time the message was delivered in order to allow a continuous stream of messages under load.

  • YOURJOBS(array of messages): The reply to NEEDJOBS. An array of serialized jobs, usually all about the same queue (but future optimization may allow to send different jobs from different queues). Jobs into YOURJOBS replies are extracted from the local queue, and queued at the receiver node's queue with the same name. So even messages with a retry set to 0 (at most once delivery) still guarantee the safety rule since a given message may be in the source node, on the wire, or already received in the destination node. If a YOURJOBS message is lost, at least once delivery jobs will be re-queued later when the retry time is reached.

Disque state machine

This section shows the most interesting (as in less obvious) parts of the state machine each Disque node implements. While practically it is a single state machine, it is split in sections. The state machine description uses a convention that is not standard but should look familiar, since it is event driven, made of actions performed upon: message receptions in the form of commands received from clients, messages received from other cluster nodes, timers, and procedure calls.

Note that: job is a job object with the following fields:

  1. job.delivered: A list of nodes that may have this message. This list does not need to be complete, is used for best-effort algorithms.
  2. job.confirmed: A list of nodes that confirmed reception of ACK by replying with a GOTJOB message.
  3. job.id: The job 48 chars ID.
  4. job.state: The job state among: wait-repl, active, queued, acked.
  5. job.replicate: Replication factor for this job.
  6. job.qtime: Time at which we need to re-queue the job.

List fields such as .delivered and .confirmed support methods like .size to get the number of elements.

States are as follows:

  1. wait-repl: the job is waiting to be synchronously replicated.
  2. active: the job is active, either it reached the replication factor in the originating node, or it was created because the node received an REPLJOB message from another node.
  3. queued: the job is active and also is pending into a queue in this node.
  4. acked: the job is no longer active since a client confirmed the reception using the ACKJOB command or another Disque node sent a SETACK message for the job.

Generic functions

PROCEDURE LOOKUP-JOB(string job-id):

  1. If job with the specified id is found, returns the corresponding job object.
  2. Otherwise returns NULL.

PROCEDURE UNREGISTER(object job):

  1. Delete the job from memory, and if queued, from the queue.

PROCEDURE ENQUEUE(job):

  1. If job.state == queued return ASAP.
  2. Add job into job.queue.
  3. Change job.state to queued.

PROCEDURE DEQUEUE(job):

  1. If job.state != queued return ASAP.
  2. Remove job from job.queue.
  3. Change job.state to active.

ON RECV cluster message: DELJOB(string job.id):

  1. job = Call LOOKUP-JOB(job-id).
  2. IF job != NULL THEN call UNREGISTER(job).

Job replication state machine

This part of the state machine documents how clients add jobs to the cluster and how the cluster replicates jobs across different Disque nodes.

ON RECV client command `ADDJOB(string queue-name, string body, integer replicate, integer retry, integer ttl, ...):

  1. Create a job object in wait-repl state, having as body, ttl, retry, queue name, the specified values.
  2. Send REPLJOB(job.serialized) cluster message to replicate-1 nodes.
  3. Block the client without replying.

Step 3: We'll reply to the client in step 4 of GOTJOB message processing.

ON RECV cluster message REPLJOB(object serialized-job):

  1. job = Call LOOKUP-JOB(serialized-job.id).
  2. IF job != NULL THEN: job.delivered = UNION(job.delivered,serialized-job.delivered). Return ASAP, since we have the job.
  3. Create a job from serialized-job information.
  4. job.state = active.
  5. Reply to the sender with GOTJOB(job.id).

Step 1: We may already have the job, since REPLJOB may be duplicated.

Step 2: If we already have the same job, we update the list of jobs that may have a copy of this job, performing the union of the list of nodes we have with the list of nodes in the serialized job.

ON RECV cluster message GOTJOB(object serialized-job):

  1. job = Call LOOKUP-JOB(serialized-job.id).
  2. IF job == NULL OR job.state != wait-repl Return ASAP.
  3. Add sender node to job.confirmed.
  4. IF job.confirmed.size == job.replicate THEN change job.state to active, call ENQUEUE(job), and reply to the blocked client with job.id.

Step 4: As we receive enough confirmations via GOTJOB messages, we finally reach the replication factor required by the user and consider the message active.

TIMER, firing every next 50 milliseconds while a job still did not reached the expected replication factor.

  1. Select an additional node not already listed in job.delivered, call it node.
  2. Add node to job.delivered.
  3. Send REPLJOB(job.serialized) cluster message to each node in job.delivered.

Step 3: We send the message to every node again, so that each node will have a chance to update job.delivered with the new nodes. It is not required for each node to know the full list of nodes that may have a copy, but doing so improves our approximation of single delivery whenever possible.

Job re-queueing state machine

This part of the state machine documents how Disque nodes put a given job back into the queue after the specified retry time elapsed without the job being acknowledged.

TIMER, firing 500 milliseconds before the retry time elapses:

  1. Send WILLQUEUE(job.id) to every node in jobs.delivered.

TIMER, firing when job.qtime time is reached.

  1. If job.retry == 0 THEN return ASAP.
  2. Call ENQUEUE(job).
  3. Update job.qtime to NOW + job.retry.
  4. Send QUEUED(job.id) message to each node in job.delivered.

Step 1: At most once jobs never get enqueued again.

Step 3: We'll retry again after the retry period.

ON RECV cluster message WILLQUEUE(string job-id):

  1. job = Call LOOKUP-JOB(job-id).
  2. IF job == NULL THEN return ASAP.
  3. IF job.state == queued SEND QUEUED(job.id) to job.delivered.
  4. IF job.state == acked SEND SETACK(job.id) to the sender.

Step 3: We broadcast the message since likely the other nodes are going to retry as well.

Step 4: SETACK processing is documented below in the acknowledges section of the state machine description.

ON RECV cluster message QUEUED(string job-id):

  1. job = Call LOOKUP-JOB(job-id).
  2. IF job == NULL THEN return ASAP.
  3. IF job.state == acked THEN return ASAP.
  4. IF job.state == queued THEN if sender node ID is greater than my node ID call DEQUEUE(job).
  5. Update job.qtime setting it to NOW + job.retry.

Step 4: If multiple nodes re-queue the job about at the same time because of race conditions or network partitions that make WILLQUEUE not effective, then QUEUED forces receiving nodes to dequeue the message if the sender has a greater node ID, lowering the probability of unwanted multiple delivery.

Step 5: Now the message is already queued somewhere else, but the node will retry again after the retry time.

Acknowledged jobs garbage collection state machine

This part of the state machine is used in order to garbage collect acknowledged jobs, when a job finally gets acknowledged by a client.

PROCEDURE ACK-JOB(job):

  1. If job state is already acked, do nothing and return ASAP.
  2. Change job state to acked, dequeue the job if queued, schedule first call to TIMER.

PROCEDURE START-GC(job):

  1. Send SETACK(job.delivered.size) to each node that is listed in job.delivered but is not listed in job.confirmed.
  2. IF job.delivered.size == 0, THEN send SETACK(0) to every node in the cluster.

Step 2: this is an ACK about a job we donโ€™t know. In that case, we can just broadcast the acknowledged hoping somebody knows about the job and replies.

ON RECV client command ACKJOB(string job-id):

  1. job = Call LOOKUP-JOB(job-id).
  2. if job is NULL, ignore the message and return.
  3. Call ACK-JOB(job).
  4. Call START-GC(job).

ON RECV cluster message SETACK(string job-id, integer may-have):

  1. job = Call LOOKUP-JOB(job-id).
  2. Call ACK-JOB(job) IF job is not NULL.
  3. Reply with GOTACK IF job == NULL OR job.delivered.size <= may-have.
  4. IF job != NULL and jobs.delivered.size > may-have THEN call START-GC(job).
  5. IF may-have == 0 AND job != NULL, reply with GOTACK(1) and call START-GC(job).

Steps 3 and 4 makes sure that among the reachable nodes that may have a message, garbage collection will be performed by the node that is aware of more nodes that may have a copy.

Step 5 instead is used in order to start a GC attempt if we received a SETACK message from a node just hacking a dummy ACK (an acknowledge about a job it was not aware of).

ON RECV cluster message GOTACK(string job-id, bool known):

  1. job = Call LOOKUP-JOB(job-id). Return ASAP IF job == NULL.
  2. Call ACK-JOB(job).
  3. IF known == true AND job.delivered.size > 0 THEN add the sender node to job.delivered.
  4. IF (known == true) OR (known == false AND job.delivered.size > 0) OR (known == false AND sender is an element of job.delivered) THEN add the sender node to jobs.confirmed.
  5. IF job.delivered.size > 0 AND job.delivered.size == job.confirmed.size, THEN send DELJOB(job.id) to every node in the job.delivered list and call UNREGISTER(job).
  6. IF job.delivered == 0 AND known == true, THEN call UNREGISTER(job).
  7. IF job.delivered == 0 AND job.confirmed.size == cluster.size THEN call UNREGISTER(job).

Step 3: If job.delivered.size is zero, it means that the node just holds a dummy ack for the job. It means the node has an acknowledged job it created on the fly because a client acknowledged (via ACKJOB command) a job it was not aware of.

Step 6: we don't have to hold a dummy acknowledged jobs if there are nodes that have the job already acknowledged.

Step 7: this happens when nobody knows about a job, like when a client acknowledged a wrong job ID.

TIMER, from time to time (exponential backoff with random error), for every acknowledged job in memory:

  1. call START-GC(job).

Limitations

  • Disque is new code, not tested, and will require quite some time to reach production quality. It is likely very buggy and may contain wrong assumptions or tradeoffs.
  • As long as the software is non stable, the API may change in random ways without prior notification.
  • It is possible that Disque spends too much effort in approximating single delivery during failures. The fast acknowledge concept and command makes the user able to opt-out this efforts, but yet I may change the Disque implementation and internals in the future if I see the user base really not caring about multiple deliveries during partitions.
  • There is yet a lot of Redis dead code inside probably that could be removed.
  • Disque was designed a bit in astronaut mode, not triggered by an actual use case of mine, but more in response to what I was seeing people doing with Redis as a message queue and with other message queues. However I'm not an expert, if I succeeded to ship something useful for most users, this is kinda of an accomplishment. Otherwise it may just be that Disque is pretty useless.
  • As Redis, Disque is single threaded. While in Redis there are stronger reasons to do so, in Disque there is no manipulation of complex data structures, so maybe in the future it should be moved into a threaded server. We need to see what happens in real use cases in order to understand if it's worth it or not.
  • The number of jobs in a Disque process is limited to the amount of memory available. Again while this in Redis makes sense (IMHO), in Disque there are definitely simple ways in order to circumvent this limitation, like logging messages on disk when the server is out of memory and consuming back the messages when memory pressure is already acceptable. However in general, like in Redis, manipulating data structures in memory is a big advantage from the point of view of the implementation simplicity and the functionality we can provide to users.
  • Disque is completely not optimized for speed, was never profiled so far. I'm currently not aware of the fact it's slow, fast, or average, compared to other messaging solutions. For sure it is not going to have Redis-alike numbers because it does a lot more work at each command. For example when a job is added, it is serialized and transmitted to other N servers. There is a lot more message passing between nodes involved, and so forth. The good news is that being totally unoptimized, there is room for improvements.
  • Ability of federation to handle well low and high loads without incurring into congestion or high latency, was not tested well enough. The algorithm is reasonable but may fail short under many load patterns.
  • Amount of tested code path and possible states is not enough.

FAQ

Is Disque part of Redis?

No, it is a standalone project, however a big part of the Redis networking source code, nodes message bus, libraries, and the client protocol, were reused in this new project. In theory it was possible to extract the common code and release it as a framework to write distributed systems in C. However this is not a perfect solution as well, since the projects are expected to diverge more and more in the future, and to rely on a common foundation was hard. Moreover the initial effort to turn Redis into two different layers: an abstract server, networking stack and cluster bus, and the actual Redis implementation, was a huge effort, ways bigger than writing Disque itself.

However while it is a separated project, conceptually Disque is related to Redis, since it tries to solve a Redis use case in a vertical, ad-hoc way.

Who created Disque?

Disque is a side project of Salvatore Sanfilippo, aka @antirez.

There are chances for this project to be actively developed?

Currently I consider this just a public alpha: If I see people happy to use it for the right reasons (i.e. it is better in some use cases compared to other message queues) I'll continue the development. Otherwise it was anyway cool to develop it, I had much fun, and I definitely learned new things.

What happens when a node runs out of memory?

  1. Maxmemory setting is mandatory in Disque, and defaults to 1GB.
  2. When 75% of maxmemory is reached, Disque starts to replicate the new jobs only to external nodes, without taking a local copy, so basically if there is free RAM into other nodes, adding still works.
  3. When 95% of maxmemory is reached, Disque starts to evict data that does not violates the safety guarantees: For instance acknowledged jobs and inactive queues.
  4. When 100% of maxmemory is reached, commands that may result into more memory used are not processed at all and the client is informed with an error.

Are there plans to add the ability to hold more jobs than the physical memory of a single node can handle?

Yes. In Disque it should be relatively simple to use the disk when memory is not available, since jobs are immutable and don't need to necessarily exist in memory at a given time.

There are multiple strategies available. The current idea is that when an instance is out of memory, jobs are stored into a log file instead of memory. As more free memory is available in the instance, on disk jobs are loaded.

However in order to implement this, there is to observe strong evidence of its general usefulness for the user base.

When I consume and produce from different nodes, sometimes there is a delay in order for the jobs to reach the consumer, why?

Disque routing is not static, the cluster automatically tries to provide messages to nodes where consumers are attached. When there is an high enough traffic (even one message per second is enough) nodes remember other nodes that recently were sources for jobs in a given queue, so it is possible to aggressively send messages asking for more jobs, every time there are consumers waiting for more messages and the local queue is empty.

However when the traffic is very low, informations about recent sources of messages are discarded, and nodes rely on a more generic mechanism in order to discover other nodes that may have messages in the queues we need them (which is also used in high traffic conditions as well, in order to discover new sources of messages for a given queue).

For example imagine a setup with two nodes, A and B.

  1. A client attaches to node A and asks for jobs in the queue myqueue. Node A has no jobs enqueued, so the client is blocked.
  2. After a few seconds another client produces messages into myqueue, but sending them to node B.

During step 1 if there was no recent traffic of imported messages for this queue, node A has no idea about who may have messages for the queue myqueue. Every other node may have, or none may have. So it starts to broadcast NEEDJOBS messages to the whole cluster. However we can't spam the cluster with messages, so if no reply is received after the first broadcast, the next will be sent with a larger delay, and so foth. The delay is exponential, with a maximum value of 30 seconds (this parameters will be configurable in the future, likely).

When there is some traffic instead, nodes send NEEDJOBS messages ASAP to other nodes that were recent sources of messages. Even when no reply is received, the next NEEDJOBS messages will be sent more aggressively to the subset of nodes that had messages in the past, with a delay that starts at 25 milliseconds and has a maximum value of two seconds.

In order to minimize the latency, NEEDJOBS messages are not throttled at all when:

  1. A client consumed the last message from a given queue. Source nodes are informed immediately in order to receive messages before the node asks for more.
  2. Blocked clients are served the last message available in the queue.

For more information, please refer to the file queue.c, especially the function needJobsForQueue and its callers.

Are messages re-enqueued in the queue tail or head or what?

Messages are put into the queue according to their creation time attribute. This means that they are enqueued in a best effort order in the local node queue. Messages that need to be put back into the queue again because their delivery failed are usually (but not always) older than messages already in queue, so they'll likely be among the first to be delivered to workers.

What Disque means?

DIStributed QUEue but is also a joke with "dis" as negation (like in disorder) of the strict concept of queue, since Disque is not able to guarantee the strict ordering you expect from something called queue. And because of this tradeof it gains many other interesting things.

Community: how to get help and how to help

Get in touch with us in one of the following ways:

  1. Post on Stack Overflow using the disque tag. This is the preferred method to get general help about Disque: other users will easily find previous questions so we can incrementally build a knowledge base.
  2. Join the #disque IRC channel at irc.freenode.net.
  3. Create an Issue or Pull request if your question or issue is about the Disque implementation itself.

Thanks

I would like to say thank you to the following persons and companies.

  • Pivotal, for allowing me to work on Disque, most in my spare time, but sometimes during work hours. Moreover Pivotal agreed to leave the copyright of the code to me. This is very generous. Thanks Pivotal!
  • Michel Martens and Damian Janowski for providing early feedback about Disque while the project was still private.
  • Everybody who is already writing client libraries, sending pull requests, creating issues in order to move this forward from alpha to something actually usable.

disque's People

Contributors

7imbrook avatar abioy avatar alanc10n avatar antirez avatar badboy avatar beanz avatar braintreeps avatar bugthesystem avatar cammellos avatar djanowski avatar dnltn avatar erikdubbelboer avatar felixbuenemann avatar joeyates avatar justincase avatar kiemes avatar lovelle avatar mariano avatar mattsta avatar rloomba avatar rossdylan avatar ryansb avatar s12v avatar seanmccann avatar seppo0010 avatar skx avatar sunheehnus avatar toc21c avatar troyswanson avatar vojtechvitek avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

disque's Issues

Getting 0 jobs out of a non-empty queue succeeds

This is another counterexample I found, where I'm breaking the rules of disque:

[jlouis@lady-of-pain disque]$ ./src/disque addjob test_queue foo 5000
DId886465d76d7924aa56a785240a182dd0f80af0505a0SQ
[jlouis@lady-of-pain disque]$ ./src/disque getjob count 0 from test_queue
1) 1) "test_queue"
   2) "DId886465d76d7924aa56a785240a182dd0f80af0505a0SQ"
   3) "foo"

I would have expected an error because I'm supplying a COUNT < 1, which is a violation of the specification. Hence, I'm reporting this as something the parser perhaps should disallow rather than accept as if it was COUNT 1.

Possible data loss following node restart

Let me preface this with: my test could be wrong. I may be misunderstanding Disque's persistence guarantees, or misusing the API, or maybe the test code is wrong somehow... but I think I've ruled out the most obvious mistakes at this point, so I'd like y'all's feedback!

Config file: https://github.com/aphyr/jepsen/blob/efa09187d710e6937bf4b494bc114b9093c42d93/disque/resources/disque.conf
Test code: https://github.com/aphyr/jepsen/blob/9a164a18aec97a56012cb1464a6438bcec327634/disque/src/jepsen/disque.clj

This test runs on a five-node disque cluster: inter-node latency is effectively zero, and the network is perfectly reliable. There are no partitions in this test. I've lowered the cluster node timeout to 100ms to speed up cluster convergence.

In this test, five clients perform a mix of enqueues and dequeues (GETJOB followed by ACKJOB) with unique integers as payloads. Clients use a 100-millisecond timeout for all operations, a retry interval of 1 second, and replication factor 3. They perform a random mixture of enqueues and dequeues for 30 seconds, while we restart a single node at a time--with 10 seconds of downtime and 10 seconds of uptime in between. After those failures, we bring every node up, give them 5 seconds to heal and proceed with normal operations, then drain the queues by issuing dequeues on every node until no more jobs come.

I've enabled AOF and set appendfsync to everysec; regardless, this test only kills two nodes, so a replication factor of three should allow every job to remain resident in RAM.

This test appears to lose enqueued jobs.

{:valid? false,
 :queue {:valid? true, :final-queue {:pending #{13 25 35 19}}},
 :total-queue
 {:valid? false,
  :lost #{13 19},
  :unexpected #{},
  :recovered #{},
  :ok-frac 35/37,
  :unexpected-frac 0,
  :lost-frac 1/37,
  :recovered-frac 0},
 :latency {:valid? true, :file "report//latency.png"}}

As you can see in the full history, job 13 is successfully enqueued but never appears in any dequeue.

latency

The latency plot shows a few enqueue/dequeue crashes during the first node's downtime, and a couple after the second, but since at least three nodes were online continuously through this full test, and all clocks are synchronized, and the retry interval is only 100ms, I expect that every enqueued job should arrive.

You can replicate this by cloning Jepsen 9a164a18aec97a56012cb1464a6438bcec327634 and running lein test in the disque directory.

Any idea what might be going on here?

Unable to get disque running on separate docker containers to establish a cluster

I put together a docker container building disque https://registry.hub.docker.com/u/jobflow/disque/

I am able to deploy and run a single service. I can expose a port and connect to it from a disque client running on the same container or a different container.

But when connect to one instance and send the cluster meet with the ip and port of the other container it will attempt to cluster the instance but eventually fail.

I can link two containers and they will cluster fine, but you can not link a container to more than one. So building a cluster of 3+ isn't working.

I can run multiple disque service in a single container and they will cluster ok. It's just cross container communication that fails.

I wonder if this issue is similar to this one: Sentinel: Fix initial Hello source address

For more details see:
http://stackoverflow.com/questions/30904033/unable-to-get-docker-containers-running-disque-to-establish-a-cluster

In a 2-node setup, QPEEK doesn't see the queue contents of the "other" node

The setup is that we have two nodes on localhost, 7711 and 7712 ports, standard configuration. We add a job on 7711 (with replication factor 2) and QPEEK on 7712:

[jlouis@lady-of-pain disque]$ ./src/disque -p 7711 cluster meet "127.0.0.1" 7712
OK
[jlouis@lady-of-pain disque]$ ./src/disque -p 7711 addjob test_queue x 300 replicate 2
DId28925cea079cee4507e3904ab7b85d2f979a72c05a0SQ
[jlouis@lady-of-pain disque]$ ./src/disque -p 7712 qpeek test_queue 1
(empty list or set)
[jlouis@lady-of-pain disque]$ ./src/disque -p 7711 qpeek test_queue 1
1) 1) "DId28925cea079cee4507e3904ab7b85d2f979a72c05a0SQ"
   2) "x"

The output of the empty queue on 7712 is not what I expected. I expected a result as 7711's query in the end. Is this a bug or a feature? Running QLEN shows the same discrepancy:

[jlouis@lady-of-pain disque]$ ./src/disque -p 7711 qlen test_queue
(integer) 1
[jlouis@lady-of-pain disque]$ ./src/disque -p 7712 qlen test_queue
(integer) 0

Again, bug or feature?

Can disque handle RPC?

I have created a node client which is disqueue-node. I have implemented a RPC. My concern is that I have achieved this using 3 tcp connections for request, response and replyQueue. In the reason that get jobs waits until it gets jobs then hangs up the rest of the commands. Do you think it can be achieved using 1 tcp connection?

Graceful exit of node from cluster

Ability to mark of a given node as leaving:

  1. It's a node flag, so gets propagated cluster-wide with current cluster gossip features.
  2. When a node is flagged as leaving it does no longer creates local messages. It handles ADDJOB requests using external replication and ACKJOB never creates dummy acks.
  3. The node refuses to accept new messages from the cluster as well, but also the other nodes avoid to try to replicate new messages into it directly. So no new messages are ever added to exiting nodes.
  4. The node is otherwise part of the cluster: can work as a source of messages for other nodes, participates in acknowledging messages and so forth.
  5. HELLO command node priority is set to a low value so that clients start to avoid using it.
  6. Clients that would block on GETJOB are sent a -LEAVING error instead.
  7. Clients already blocked are unblocked with a -LEAVING error.

Eventually a node in this state reaches a number of jobs of zero, and can be removed from the cluster entirely. This allows Disque clusters to be migrated into a different set of servers without any interruption of service.

Normally this is not needed in order to remove a node from a cluster (but still can be used for this goal), since to shutdown the node is enough because the messages are replicated into other nodes as well.
However using this feature, it is possible to remove a node without effectively lowering the number of copies of a set of messages.

The feature is exposed as a CLUSTER LEAVING command.

The behavior of the other nodes is to always replicate this bit in their configuration, using as the source the node itself. So if a node advertises itself with the leaving flag set, all the other nodes will set the flag. If the flag is no longer set, all the other nodes will clear the flag.

At-most-once jobs are lost in cluster exchange

Here is a scenario:

I have a two node cluster, and I add one job to each node with RETRY 0. If I issue GETJOB on node1 twice, the second time blocks since that node's queue is empty. I can see node2 receive NEEDJOBS and send YOURJOBS to node1, and I can see that node1 receives the YOURJOBS message, but it never queues the job because of (from what I can tell) this line in queue.c:

    if (job->state == JOB_STATE_QUEUED || job->qtime == 0)
        return DISQUE_ERR;

It seems like this might have something to do with never re-queueing a job to guarantee at most once deliverability when a job is set to RETRY 0, but in this case the job will never be delivered unless it is retrieved from the original node it was queued in before the original node delivers it to another node as part of YOURJOBS.

I hope that makes sense, it is highly likely that I overlooked something or am not understanding the code correctly!

Disque doesn't seem to reclaim memory.

Hi,

This may be a misunderstanding about the inner working of disque, but I have run into an issue where disque reports the following to me:

NOREPL Not enough reachable nodes for the requested replication level, since I'm unable to hold a copy of the message for memory usage problems.

This happens if we execute the following series of commands around 200k times, on a single disque server, no clustering:

  1. GETJOB all jobs on a queue test_queue.
  2. ACKJOB on all the jobs we just got.
  3. Call QLEN on test_queue - see that there are 0 jobs.
  4. Add the following jobs ADDJOB test_queue X 300 with the following jobs (the notation <<>> denotes the empty string, wheras <<X,Y,Z,...>> means the decimal representation of bytes X,Y,Z,...:
<<>>
<<209,100,0,105,182,14,6,2,62>>
<<39,157,129,26,95,242,121>>
<<232,223,179,218,130,58>>
<<>>
<<237,158,171,168,97>>
<<63,126>>
<<61,96,195,99,167,139>>
  1. Go back to 1, next round.

After around 200k runs of this (that is 200k * 8 addjob commands, with a "reset" in between as in 1-3 above), we eventually reach a disque-server process (with standard configuration), using up 800 megabytes of memory. Once this happens, it starts reporting the above message.

It would seem plausible that some of the jobs, perhaps all of them, are not ACK'ed and removed properly, because the server should clean up before sending the above NOREPL notion in which it states that it is practically out of memory.

Code errors discovered by static analysis

I've run a static analyzer over master (b526435 at the time of writing), and found some problems with memory management. They all involve failure to check the results of memory allocations, so in a low-memory situation, disque is simply going to crash.

redisAsyncInitialize may return NULL, but its result is not checked.

disque/deps/hiredis/async.c:172: error: NULL_DEREFERENCE
  [B1] pointer ac last assigned on line 171 could be null and is dereferenced by call to __redisAsyncCopyError() at line 172, column 5

Failures to check result of malloc. Perhaps something like an xmalloc wrapper would be appropriate.

disque/deps/hiredis/dict.c:75: error: NULL_DEREFERENCE
  [B1] pointer ht last assigned on line 74 could be null and is dereferenced by call to _dictInit() at line 75, column 5

disque/deps/hiredis/dict.c:261: error: NULL_DEREFERENCE
  [B1] pointer iter last assigned on line 259 could be null and is dereferenced at line 261, column 5

redisContextInit may return NULL, but its result is not checked.

disque/deps/hiredis/hiredis.c:1062: error: NULL_DEREFERENCE
  [B1] pointer c last assigned on line 1061 could be null and is dereferenced at line 1062, column 5

It looks like zrealloc is simply realloc, and its result is not checked for NULL.

deps/hiredis/sds.c:160: error: NULL_DEREFERENCE
  [B1] pointer sh last assigned on line 159 could be null and is dereferenced at line 160, column 5

what consensus algorithm?

Hi Antirez,
Can I ask, what consensus algorithm do you use in disque? Paxos or Raft or your home-make algorithm?

Thank you,
Dimi

dequeue always return 0

127.0.0.1:7711> addjob test test 100
DI4b895d5af4537526a2dd8d6c3f1cdd26e429b76705a0SQ
127.0.0.1:7711> addjob test test 100
DI4b895d5a0b47d22a9e4be148d83177e961375ef005a0SQ
127.0.0.1:7711> addjob test test 100
DI4b895d5a75b230ee71ede31f36940fe7a4c895df05a0SQ
127.0.0.1:7711> dequeue DI4b895d5af4537526a2dd8d6c3f1cdd26e429b76705a0SQ
(integer) 0
127.0.0.1:7711> dequeue DI4b895d5a0b47d22a9e4be148d83177e961375ef005a0SQ DI4b895d5a75b230ee71ede31f36940fe7a4c895df05a0SQ
(integer) 0
127.0.0.1:7711> qlen test
(integer) 0

QSCAN command proposal

QSCAN [COUNT <count>] [BLOCKING] [MINLEN <len>] [MAXLEN <len>] [IMPORTRATE <rate>]

This issue is a proposal for a queue iteration command for Disque, able to enlist al the existing queues names, and providing filter capabilities to only enlist a subset of the existing queues. The command is non blocking, and is very similar to Redis SCAN commands, providing a cursor based non blocking iterator that guarantees to return all the elements but may emit duplicates. Contrary to Redis a blocking option is provided for blocking, unique elements output semantics, because there are a great deal of use cases where the number of queues is small.

The command returns an iterator and a list of elements. We could set the default COUNT to a large value like ~100, in order for most use cases dealing with a few queues to get all the queue names with just a single QSCAN invocation, without any argument at all.

Options:

  • BLOCKING Return all the elements without duplicates in a blocking way. Dangerous.
  • MINLEN <count> Only return queues having at least (>=) count enqueued jobs.
  • MAXLEN <count> Only return queues having at most (<=) count enqueued jobs.
  • IMPORTRATE <msgsec> Only return queues importing at least msgsec messages per second from other nodes. If zero is specified all the queues importing a non-zero number of messages are returned.
  • COUNT <count> Perform count work at every iteration. Does not always map to the actual number of returned items like in Redis.

Please give some feedback. It's pretty trivial to implement but I don't want to write code straight away.

NACKJOB (re-queue) and WAITJOB

First of all, let me thank you for this awesome project. It feels fast & stable, even though it's still Alpha. Good job! ๐Ÿ‘

I'm trying to solve these two Use-cases:

  1. NACKJOB (force re-queue)

    Let's assume I'm a Consumer and I fail to process the message. It'd be great if I could NACKJOB id, so the job gets re-queued right away (if the RETRY timeout was specified).

    ** Or should I use ACKJOB id ADDJOB queue data instead? Can I do this atomically?

  2. WAITJOB

    It'd be great to have a blocking operation that would wait for job to finish (get ACKed):

    WAITJOB id [id ...] timeout
    ... in other words, similar cmd to Redis' BLPOP key [key ...] timeout

Feedback welcome, I could be missing something obvious.

Support for topic subscription

  • Does/will Disque support consumer pattern-matching subscription to a topic somehow?
    • For example: consume all messages with topic/key "?.cpu.idle"
  • Will there be support for fanout/one-to-many messaging patterns?

cluster meet not working

I'm trying to follow the README to setup a cluster of 2 disque nodes on 2 amazon ec2 instances. When i run disque cluster meet <IP> <PORT> disque replies with OK, but it seems no clustering has been setup.

The logs on the second node do not show anything and when i do ADDJOB queue body 0 on node1 and GETJOB FROM queue on node2 there's no job in the queue.

Am i missing something obvious here?

Expired items queue -- dead letter queue

Lets say i have put an item in the queue which causes the worker to crash. so disque re-queue it after RETRY time. Again picked up by the worker . This goes till the job expiry time. When disque expires an job it would be great to have expired job in deadletter queue.

Make node order depend on message id

This is just an enhancement request that changed the behaviour of the state machine on QUEUED message.

ON RECV cluster message QUEUED(string job-id):

1 - job = Call LOOKUP-JOB(job-id).
2 - IF job == NULL THEN return ASAP.
3 - IF job.state == acknowledged THEN return ASAP.
4 - IF job.state == queued THEN if sender node ID is greater than my node ID call DEQUEUE(job).
5 - Update job.qtime setting it to NOW + job.retry.

I'd suggesting modifying line 4 to include the job-id when ordering the nodes. Such as

4 - IF job.state == queued THEN if hash(sender node ID | job-id) is greater than hash(my node ID | job-id) call DEQUEUE(job).

In this way the "biggest" node id is not constant and you get more balance between nodes.

A home for dead jobs and retry limit

A common use case we have is creating jobs for external APIs. Upon repeated failure (failing a certain number of times) stick the job in a list of jobs that an admin needs to look at.

For example:

addjob emailnotify "info here" 0 retry 120 retrylimit 5 failure emailnotifydead

So if a job fails to ack 5 times, either through timeout or being re-enqued, it should go to the failure queue for an admin to look at if provided.

Awesome work, Antirez!

crash when "SHOW" of acked job

Doesn't seem consistent.

  1. addjob
  2. getjob
  3. ackjob
  4. show
  5. BOOM

=== DISQUE BUG REPORT START: Cut & paste starting from here ===
87713:P 27 Apr 11:31:59.342 # Disque 0.0.1 crashed by signal: 11
87713:P 27 Apr 11:31:59.342 # Failed assertion: (:0)
87713:P 27 Apr 11:31:59.342 # --- STACK TRACE
0 disque-server 0x000000010e3684f7 logStackTrace + 103
1 disque-server 0x000000010e35cb5f addReplyBulkLen + 31
2 libsystem_platform.dylib 0x00007fff99df1f1a _sigtramp + 26
3 ??? 0x0000000000000001 0x0 + 1
4 disque-server 0x000000010e35cc82 addReplyBulk + 18
5 disque-server 0x000000010e37347e showCommand + 238
6 disque-server 0x000000010e352ef4 call + 116
7 disque-server 0x000000010e353118 processCommand + 344
8 disque-server 0x000000010e35dc0c processInputBuffer + 204
9 disque-server 0x000000010e34cf24 aeProcessEvents + 548
10 disque-server 0x000000010e34d22b aeMain + 43
11 disque-server 0x000000010e354e4c main + 860
12 libdyld.dylib 0x00007fff8ca775c9 start + 1
87713:P 27 Apr 11:31:59.342 # --- INFO OUTPUT
87713:P 27 Apr 11:31:59.342 # # Server
disque_version:0.0.1
disque_git_sha1:8b43acac
disque_git_dirty:0
disque_build_id:932dedde84eaf248
os:Darwin 14.3.0 x86_64
arch_bits:64
multiplexing_api:kqueue
gcc_version:4.2.1
process_id:87713
run_id:3582ab3a4f4efd88c5263e0218244c49bbb97303
tcp_port:7711
uptime_in_seconds:65
uptime_in_days:0
hz:10
config_file:

Clients

connected_clients:1
client_longest_output_list:0
client_biggest_input_buf:0
blocked_clients:0

Memory

used_memory:960496
used_memory_human:937.98K
used_memory_rss:1511424
used_memory_peak:960496
used_memory_peak_human:937.98K
mem_fragmentation_ratio:1.57
mem_allocator:libc

Jobs

registered_jobs:1

Queues

registered_queues:1

Persistence

loading:0
aof_enabled:0
aof_rewrite_in_progress:0
aof_rewrite_scheduled:0
aof_last_rewrite_time_sec:-1
aof_current_rewrite_time_sec:-1
aof_last_bgrewrite_status:ok
aof_last_write_status:ok

Stats

total_connections_received:1
total_commands_processed:4
instantaneous_ops_per_sec:0
total_net_input_bytes:329
total_net_output_bytes:463
instantaneous_input_kbps:0.00
instantaneous_output_kbps:0.00
rejected_connections:0
latest_fork_usec:0

CPU

used_cpu_sys:0.40
used_cpu_user:0.19
used_cpu_sys_children:0.00
used_cpu_user_children:0.00

Commandstats

cmdstat_addjob:calls=1,usec=23,usec_per_call=23.00
cmdstat_ackjob:calls=2,usec=24,usec_per_call=12.00
cmdstat_show:calls=1,usec=15,usec_per_call=15.00
hash_init_value: 1429710352

87713:P 27 Apr 11:31:59.342 # --- CLIENT LIST OUTPUT
87713:P 27 Apr 11:31:59.342 # id=1 addr=127.0.0.1:62551 fd=9 name= age=49 idle=0 flags=N qbuf=0 qbuf-free=32768 obl=79 oll=0 omem=0 events=rw cmd=show

87713:P 27 Apr 11:31:59.342 # --- CURRENT CLIENT INFO
87713:P 27 Apr 11:31:59.342 # client: id=1 addr=127.0.0.1:62551 fd=9 name= age=49 idle=0 flags=N qbuf=0 qbuf-free=32768 obl=79 oll=0 omem=0 events=rw cmd=show
87713:P 27 Apr 11:31:59.342 # argv[0]: 'show'
87713:P 27 Apr 11:31:59.342 # argv[1]: 'DI366b5501c9919bb448aa2ceac136e73cfa0b165405a0SQ'
87713:P 27 Apr 11:31:59.342 # --- REGISTERS
87713:P 27 Apr 11:31:59.342 #
RAX:00007f8644800111 RBX:00007f8644800000
RCX:000000000000000a RDX:0000000000000000
RDI:00007f8644800000 RSI:0000000000000000
RBP:00007fff518b47a0 RSP:00007fff518b4700
R8 :0000000000000002 R9 :0000000000000000
R10:0000000000000000 R11:0000000001aff069
R12:000000010e380830 R13:00000000000537e3
R14:28005a139f657c50 R15:00007f8644800000
RIP:000000010e35cb5f EFL:0000000000010206
CS :000000000000002b FS:0000000000000000 GS:0000000000000000
87713:P 27 Apr 11:31:59.342 # (00007fff518b470f) -> 000000010e350837
87713:P 27 Apr 11:31:59.342 # (00007fff518b470e) -> 00007fff518b4780
87713:P 27 Apr 11:31:59.342 # (00007fff518b470d) -> 000000010e35bfb0
87713:P 27 Apr 11:31:59.342 # (00007fff518b470c) -> 00007fff518b47c0
87713:P 27 Apr 11:31:59.342 # (00007fff518b470b) -> 00007f8644800000
87713:P 27 Apr 11:31:59.342 # (00007fff518b470a) -> 00007f8642d293a0
87713:P 27 Apr 11:31:59.342 # (00007fff518b4709) -> 00000000000537e3
87713:P 27 Apr 11:31:59.342 # (00007fff518b4708) -> 28005a139f657c50
87713:P 27 Apr 11:31:59.342 # (00007fff518b4707) -> 00007f8644800000
87713:P 27 Apr 11:31:59.343 # (00007fff518b4706) -> 0000000000000000
87713:P 27 Apr 11:31:59.343 # (00007fff518b4705) -> 0000000000000000
87713:P 27 Apr 11:31:59.343 # (00007fff518b4704) -> 000000000001fffe
87713:P 27 Apr 11:31:59.343 # (00007fff518b4703) -> 0000000000000009
87713:P 27 Apr 11:31:59.343 # (00007fff518b4702) -> 00007f0a0d383424
87713:P 27 Apr 11:31:59.343 # (00007fff518b4701) -> 000000010e35cd52
87713:P 27 Apr 11:31:59.343 # (00007fff518b4700) -> 00007fff518b47c0
87713:P 27 Apr 11:31:59.343 #
=== DISQUE BUG REPORT END. Make sure to include from START to END. ===

Segfault when removing client from blocked list

I wasn't able to create a self-contained script to reproduce. This only happens when running the tests for Havanna (a Ruby library for consuming Disque queues). I tried to debug this without much luck, so I'm posting here to see if you can figure out what's going on.

One clue I have: the tests that I'm running try to verify that the workers shut down gracefully. So I send my Ruby process a TERM signal, the signal is trapped and a flag is set. Once GETJOB times out, the process exits.

I'll keep trying to isolate the issue and report back if I find anything.

=== DISQUE BUG REPORT START: Cut & paste starting from here ===
84756:P 12 Jul 21:12:14.987 #     Disque 0.0.1 crashed by signal: 11
84756:P 12 Jul 21:12:14.987 #     Failed assertion: <no assertion failed> (<no file>:0)
84756:P 12 Jul 21:12:14.987 # --- STACK TRACE
0   disque-server                       0x000000010bcf5b97 logStackTrace + 103
1   disque-server                       0x000000010bcda923 listSearchKey + 35
2   libsystem_platform.dylib            0x00007fff91d01f1a _sigtramp + 26
3   libsystem_c.dylib                   0x00007fff7b212358 __sF + 152
4   disque-server                       0x000000010bd01b35 unblockClientBlockedForJobs + 117
5   disque-server                       0x000000010bcfcaac unblockClient + 28
6   disque-server                       0x000000010bcdefe1 clientsCronHandleTimeout + 193
7   disque-server                       0x000000010bcdf0e3 clientsCron + 115
8   disque-server                       0x000000010bcdf3c8 serverCron + 376
9   disque-server                       0x000000010bcdb3ee aeProcessEvents + 830
10  disque-server                       0x000000010bcdb5cb aeMain + 43
11  disque-server                       0x000000010bce306c main + 860
12  libdyld.dylib                       0x00007fff9811f5c9 start + 1
84756:P 12 Jul 21:12:14.987 # --- INFO OUTPUT
84756:P 12 Jul 21:12:14.988 # # Server
disque_version:0.0.1
disque_git_sha1:5ee58c8f
disque_git_dirty:0
disque_build_id:72535184763aa2d7
os:Darwin 14.4.0 x86_64
arch_bits:64
multiplexing_api:kqueue
gcc_version:4.2.1
process_id:84756
run_id:01114988aa9371a539495d40b3c4709f568c8d4f
tcp_port:7711
uptime_in_seconds:8
uptime_in_days:0
hz:10
config_file:

# Clients
connected_clients:4
client_longest_output_list:0
client_biggest_input_buf:0
blocked_clients:2

# Memory
used_memory:979296
used_memory_human:956.34K
used_memory_rss:1556480
used_memory_peak:1060752
used_memory_peak_human:1.01M
mem_fragmentation_ratio:1.59
mem_allocator:libc

# Jobs
registered_jobs:0

# Queues
registered_queues:2

# Persistence
loading:0
aof_enabled:0
aof_state:off
aof_rewrite_in_progress:0
aof_rewrite_scheduled:0
aof_last_rewrite_time_sec:-1
aof_current_rewrite_time_sec:-1
aof_last_bgrewrite_status:ok
aof_last_write_status:ok

# Stats
total_connections_received:13
total_commands_processed:23
instantaneous_ops_per_sec:0
total_net_input_bytes:1138
total_net_output_bytes:1510
instantaneous_input_kbps:0.00
instantaneous_output_kbps:0.00
rejected_connections:0
latest_fork_usec:0

# CPU
used_cpu_sys:0.11
used_cpu_user:0.03
used_cpu_sys_children:0.00
used_cpu_user_children:0.00

# Commandstats
cmdstat_debug:calls=3,usec=51,usec_per_call=17.00
cmdstat_hello:calls=7,usec=54,usec_per_call=7.71
cmdstat_addjob:calls=4,usec=177,usec_per_call=44.25
cmdstat_getjob:calls=6,usec=184,usec_per_call=30.67
cmdstat_ackjob:calls=2,usec=81,usec_per_call=40.50
cmdstat_qlen:calls=1,usec=6,usec_per_call=6.00
hash_init_value: 1437499737

84756:P 12 Jul 21:12:14.988 # --- CLIENT LIST OUTPUT
84756:P 12 Jul 21:12:14.988 # id=5 addr=127.0.0.1:60814 fd=10 name= age=5 idle=5 flags=b qbuf=0 qbuf-free=0 obl=5 oll=0 omem=0 events=rw cmd=getjob
id=6 addr=127.0.0.1:60815 fd=11 name= age=5 idle=5 flags=N qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=addjob
id=13 addr=127.0.0.1:60822 fd=12 name= age=1 idle=1 flags=b qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=getjob
id=2 addr=127.0.0.1:60811 fd=9 name= age=5 idle=1 flags=N qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=debug

84756:P 12 Jul 21:12:14.988 # --- REGISTERS
84756:P 12 Jul 21:12:14.988 #
RAX:00007faea142f3f0 RBX:00007faea15006d0
RCX:0000000000000001 RDX:2820820800000000
RDI:000000010bd3e000 RSI:00007faea142f3f0
RBP:00007fff53f263d0 RSP:00007fff53f263b0
R8 :0000000000000001 R9 :00000000000002f2
R10:00007faea1400000 R11:0000000000000001
R12:0000000000000000 R13:0000000000000000
R14:00007faea2805400 R15:0000000000000000
RIP:000000010bcda923 EFL:0000000000010206
CS :000000000000002b FS:0000000000000000  GS:0000000000000000
84756:P 12 Jul 21:12:14.988 # (00007fff53f263bf) -> 00007faea2805400
84756:P 12 Jul 21:12:14.988 # (00007fff53f263be) -> 5e0013234f0321e6
84756:P 12 Jul 21:12:14.988 # (00007fff53f263bd) -> 000000010bcfcaac
84756:P 12 Jul 21:12:14.988 # (00007fff53f263bc) -> 00007fff53f26430
84756:P 12 Jul 21:12:14.988 # (00007fff53f263bb) -> 000000010bd0e940
84756:P 12 Jul 21:12:14.988 # (00007fff53f263ba) -> 0000000000000000
84756:P 12 Jul 21:12:14.988 # (00007fff53f263b9) -> 0000000000000000
84756:P 12 Jul 21:12:14.988 # (00007fff53f263b8) -> 0000000000000000
84756:P 12 Jul 21:12:14.988 # (00007fff53f263b7) -> 00007faea2805400
84756:P 12 Jul 21:12:14.988 # (00007fff53f263b6) -> 0000000000000002
84756:P 12 Jul 21:12:14.988 # (00007fff53f263b5) -> 000000010bd01b35
84756:P 12 Jul 21:12:14.988 # (00007fff53f263b4) -> 00007fff53f26410
84756:P 12 Jul 21:12:14.988 # (00007fff53f263b3) -> 00007faea142f7c0
84756:P 12 Jul 21:12:14.988 # (00007fff53f263b2) -> 00007faea2805400
84756:P 12 Jul 21:12:14.988 # (00007fff53f263b1) -> 000000010bd0e940
84756:P 12 Jul 21:12:14.988 # (00007fff53f263b0) -> 00007faea15006d0
84756:P 12 Jul 21:12:14.988 #
=== DISQUE BUG REPORT END. Make sure to include from START to END. ===

       Please report the crash by opening an issue on github:

           http://github.com/antirez/disque/issues

  Suspect RAM error? Use disque-server --test-memory to verify it.


Segmentation fault: 11

"Unknown encoding type" #object.c:319

=== DISQUE BUG REPORT START: Cut & paste starting from here ===
16525:P 21 May 11:30:15.598 # ------------------------------------------------
16525:P 21 May 11:30:15.598 # !!! Software Failure. Press left mouse button to continue
16525:P 21 May 11:30:15.598 # Guru Meditation: "Unknown encoding type" #object.c:319
16525:P 21 May 11:30:15.598 # (forcing SIGSEGV in order to print the stack trace)
16525:P 21 May 11:30:15.598 # ------------------------------------------------
16525:P 21 May 11:30:15.598 # Disque 0.0.1 crashed by signal: 11
16525:P 21 May 11:30:15.598 # Failed assertion: (:0)
16525:P 21 May 11:30:15.598 # --- STACK TRACE
/usr/local/bin/disque-server 0.0.0.0:7711(logStackTrace+0x3e)[0x4269be]
/usr/local/bin/disque-server 0.0.0.0:7711(_serverPanic+0x7f)[0x42625f]
/lib/x86_64-linux-gnu/libpthread.so.0(+0xfcb0)[0x7f2766663cb0]
/usr/local/bin/disque-server 0.0.0.0:7711(_serverPanic+0x7f)[0x42625f]
/usr/local/bin/disque-server 0.0.0.0:7711(getDecodedObject+0xbf)[0x41f86f]
/usr/local/bin/disque-server 0.0.0.0:7711(dictEncObjHash+0x2c)[0x412ddc]
/usr/local/bin/disque-server 0.0.0.0:7711(dictAddRaw+0x3b)[0x411f2b]
/usr/local/bin/disque-server 0.0.0.0:7711(dictAdd+0x1e)[0x41208e]
/usr/local/bin/disque-server 0.0.0.0:7711(clusterReplicateJob+0x76)[0x42ade6]
/usr/local/bin/disque-server 0.0.0.0:7711(clientsCronHandleDelayedJobReplication+0x47)[0x42ff47]
/usr/local/bin/disque-server 0.0.0.0:7711(clientsCron+0x8a)[0x413a9a]
/usr/local/bin/disque-server 0.0.0.0:7711(serverCron+0xe1)[0x415001]
/usr/local/bin/disque-server 0.0.0.0:7711(aeProcessEvents+0x20b)[0x4101cb]
/usr/local/bin/disque-server 0.0.0.0:7711(aeMain+0x2b)[0x4103db]
/usr/local/bin/disque-server 0.0.0.0:7711(main+0x302)[0x40f162]
/lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0xed)[0x7f27662b776d]
/usr/local/bin/disque-server 0.0.0.0:7711[0x40f27d]
16525:P 21 May 11:30:15.599 # --- INFO OUTPUT
16525:P 21 May 11:30:15.601 # # Server
disque_version:0.0.1
disque_git_sha1:9ce5cb67
disque_git_dirty:0
disque_build_id:adcb5342e3e5a0ed
os:Linux 3.2.0-69-virtual x86_64
arch_bits:64
multiplexing_api:epoll
gcc_version:4.6.3
process_id:16525
run_id:c625c8ab6ce9cacc9cb54e04cb06752fff8f15af
tcp_port:7711
uptime_in_seconds:341
uptime_in_days:0
hz:10
config_file:/etc/disque/7711/disque.conf

Clients

connected_clients:14639
client_longest_output_list:0
client_biggest_input_buf:0
blocked_clients:14639

Memory

used_memory:373456928
used_memory_human:356.16M
used_memory_rss:173277184
used_memory_peak:380637792
used_memory_peak_human:363.00M
mem_fragmentation_ratio:0.46
mem_allocator:jemalloc-3.6.0

Jobs

registered_jobs:54

Queues

registered_queues:1001

Persistence

loading:0
aof_enabled:0
aof_rewrite_in_progress:0
aof_rewrite_scheduled:0
aof_last_rewrite_time_sec:-1
aof_current_rewrite_time_sec:-1
aof_last_bgrewrite_status:ok
aof_last_write_status:ok

Stats

total_connections_received:375779
total_commands_processed:928483
instantaneous_ops_per_sec:60
total_net_input_bytes:189129863
total_net_output_bytes:250909533
instantaneous_input_kbps:1.73
instantaneous_output_kbps:8.10
rejected_connections:0
latest_fork_usec:0

CPU

used_cpu_sys:49.50
used_cpu_user:62.80
used_cpu_sys_children:0.00
used_cpu_user_children:0.00

Commandstats

cmdstat_cluster:calls=2,usec=35,usec_per_call=17.50
cmdstat_hello:calls=375777,usec=1500012,usec_per_call=3.99
cmdstat_addjob:calls=184231,usec=1505720,usec_per_call=8.17
cmdstat_getjob:calls=191546,usec=796183,usec_per_call=4.16
cmdstat_ackjob:calls=176927,usec=929356,usec_per_call=5.25
hash_init_value: 1432287713

16525:P 21 May 11:30:15.601 # --- CLIENT LIST OUTPUT
16525:P 21 May 11:30:15.642 # id=375574 addr=127.0.0.1:13080 fd=12204 name= age=30 idle=30 flags=b qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=addjob
id=353307 addr=127.0.0.1:55631 fd=5862 name= age=46 idle=46 flags=b qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=getjob
id=353308 addr=127.0.0.1:55632 fd=5986 name= age=46 idle=46 flags=b qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=getjob
id=353309 addr=127.0.0.1:55634 fd=6008 name= age=46 idle=46 flags=b qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=getjob
id=353310 addr=127.0.0.1:55636 fd=6024 name= age=46 idle=46 flags=b qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=getjob
id=353312 addr=127.0.0.1:55638 fd=8346 name= age=46 idle=46 flags=b qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=getjob
id=353314 addr=127.0.0.1:55640 fd=9665 name= age=46 idle=46 flags=b qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=getjob
id=353317 addr=127.0.0.1:55643 fd=10474 name= age=46 idle=46 flags=b qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=getjob
id=353320 addr=127.0.0.1:55646 fd=12737 name= age=46 idle=46 flags=b qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=getjob
id=353323 addr=127.0.0.1:55650 fd=13208 name= age=46 idle=46 flags=b qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=getjob
id=353324 addr=127.0.0.1:55649 fd=13307 name= age=46 idle=46 flags=b qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=getjob
id=353327 addr=127.0.0.1:55657 fd=13799 name= age=46 idle=46 flags=b qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=getjob
id=353329 addr=127.0.0.1:55664 fd=738 name= age=46 idle=46 flags=b qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=getjob
...
...
...
id=375560 addr=127.0.0.1:13059 fd=10645 name= age=30 idle=30 flags=b qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=addjob
id=375561 addr=127.0.0.1:13060 fd=10886 name= age=30 idle=30 flags=b qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=addjob
id=375562 addr=127.0.0.1:13061 fd=10889 name= age=30 idle=30 flags=b qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=addjob
id=375563 addr=127.0.0.1:13063 fd=11165 name= age=30 idle=30 flags=b qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=addjob
id=375564 addr=127.0.0.1:13067 fd=11182 name= age=30 idle=30 flags=b qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=addjob
id=375565 addr=127.0.0.1:13070 fd=11335 name= age=30 idle=30 flags=b qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=addjob
id=375566 addr=127.0.0.1:13071 fd=11586 name= age=30 idle=30 flags=b qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=addjob
id=375567 addr=127.0.0.1:13072 fd=11620 name= age=30 idle=30 flags=b qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=addjob
id=375568 addr=127.0.0.1:13073 fd=11822 name= age=30 idle=30 flags=b qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=addjob
id=375569 addr=127.0.0.1:13074 fd=11847 name= age=30 idle=30 flags=b qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=addjob
id=375570 addr=127.0.0.1:13076 fd=11853 name= age=30 idle=30 flags=b qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=addjob
id=375571 addr=127.0.0.1:13077 fd=11945 name= age=30 idle=30 flags=b qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=addjob
id=375572 addr=127.0.0.1:13078 fd=12093 name= age=30 idle=30 flags=b qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=addjob
id=375573 addr=127.0.0.1:13079 fd=12186 name= age=30 idle=30 flags=b qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=addjob

16525:P 21 May 11:30:15.644 # --- REGISTERS
16525:P 21 May 11:30:15.645 #
RAX:0000000000000000 RBX:00000000004621f8
RCX:00007f276664d778 RDX:0000000000000000
RDI:0000000000020000 RSI:00007f276664d778
RBP:00000000004621c7 RSP:00007fff8d35d700
R8 :00007f276664d770 R9 :0000000000000001
R10:0000000000000000 R11:0000000000000206
R12:000000000000013f R13:00007f274eac5980
R14:0000000000000002 R15:0000000000000000
RIP:000000000042625f EFL:0000000000010206
CSGSFS:000000000000e033
16525:P 21 May 11:30:15.645 # (00007fff8d35d70f) -> 000000000043cf2d
16525:P 21 May 11:30:15.645 # (00007fff8d35d70e) -> 00007f2765c000c0
16525:P 21 May 11:30:15.645 # (00007fff8d35d70d) -> 00007f274e000000
16525:P 21 May 11:30:15.645 # (00007fff8d35d70c) -> 000000000000000c
16525:P 21 May 11:30:15.645 # (00007fff8d35d70b) -> 0000000000412ddc
16525:P 21 May 11:30:15.645 # (00007fff8d35d70a) -> 00007f274eac5980
16525:P 21 May 11:30:15.645 # (00007fff8d35d709) -> ddf8e1c14afef100
16525:P 21 May 11:30:15.645 # (00007fff8d35d708) -> 00007f2759b07148
16525:P 21 May 11:30:15.645 # (00007fff8d35d707) -> 0000000000410401
16525:P 21 May 11:30:15.645 # (00007fff8d35d706) -> 0000000000000001
16525:P 21 May 11:30:15.645 # (00007fff8d35d705) -> ddf8e1c14afef100
16525:P 21 May 11:30:15.645 # (00007fff8d35d704) -> 00007f2765c000c0
16525:P 21 May 11:30:15.645 # (00007fff8d35d703) -> 000000000041f86f
16525:P 21 May 11:30:15.645 # (00007fff8d35d702) -> 00007f276585d0c0
16525:P 21 May 11:30:15.645 # (00007fff8d35d701) -> 00007f274eac5980
16525:P 21 May 11:30:15.645 # (00007fff8d35d700) -> 00007f274eac5980
16525:P 21 May 11:30:15.645 # --- FAST MEMORY TEST
16525:P 21 May 11:30:15.645 # Bio thread for job type #0 terminated
16525:P 21 May 11:30:15.645 # Bio thread for job type #1 terminated
16525:P 21 May 11:30:19.644 # Fast memory test PASSED, however your memory can still be broken. Please run a memory test for several hours if possible.
16525:P 21 May 11:30:19.644 #
=== DISQUE BUG REPORT END. Make sure to include from START to END. ===

WORKING command: a way to delay next deliveries of the same message.

When submitting a job with ADDJOB, you can't necessarily predict the total amount of time that a job will take in order to provide a proper RETRY time. In the context of locks that time out, there is the concept of "refreshing" the lock to ensure it is still held by the owner. I think it would make sense to add a command whose purpose is to reset the RETRY timer, in effect allowing a worker to say "I'm not done, but I'm still working on this, I may still fail, but don't retry it yet."

Something like WORKING jobid1 ... or given that ASYNC is not a valid jobid, WORKING jobid1 ... [ASYNC], depending on whether async retry resets make sense.

Documentation-wise: Tells Disque that the provided job ids are still being worked on, and the retry timer should be reset as though those jobs had just been fetched by GETJOB.

QSCAN test is missing

The Redis SCAN test can surely be useful as a reference to test the QSCAN command in Disque.

Crash on network partitions

A network partitions test (available on Jepsen a1938c734460f9180ab68177aafc299fb6a09f36) seems to reliably segfault Disque:

=== DISQUE BUG REPORT START: Cut & paste starting from here ===
12277:P 02 Jul 16:24:41.683 #     Disque 0.0.1 crashed by signal: 11
12277:P 02 Jul 16:24:41.683 #     Failed assertion: <no assertion failed> (<no file>:0)
12277:P 02 Jul 16:24:41.683 # --- STACK TRACE
/opt/disque/src/disque-server *:7711(logStackTrace+0x75)[0x425925]
/opt/disque/src/disque-server *:7711(gotAckReceived+0xa2)[0x431642]
/lib/x86_64-linux-gnu/libpthread.so.0(+0xf8d0)[0x7fd655d888d0]
/opt/disque/src/disque-server *:7711(gotAckReceived+0xa2)[0x431642]
/opt/disque/src/disque-server *:7711(clusterProcessPacket+0x8c3)[0x428b93]
/opt/disque/src/disque-server *:7711(clusterReadHandler+0x83)[0x428f13]
/opt/disque/src/disque-server *:7711(aeProcessEvents+0x133)[0x410083]
/opt/disque/src/disque-server *:7711(aeMain+0x2b)[0x41039b]
/opt/disque/src/disque-server *:7711(main+0x302)[0x40f192]
/lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0xf5)[0x7fd6559f1b45]
/opt/disque/src/disque-server *:7711[0x40f2a9]
12277:P 02 Jul 16:24:41.683 # --- INFO OUTPUT
12277:P 02 Jul 16:24:41.683 # # Server
disque_version:0.0.1
disque_git_sha1:5df8e1d7
disque_git_dirty:0
disque_build_id:284a4b8aa08564b1
os:Linux 3.16.0-4-amd64 x86_64
arch_bits:64
multiplexing_api:epoll
gcc_version:4.9.2
process_id:12277
run_id:704c6ed80ac68eccd052fd628cae460ed8a6ff6b
tcp_port:7711
uptime_in_seconds:71
uptime_in_days:0
hz:10
config_file:/opt/disque/disque.conf

# Clients
connected_clients:1
client_longest_output_list:0
client_biggest_input_buf:0
blocked_clients:0

# Memory
used_memory:740848
used_memory_human:723.48K
used_memory_rss:3731456
used_memory_peak:777200
used_memory_peak_human:758.98K
mem_fragmentation_ratio:5.04
mem_allocator:jemalloc-3.6.0

# Jobs
registered_jobs:14

# Queues
registered_queues:1

# Persistence
loading:0
aof_enabled:1
aof_rewrite_in_progress:0
aof_rewrite_scheduled:0
aof_last_rewrite_time_sec:-1
aof_current_rewrite_time_sec:-1
aof_last_bgrewrite_status:ok
aof_last_write_status:ok
aof_current_size:21824
aof_base_size:0
aof_pending_rewrite:0
aof_buffer_length:0
aof_rewrite_buffer_length:0
aof_pending_bio_fsync:0
aof_delayed_fsync:0

# Stats
total_connections_received:2
total_commands_processed:93
instantaneous_ops_per_sec:1
total_net_input_bytes:7331
total_net_output_bytes:4443
instantaneous_input_kbps:0.09
instantaneous_output_kbps:0.05
rejected_connections:0
latest_fork_usec:0

# CPU
used_cpu_sys:0.72
used_cpu_user:0.37
used_cpu_sys_children:0.00
used_cpu_user_children:0.00

# Commandstats
cmdstat_cluster:calls=1,usec=12,usec_per_call=12.00
cmdstat_addjob:calls=37,usec=531,usec_per_call=14.35
cmdstat_getjob:calls=30,usec=424,usec_per_call=14.13
cmdstat_ackjob:calls=25,usec=241,usec_per_call=9.64
hash_init_value: 1436079788

12277:P 02 Jul 16:24:41.683 # --- CLIENT LIST OUTPUT
12277:P 02 Jul 16:24:41.683 # id=2 addr=192.168.122.1:41741 fd=18 name= age=67 idle=1 flags=N qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=ackjob

12277:P 02 Jul 16:24:41.683 # --- REGISTERS
12277:P 02 Jul 16:24:41.683 # 
RAX:0000000000000000 RBX:00007fd655014380
RCX:00007fd6550117a0 RDX:00000000000000e8
RDI:0000000000000000 RSI:00007fd65505d408
RBP:00007fd65505d3e0 RSP:00007fffdf310160
R8 :00007fd65505d408 R9 :00000000fffffff7
R10:000000298a2b10d0 R11:00007fd655b3ac70
R12:0000000000000001 R13:0000000000000000
R14:0000000000000003 R15:00007fd65505b850
RIP:0000000000431642 EFL:0000000000010246
CSGSFS:0000000000000033
12277:P 02 Jul 16:24:41.683 # (00007fffdf31016f) -> 00007fd65509e8a8
12277:P 02 Jul 16:24:41.683 # (00007fffdf31016e) -> 00007fd6550d05e0
12277:P 02 Jul 16:24:41.683 # (00007fffdf31016d) -> 0000000000416d31
12277:P 02 Jul 16:24:41.683 # (00007fffdf31016c) -> 0000000000000008
12277:P 02 Jul 16:24:41.683 # (00007fffdf31016b) -> 0000000000000008
12277:P 02 Jul 16:24:41.683 # (00007fffdf31016a) -> 0000000000000128
12277:P 02 Jul 16:24:41.683 # (00007fffdf310169) -> 0000000000418471
12277:P 02 Jul 16:24:41.683 # (00007fffdf310168) -> 00007fd65505f140
12277:P 02 Jul 16:24:41.683 # (00007fffdf310167) -> 0000000000000008
12277:P 02 Jul 16:24:41.683 # (00007fffdf310166) -> 0000000000000131
12277:P 02 Jul 16:24:41.683 # (00007fffdf310165) -> 0000000000428b93
12277:P 02 Jul 16:24:41.683 # (00007fffdf310164) -> 00007fd65505f148
12277:P 02 Jul 16:24:41.683 # (00007fffdf310163) -> 00007fd65505f158
12277:P 02 Jul 16:24:41.683 # (00007fffdf310162) -> 000000000000000a
12277:P 02 Jul 16:24:41.683 # (00007fffdf310161) -> 00007fd65505d3e0
12277:P 02 Jul 16:24:41.683 # (00007fffdf310160) -> 0000000000000001
12277:P 02 Jul 16:24:41.683 # --- FAST MEMORY TEST
12277:P 02 Jul 16:24:41.683 # Bio thread for job type #0 terminated
12277:P 02 Jul 16:24:41.684 # Bio thread for job type #1 terminated
12277:P 02 Jul 16:24:41.927 # Fast memory test PASSED, however your memory can still be broken. Please run a memory test for several hours if possible.
12277:P 02 Jul 16:24:41.927 # 
=== DISQUE BUG REPORT END. Make sure to include from START to END. ===

       Please report the crash by opening an issue on github:

           http://github.com/antirez/disque/issues

  Suspect RAM error? Use disque-server --test-memory to verify it.

Join cluster using hostnames

When joining a cluster using hostnames and not actual IP addresses, we see the following error using the command disque cluster meet some-node.example.com 7711:

(error) ERR Invalid node address specified: some-node.example.com:7711

If we supply the actual IP address of that server, the join command works.

Is this expected behavior?

More cluster aware MAXLEN option in ADDJOB

The idea is that while we don't want to run consensus or alike to make sure the queue is a given length, it should have a more global meaning. Example of things we could do:

  1. Send the MAXLEN count in the ADDJOB cluster packet, and let the receiver discard the packet or NACK if the MAXLEN is reached on the receiver. This way the option is already more cluster aware since the job will be replicated only by instances where the queue length is less than MAXLEN. In turn this means that all the node will eventually reach MAXLEN if the consumers are not fast enough.
  2. Reply to ADDJOB (GOTJOB) may carry the current length of the queue in the target so that if the sum of our queue length and the replicas is greater than MAXLEN, we don't accept the job.

There is to consider if in the Big Picture this really helps or not. The current behavior is to just check the local queue length. Given Disque automatic federation, if the length was reached in the local node, likely there is no demand or not enough demand for jobs cluster-wide, so it may make sense to have just a local limit. However this issue is here as a placeholder in order to rethink the problem soon or later.

Disque sometimes crashes when the client closes the connection

I can trigger this after a few iterations of my script, which closes the socket somewhat abruptly (socket.end() in Node.js). See comments, script to reproduce is this: https://gist.github.com/djanowski/cfc662d64585b284db9c

=== DISQUE BUG REPORT START: Cut & paste starting from here ===
27560:P 30 Apr 11:35:15.459 # ------------------------------------------------
27560:P 30 Apr 11:35:15.459 # !!! Software Failure. Press left mouse button to continue
27560:P 30 Apr 11:35:15.459 # Guru Meditation: "Unknown btype in unblockClient()." #blocked.c:136
27560:P 30 Apr 11:35:15.459 # (forcing SIGSEGV in order to print the stack trace)
27560:P 30 Apr 11:35:15.460 # ------------------------------------------------
27560:P 30 Apr 11:35:15.460 #     Disque 0.0.1 crashed by signal: 11
27560:P 30 Apr 11:35:15.460 #     Failed assertion: <no assertion failed> (<no file>:0)
27560:P 30 Apr 11:35:15.460 # --- STACK TRACE
0   disque-server                       0x00000001056a1757 logStackTrace + 103
1   disque-server                       0x00000001056a14ec _serverPanic + 172
2   libsystem_platform.dylib            0x00007fff92b55f1a _sigtramp + 26
3   libsystem_c.dylib                   0x00007fff8517820a vsnprintf + 80
4   disque-server                       0x00000001056a8857 unblockClient + 135
5   disque-server                       0x00000001056abd00 jobReplicationAchieved + 208
6   disque-server                       0x00000001056a47ca clusterProcessPacket + 1578
7   disque-server                       0x00000001056a30f6 clusterReadHandler + 262
8   disque-server                       0x0000000105686ee4 aeProcessEvents + 548
9   disque-server                       0x00000001056871db aeMain + 43
10  disque-server                       0x000000010568ecfc main + 860
11  libdyld.dylib                       0x00007fff8b8e05c9 start + 1
12  ???                                 0x0000000000000013 0x0 + 19
27560:P 30 Apr 11:35:15.461 # --- INFO OUTPUT
27560:P 30 Apr 11:35:15.461 # # Server
disque_version:0.0.1
disque_git_sha1:b3652518
disque_git_dirty:0
disque_build_id:8710da5c468ed4fd
os:Darwin 14.3.0 x86_64
arch_bits:64
multiplexing_api:kqueue
gcc_version:4.2.1
process_id:27560
run_id:c65db17757d8653fe9976eb7efc262ff00b0d816
tcp_port:7712
uptime_in_seconds:22
uptime_in_days:0
hz:10
config_file:

# Clients
connected_clients:1
client_longest_output_list:0
client_biggest_input_buf:117
blocked_clients:1

# Memory
used_memory:964704
used_memory_human:942.09K
used_memory_rss:1351680
used_memory_peak:964704
used_memory_peak_human:942.09K
mem_fragmentation_ratio:1.40
mem_allocator:libc

# Jobs
registered_jobs:2

# Queues
registered_queues:1

# Persistence
loading:0
aof_enabled:0
aof_rewrite_in_progress:0
aof_rewrite_scheduled:0
aof_last_rewrite_time_sec:-1
aof_current_rewrite_time_sec:-1
aof_last_bgrewrite_status:ok
aof_last_write_status:ok

# Stats
total_connections_received:30
total_commands_processed:80
instantaneous_ops_per_sec:23
total_net_input_bytes:2368
total_net_output_bytes:13304
instantaneous_input_kbps:0.66
instantaneous_output_kbps:3.75
rejected_connections:0
latest_fork_usec:0

# CPU
used_cpu_sys:0.12
used_cpu_user:0.04
used_cpu_sys_children:0.00
used_cpu_user_children:0.00

# Commandstats
cmdstat_ping:calls=2,usec=8,usec_per_call=4.00
cmdstat_debug:calls=24,usec=325,usec_per_call=13.54
cmdstat_cluster:calls=1,usec=20,usec_per_call=20.00
cmdstat_hello:calls=41,usec=347,usec_per_call=8.46
cmdstat_addjob:calls=12,usec=322,usec_per_call=26.83
hash_init_value: 1431059163

27560:P 30 Apr 11:35:15.461 # --- CLIENT LIST OUTPUT
27560:P 30 Apr 11:35:15.461 # id=30 addr=127.0.0.1:64888 fd=11 name= age=0 idle=0 flags=b qbuf=117 qbuf-free=32651 obl=51 oll=0 omem=0 events=rw cmd=addjob

27560:P 30 Apr 11:35:15.461 # --- REGISTERS
27560:P 30 Apr 11:35:15.461 # 
RAX:4c008c55a4f8b912 RBX:00000001056b4dec
RCX:00007fff73ba7070 RDX:00007fff5a579a50
RDI:0002a5000002a603 RSI:0002a6000002a600
RBP:00007fff5a579ec0 RSP:00007fff5a579ea0
R8 :00007fff5a5798a0 R9 :00007fff75317300
R10:00000001056ea668 R11:0000000000000040
R12:00000001056b3a6d R13:00000001056b9830
R14:0000000000000088 R15:00000001056b4de2
RIP:00000001056a14ec EFL:0000000000010206
CS :000000000000002b FS:0000000000000000  GS:0000000000000000
27560:P 30 Apr 11:35:15.462 # (00007fff5a579eaf) -> 00007f9243500ff8
27560:P 30 Apr 11:35:15.462 # (00007fff5a579eae) -> 0000000000000001
27560:P 30 Apr 11:35:15.462 # (00007fff5a579ead) -> 00007f9243601260
27560:P 30 Apr 11:35:15.462 # (00007fff5a579eac) -> 4c008c55a4f8b912
27560:P 30 Apr 11:35:15.462 # (00007fff5a579eab) -> 0000000000000005
27560:P 30 Apr 11:35:15.463 # (00007fff5a579eaa) -> 00007f92436008f0
27560:P 30 Apr 11:35:15.463 # (00007fff5a579ea9) -> 00000001056abd00
27560:P 30 Apr 11:35:15.463 # (00007fff5a579ea8) -> 00007fff5a579f30
27560:P 30 Apr 11:35:15.463 # (00007fff5a579ea7) -> 00007f9243601260
27560:P 30 Apr 11:35:15.463 # (00007fff5a579ea6) -> 00007f92440014f5
27560:P 30 Apr 11:35:15.463 # (00007fff5a579ea5) -> 00000001056a8857
27560:P 30 Apr 11:35:15.463 # (00007fff5a579ea4) -> 00007fff5a579ee0
27560:P 30 Apr 11:35:15.464 # (00007fff5a579ea3) -> 00007f9243601380
27560:P 30 Apr 11:35:15.464 # (00007fff5a579ea2) -> 00007f9244001400
27560:P 30 Apr 11:35:15.464 # (00007fff5a579ea1) -> 00007fff73ba7070
27560:P 30 Apr 11:35:15.464 # (00007fff5a579ea0) -> 00007f9244001400
27560:P 30 Apr 11:35:15.464 # 
=== DISQUE BUG REPORT END. Make sure to include from START to END. ===

       Please report the crash by opening an issue on github:

           http://github.com/antirez/disque/issues

  Suspect RAM error? Use disque-server --test-memory to verify it.

Configurable QUEUE_MAX_IDLE_TIME

With many transient queues it would be helpful if the GC interval (QUEUE_MAX_IDLE_TIME) was configurable in the configuration file and possibly with CONFIG SET. This in order to more aggressively clean up obsolete queues.

UPDATEJOB jobid value [options]

First of all, Thanks for this development, it something that I was looking to found. And already it looks very promising!

I'm wondering if you plane to implement the "Update job" command.

In the case where you have already add a job in a queue (ADDJOB); the job was not already ACK and you want to change the "value" of this job or an option (Timeout, DELAY...), without changing the JobId.

For this usecase, I think it should be interesting to have an UPDATEJOB command.

UPDATEJOB jobid value [options]

What do you think?

Disque can report a QLEN higher than the number of available jobs (race condition)

Hi,

This is another bug, it is highly non-deterministic and requires you to be fairly "lucky" to hit it. One disque server is needed. Same test as in #22, running the same commands, but following the insert of the 8 jobs, run a QLEN. In rare circumstances, disque will report 9 rather than 8. It may be related to deletions not "falling through" or it may be related to other data entirely. When I first provoked this error, the data inserted was random data, so it may be due to an earlier state.

It looks like some kind of race internally, though all my data access is sequential on a single connection toward disque.

GETJOB command failed to return

Run 127.0.0.1:7711> GETJOB COUNT 1 FROM queue1, the client is hanging there. This happens intermittently.
debugger gives this trace:
#0 0x00007f8daa6c53a0 in __read_nocancel () at ../sysdeps/unix/syscall-template.S:81
#1 0x0000000000412b0e in read (__nbytes=16384, __buf=0x7fffe3d19c50, __fd=) at /usr/include/x86_64-linux-gnu/bits/unistd.h:44
#2 redisBufferRead (c=c@entry=0x1730720) at hiredis.c:1142
#3 0x0000000000412de2 in redisGetReply (c=0x1730720, reply=reply@entry=0x7fffe3d1dcd8) at hiredis.c:1228
#4 0x000000000040d0c0 in cliReadReply (output_raw_strings=output_raw_strings@entry=0) at disque-cli.c:519
#5 0x000000000040ecb8 in cliSendCommand (argc=5, argv=argv@entry=0x7f8da9810130, repeat=0, repeat@entry=1) at disque-cli.c:655
#6 0x0000000000409427 in repl () at disque-cli.c:938
#7 main (argc=, argv=) at disque-cli.c:1971

make test fail to discover nodes

root@server:~/disque# git log -n 1
commit c634697
Merge: 83362b2 12770bd
Author: Salvatore Sanfilippo [email protected]
Date: Thu Jun 11 11:26:34 2015 +0200

Merge pull request #79 from sunheehnus/cluster

cluster: fix oversights

root@server:/disque# make clean
cd src && make clean
make[1]: Entering directory /root/disque/src' rm -rf disque-server disque disque-check-aof *.o *.gcda *.gcno *.gcov disque.info lcov-html make[1]: Leaving directory/root/disque/src'
root@ip-10-230-207-79:
/disque# make
cd src && make all
make[1]: Entering directory `/root/disque/src'
CC adlist.o
CC ae.o
CC anet.o
CC dict.o
CC disque.o
CC sds.o
CC zmalloc.o
CC lzf_c.o
CC lzf_d.o
CC pqsort.o
CC sha1.
o CC release.o
CC networking.o
CC util.o
CC object.o
CC config.o
CC aof.o
CC debug.o
CC syncio.o
CC cluster.o
CC crc16.o
CC endianconv.o
CC slowlog.o
CC bio.o
CC memtest.o
CC crc64.o
CC setproctitle.o
CC blocked.o
CC latency.o
CC sparkline.o
CC rio.o
CC job.o
CC queue.o
CC skiplist.o
CC ack.o
LINK disque-server
CC disque-cli.o
LINK disque
CC disque-check-aof.o
LINK disque-check-aof

Hint: It's a good idea to run 'make test' ;)

make[1]: Leaving directory /root/disque/src' root@server:~/disque# make test cd src && make test make[1]: Entering directory/root/disque/src'
Starting disque #0 at port 25000
Starting disque #1 at port 25001
Starting disque #2 at port 25002
Starting disque #3 at port 25003
Starting disque #4 at port 25004
Starting disque #5 at port 25005
Starting disque #6 at port 25006
Testing unit: 00-base.tcl
09:27:40> (init) Restart killed instances: OK
09:27:40> Cluster nodes are reachable: OK
09:27:40> Cluster nodes hard reset: OK
09:27:40> Cluster Join and auto-discovery test: Cluster failed to join into a full mesh.
(Jumping to next unit after error)
Testing unit: 01-faildet.tcl
09:28:30> (init) Restart killed instances: OK
09:28:30> Cluster nodes are reachable: OK
09:28:30> Cluster nodes hard reset: OK
09:28:30> Cluster Join and auto-discovery test:

Segfault

Got the following crash while creating a client library. The commands being executed related to JSCAN were:

"JSCAN" "0" "COUNT" "1000"
"JSCAN" "0" "COUNT" "1000" "QUEUE" "queue17"

No other JSCAN parameters were used. I did not run a full memory check yet.

19621:C 16 Aug 03:09:30.740 # Warning: no config file specified, using the default config. In order to specify a config file use ./src/disque-server /path/to/disque.conf
19621:P 16 Aug 03:09:30.742 # You requested maxclients of 10000 requiring at least 10032 max file descriptors.
19621:P 16 Aug 03:09:30.742 # Server can't set maximum open files to 10032 because of OS error: Operation not permitted.
19621:P 16 Aug 03:09:30.742 # Current maximum open files is 2048. maxclients has been reduced to 2016 to compensate for low ulimit. If you need higher maxclients increase 'ulimit -n'.
19621:P 16 Aug 03:09:30.743 * Node configuration loaded, I'm d1dbbca17a9759ccf377a24132a7f7dc380f2918
                                        Disque 0.0.1 (6dbfa4c1/0) 64 bit
          _ -                                                        
        .                               Port: 7711
        .    o    .                     PID: 19621
                 .                                                   
               -                              http://disque.io       


19621:P 16 Aug 03:09:30.743 # Server started, Disque version 0.0.1
19621:P 16 Aug 03:09:30.743 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128.
19621:P 16 Aug 03:09:30.743 * The server is now ready to accept connections on port 7711
19621:P 16 Aug 03:17:17.768 # 

=== DISQUE BUG REPORT START: Cut & paste starting from here ===
19621:P 16 Aug 03:17:17.768 #     Disque 0.0.1 crashed by signal: 11
19621:P 16 Aug 03:17:17.768 #     Failed assertion: <no assertion failed> (<no file>:0)
19621:P 16 Aug 03:17:17.768 # --- STACK TRACE
./src/disque-server *:7711(logStackTrace+0x33)[0x4297d3]
./src/disque-server *:7711(equalStringObjects+0x4)[0x4224d4]
/lib/x86_64-linux-gnu/libpthread.so.0(+0x10340)[0x7f7ccb04a340]
./src/disque-server *:7711(equalStringObjects+0x4)[0x4224d4]
./src/disque-server *:7711(jscanCallback+0x20)[0x4320b0]
./src/disque-server *:7711(dictScan+0x198)[0x412f98]
./src/disque-server *:7711(jscanCommand+0x178)[0x434528]
./src/disque-server *:7711(call+0x50)[0x415440]
./src/disque-server *:7711(processCommand+0xf2)[0x416ea2]
./src/disque-server *:7711(processInputBuffer+0x97)[0x420627]
./src/disque-server *:7711(aeProcessEvents+0x250)[0x410610]
./src/disque-server *:7711(aeMain+0x2b)[0x41084b]
./src/disque-server *:7711(main+0x325)[0x40f615]
/lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0xf5)[0x7f7ccac96ec5]
./src/disque-server *:7711[0x40f739]
19621:P 16 Aug 03:17:17.770 # --- INFO OUTPUT
19621:P 16 Aug 03:17:17.770 # # Server
disque_version:0.0.1
disque_git_sha1:6dbfa4c1
disque_git_dirty:0
disque_build_id:63f6d787d079142c
os:Linux 3.8.11 x86_64
arch_bits:64
multiplexing_api:epoll
gcc_version:4.8.4
process_id:19621
run_id:0b271fe02f5f2fad3fef2380e8ab1c420029b2a7
tcp_port:7711
uptime_in_seconds:467
uptime_in_days:0
hz:10
config_file:

# Clients
connected_clients:4
client_longest_output_list:0
client_biggest_input_buf:0
blocked_clients:0

# Memory
used_memory:500888
used_memory_human:489.15K
used_memory_rss:1814528
used_memory_peak:500888
used_memory_peak_human:489.15K
mem_fragmentation_ratio:3.62
mem_allocator:jemalloc-3.6.0

# Jobs
registered_jobs:23

# Queues
registered_queues:11

# Persistence
loading:0
aof_enabled:0
aof_state:off
aof_rewrite_in_progress:0
aof_rewrite_scheduled:0
aof_last_rewrite_time_sec:-1
aof_current_rewrite_time_sec:-1
aof_last_bgrewrite_status:ok
aof_last_write_status:ok

# Stats
total_connections_received:25
total_commands_processed:62
instantaneous_ops_per_sec:0
total_net_input_bytes:3691
total_net_output_bytes:9925
instantaneous_input_kbps:0.00
instantaneous_output_kbps:0.00
rejected_connections:0
latest_fork_usec:0

# CPU
used_cpu_sys:8.53
used_cpu_user:6.64
used_cpu_sys_children:0.00
used_cpu_user_children:0.00

# Commandstats
cmdstat_info:calls=1,usec=65,usec_per_call=65.00
cmdstat_monitor:calls=1,usec=5,usec_per_call=5.00
cmdstat_hello:calls=1,usec=5,usec_per_call=5.00
cmdstat_addjob:calls=26,usec=484,usec_per_call=18.62
cmdstat_getjob:calls=6,usec=272,usec_per_call=45.33
cmdstat_ackjob:calls=3,usec=15,usec_per_call=5.00
cmdstat_fastack:calls=3,usec=14,usec_per_call=4.67
cmdstat_deljob:calls=1,usec=8,usec_per_call=8.00
cmdstat_jscan:calls=18,usec=207,usec_per_call=11.50
cmdstat_enqueue:calls=1,usec=12,usec_per_call=12.00
cmdstat_dequeue:calls=1,usec=7,usec_per_call=7.00
hash_init_value: 1440444137

19621:P 16 Aug 03:17:17.770 # --- CLIENT LIST OUTPUT
19621:P 16 Aug 03:17:17.770 # id=12 addr=127.0.0.1:42917 fd=9 name= age=170 idle=162 flags=N qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=jscan
id=22 addr=127.0.0.1:42979 fd=12 name= age=0 idle=0 flags=N qbuf=0 qbuf-free=32768 obl=0 oll=0 omem=0 events=r cmd=getjob
id=24 addr=127.0.0.1:42981 fd=11 name= age=0 idle=0 flags=N qbuf=0 qbuf-free=32768 obl=0 oll=0 omem=0 events=r cmd=jscan
id=25 addr=127.0.0.1:42982 fd=10 name= age=0 idle=0 flags=N qbuf=0 qbuf-free=32768 obl=0 oll=0 omem=0 events=r cmd=hello

19621:P 16 Aug 03:17:17.770 # --- CURRENT CLIENT INFO
19621:P 16 Aug 03:17:17.771 # client: id=24 addr=127.0.0.1:42981 fd=11 name= age=0 idle=0 flags=N qbuf=0 qbuf-free=32768 obl=0 oll=0 omem=0 events=r cmd=jscan
19621:P 16 Aug 03:17:17.771 # argv[0]: 'JSCAN'
19621:P 16 Aug 03:17:17.771 # argv[1]: '0'
19621:P 16 Aug 03:17:17.771 # argv[2]: 'COUNT'
19621:P 16 Aug 03:17:17.771 # argv[3]: '1000'
19621:P 16 Aug 03:17:17.771 # argv[4]: 'QUEUE'
19621:P 16 Aug 03:17:17.771 # argv[5]: 'queue16'
19621:P 16 Aug 03:17:17.771 # --- REGISTERS
19621:P 16 Aug 03:17:17.771 # 
RAX:0000000000000000 RBX:00007ffc5ececdc0
RCX:0000000000000000 RDX:0000000000000007
RDI:0000000000000000 RSI:00007f7cca414aa0
RBP:00007f7cca416680 RSP:00007ffc5ececcf0
R8 :aaaaaaaaaaaaaaaa R9 :000000000000001f
R10:000000000000000f R11:00007f7ccadfb870
R12:00007f7cca41c430 R13:00007f7cca410940
R14:00007f7cca49a000 R15:000000000000001f
RIP:00000000004224d4 EFL:0000000000010206
CSGSFS:0000000000000033
19621:P 16 Aug 03:17:17.771 # (00007ffc5ececcff) -> 00007f7cca49a000
19621:P 16 Aug 03:17:17.771 # (00007ffc5ececcfe) -> 0000000000000006
19621:P 16 Aug 03:17:17.771 # (00007ffc5ececcfd) -> 00007f7cca41c3d0
19621:P 16 Aug 03:17:17.771 # (00007ffc5ececcfc) -> 00007f7cca41c430
19621:P 16 Aug 03:17:17.771 # (00007ffc5ececcfb) -> 00000000000026fc
19621:P 16 Aug 03:17:17.771 # (00007ffc5ececcfa) -> 00007f7cca41c430
19621:P 16 Aug 03:17:17.771 # (00007ffc5ececcf9) -> 00007f7cca49a000
19621:P 16 Aug 03:17:17.771 # (00007ffc5ececcf8) -> 0000000000000006
19621:P 16 Aug 03:17:17.771 # (00007ffc5ececcf7) -> ffffffffffffffe0
19621:P 16 Aug 03:17:17.771 # (00007ffc5ececcf6) -> 0000000000000030
19621:P 16 Aug 03:17:17.771 # (00007ffc5ececcf5) -> 0000000000412f98
19621:P 16 Aug 03:17:17.771 # (00007ffc5ececcf4) -> 0000000000000005
19621:P 16 Aug 03:17:17.771 # (00007ffc5ececcf3) -> 00007ffc5ececdb0
19621:P 16 Aug 03:17:17.771 # (00007ffc5ececcf2) -> 0000000000432090
19621:P 16 Aug 03:17:17.771 # (00007ffc5ececcf1) -> 00000000004320b0
19621:P 16 Aug 03:17:17.771 # (00007ffc5ececcf0) -> ffffffffffffffff
19621:P 16 Aug 03:17:17.771 # --- FAST MEMORY TEST
19621:P 16 Aug 03:17:17.771 # Bio thread for job type #0 terminated
19621:P 16 Aug 03:17:17.771 # Bio thread for job type #1 terminated
Testing 685000 86016
Testing 794000 135168
Testing 7f7cc93ff000 8388608
Testing 7f7cc9c00000 16777216
Testing 7f7ccb035000 20480
Testing 7f7ccb254000 16384
Testing 7f7ccb769000 16384
Testing 7f7ccb77c000 16384
Testing 7f7ccb782000 4096
19621:P 16 Aug 03:17:18.173 # Fast memory test PASSED, however your memory can still be broken. Please run a memory test for several hours if possible.
19621:P 16 Aug 03:17:18.173 # 
=== DISQUE BUG REPORT END. Make sure to include from START to END. ===

       Please report the crash by opening an issue on github:

           http://github.com/antirez/disque/issues

  Suspect RAM error? Use disque-server --test-memory to verify it.


[1]    19621 segmentation fault (core dumped)  ./src/disque-server

Pause a queue

When temporarily wanting to halt a queue from being processed it would be nice to be able to pause and resume (or open/close) a queue. This would be easier than shutting down all workers.

ADDJOB MAXLEN Off-by-One Error

If I try to enqueue a job with MAXLEN 1, queueing fails for the third instead of the second job.

This seems to be due to an off by one error in job.c where the length is compared with > instead of >=.

Example:

127.0.0.1:7711> ADDJOB foo bar 1000 MAXLEN 1
DI1dcc90feebc8085a2e65bc0d41c1d49b46cf357c05a0SQ
127.0.0.1:7711> ADDJOB foo bar 1000 MAXLEN 1
DI1dcc90fe8de6220e6339debc3b3a9ff630b4aa7805a0SQ
127.0.0.1:7711> ADDJOB foo bar 1000 MAXLEN 1
(error) MAXLEN Queue is already longer than the specified MAXLEN count
127.0.0.1:7711>

Expected the failure on the second ADDJOB invocation instead of on the third.

Crash

=== DISQUE BUG REPORT START: Cut & paste starting from here ===
1902:P 20 May 14:58:41.643 # Disque 0.0.1 crashed by signal: 11
1902:P 20 May 14:58:41.643 # Failed assertion: (:0)
1902:P 20 May 14:58:41.643 # --- STACK TRACE
/usr/local/bin/disque-server 0.0.0.0:7711(logStackTrace+0x3e)[0x4269be]
/usr/local/bin/disque-server 0.0.0.0:7711(dictAddRaw+0x14)[0x411f04]
/lib/x86_64-linux-gnu/libpthread.so.0(+0xfcb0)[0x7fdfcab00cb0]
/usr/local/bin/disque-server 0.0.0.0:7711(dictAddRaw+0x14)[0x411f04]
/usr/local/bin/disque-server 0.0.0.0:7711(dictAdd+0x1e)[0x41208e]
/usr/local/bin/disque-server 0.0.0.0:7711(gotAckReceived+0x8c)[0x432c2c]
/usr/local/bin/disque-server 0.0.0.0:7711(clusterProcessPacket+0x8ac)[0x429f3c]
/usr/local/bin/disque-server 0.0.0.0:7711(clusterReadHandler+0xb1)[0x42a2c1]
/usr/local/bin/disque-server 0.0.0.0:7711(aeProcessEvents+0x145)[0x410105]
/usr/local/bin/disque-server 0.0.0.0:7711(aeMain+0x2b)[0x4103db]
/usr/local/bin/disque-server 0.0.0.0:7711(main+0x302)[0x40f162]
/lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0xed)[0x7fdfca75476d]
/usr/local/bin/disque-server 0.0.0.0:7711[0x40f27d]
1902:P 20 May 14:58:41.643 # --- INFO OUTPUT
1902:P 20 May 14:58:41.643 # # Server
disque_version:0.0.1
disque_git_sha1:af4e6ce4
disque_git_dirty:0
disque_build_id:c74e56e1e5bd42ab
os:Linux 3.2.0-69-virtual x86_64
arch_bits:64
multiplexing_api:epoll
gcc_version:4.6.3
process_id:1902
run_id:e9664b526bab0b0996cd499b20d7c3a9bbec7032
tcp_port:7711
uptime_in_seconds:1507
uptime_in_days:0
hz:10
config_file:/etc/disque/7711/disque.conf

Clients

connected_clients:0
client_longest_output_list:0
client_biggest_input_buf:0
blocked_clients:0

Memory

used_memory:67452496
used_memory_human:64.33M
used_memory_rss:35233792
used_memory_peak:67452496
used_memory_peak_human:64.33M
mem_fragmentation_ratio:0.52
mem_allocator:jemalloc-3.6.0

Jobs

registered_jobs:72

Queues

registered_queues:71

Persistence

loading:0
aof_enabled:0
aof_rewrite_in_progress:0
aof_rewrite_scheduled:0
aof_last_rewrite_time_sec:-1
aof_current_rewrite_time_sec:-1
aof_last_bgrewrite_status:ok
aof_last_write_status:ok

Stats

total_connections_received:1
total_commands_processed:1
instantaneous_ops_per_sec:0
total_net_input_bytes:52
total_net_output_bytes:5
instantaneous_input_kbps:0.00
instantaneous_output_kbps:0.00
rejected_connections:0
latest_fork_usec:0

CPU

used_cpu_sys:2.62
used_cpu_user:1.06
used_cpu_sys_children:0.00
used_cpu_user_children:0.00

Commandstats

cmdstat_cluster:calls=1,usec=30,usec_per_call=30.00
hash_init_value: 1432290130

1902:P 20 May 14:58:41.643 # --- CLIENT LIST OUTPUT
1902:P 20 May 14:58:41.644 #
1902:P 20 May 14:58:41.644 # --- REGISTERS
1902:P 20 May 14:58:41.644 #
RAX:0000000000000001 RBX:0000000000000000
RCX:3936643434363836 RDX:00007fdfc9c5d0c0
RDI:0000000000000000 RSI:00007fdfc9c5d0c0
RBP:0000000000000000 RSP:00007fff84c43c80
R8 :00000000c616cd67 R9 :00007fdfc9c5d0e8
R10:00000000fffffff7 R11:00007fdfca8b52b0
R12:00007fdfc9c5d0c0 R13:00007fdfc9c1a500
R14:0000000000000000 R15:000000000000000a
RIP:0000000000411f04 EFL:0000000000010202
CSGSFS:000000000000e033
1902:P 20 May 14:58:41.644 # (00007fff84c43c8f) -> 0000000000000001
1902:P 20 May 14:58:41.644 # (00007fff84c43c8e) -> 00007fdfc9c5d0c0
1902:P 20 May 14:58:41.644 # (00007fff84c43c8d) -> 0000000000432c2c
1902:P 20 May 14:58:41.644 # (00007fff84c43c8c) -> 0000000000000001
1902:P 20 May 14:58:41.644 # (00007fff84c43c8b) -> 00007fdfc9c5d0c0
1902:P 20 May 14:58:41.644 # (00007fff84c43c8a) -> 00007fdfc9c7af80
1902:P 20 May 14:58:41.644 # (00007fff84c43c89) -> 000000000041208e
1902:P 20 May 14:58:41.644 # (00007fff84c43c88) -> 000000000000000a
1902:P 20 May 14:58:41.644 # (00007fff84c43c87) -> 0000000000000000
1902:P 20 May 14:58:41.644 # (00007fff84c43c86) -> 00007fdfc9c1a500
1902:P 20 May 14:58:41.644 # (00007fff84c43c85) -> 00007fdfc9c5d0c0
1902:P 20 May 14:58:41.644 # (00007fff84c43c84) -> 00007fdfc9c5d0c0
1902:P 20 May 14:58:41.644 # (00007fff84c43c83) -> 0000000000000000
1902:P 20 May 14:58:41.644 # (00007fff84c43c82) -> 000000000000000d
1902:P 20 May 14:58:41.644 # (00007fff84c43c81) -> 00007fdfc9c6db40
1902:P 20 May 14:58:41.644 # (00007fff84c43c80) -> 3565376434346466
1902:P 20 May 14:58:41.644 # --- FAST MEMORY TEST
1902:P 20 May 14:58:41.644 # Bio thread for job type #0 terminated
1902:P 20 May 14:58:41.644 # Bio thread for job type #1 terminated
1902:P 20 May 14:58:42.465 # Fast memory test PASSED, however your memory can still be broken. Please run a memory test for several hours if possible.
1902:P 20 May 14:58:42.465 #
=== DISQUE BUG REPORT END. Make sure to include from START to END. ===

If you need more info, please ask.

NACK and dead letters, recap and proposal

In other message systems, especially the ones used to process asynchronous tasks, people are used to two features: negative acknowledgements and dead letter queues.

The role of NACKs is make workers able to actively signal a job as impossible to process, with two main benefits:

  1. Make the job available for other workers to process it ASAP, without the need to way the retry time.
  2. Provide a semantical hint about the lack of ability to process the job, that is, information about why a job was not able to be processed, and ability to distinguish a message that gets lost from a message that is not processable because of its content.

Another feature closely related is dead letter queues: a place where jobs that are impossible to be processed go, for later inspection.

During the weekend I tried to understand if there is a way to avoid to have dead letter queues without, after all, making Disque less functional. There is something disturbingly bloatware in this feature in some way, yet it is very important for the application to be notified that there are jobs that can't be processed. However maybe it is possible to just provide minimal help to the application using Disque to handle failures?

So that was my design assignment for the weekend, and this is my proposal.

Disque will add a NACK command, very similar to ENQUEUE, that is, the command will try to put the message back into the queue ASAP for other workers to process. However there are two things that Disque will track.

  1. A best effort count of the number of times the message was delivered after the first time. That is, the amount of additional deliveries. This count is best effort and is not guaranteed to be consistent across nodes. However this should not be an issue in this context (read more).
  2. A best effort count of the number of times the message was NACK-ed.

When using GETJOB, the worker can pass an option to also retrieve these two counters. The application can decide to do anything with them. For example: if the number of NACKs is greater than 3 the worker may add the job in a dead letter queue (but note that this is not implemented by Disque, is just what the worker decided to do). The worker may actually try to process the job and provide additional information in the dead letter if it wishes, like stack traces or alike.

Workers implementing a dead letter may do interesting things like setting the replication factor to 1, specifying MAXLEN option and so forth, depending on the application needs, in order to avoid dead letters to turn into an auto inflicted DOS :-) Workers may even FASTACK the original message when doing so, in order to avoid other workers to waste time with the same job.

Otherwise the worker may just provide some warning in other form to the application, like to the monitoring system, when it sees too many re-deliveries of the same job or alike. The bottom line here is it is up to the application what to do.

The missing part about this is a dead letter for jobs reaching the TTL. I've a semantical issue with that. If a job is never processed within the TTL, and you want to log it somewhere inside the system, then you really want a different TTL: you are not ready to lose the job after the expiration time you proposed elapsed.

The only justification for the above feature is IMHO outside the message safety context, since the TTL should specify a time after which processing the job is no longer useful. So the left reason is actually debugging. However for debugging tasks, I've the feeling most users could be happy enough just with Disque logging the event in a log file where all the unprocessed jobs expired by TTL go.

Additionally Disque could provide an API to access the latest N items inside the file (in a way that does not allow the process to block, i.e. using a different thread that populates an in-memory version of the data from time to time, like: file size, last N items, and so forth).

I believe this gives us 99% of what dead letters provide, but without making the Disque design more complex... Feedbacks appreciated.

Implementation details

The counters would just be approximated, if there are partitions and we lose one increment, the next re-queue the counter will eventually reach the threshold for the application to be warned about the inability to process the job. So basically every node would have in their private job object the two counters, incremented in the following ways:

  1. When a QUEUED cluster bus message is received, the number of additional deliveries is incremented. The message is sent to all the nodes that have a copy according to the node putting the job back in queue again (but not the first time a job is queued).
  2. When a NACK is received, the QUEUED message broadcasted will be flagged in a way that forces the receivers to increment the nack counter instead. So it's the same system as in the previous item. We just take two counters to distinguish between messages lost for other causes, and messages actively NACKed.

When a job with retry set to 0 gets dequeued, it should be removed.

We are sure the job will never be requeued again, since it is against the retry parameter set to 0, meaning the user requested the at most once semantics. Waiting for an ACK for this job is probably silly, we should just remove the job as soon as we transmit it to the client.

Guarantee message availability in at-most-one mode ?

Hello
The readme says that no copy is done around nodes when we are in an at-most-one mode.

What happen if the node crash ?

"""
The at most once semantics is a trivial result of using a retry time set to 0 (which means, never re-queue the message again) and a replication factor of 1 for the message (not strictly needed, but it is useless to have multiple copies of a message around if it will be delivered at most one time).
"""

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.