Coder Social home page Coder Social logo

raft's Introduction

go-raft Build Status Coverage Status

Overview

unmaintained

NOTE: This project is unmaintained. If you are using goraft in a project and want to carry the project forward please file an issue with your ideas and intentions. The original project authors have created new raft implementations now used in etcd and InfluxDB.

This is a Go implementation of the Raft distributed consensus protocol. Raft is a protocol by which a cluster of nodes can maintain a replicated state machine. The state machine is kept in sync through the use of a replicated log.

For more details on Raft, you can read In Search of an Understandable Consensus Algorithm by Diego Ongaro and John Ousterhout.

Project Status

This library is feature complete but should be considered experimental until it has seen more usage. If you have any questions on implementing go-raft in your project please file an issue. There is an active community of developers who can help. go-raft is under the MIT license.

Features

  • Leader election
  • Log replication
  • Configuration changes
  • Log compaction
  • Unit tests
  • Fast Protobuf Log Encoding
  • HTTP transport

Projects

These projects are built on go-raft:

  • goraft/raftd - A reference implementation for using the go-raft library for distributed consensus.
  • Weed File System - A scalable distributed key-to-file system with O(1) disk access for each read.
  • rqlite - A replicated SQLite database, distributing the database replicas across multiple nodes.

If you have a project that you're using go-raft in, please add it to this README so others can see implementation examples.

Contact and Resources

  • raft-dev is a mailing list for discussion about best practices and implementation of Raft. Not goraft specific but helpful if you have questions.
  • Slides from Ben's talk which includes easy to understand diagrams of leader election and replication
  • The Raft Consensus homepage has links to additional raft implementations, slides to talks on Raft and general information

The Raft Protocol

This section provides a summary of the Raft protocol from a high level. For a more detailed explanation on the failover process and election terms please see the full paper describing the protocol: In Search of an Understandable Consensus Algorithm.

Overview

Maintaining state in a single process on a single server is easy. Your process is a single point of authority so there are no conflicts when reading and writing state. Even multi-threaded processes can rely on locks or coroutines to serialize access to the data.

However, in a distributed system there is no single point of authority. Servers can crash or the network between two machines can become unavailable or any number of other problems can occur.

A distributed consensus protocol is used for maintaining a consistent state across multiple servers in a cluster. Many distributed systems are built upon the Paxos protocol but Paxos can be difficult to understand and there are many gaps between Paxos and real world implementation.

An alternative is the Raft distributed consensus protocol by Diego Ongaro and John Ousterhout. Raft is a protocol built with understandability as a primary tenet and it centers around two things:

  1. Leader Election
  2. Replicated Log

With these two constructs, you can build a system that can maintain state across multiple servers -- even in the event of multiple failures.

Leader Election

The Raft protocol effectively works as a master-slave system whereby state changes are written to a single server in the cluster and are distributed out to the rest of the servers in the cluster. This simplifies the protocol since there is only one data authority and conflicts will not have to be resolved.

Raft ensures that there is only one leader at a time. It does this by performing elections among the nodes in the cluster and requiring that a node must receive a majority of the votes in order to become leader. For example, if you have 3 nodes in your cluster then a single node would need 2 votes in order to become the leader. For a 5 node cluster, a server would need 3 votes to become leader.

Replicated Log

To maintain state, a log of commands is maintained. Each command makes a change to the state of the server and the command is deterministic. By ensuring that this log is replicated identically between all the nodes in the cluster we can replicate the state at any point in time in the log by running each command sequentially.

Replicating the log under normal conditions is done by sending an AppendEntries RPC from the leader to each of the other servers in the cluster (called Peers). Each peer will append the entries from the leader through a 2-phase commit process which ensure that a majority of servers in the cluster have entries written to log.

Raft in Practice

Optimal Cluster Size

The primary consideration when choosing the node count in your Raft cluster is the number of nodes that can simultaneously fail. Because Raft requires a majority of nodes to be available to make progress, the number of node failures the cluster can tolerate is (n / 2) - 1.

This means that a 3-node cluster can tolerate 1 node failure. If 2 nodes fail then the cluster cannot commit entries or elect a new leader so progress stops. A 5-node cluster can tolerate 2 node failures. A 9-node cluster can tolerate 4 node failures. It is unlikely that 4 nodes will simultaneously fail so clusters larger than 9 nodes are not common.

