Server-less p2p database built on libp2p

ThreadDB is still under heavy development and no part of it should be used before a thorough review of the underlying code and an understanding APIs and protocols may change rapidly. There may be coding mistakes, and the underlying protocols may contain design flaws. Please let us know immediately if you have discovered a security vulnerability.

Please also read the security note for go-ipfs.


ThreadDB is an implementation of the database described in the paper entitled A protocol & event-sourced database for decentralized user-siloed data.

Go to the docs for more about the motivations behind ThreadDB and Textile.


ThreadDB has two distinct layers:

  • db: The database layer is a document store, which internally leverages the net API. Most applications will only interface with this layer.
  • net: The network layer maintains and orchestrates append-only event logs between network participants. Some applications, like event logging, may choose to rely on this layer directly.

This repo contains a daemon and client for interacting with these layers as a remote service. Depending on the application, Golang projects may choose to import the internal db and net packages directly.


git clone
cd go-threads
go get ./threadsd


import ""

Getting Started

You can think of the DB client as a gRPC client wrapper around the internal db package API, and the Network client as a gRPC client wrapper around the internal net package API. This section will only focus on getting started with the gRPC clients, but Golang apps may choose to interact directly with db and/or net.

Running ThreadDB

The threadsd daemon can be run as a server or alongside desktop apps or command-line tools. The easiest way to run threadsd is by using the provided Docker Compose files. If you're new to Docker and/or Docker Compose, get started here. Once you are setup, you should have docker-compose in your PATH.

Create an .env file and add the following values:


Copy this compose file and run it with the following command.

docker-compose -f docker-compose.yml up 

You should see some console output:

threads_1  | 2020-09-19T16:34:06.420Z	DEBUG	threadsd	repo: /data/threads
threads_1  | 2020-09-19T16:34:06.420Z	DEBUG	threadsd	hostAddr: /ip4/
threads_1  | 2020-09-19T16:34:06.421Z	DEBUG	threadsd	apiAddr: /ip4/
threads_1  | 2020-09-19T16:34:06.421Z	DEBUG	threadsd	apiProxyAddr: /ip4/
threads_1  | 2020-09-19T16:34:06.421Z	DEBUG	threadsd	connLowWater: 100
threads_1  | 2020-09-19T16:34:06.421Z	DEBUG	threadsd	connHighWater: 400
threads_1  | 2020-09-19T16:34:06.422Z	DEBUG	threadsd	connGracePeriod: 20s
threads_1  | 2020-09-19T16:34:06.423Z	DEBUG	threadsd	keepAliveInterval: 5s
threads_1  | 2020-09-19T16:34:06.423Z	DEBUG	threadsd	enableNetPubsub: false
threads_1  | 2020-09-19T16:34:06.424Z	DEBUG	threadsd	debug: true
threads_1  | Welcome to Threads!
threads_1  | Your peer ID is 12D3KooWFCXqmQTwvpfYFWK3DjXChEc4NoPt8pp5jjC8REZ3g6NZ

Congrats! Now you have ThreadDB running locally.

Configuration values

Note the various configuration values shown in the output above. These can be modified with environment variables show below.

  • THRDS_REPO: Repo location. Mandatory when launching from docker compose.
  • THRDS_HOSTADDR: Libp2p host bind address. /ip4/ by default.
  • THRDS_APIADDR: gRPC API bind address. /ip4/ by default.
  • THRDS_APIPROXYADDR: gRPC API web proxy bind address. /ip4/ by default.
  • THRDS_CONNLOWWATER: Low watermark of libp2p connections that'll be maintained. 100 by default.
  • THRDS_CONNHIGHWATER: High watermark of libp2p connections that'll be maintained. 400 by default.
  • THRDS_CONNGRACEPERIOD: Duration a new opened connection is not subject to pruning. 20 seconds by default.
  • THRDS_KEEPALIVEINTERVAL: Websocket keepalive interval (must be >= 1s). 5 seconds by default.
  • THRDS_ENABLENETPUBSUB: Enables thread networking over libp2p pubsub. false by default.
  • THRDS_DEBUG: Enables debug logging. false by default.


The database layer is a document store, which internally leverages the net API. Most applications will only interface with this layer.

The full API spec is available here.

