Coder Social home page Coder Social logo

roshi's Introduction

roshi Build Status GoDoc

Roshi implements a time-series event storage via a LWW-element-set CRDT with limited inline garbage collection. Roshi is a stateless, distributed layer on top of Redis and is implemented in Go. It is partition tolerant, highly available and eventually consistent.

At a high level, Roshi maintains sets of values, with each set ordered according to (external) timestamp, newest-first. Roshi provides the following API:

  • Insert(key, timestamp, value)
  • Delete(key, timestamp, value)
  • Select(key, offset, limit) []TimestampValue

Roshi stores a sharded copy of your dataset in multiple independent Redis instances, called a cluster. Roshi provides fault tolerance by duplicating clusters; multiple identical clusters, normally at least 3, form a farm. Roshi leverages CRDT semantics to ensure consistency without explicit consensus.

Use cases

Roshi is basically a high-performance index for timestamped data. It's designed to sit in the critical (request) path of your application or service. The originating use case is the SoundCloud stream; see this blog post for details.

Theory and system properties

Roshi is a distributed system, for two reasons: it's made for datasets that don't fit on one machine, and it's made to be tolerant against node failure.

Next, we will explain the system design.

CRDT

CRDTs (conflict-free replicated data types) are data types on which the same set of operations yields the same outcome, regardless of order of execution and duplication of operations. This allows data convergence without the need for consensus between replicas. In turn, this allows for easier implementation (no consensus protocol implementation) as well as lower latency (no wait-time for consensus).

Operations on CRDTs need to adhere to the following rules:

  • Associativity (a+(b+c)=(a+b)+c), so that grouping doesn't matter.
  • Commutativity (a+b=b+a), so that order of application doesn't matter.
  • Idempotence (a+a=a), so that duplication doesn't matter.

Data types as well as operations have to be specifically crafted to meet these rules. CRDTs have known implementations for counters, registers, sets, graphs, and others. Roshi implements a set data type, specifically the Last Writer Wins element set (LWW-element-set).

This is an intuitive description of the LWW-element-set:

  • An element is in the set, if its most-recent operation was an add.
  • An element is not in the set, if its most-recent operation was a remove.

A more formal description of a LWW-element-set, as informed by Shapiro, is as follows: a set S is represented by two internal sets, the add set A and the remove set R. To add an element e to the set S, add a tuple t with the element and the current timestamp t=(e, now()) to A. To remove an element from the set S, add a tuple t with the element and the current timestamp t=(e, now()) to R. To check if an element e is in the set S, check if it is in the add set A and not in the remove set R with a higher timestamp.

Roshi implements the above definition, but extends it by applying a sort of instant garbage collection. When inserting an element E to the logical set S, check if E is already in the add set A or the remove set R. If so, check the existing timestamp. If the existing timestamp is lower than the incoming timestamp, the write succeeds: remove the existing (element, timestamp) tuple from whichever set it was found in, and add the incoming (element, timestamp) tuple to the add set A. If the existing timestamp is higher than the incoming timestamp, the write is a no-op.

Below are all possible combinations of add and remove operations. A(elements...) is the state of the add set. R(elements...) is the state of the remove set. An element is a tuple with (value, timestamp). add(element) and remove(element) are the operations.

Original state Operation Resulting state
A(a,1) R() add(a,0) A(a,1) R()
A(a,1) R() add(a,1) A(a,1) R()
A(a,1) R() add(a,2) A(a,2) R()
A(a,1) R() remove(a,0) A(a,1) R()
A(a,1) R() remove(a,1) A(a,1) R()
A(a,1) R() remove(a,2) A() R(a,2)
A() R(a,1) add(a,0) A() R(a,1)
A() R(a,1) add(a,1) A() R(a,1)
A() R(a,1) add(a,2) A(a,2) R()
A() R(a,1) remove(a,0) A() R(a,1)
A() R(a,1) remove(a,1) A() R(a,1)
A() R(a,1) remove(a,2) A() R(a,2)

For a Roshi LWW-element-set, an element will always be in either the add or the remove set exclusively, but never in both and never more than once. This means that the logical set S is the same as the add set A.

Every key in Roshi represents a set. Each set is its own LWW-element-set.

For more information on CRDTs, the following resources might be helpful:

Replication