Another consideration is performance. The leader must replicate log entries for each follower node so CPU and networking resources can quickly be bottlenecked under stress in a large cluster.

Scaling Raft

Once you grow beyond the maximum size of your cluster there are a few options for scaling Raft:

  1. Core nodes with dumb replication. This option requires you to maintain a small cluster (e.g. 5 nodes) that is involved in the Raft process and then replicate only committed log entries to the remaining nodes in the cluster. This works well if you have reads in your system that can be stale.

  2. Sharding. This option requires that you segment your data into different clusters. This option works well if you need very strong consistency and therefore need to read and write heavily from the leader.

If you have a very large cluster that you need to replicate to using Option 1 then you may want to look at performing hierarchical replication so that nodes can better share the load.

History

Ben Johnson started this library for use in his behavioral analytics database called Sky. He put it under the MIT license in the hopes that it would be useful for other projects too.

raft's People

Contributors

baruch avatar bcwaldon avatar benbjohnson avatar chrislusf avatar dgryski avatar gdb avatar grncdr avatar hanlz avatar jbcrail avatar jensrantil avatar jessta avatar jvshahid avatar klobucar avatar mattn avatar matttproud avatar mistobaan avatar otoolep avatar philips avatar scooletz avatar temoto avatar tsenart avatar wariosg avatar xiang90 avatar yichengq avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

raft's Issues

DeadLock with LeaderChange/StateChange Events

The setState function locks the state mutex that isn't unlocked when the event handlers are called. As a result if an event handler calls server.State() in response to a StateChange or LeaderChange event, the application can deadlock.

Example gist: https://gist.github.com/nemothekid/8576383

I think this can be fixed by either unlocked the mutex before the events are called, or moving the event callbacks inside a defer

commit log and heartbeat issue

When doing committing, the log is locked. This is a disk operation, and sometime it may take a longer time.
During that operation, our current implementation cannot do heartbeat. This will delay heartbeat and leads to election timeout.

Fix raft.Timer Channel

The internalTimer in raft.Timer is not closing when internalTimer.Stop() is called. This needs to be done explicitly.

Add back removed node

Now when we remove a node, we will write an log entry.
So if a node is removed, and then came back with the same name.

It will also be removed at the previous log entry.
Or it will receive a snapshot, which does not contain itself in the conf.

To solve this, we probably need to let the removed node to send a join command and bypass the previous remove command.

The use case is that the application may want to remove a node if it does not reply to AE for minutes. But if the node recover later, we may still want it to join in.

@benbjohnson any ideas?

Write Uncommitted Entries to Disk

Uncommitted entries should be written to disk. When a node becomes leader with an inconsistent log index and commit index then a noop should be performed to advance the commit index.

/cc: @ongardie

Simplify Server Initialization

When starting the server, the API should be simplified:

  • Simplify to a single Server.Start() method which starts node as a follower.
  • If log entries exist then allow promotion to candidate if no AEs received.
  • If no log entries exist then wait for AEs from another node.
  • If no log entries exist and a self-join command is issued then immediately become leader and commit entry.
  • Only allow the first log entry to be a self-join.
  • Move JoinCommand into the Raft library with a command name of raft:join.
  • Create a LeaveCommand in the Raft library with a command name of raft:leave.

We may want to combine the code in the current Server.Initialize() into Server.Start() since it doesn't make sense to have two calls to initialize. It'd be nice to add a Server.Initialize(name, connectionString string) method that issues the self-join.

/cc: @xiangli-cmu

Log Commit

Append() should queue entries in memory until SetCommitIndex() has been set to an index higher than the entry.

Moving this library to a correct base name?

go-raft doesn't follow with the Package Naming that golang.org recommends. Essentially, our base name should be raft instead of go-raft.

It would be nice to fix this and I was thinking we could move the repo into an organization at the same time: github.com/goraft/raft

That way the package URL doesn't get longer, it still conveys the information that this is a Go library and by using an Org it sends a signal that there is a community of people behind the project.

@benbjohnson What do you think about this? I reserved the goraft Org already and would be happy to give you admin access.

Cluster configuration issue

With join command, all the peers will append join command.
Suppose we have 3 nodes, and node 1 is the first leader node.

Node 2 join Leader 1:
Leader: peer 2 Restart: reply join 2 -> peer 2
Peer2: None Restart: reply join 2 -> None