As described in the paper, ThreadDB's network layer orchestrates groups of event logs, or threads. In the current implementation, a single database leverages a single network-layer thread for state orchestration.

Starting the client

import ""

db, err := client.NewClient("", grpc.WithInsecure())

Getting a thread token

Thread tokens (JWTs) are used by the daemon to determine the identity of the caller. Most APIs take a thread token as an optional argument, since whether or not they are needed usually depends on how the target collection is configured (see Write Validation and Read Filtering). These tokens are obtained by performing a signing challenge with the daemon using a libp2p private key.

privateKey, _, err := crypto.GenerateEd25519Key(rand.Reader) // Private key is kept locally
myIdentity := thread.NewLibp2pIdentity(privateKey)

threadToken, err := db.GetToken(context.Background(), myIdentity)

Creating a new DB

threadID := thread.NewIDV1(thread.Raw, 32)
err := db.NewDB(context.Background(), threadID)

Creating a new DB from an existing address

An existing DB can be added to a different daemon by providing a valid host address and thread key.

threadID := thread.NewIDV1(thread.Raw, 32)
err := db1.NewDB(context.Background(), threadID)
dbInfo, err := db1.GetDBInfo(context.Background(), threadID)

// db2 is a different client (this would normally be done on a different machine)
err := db2.NewDBFromAddr(context.Background(), dbInfo.Addrs[0], dbInfo.Key)

Creating a collection

Collections are groups of documents or instances and are analogous to tables in relational databases. Creating a collection involves defining the following configuration parameters:

  • Name: The name of the collection, e.g, "Animals" (must be unique per DB).
  • Schema: A JSON Schema), which is used for instance validation.
  • Indexes: An optional list of index configurations, which define how instances are indexed.
  • WriteValidator: An optional JavaScript (ECMAScript 5.1) function that is used to validate instances on write.
  • ReadFilter: An optional JavaScript (ECMAScript 5.1) function that is used to filter instances on read.
Write Validation

The WriteValidator function receives three arguments:

  • writer: The multibase-encoded public key identity of the writer.
  • event: An object describing the update event (see core.db.Event).
  • instance: The current instance as a JavaScript object before the update event is applied.

A falsy return value indicates a failed validation.

Having access to writer, event, and instance opens the door to a variety of app-specific logic. Textile Buckets file-level access roles are implemented in part with a write validator.

Read Filtering

The function receives three arguments:

  • reader: The multibase-encoded public key identity of the reader.
  • instance: The current instance as a JavaScript object.

The function must return a JavaScript object. Most implementation will modify and return the current instance.

Like write validation, read filtering opens the door to a variety of app-specific logic. Textile Buckets file-level access roles are implemented in part with a read filter.

import ""