Roshi replicates data over several non-communicating clusters. A typical replication factor is 3. Roshi has two methods of replicating data: during write, and during read-repair.

A write (Insert or Delete) is sent to all clusters. The overall operation returns success the moment a user-defined number of clusters return success. Unsuccessful clusters might either have been too slow (but still accepted the write) or failed (due to a network partition or an instance crash). In case of failure, read-repair might be triggered on a later read.

A read (Select) is dependent on the read strategy employed. If the strategy queries several clusters, it might be able to spot disagreement in the returned sets. If so, the unioned set is returned to the client, and in the background, a read-repair is triggered, which lazily converges the sets across all replicas.

Package farm explains replication, read strategies, and read-repair further.

Fault tolerance

Roshi runs as a homogenous distributed system. Each Roshi instance can serve all requests (Insert, Delete, Select) for a client, and communicates with all Redis instances.

A Roshi instance is effectively stateless, but holds transient state. If a Roshi instance crashes, two types of state are lost:

  1. Current client connections are lost. Clients can reconnect to another Roshi instance and re-execute their operation.
  2. Unresolved read-repairs are lost. The read-repair might be triggered again during another read.

Since all operations are idempotent, both failure modes do not impede on convergence of the data.

Persistence is delegated to Redis. Data on a crashed-but-recovered Redis instance might be lost between the time it commited to disk, and the time it accepts connections again. The lost data gap might be repaired via read-repair.

If a Redis instance is permanently lost and has to be replaced with a fresh instance, there are two options:

  1. Replace it with an empty instance. Keys will be replicated to it via read-repair. As more and more keys are replicated, the read-repair load will decrease and the instance will work normally. This process might result in data loss over the lifetime of a system: if the other replicas are also lost, non-replicated keys (keys that have not been requested and thus did not trigger a read-repair) are lost.
  2. Replace it with a cloned replica. There will be a gap between the time of the last write respected by the replica and the first write respected by the new instance. This gap might be fixed by subsequent read-repairs.

Both processes can be expedited via a keyspace walker process. Nevertheless, these properties and procedures warrant careful consideration.

Responses to write operations

Write operations (insert or delete) return boolean to indicate whether the operation was successfully applied to the data layer, respecting the configured write quorum. Clients should interpret a write response of false to mean they should re-submit their operation. A write response of true does not imply the operation mutated the state in a way that will be visible to readers, merely that it was accepted and processed according to CRDT semantics.

As an example, all of these write operations would return true.

Write operation Final state Operation description
Insert("foo", 3, "bar") foo+ bar/3
foo- —
Initial write
Insert("foo", 3, "bar") foo+ bar/3
foo- —
No-op: incoming score doesn't beat existing score
Delete("foo", 2, "bar") foo+ bar/3
foo- —
No-op: incoming score doesn't beat existing score
Delete("foo", 4, "bar") foo+ —
foo- bar/4
"bar" moves from add set to remove set
Delete("foo", 5, "bar") foo+ —
foo- bar/5
score of "bar" in remove set is incremented

Considerations

Elasticity

Roshi does not support elasticity. It is not possible to change the sharding configuration during operations. Roshi has static service discovery, configured during startup.

Data structure

Roshi works with LWW-element-sets only. Clients might choose to model other data types on top of the LWW-element-sets themselves.

Correct client timestamps

Client timestamps are assumed to correctly represent the physical order of events coming into the system. Incorrect client timestamps might lead to values of a client either never appearing or always overriding other values in a set.

Data loss

Assuming a replication factor of 3, and a write quorum of 2 nodes, Roshi makes the following guarantees in the presence of failures of Redis instances that represent the same data shard:

Failures Data loss? Reads Writes
0 No Succeed Succeed
1 No Success dependent on read strategy Succeed
2 No Success dependent on read strategy Fail
3 Yes Fail Fail

Package farm explains read strategies further.

Failures of Redis instances over independent data shards don't affect instantaneous data durability. However, over time, independent Redis instance failures can lead to data loss, especially on keys which are not regularly read-repaired. In practice, a number of strategies may be used to probabilistically mitigate this concern. For example, walking modified keys after known outages, or the whole keyspace at regular intervals, which will trigger read-repairs for inconsistent sets. However, Roshi fundamentally does not guarantee perfect data durability. Therefore, Roshi should not be used as a source of truth, but only as an intermediate store for performance critical data.