Node 2, Node 3 join Leader 1:
Leader: peer 2 peer 3 Restart: reply join 2 -> peer 2 reply join 3 -> peer 3
Peer2: peer 3 Restart: reply join 2 -> None reply join 3 -> peer3
Peer3: peer 2 Restart: reply join 2 -> peer3 reply join 3 -> peer2

Here is the problem, the peers will not add leader to their log.

Clean up promote func

When the server is a candidate, it can also step down due to receiving appendEntries request from new leader.

go test -race fails

I haven't looked into whether not these are legitimate races or test-only races but they should be cleaned up so go test -race is useful.

Add sync registration and method

Some application might want to let leader trigger a sync event every given time.
So I think we can provide a api to let application register sync event and send it out from leader.
@benbjohnson

External Interface Cleanup

The external interface to the library need to be pared down significantly. A client using go-raft should only need to access a couple functions on the Server such as Start(), Stop(), etc. Everything else should be internal-only.

Timer Race Conditions

The timer has several race conditions surrounding the initialization of timer goroutines.

Avoid unlimited wait in stopHeartbeat

We need to fix a possible unlimited wait in stopHeartbeat. Initial attempt was via #186

Via @xiangli-cmu I have found the root cause of the problem. Here is why there is a deadlock:
When the leader call removePeer it is holding the log lock, since it entry the removePeer via
setCommitIndex. The leader will send a stop signal and wait for receiving.

If the peer is actually in function: flush(), it is also need to acquire the log lock at func
p.server.log.getEntriesAfter.

So a deadlock happens.

Bug in log.appendEntry?

log.go:390:

} else if entry.index == lastEntry.index && entry.index <= lastEntry.index {
    return fmt.Errorf("raft.Log: Cannot append entry with earlier index in the same term (%x:%x <= %x:%x)", entry.term, entry.index, lastEntry.term, lastEntry.index)
}

Should the first condition be entry.term == lastEntry.term?

Leader Election

  • Add RequestVote RPC to Server.
  • Add state (Follower, Candidate, Leader) to Server.

The restart node should only apply committed logs

the restart one can know that by receive the first heartbeat from the current leader (but it is slow to replay all the logs after the server has been started)

maybe we can keep the committed index and flush that to disk every several seconds?

Writes accepted in a 2 node cluster if the follower is down

If you create a 2 node cluster and kill the follower process you can still issue writes to the leader. I would expect the leader once its discovered the follower is down to go back to trying to elect a new leader but can't because there's not enough nodes to elect one which would cause writes to fail.

I let it run for about 30 minutes and writes were still being accepted.

If you stop both nodes and only start the leader, the leader correctly rejects writes.

Randomness issue

Based on my testing, in many cases all the peers tend to timeout at the same time.
This may cause by all the server is created at the same time (election timeout use time.Now() as the seed?). Can we add more randomness, such as the server name?

04:59:00.588118 Name: 9, State: follower, Term: 31628, Index: 483708 start Candiate
04:59:00.588150 Name: 9, State: candidate, Term: 31629, Index: 483708 start Select
04:59:00.588187 Name: 3, State: follower, Term: 31628, Index: 483708 start Candiate
04:59:00.588230 Name: 3, State: candidate, Term: 31629, Index: 483708 start Select
04:59:00.588266 Name: 5, State: follower, Term: 31628, Index: 483708 start Candiate
04:59:00.588282 Name: 5, State: candidate, Term: 31629, Index: 483708 start Select
04:59:00.588317 Name: 4, State: follower, Term: 31628, Index: 483708 start Candiate
04:59:00.588334 Name: 4, State: candidate, Term: 31629, Index: 483708 start Select
04:59:00.588369 Name: 2, State: follower, Term: 31628, Index: 483708 start Candiate
04:59:00.588383 Name: 2, State: candidate, Term: 31629, Index: 483708 start Select
04:59:00.588419 Name: 6, State: follower, Term: 31628, Index: 483708 start Candiate
04:59:00.588434 Name: 6, State: candidate, Term: 31629, Index: 483708 start Select
04:59:00.588471 Name: 7, State: follower, Term: 31628, Index: 483708 start Candiate
04:59:00.588486 Name: 7, State: candidate, Term: 31629, Index: 483708 start Select
04:59:00.588521 Name: 1, State: follower, Term: 31628, Index: 483708 start Candiate
04:59:00.588535 Name: 1, State: candidate, Term: 31629, Index: 483708 start Select

Error after stopping a single raft instance