// We can use a struct to define a collection schema
type Person struct {
    ID        string `json:"_id"`
    Name      string `json:"name"`
    Age       int    `json:"age"`
    CreatedAt int    `json:"created_at"`

reflector := jsonschema.Reflector{}
mySchema = reflector.Reflect(&Person{}) // Generate a JSON Schema from a struct

err := db.NewCollection(context.Background(), myThreadID, db.CollectionConfig{
    Name:    "Persons",
    Schema:  mySchema,
    Indexes: []db.Index{{
        Path:   "name", // Value matches json tags
		Unique: true, // Create a unique index on "name"


// We can use the same schema to create more collections.
err := db.NewCollection(context.Background(), myThreadID, db.CollectionConfig{
    Name:    "Persons",
    Schema:  mySchema,
    Indexes: []db.Index{{
        Path:   "name",
		Unique: true,
        var type = event.patch.type
        var patch = event.patch.json_patch
        switch (type) {
          case "delete":
            if (writer != "the_boss") {
              return false // Not the boss? No deletes for you.
            return true
        if (instance.Age > 50) {
            delete instance.Age // Getting old, let's hide just _how_ old hehe
        return instance

Updating a collection

Each of the collection configuration parameters above can be updated.


err := db.UpdateCollection(context.Background(), myThreadID, db.CollectionConfig{
    Name:    "Persons",
    Schema:  mySchema,
    Indexes: []db.Index{{
        Path:   "name",
        Unique: true,
        Path: "created_at", // Add an additional index on "created_at"

Creating an instance

Creating a collection instance is analogous to inserting a row in a relational database table.


// ID is autogenerated when omitted
alice := &Person{
    ID:        "",
    Name:      "Alice",
    Age:       30,
    CreatedAt: time.Now().UnixNano(),

ids, err := db.Create(context.Background(), threadID, "Persons", Instances{alice})

alice.ID = ids[0] // ids contains autogenerated instance identifiers

// We can also define a custom ID, it just has to be a collection-wide unique string
bob := &Person{
    ID:        "123",
    Name:      "Bob",
    Age:       30,
    CreatedAt: time.Now().UnixNano(),

ids, err := db.Create(context.Background(), threadID, "Persons", Instances{bob})

Saving an instance

Similarly, we can update an instance with new values.


alice.Age = 31
err = db.Save(context.Background(), threadID, "Persons", Instances{alice})

Collection queries

There are three methods to query for collection instances: Find, FindByID, and Has. As usual, queries are enhanced by indexes.

Check out db.Query and db.Criterion for more about constructing queries and ordering results.


// Find instances with a query
query := db.Where("name").Eq("Alice")
results, err := db.Find(context.Background(), threadID, "Persons", query, &Person{})

alice := results[0].(*Person)


// Find an instance by ID
alice := &Person{}
err = db.FindByID(context.Background(), threadID, "Persons", aliceID, alice)


// Determine if an instance exists by ID
exists, err := db.Has(context.Background(), threadID, "Persons", []string{aliceID})


ThreadDB transactions come in two flavors: WriteTransaction and ReadTransaction.

Write transactions

txn, err := db.WriteTransaction(context.Background(), threadID, "Persons")
end, err := txn.Start()

alice.Age = 32
err = txn.Save(alice)

err = txn.Create(&Person{
    Name:      "Bob",
    Age:       30,
    CreatedAt: time.Now().UnixNano(),

end() // Done writing, commit transaction updates
Read transactions

txn, err := db.ReadTransaction(context.Background(), threadID, "Persons")
end, err := txn.Start()

hasAlice, err := txn.Has(alice.ID)

results, err := txn.Find(db.Where("name").Eq("Bob"), &Person{})

bob := results[0].(*Person)

end() // Done reading

Listening for collection changes

We can listen for DB changes on three levels: DB, collection, or instance.

Check out ListenOption for more.


ctx, cancel := context.WithCancel(context.Background())
defer cancel()
events, err := db.Listen(ctx, threadID, []db.ListenOption{{
    Type: client.ListenAll,
    Collection: "Persons",  // Omit to receive events from all collections
    InstanceID: bob.ID,     // Omit to receive events from all instances

for event := range events {
    // Handle event

The Network API

The network layer maintains and orchestrates append-only event logs between network participants and is used internally by the database layer. Some applications, like event logging, may choose to rely on this layer directly.

The full API spec is available here.

Starting the client

import ""

net, err := client.NewClient("", grpc.WithInsecure())

Getting a thread token

Thread tokens (JWTs) are used by the daemon to determine the identity of the caller. Most APIs take a thread token as an optional argument.

privateKey, _, err := crypto.GenerateEd25519Key(rand.Reader) // Private key is kept locally
myIdentity := thread.NewLibp2pIdentity(privateKey)

threadToken, err := net.GetToken(context.Background(), myIdentity)

Creating a thread

threadID := thread.NewIDV1(thread.Raw, 32)
threadInfo, err := net.CreateThread(context.Background(), threadID)

Adding an existing thread

An existing thread can be added to a different daemon by providing a valid host address and thread key.

threadID := thread.NewIDV1(thread.Raw, 32)
threadInfo1, err := net1.CreateThread(context.Background(), threadID)

// net2 is a different client (this would normally be done on a different machine)
threadInfo2, err := net2.AddThread(context.Background(), threadInfo1.Addrs[0], core.WithThreadKey(threadInfo1.Key))

Adding a thread replicator

We can replicate a thread on a different host. All logs and records are pushed to the new host. However, it will not be able to read them since it won't receive read portion of the thread key.

threadID := thread.NewIDV1(thread.Raw, 32)
threadInfo, err := net1.CreateThread(context.Background(), threadID)

replicatorAddr, err := multiaddr.NewMultiaddr("/ip4/<REPLICATOR_IP_ADDRESS>/tcp/4006/p2p/<REPLICATOR_PEER_ID>")
replicatorID, err := net.AddReplicator(context.Background(), threadID, replicatorAddr)

Creating a thread record

A thread record can have any body.

import ipldcbor ""

body, err := ipldcbor.WrapObject(map[string]interface{}{
    "foo": "bar",
    "baz": []byte("howdy"),
}, multihash.SHA2_256, -1)
record, err := net.CreateRecord(context.Background(), threadID, body)

Adding a thread record

We can also retain control over the read portion of the thread key and the log private key and create records locally.

import ipldcbor ""

privateKey, _, err := crypto.GenerateEd25519Key(rand.Reader)
myIdentity := thread.NewLibp2pIdentity(privateKey)

threadToken, err := net.GetToken(context.Background(), myIdentity)

threadID := thread.NewIDV1(thread.Raw, 32)
threadKey := thread.NewRandomKey()
logPrivateKey, logPublicKey, err := crypto.GenerateEd25519Key(rand.Reader)
logID, err := peer.IDFromPublicKey(logPublicKey)

threadInfo, err := net.CreateThread(
    core.WithThreadKey(thread.NewServiceKey(threadKey.Service())), // Read key is kept locally
    core.WithLogKey(logPublicKey),                                 // Private key is kept locally
    core.WithNewThreadToken(threadToken))                          // Thread token for identity is needed to verify records

body, err := ipldcbor.WrapObject(map[string]interface{}{
    "foo": "bar",
    "baz": []byte("howdy"),
}, mh.SHA2_256, -1)

// Create the event locally
event, err := cbor.CreateEvent(context.Background(), nil, body, threadKey.Read())

// Create the record locally
record, err := cbor.CreateRecord(context.Background(), nil, cbor.CreateRecordConfig{
	Block:      event,
	Prev:       cid.Undef,              // No previous records because this is the first
	Key:        logPrivateKey,
	PubKey:     myIdentity.GetPublic(),
	ServiceKey: threadKey.Service(),

err = net.AddRecord(context.Background(), threadID, logID, record)

Pulling a thread for new records

Although all known hosts of a particular thread are internally polled for new records (as part of the orchestration protocol), doing so manually can often be useful.

err := net.PullThread(context.Background(), info.ID)

Listening for new records

We can listen for new thread records across all or a subset of known threads.

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
records, err := net.Subscribe(ctx, core.WithSubFilter(threadID)) // Only receive new records from this thread

for record := range records {
    // Handle record


The easiest way to develop against threadsd is to use the Docker Compose files. The -dev flavored file doesn't persist a repo via Docker Volumes, which may be desirable in some cases.


Pull requests and bug reports are very welcome ❤️

This repository falls under the Textile Code of Conduct.

Feel free to get in touch by:


A changelog is published along with each release.



go-threads's Issues

Create follower/replicator mode

Allow a thread to be followed without knowledge of its secret. This enables more "public" style threads, e.g., a blog, twitter, etc.

Network API

Current setup is HTTP + a libp2p adapter. However, since many payloads contain raw bytes, having a serialization layer that can handle typed bytes (protos) would be ideal.

I'm considering gRPC + libp2 transport (I have an example of this working nicely). Pubsub is another consideration. Also considering gRPC.

Fix tests in CI

For some reason, the tests peers can usually not find each other when run on CircleCI.

Store JWTs

Allow a remote client to read/write via a JWT + role claim.

dispatcher: partial handling

As mentioned in textileio/go-eventstore#7, we should decide on something if an ongoing dispatch of a set of events fails.

The problem is quite complex, but not really a blocker to get going. Even if no hard decision is made, maybe proper documentation may be enough.

Some summary/ideas to have a sense of the problem, complexity, and possible solutions.


When a transaction is committed, all generated events from the actions performed in the transaction are dispatched to the Dispatcher. On every event dispatch, the Dispatcher calls the Reduce function of every registered reducer. If any calls to a registered reducer fail, the Dispatch() operations fail.

Consequence 1

The event that caused the failure may have gone through half of the dispatches before failing. This means that half of the models reacted to the event, and the other half didn't.

Consequence 2

Before the event that caused the failure, other events could have been completely processed correctly by all reducers. Similarly, events that would be processed after the event that caused the failure didn't go through the dispatcher.


Both consequences are an inconsistent state at two different levels (partial event dispatch, and complete and incomplete dispatch of other events in the txn).
The concept of Rollback sounds appealing, but that implies more work to ask in Models.
Possible ideas leveraging the event sourcing mechanism:

  • On failure reapply history of events up to the failed event (costly, but would work)
  • Every Reduce processing of an event, should store some kind of rollback action to be prepared if that is needed. (Not sure about is possible in all cases to have an undo-event concept)
  • Every Model can have a snapshot of its state before applying the last event, just in case is needed for a rollback (can leverage an existing snapshot feature in datastore? if not, not sure the work to implement that outweighs the benefits)
  • Consider @carsonfarmer insight of leveraging Action.Previous to have previous state, and try to play with past Action to rollback things. It sounds a good idea.

model: explore ipld schema

As mentioned by @carsonfarmer
Consider it as a replacement for the currently underlying JSON schema and encoding.

  • pro: smaller encoded values
  • pro: faster validation of instance with schema?
  • pro: will be interoperable with other languages
  • cons: would make certain EventCodec unusable: json-patcher, json-crdt, etc

e2e: app for demo

When all things are wired, make a simple but useful demo app to share.

Add Metadata test to Threadstore suite

There're some tests for Metadata but in the Threadservice suite scope that uses inmem implementation.

I'll add some tests to a metadata_suite.go to test both inmem and tstoreds. Making it in other PR to avoid blocking the current one since it is making lots of changes.

Proxy Log Manager

Daemon based, manage multiple logs per thread

  • #54 JWTs used to enable remote log writing
  • #55 Serve and push over a web-socket at the thread level

Add a share key to conceal read keys from a follower

AddFollower should work like,

  • Get or create a symmetric share key for the thread (you will have it if you or anyone else has already added a follower). Store the key in the metadata store because it's for the whole thread.
  • Use this key to encrypt all log read keys before sending them to the follower.
  • Tag the key on to the thread address when sharing it.

Now, on the client,

  • Do NOT send the share key to the follower, just use it to decrypt the log read keys

Ensure that a thread is not pulled concurrently

As @jsign mentioned in a review, we should avoid have multiple pull jobs running for a single thread.

Idea: Maybe adding extra logic to avoid pulling concurrently from the same thread.

Like, if t.Pull is slow enough and tick ticks again, maybe pulls for the same thread start queueing. If that makes sense there're could be multiple options: from discarding it since there's an already running one, or cancelling running one and retry from zero again, etc.

This can give more guarantees that at most there are 1 live pull for the existing threads (which also in the future we can think to move it to a (capped) worker model to cap it if that's too much pressure).

At the very least we could have a sync.Map or equivalent that tracks live pulls.

Store API (gRPC)

We'll need a daemon that runs a server and a gRPC service that can control multiple stores.

RPC Methods

  • NewStore
  • RegisterSchema
  • Start
  • StartFromAddress
  • ModelCreate
  • ModelHas
  • ModelSave
  • ModelDelete
  • ModelReadTxn
  • ModelWriteTxn
  • ModelFindByID
  • ListenForStateChanges?
  • others?

dispatcher: key structure reconsideration

Quoting @carsonfarmer in textileio/go-eventstore#7 :

Do you still think having time first is a good idea? I was starting to wonder if prefix searching via entity it would be useful. Then you'd want something like ///?

Not clear now for me when we're going to query raw events from dispatcher, so no clear answer.
Leaving here to keep record of the question because might be important to reconsdier.

Get Thread API

This is used in conjunction with thread links to facilitate adding a new thread.

JSON-CRDT EventCodec

Currently there's a JsonPatcher implementation for creating events. Ultimately, using rdoc (or similar since since other languages requiriements are important) will be ideal.

Modularize threads


  • go-textile-threads
  • go-textile-wallet
  • go-textile-core (to replace go-textile-crypto w/ proto types)

Network test

Possibly with Docker compose, run a thread tests with a large number of participants and high write load.