Authentication, authorization, validation

In case it's not obvious, Roshi performs no authentication, authorization, or any validation of input data. Clients must implement those things themselves.

Architecture

Roshi has a layered architecture, with each layer performing a specific job with a relatively small surface area. From the bottom up...

  • Redis: Roshi is ultimately implemented on top of Redis instance(s), utilizing the sorted set data type. For more details on how the sorted sets are used, see package cluster, below.

  • Package pool performs key-based sharding over one or more Redis instances. It exposes basically a single method, taking a key and yielding a connection to the Redis instance that should hold that key. All Redis interactions go through package pool.

  • Package cluster implements an Insert/Select/Delete API on top of package pool. To ensure idempotency and commutativity, package cluster expects timestamps to arrive as float64s, and refuses writes with smaller timestamps than what's already been persisted. To ensure information isn't lost via deletes, package cluster maintains two physical Redis sorted sets for every logical (user) key, and manages the transition of key-timestamp-value tuples between those sets.

  • Package farm implements a single Insert/Select/Delete API over multiple underlying clusters. Writes (Inserts and Deletes) are sent to all clusters, and a quorum is required for success. Reads (Selects) abide one of several read strategies. Some read strategies allow for the possibility of read-repair.

  • roshi-server makes a Roshi farm accessible through a REST-ish HTTP interface. It's effectively stateless, and 12-factor compliant.

  • roshi-walker walks the keyspace in semirandom order at a defined rate, making Select requests for each key in order to trigger read repairs.

The big picture

Overview

(Clusters need not have the same number of Redis instances.)

Development

Roshi is written in Go. You'll need a recent version of Go installed on your computer to build Roshi. If you're on a Mac and use homebrew, brew install go should work fine.

Build

go build ./...

Test

go test ./...

Running

See roshi-server and roshi-walker for information about owning and operating your own Roshi.

roshi's People

Contributors

0xflotus avatar beorn7 avatar freenerd avatar mhmoudgmal avatar naufraghi avatar nickstenning avatar peterbourgon avatar remover avatar yujunz avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

roshi's Issues

README.mk - LWW-element-set formalism

Just a small thing on the doc. Shapiro et al's LWW-element-sets also allow for the same element to be added again (with a higher timestamp) after it's been removed. Multiple tuples having the same element (but different timestamps) can be inserted in both the A and R sets.

"A more formal description of a LWW-element-set, as informed by Shapiro, is as follows: A set S is represented by two internal sets, the add set A and the remove set R. To add an element e to the set S, add a tuple t with the element and the current timestamp t=(e, now()) to A. To remove an element from the set S, add a tuple t with the element and the current timestamp t=(e, now()) to R. To check if an element e is in the set S, check if it is in the add set A and not in the remove set R with a higher timestamp."

"In contrast to the formal description above, the Roshi approach allows for the same element to be added again (with a higher timestamp) after it's been removed."

Cursor instead of pagination

I noticed Roshi uses the page/offset to get the next set of items. This doesn't work that well for real-time timelines, any thoughts to use a cursor instead?

One approach would be to take a next_id or prev_id cursor and a count. From this, you can get the index of the cursor id in an ordered redis set, and use that.

CRDT semantics violated if same timestamp?

Given that we have two operations:

  Insert(foo, 123, bar)
  Delete(foo, 123, bar)

Given the implementation, the order of how these operations are executed on each redis node is significant, with the new operation preferred. This violates CRDT semantics. Example:

Node1: A() R() + insert(foo,123) = A(foo,123) R() + delete(foo,123) = A() R(foo, 123)
Node2: A() R() + delete(foo,123) = A() R(foo,123) + insert(foo,123) = A(foo, 123) R()

I'm not sure how this would be treated by read-repair. Sadly I wasn't able to write a test case quickly.

As a sidenote: The documentation is inconsistent with the implementation. The implementation favors the new key, whereas the documentation favors the old key.

https://github.com/soundcloud/roshi/blob/master/cluster/cluster.go#L94-L98

        if insertTs and tonumber(ARGV[1]) < tonumber(insertTs) then
            return -1
        elseif deleteTs and tonumber(ARGV[1]) < tonumber(deleteTs) then
            return -1
        end

