Coder Social home page Coder Social logo

raft's Introduction

Raft

Raft provides users with an api for building consistent (as defined by CAP), distributed state machines. It does this using the raft leader election and consensus protocol as described in the original paper. Logs are persisted using rocksdb but Raft provides a pluggable storage adapter for utilizing other storage engines.

Installation

def deps do
  [
    {:raft, "~> 0.2.1"},
  ]
end

Example

Lets build a distributed key value store. The first thing that we'll need is a state machine:

defmodule KVStore do
  use Raft.StateMachine

  @initial_state %{}

  def write(name, key, value) do
    Raft.write(name, {:set, key, value})
  end

  def read(name, key) do
    Raft.read(name, {:get, key})
  end

  def init(_name) do
    @initial_state
  end

  def handle_write({:set, key, value}, state) do
    {{:ok, key, value}, put_in(state, [key], value)}
  end

  def handle_read({:get, key}, state) do
    case get_in(state, [key]) do
      nil ->
        {{:error, :key_not_found}, state}

      value ->
        {{:ok, value}, state}
    end
  end
end

You can automatically start a peer as part of your supervision tree as shown below. As shown here, when the supervisor starts, a new peer will be started with the given name. You can provide additional options and they will be used to customize the default config for the peer, for example, you can change the data directory with the :data_dir option.

defmodule KVStoreSupervisor do
  use Supervisor
  
  def start_link(args), do: Supervisor.start_link(__MODULE__, name: __MODULE__)
  
  def init(_args) do
    children = [
      {KVStore, [name: :"kvstore_#{Node.self}"]}
    ]
    Supervisor.init(children, strategy: :one_for_one)
  end
end

For the rest of this example however, we will assume you are manually starting/configuring peers with Raft.start_peer/2, rather than starting them as part of your supervision tree.

Now we can start our peers. Its important to note that each peer must be given a unique name within the cluster. In this example we'll create three codes with shortnames a, b, and c. The Raft peers on these nodes are called peer1, peer2, and peer3.,

$ iex --sname a -S mix
iex(a@mymachine)> {:ok, _pid} = Raft.start_peer(KVStore, name: :peer1)

$ iex --sname b -S mix
iex(b@mymachine)> {:ok, _pid} = Raft.start_peer(KVStore, name: :peer2)

$ iex --sname c -S mix
iex(c@mymachine)> {:ok, _pid} = Raft.start_peer(KVStore, name: :peer3)

At this point our peers are started but currently they're all in the "follower" state. In order to get them to communicate we need to define a cluster configuration for them like so. This needs to be done on only one of the nodes. In our case, we'll run it on node a.