I startup my raft instance and run a test to start sending data. I then ctrl-c to kill the running raft state machine. I have a channel that catches the ctrl-c and runs raft.Stop() and I then use Running() to make sure raft is not running anymore.

I then start my server and raft recovers the log. I then start to send data with the same test and get the following error.

This error is resolved by deleting the log and starting again. Maybe corruption? Not totally sure what's going on.

panic: server unable to send signal to commit channel

goroutine 9 [running]:
github.com/goraft/raft.(_server).processAppendEntriesResponse(0xc2000f1100, 0xc2000dad80)
/go/src/github.com/goraft/raft/server.go:919 +0x60c
github.com/goraft/raft.(_server).leaderLoop(0xc2000f1100)
/go/src/github.com/goraft/raft/server.go:705 +0x52a
github.com/goraft/raft.(_server).loop(0xc2000f1100)
/go/src/github.com/goraft/raft/server.go:500 +0x33e
created by github.com/goraft/raft.(_server).Start
/go/src/github.com/goraft/raft/server.go:420 +0x71e

goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc200000300)
/usr/local/go/src/pkg/runtime/zsema_darwin_amd64.c:165 +0x2e
sync.(*WaitGroup).Wait(0xc2000c7600)
/usr/local/go/src/pkg/sync/waitgroup.go:109 +0xf2
main.main()
/go/src/github.com/repo/main.go:31 +0x1e6

goroutine 2 [syscall]:

goroutine 4 [syscall]:
os/signal.loop()
/usr/local/go/src/pkg/os/signal/signal_unix.go:21 +0x1c
created by os/signal.init·1
/usr/local/go/src/pkg/os/signal/signal_unix.go:27 +0x2f

goroutine 5 [chan receive]:
github.com/repo/router.(_Router).sigMonitor(0xc2000db000)
/go/src/github.com/repo/router/router.go:138 +0x196
created by github.com/repo/router.(_Router).Start
/go/src/github.com/repo/router/router.go:80 +0xeb

goroutine 6 [IO wait]:
net.runtime_pollWait(0x58ef00, 0x72, 0x0)
/usr/local/go/src/pkg/runtime/znetpoll_darwin_amd64.c:118 +0x82
net.(_pollDesc).WaitRead(0xc2000cc1a0, 0x23, 0xc2000b6c60)
/usr/local/go/src/pkg/net/fd_poll_runtime.go:75 +0x31
net.(_netFD).accept(0xc2000cc120, 0x335ef0, 0x0, 0xc2000b6c60, 0x23, ...)
/usr/local/go/src/pkg/net/fd_unix.go:385 +0x2c1
net.(_TCPListener).AcceptTCP(0xc200000338, 0x18, 0xc2000f2010, 0xf55f7)
/usr/local/go/src/pkg/net/tcpsock_posix.go:229 +0x45
net.(_TCPListener).Accept(0xc200000338, 0x0, 0x0, 0x0, 0xc2000b6bd0, ...)
/usr/local/go/src/pkg/net/tcpsock_posix.go:239 +0x25
net/http.(_Server).Serve(0xc2000b35f0, 0xc2000da100, 0xc200000338, 0x0, 0x0, ...)
/usr/local/go/src/pkg/net/http/server.go:1542 +0x85
net/http.(_Server).ListenAndServe(0xc2000b35f0, 0x0, 0x0)
/usr/local/go/src/pkg/net/http/server.go:1532 +0x9e
github.com/repo/router.(_Router).listenAndServe(0xc2000db000)
/go/src/github.com/repo/router/router.go:126 +0x2c
created by github.com/repo/router.(_Router).Start
/go/src/github.com/repo/router/router.go:81 +0x102

goroutine 8 [chan receive]:
github.com/goraft/raft.(_server).send(0xc2000f1100, 0x274920, 0xc20011c060, 0xc20011c060, 0xc200000001, ...)
/go/src/github.com/goraft/raft/server.go:515 +0xde
github.com/goraft/raft.(_server).Do(0xc2000f1100, 0xc2000f5a50, 0xc20011c060, 0x19, 0x40, ...)
/go/src/github.com/goraft/raft/server.go:768 +0x56
github.com/repo/router.(_Router).register(0xc2000db000, 0xc2001184d0, 0x8, 0x0, 0x0, ...)
/go/src/github.com/repo/router/router.go:221 +0x145
github.com/repo/router.(_Router).parseMessage(0xc2000db000, 0xc20011c030, 0x2, 0x2)
/go/src/github.com/repo/router/router.go:202 +0x120
github.com/repo/router.(_Router).messageFilter(0xc2000db000)
/go/src/github.com/repo/router/router.go:192 +0x318
created by github.com/repo/router.(_Router).Start
/go/src/github.com/repo/router/router.go:83 +0x147