https://github.com/soundcloud/roshi/blob/master/cluster/README.md

bool valid(key, score, member):
    if contains(key+, member) and score_of(key+, member) >= score:
        return false
    if contains(key-, member) and score_of(key-, member) >= score:
        return false
    return true

https://github.com/soundcloud/roshi/blob/master/README.md

A(a,1) R() + remove(a,1) = A(a,1) R()
...
A() R(a,1) + remove(a,1) = A() R(a,1)

Repair strategies should bias toward Adds

In your table you show that add(a, 0) -> A([{a. 0}]), R() etc. In each example for a remove and element was already present. How should the set behave when an element (a, 99) is removed, but no (a, n) exists in the add set?

Project status

Hi,
I would like to ask if this project is still active and used. Thank you

Storage engine notes / options

Hello,

Just wanted to mention there's a nice Redis-api-compatible alternative for large and persisted data sets. The set does not have to fit in memory and it uses a variety of pluggable embedded database engines like Leveldb, Rocksdb, Boltdb, etc. It's written in Go and we've been using it in production with lots of success. Once I start peeling away at roshi, I'm planning to try it here as well.

You can find it at:
https://github.com/siddontang/ledisdb

and a docker image: https://github.com/pressly/docker-ledisdb

docstore support

We could not find a sample example on how to use this package.

We plan to persist this data structure through CRDT on the Redis(and then potentially migrate from Redis to IPFS) :

{
    "key": "value",
    "some_metadata": [
        {
            "sub_key": "sub_value",
            "sub_key": "sub_value"
        },
        {
            "sub_key": "sub_value",
            "sub_key": "sub_value"
        }
    ]
}

Can soundcloud/roshi help us achieve this?

document use cases

Although I understand crdt and most of the semantics, it would be nice have a high level overview of what things roshi can be used for, and how. Reading this readme is very informative for when you know you want to use roshi I guess, but not so much when you think "oh, what's this shiny new project :)".

Cursor-based pagination to fetch the "previous" page

Is it possible to use the cursor-based pagination to fetch the previous page (ie. to get all the new items with higher score than items coming from the first request)?

I guess to "go back", we'd have to use reversed order of the Redis data (ZRANGEBYSCORE, max=+inf, min=StartScoreStr) at

"ZREVRANGEBYSCORE",

Would it make sense? Does Roshi already support this feature (and I'm just blind)?

Any feedback appreciated.

Related to #36, CC: @peterbourgon @pkieltyka

Support service discovery using Etcd or Consul

Both systems are mature service discovery solutions and can allow elasticity or extra replication at runtime.

I understand @soundcloud might not need it but let's keep this open if someone (or me, eventually) will want to take a stab at it.

roshi-server golang app select and map order ?

Hi!

Thanks for roshi, i have been looking like crazy for a such a easy way of doing fan out on read. Being used in production at soundcloud makes this very cool.

I have two questions (which is probably due to my lack of go knowledge)

  1. At soundcloud, are you then using another db or redis for multiget the key responses you get back from roshi so you can create the entire feed server-side, i.e from who/tracks/metadata/likes etc or do you just supply the member server side and let client side fetch all the details on this member and the build the feed client side?

  2. But why isn't the response sorted according to the timestamp/score ? I know maps are not sorted, but do i need to manually re-sort them again isnt that loosing much of the effiency from the redis set then.

 var items []common.KeyScoreMember
    for i := 0; i < 50000; i++ {
        timestampFloat = float64(time.Now().UnixNano())
        mycont = append(items, common.KeyScoreMember{Key: "item" + strconv.Itoa(i), Score: timestampFloat, Member: "abc" + strconv.Itoa(i*4)})
    }
    requestBody, _ := json.Marshal(mycont)

    resp, err := http.Post("http://localhost:6302/", "text/plain", bytes.NewReader(requestBody))
    if err != nil {
        log.Fatal(err)
    }
    defer resp.Body.Close()

And then select, select.go

var items [][]byte
    for i := 0; i < 20; i++ {
        items = append(items, []byte("item"+strconv.Itoa(i)))
    }

    body, _ := json.Marshal(items)
    req, _ := http.NewRequest("GET", "http://localhost:6302/", bytes.NewReader(body))
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        log.Fatal(err)
    }
    defer resp.Body.Close()
    if resp.StatusCode != 200 {
        log.Fatalf("HTTP %d", resp.StatusCode)
    }

    var normalResponse struct {
        Records map[string][]common.KeyScoreMember `json:"records"`
    }
    if err := json.NewDecoder(resp.Body).Decode(&normalResponse); err != nil {
        log.Fatal(err)
    }

    for key := range normalResponse.Records {
        fmt.Print(key)
    }