iex(a@mymachine)> Raft.set_configuration(:peer1,
            ...> [{ :peer1, :a@mymachine },
            ...>  { :peer2, :b@mymachine },
            ...>  { :peer3, :c@mymachine }]

Notice that we have to give both the peer name and the node name, even for the local peer. That's because we store this configuration in the replicated logs, and so they must make sense from all our nodes.

Once this command runs the peers will start an election and elect a leader. You can see who the current leader is by running:

leader = Raft.leader(:peer1)

Once we have the leader we can read and write to our state machine:

{:ok, :foo, :bar} = KVStore.write(leader, :foo, :bar)
{:ok, :bar}       = KVStore.read(leader, :foo)
{:error, :key_not_found} = KVStore.read(leader, :baz)

We can now shutdown our leader and ensure that a new leader has been elected and our state is replicated across all of our peers:

iex(a@mymachine)> Raft.stop(leader)

Try to use the old leader:

iex(a@mymachine)> KVStore.read(leader, :foo)
{ :error, { :redirect, { :peer3, :c@mymachine }}}

We're told that the leader has changed.

iex(b@mymachine)> new_leader = Raft.leader(:peer2)
# or
new_leader = { :peer3, :c@mymachine }

And use it:

{:ok, :bar} = KVStore.read(new_leader, :foo)

We now have a consistent, replicated key-value store. If you want to read more about the internals of the project or read up on the raft protocol please check out the hex docs.

Caution

This project is not quite ready for production use. If you would like to help test out the implementation that would be greatly appreciated.

Contributing

The goal of this project is to provide the elixir community with a standard way of building consistent systems. Pull requests and issues are very welcome. If you would like to get involved here's some of the immediate needs.

  • - Configuration changes
  • - Automatic cluster joining
  • - Snapshotting
  • - Alternative storage engine using lmdb
  • - Jepsen testing

raft's People

Contributors

bitwalker avatar jwworth avatar keathley avatar pragdave avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

raft's Issues

Random test failure: "Somehow we have more logs then commands"

What are we doing to trigger the bug:

while (mix test test/fuzzy/partitions_test.exs:36); do sleep 1; done

What we expect:

When running the fuzzy/partitions_test repeatedly, we expect it to pass every time.

What happens:

Some runs trigger a test failure resembling the following (sometimes there is one Entry, sometimes two):

Logs we don't understand: [
  %Raft.Log.Entry{data: {:put, -34}, index: 471, term: 3, type: :command},
  %Raft.Log.Entry{data: {:put, -34}, index: 472, term: 3, type: :command}
]


  1) test node shutdowns (Raft.Fuzzy.PartitionsTest)
     test/fuzzy/partitions_test.exs:36
     ** (RuntimeError) Somehow we have more logs then commands wtf.
     code: assert Cluster.all_logs_match(cluster, commands)
     stacktrace:
       (raft) test/support/cluster.ex:132: Raft.Support.Cluster.compare_logs/3
       (raft) test/support/cluster.ex:123: Raft.Support.Cluster.missing_writes_on_server/2
       (elixir) lib/enum.ex:1336: Enum."-map/2-lists^map/1-0-"/2
       (elixir) lib/enum.ex:1336: Enum."-map/2-lists^map/1-0-"/2
       (raft) test/support/cluster.ex:117: Raft.Support.Cluster.missing_writes/2
       (raft) test/support/cluster.ex:107: Raft.Support.Cluster.verify_logs/2
       test/fuzzy/partitions_test.exs:61: (test)

     The following output was logged:

     16:30:46.462 [error] Task #PID<0.232.0> started from :s0 terminating
     ** (stop) exited in: :gen_statem.call({:s3, :nonode@nohost}, %Raft.RPC.AppendEntriesReq{entries: [], from: {:s0, :nonode@nohost}, leader_commit: 1, leader_id: {:s0, :nonode@nohost}, prev_log_index: 1, prev_log_term: 1, term: 1, to: {:s3, :nonode@nohost}}, :infinity)
         ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
         (stdlib) gen.erl:228: :gen.do_for_proc/2
         (stdlib) gen_statem.erl:619: :gen_statem.call_dirty/4
         (raft) lib/raft/rpc.ex:80: Raft.RPC.do_send/1
         (elixir) lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2
         (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
     Function: #Function<2.92344542/0 in Raft.RPC.send_msg/1>
         Args: []

Additional info:

We have added the following debug output to try to understand what precisely goes wrong during the failed test runs.

diff --git a/test/support/cluster.ex b/test/support/cluster.ex
index 25968bd..de65f99 100644
--- a/test/support/cluster.ex
+++ b/test/support/cluster.ex
@@ -87,6 +87,8 @@ defmodule Raft.Support.Cluster do
       {s, data}
     end

+    IO.inspect data, label: :data, limit: :infinity
+    IO.inspect commands, label: :commands, limit: :infinity
     verify_terms(data) && verify_logs(data, commands)
   end

We have also simplified the StreamData generator to make the command logs easier to read:

diff --git a/test/fuzzy/partitions_test.exs b/test/fuzzy/partitions_test.exs
index 6bee922..11e7bd4 100644
--- a/test/fuzzy/partitions_test.exs
+++ b/test/fuzzy/partitions_test.exs
@@ -10,7 +10,7 @@ defmodule Raft.Fuzzy.PartitionsTest do
   }

   def commands, do: one_of([
-    tuple({constant(:put), term()}),
+    tuple({constant(:put), integer()}),
     constant(:pop),
   ])