goroutine 22 [select]:
github.com/goraft/raft.func·004()
/go/src/github.com/goraft/raft/server.go:793 +0x2cc
created by github.com/goraft/raft.(*server).processCommand
/go/src/github.com/goraft/raft/server.go:803 +0x478
exit status 2

Panics: leaky abstractions

I've just spent part of my evening reading the source code. I notice in server.go that error is returned in some cases, while panic(...) is called in other error cases. Here's an example.

A Go blog article states

The convention in the Go libraries is that even when a package uses panic internally, its external API still presents explicit error return values.

My question is, is there a specific reason as to why the raft library uses panic(...) as opposed to errors? Does it distinguish between the two somehow?

Cluster Configuration

  • Change cluster configuration file to use JSON: {"peers":[...]}.
  • Atomically read config file using ioutil.ReadFile().
  • Atomically write config file using ioutil.WriteFile().
  • Write config file to temporary location and then swap using os.Rename().

/cc: @xiangli-cmu

Snapshot issue

When the snapshot get large, to send the whole snapshot to a slow follower will not be a good idea.

Should we do incremental snapshot or keep the previous log for a longer time after the snapshot?

Log Replication

  • Add Server.Do(*Command).
  • Commands should be sent via AppendEntries RPC before being committed to log.

Simplify Log

  • Ensure that log is only accessed from the Server within a lock.
  • Remove all locks from the Log.

Transport Interface

Replace the callback system in raft.Server to use the following interface instead:

type Transporter interface {
    SendAppendEntriesRequest(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error)
    SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error)
}

cc: @tadglines

Do() should not end unless committed or demoted

The Server.do() method should not timeout while waiting for responses. Instead it should wait until an entry is committed or until it is notified of its demotion.

Reported by @xiangli-cmu.

Command commit callback issue

If the leader dies after the client send a command to it, the client will not get the callback from the "leader" it connects to. The command may be committed (the new leader has received the append entries request before the previous leader dies) or may be deleted (the new leader did not receive the append entires request).

How to notify the client?

Server Event Loop

Consolidate all state changes and command execution to the event loop in server.go.

Binary Log

Refactor the log and commands to allow for generic binary encoding.

  • Log refactor
  • Command refactor

Log Recovery

When reading the log, if a line is corrupt then truncate the log from the last correct entry.

Better logging and debugging method

I hope we can do better logging and error handling.

The log.Println only print to second level, which is not accurate enough for our debugging purpose.

Think more carefully about which part should be panic and which is only a temp error.

The current debugging method I take is to set heartbeat and election timeout very quick(several ms) and keep on killing leader and sending commands to the current leader.

After we do better logging and panic, we can more easily trace down the sequence and cause of the problem

Race condition during leader stepdown

  1. Node 1 is leader. It has 20 entries in its log, the first 10 of which are committed. It has pushed all 20 to Node 2, but let's assume Nodes 3, 4, and 5 haven't seen any of these uncommitted entries.
  2. Somehow Node 3 gets elected while Node 1 isn't looking. It only has 10 entries in its log.
  3. Node 3 sends Node 1 an AppendEntries with a higher term number, and Node 1 steps down. It truncates its log to entry 10.
  4. In a separate goroutine, Node 1 prepares to send an AppendEntries request to Node 2 (that goroutine has no idea a step-down is in progress). It notes that the last entry it replicated to Node 2 was entry 20, and attempts to retrieve all entries in the log from 20 onwards.
  5. getEntriesAfter panics with the message "raft: Index is beyond end of log: 10 20"

In general, the locking around log operations is super sketchy. I suspect, for instance, that the entirety of https://github.com/goraft/raft/blob/master/server.go#L904-920 ought to be protected by a single lock acquisition / release, for instance.

duplicate events

There are duplicate events sending to the dispatcher. I will clean this up soon.

Documentation

Add documentation that describes:

  • An overview of Raft.
  • How to implement the go-raft library.
  • Do's and Don'ts of using the library.
  • Limitations of the library.
  • Future Enhancements.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.