Shouldnt running select.go yield the same order every time ? What am i missing ?

Many thanks!
Pontus

Failed in testing repairs

I tried to port roshi to go 1.14 but failed in some tests related to repair

https://travis-ci.com/github/yujunz/roshi/builds/176022842#L377

=== RUN   TestAllRepairs
    repair_strategies_test.go:34: pre-repair: cluster 2: only got 0 responses
    repair_strategies_test.go:34: pre-repair: cluster 3: only got 0 responses
--- FAIL: TestAllRepairs (0.00s)
=== RUN   TestRateLimitedRepairs
    repair_strategies_test.go:87: post-repair: cluster 0: has [{Key:foo Score:2.3 Member:delta} {Key:foo Score:2.2 Member:beta} {Key:foo Score:2.1 Member:alpha}]
    repair_strategies_test.go:87: post-repair: cluster 1: has [{Key:foo Score:2.2 Member:beta} {Key:foo Score:2.1 Member:alpha} {Key:foo Score:1.3 Member:delta}]
    repair_strategies_test.go:87: post-repair: cluster 2: has [{Key:foo Score:2.2 Member:beta} {Key:foo Score:2.1 Member:alpha}]
    repair_strategies_test.go:92: post-repair: cluster 2: expected [{Key:foo Score:2.2 Member:beta} {Key:foo Score:2.1 Member:alpha} {Key:foo Score:1.3 Member:delta}], got [{Key:foo Score:2.2 Member:beta} {Key:foo Score:2.1 Member:alpha}]
    repair_strategies_test.go:87: post-repair: cluster 3: has [{Key:foo Score:2.2 Member:beta} {Key:foo Score:2.1 Member:alpha}]
    repair_strategies_test.go:92: post-repair: cluster 3: expected [{Key:foo Score:2.2 Member:beta} {Key:foo Score:2.1 Member:alpha} {Key:foo Score:1.3 Member:delta}], got [{Key:foo Score:2.2 Member:beta} {Key:foo Score:2.1 Member:alpha}]
    repair_strategies_test.go:87: post-repair: cluster 4: has [{Key:foo Score:2.2 Member:beta} {Key:foo Score:2.1 Member:alpha} {Key:foo Score:1.3 Member:delta}]
--- FAIL: TestRateLimitedRepairs (0.00s)
=== RUN   TestExplodingGoroutines
--- PASS: TestExplodingGoroutines (0.03s)
=== RUN   TestMakeSet
--- PASS: TestMakeSet (0.00s)
=== RUN   TestAddHas
--- PASS: TestAddHas (0.00s)
=== RUN   TestAddMany
--- PASS: TestAddMany (0.00s)
=== RUN   TestOrderedLimitedSlice
--- PASS: TestOrderedLimitedSlice (0.00s)
FAIL
FAIL	github.com/yujunz/roshi/farm	0.052s

However, running test locally did PASS. What could be the problem?

cursor.Parse() should treat EOF as empty member

when parsing a cursor string with empty member, ie:
4743834931740803072A
should be parse as a Cursor with empty Member field, currently Parse throws an io.EOF error.

I see there's a test case with Member: " " but that seems counter-intuitive

Incorrect deleted field in response

.insert("foo",1,"bar")
Will give {inserted:1}, as expected;

.insert("foo",1,"bar")
Will give {inserted:1}, as a surprise, because no item was actually inserted.

.delete("foo",1,"bar")
Will give {deleted:1}, as expected, but

.select("foo")
Will give ["bar"], as surprise, because, on a previous step response was deleted:1.

.delete("foo",2,"bar")
Will give {deleted:1}, as expected, and

.select("foo")
Will give [], as expected too. But,

.delete("foo",2,"bar")
.delete("foo",3,"bar")
both will give {deleted:1}, as a surprise, because no item was actually removed.

I expected that delete operations on elements that not actually removed will result {deleted: 0} in response.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.