@@ -53,7 +53,8 @@ defmodule Raft.Fuzzy.PartitionsTest do
       Cluster.restart(cluster, shutdown)
     end

-    {commands, _errors} = Applier.stop_applying(applier)
+    {commands, errors} = Applier.stop_applying(applier)
+    IO.inspect(errors, label: :errors)
     IO.inspect(commands, label: "Commands after applying")
     Cluster.stop(cluster)

Attached is the full output of a failed test, run with the above changes in place.

more-logs-than-commands-no-errors.log

Linking @studzien -- we worked on this together.

What to do with invalid messages?

Lets say we have a state machine like so:

defmodule Stack do
  def handle_write({:put, str}, stack) when is_binary(str) do
    new_stack = [str | stack]
    {Enum.count(new_stack), new_stack}
  end
end

We have a stack that expects string arguments (I have no idea why anyone would do this in real life but it should illustrate the point).

The problem is that we can send any message to the raft process like so: Raft.write(leader, {:put, 1}). That message will effectively kill the raft process. This is because the message is only applied to the users state machine after its been persisted to disk. The raft process will attempt to apply this message, crash (which will cause the log to crash), the server will be restarted, attempt to apply the log message again, crash, etc. Generally speaking, any incorrect message has the potential to corrupt the log.

I think we should give users an "error handling" option. In some cases they may want to allow the exception to crash the raft process. In other cases they may want to simply log the error and simply "ignore" that message. My initial thought would be to provide something like this:

defmodule Stack do
  use Raft.StateMachine, on_error: :nothing # Logs the error but maintains the user state machines current state and moves on.

  use Raft.StateMachine, on_error: :raise # Raises and crashes the raft process.
end

What do y'all think?

Cluster membership changes

We need to support adding and removing peers to the raft cluster. While this is described briefly in section 6 of the raft paper I feel like the explanation is underspecified specifically with regards to the rejection of request vote rpcs and the explanation of "catching up" new peers before initiating the joint consensus.

I'm reading through the raft mailing list, and other resources that I'll link here in order to get a better feel for potential improvements to the solution. Right now my intuition is that we should look at using the AddServer and RemoveServer RPCs from the "ongaro thesis" which I've linked below.

Research / Links

Followers aren't using higher terms from request vote rpcs

I saw these test failures on CI:

01:13:22.264 [error] Process :s5 (#PID<0.559.0>) terminating
** (FunctionClauseError) no function clause matching in Raft.Server.voted_for_someone_else?/2
    (raft) lib/raft/server.ex:491: Raft.Server.voted_for_someone_else?(%Raft.RPC.RequestVoteReq{candidate_id: {:s4, :nonode@nohost}, from: {:s4, :nonode@nohost}, last_log_index: 166, last_log_term: 3, term: 4, to: {:s5, :nonode@nohost}}, %Raft.Log.Metadata{term: 3, voted_for: {:s5, :nonode@nohost}})
    (raft) lib/raft/server.ex:476: Raft.Server.vote_granted?/3
    (raft) lib/raft/server.ex:458: Raft.Server.handle_vote/3
    (stdlib) gen_statem.erl:1240: :gen_statem.call_state_function/5
    (stdlib) gen_statem.erl:1012: :gen_statem.loop_event/6
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Initial Call: Raft.Server.init/1
Ancestors: [:s5_sup, Raft.Server.Supervisor, Raft.Supervisor, #PID<0.382.0>]

I haven't been able to re-create them locally yet but it looks like whats happening is that we're getting a request vote rpc call and we're not adopting the higher term. This causes the voted_for_someone_else? call to fail because it matches against the term to enforce that we're always comparing terms between our internal state and the request. I'm not sure if we're potentially racing when talking to the log process (which would result in us returning the wrong term) or if something else is going on.

Supervise RPC calls

Right now we do RPC calls by just spawn-ing a process and handling all calls and casts inside of that process. It would probably be better to start these rpc processes under a dynamic supervisor as temporary workers. That way they can be managed and shutdown correctly.

Default names for peers

Right now we need to specify a unique name for each peer in the cluster. We also need to include these names when we bootstrap a cluster by running set_configuration. I'm wondering if we could continue to allow users to specify names but provide a default name like {__MODULE__, node()}. The __MODULE__ in this case would be the user defined state machine module. To make this more concrete lets say a use has a module like:

defmodule KVStore do
  use Raft.StateMachine
end

We could provide a start_peer function in so that a user could simply call KVStore.start_peer() and the implementation would look something like:

quote do
  def start_peer(opts \\ []) do
    opts = Keyword.put_new(opts, :name, {unquote(__MODULE__), unquote(node())})
    Raft.start_peer(unquote(__MODULE__), opts)
  end
end

This would also allow us to change the api for adding and removing nodes. If we default to the module name then the only configuration we need to require is something like Raft.set_configuration(leader, [:a@mymachine, :b@mymachine, :c@mymachine]).

I need to think through all of the implications of this but interested to see what others think.

vs. rabbitmq/ra

Hello, I'd like to know what are the differences and benefits/disadvantages between this project and rabbitmq/ra?

Thanks!

In memory backend.

Not even sure if this question makes sense. However.

Is it possible to have a raft cluster where messages are committed to a log that exists in memory only?
Assuming enough of the cluster stays alive to accept new writes then enough of the custer should be alive to guarantee that the log is not lost?

Is that true? if so such a backend would be useful

Terms don't match in node shutdown test

What are we doing to trigger the bug:

while (mix test test/fuzzy/partitions_test.exs:36); do sleep 1; done

What we expect:

  • When running the fuzzy/partitions_test repeatedly, we expect it to pass every time.

What happens:

Some runs trigger the following test failure:

   1) test node shutdowns (Raft.Fuzzy.PartitionsTest)
     test/fuzzy/partitions_test.exs:36
     ** (RuntimeError) Terms don't match for  [s0: %{logs: [%Raft.Log.Entry{data: %Raft.Configuration{index: 0, new_servers: [], old_servers: [], state: :none}, index: 0, term: 0, type: :config}, %Raft.Log.Entry (...)

Additional info:

We have added the following debug output to try to understand what precisely
goes wrong during the failed test runs.

diff --git a/test/support/cluster.ex b/test/support/cluster.ex
index 25968bd..de65f99 100644
--- a/test/support/cluster.ex
+++ b/test/support/cluster.ex
@@ -87,6 +87,8 @@ defmodule Raft.Support.Cluster do
       {s, data}
     end

+    IO.inspect data, label: :data, limit: :infinity
+    IO.inspect commands, label: :commands, limit: :infinity
     verify_terms(data) && verify_logs(data, commands)
   end

We have also simplified the StreamData generator to make the command logs easier to read:

diff --git a/test/fuzzy/partitions_test.exs b/test/fuzzy/partitions_test.exs
index 6bee922..11e7bd4 100644
--- a/test/fuzzy/partitions_test.exs
+++ b/test/fuzzy/partitions_test.exs
@@ -10,7 +10,7 @@ defmodule Raft.Fuzzy.PartitionsTest do
   }

   def commands, do: one_of([
-    tuple({constant(:put), term()}),
+    tuple({constant(:put), integer()}),
     constant(:pop),
   ])

Attached is the full output of a failed test, run with the above changes in place.
terms-dont-match.log

/cc @pzel

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.