Coder Social home page Coder Social logo

dht-rpc's Introduction

dht-rpc

Make RPC calls over a Kademlia based DHT.

npm install dht-rpc

Key Features

  • Remote IP / firewall detection
  • Easily add any command to your DHT
  • Streaming queries and updates

Note that internally V5 of dht-rpc differs significantly from V4, due to a series of improvements to NAT detection, secure routing IDs and more.

Usage

Here is an example implementing a simple key value store

First spin up a bootstrap node. You can make multiple if you want for redundancy. There is nothing special about a bootstrap node, except it needs to know it's own host and port, since it knows no other nodes to infer it from.

import DHT from 'dht-rpc'

const bootstrap = DHT.bootstrapper(10001, '127.0.0.1')

Now lets make some dht nodes that can store values in our key value store.

import DHT from 'dht-rpc'
import crypto from 'crypto'

// Let's create 100 dht nodes for our example.
for (var i = 0; i < 100; i++) createNode()

function createNode () {
  const node = new DHT({
    bootstrap: [
      'localhost:10001'
    ]
  })

  const values = new Map()
  const VALUES = 0 // define a command enum

  node.on('request', function (req) {
    if (req.command === VALUES) {
      if (req.token) { // if we are the closest node store the value (ie the node sent a valid roundtrip token)
        const key = hash(req.value).toString('hex')
        values.set(key, req.value)
        console.log('Storing', key, '-->', req.value.toString())
        return req.reply(null)
      }

      const value = values.get(req.target.toString('hex'))
      req.reply(value)
    }
  })
}

function hash (value) {
  return crypto.createHash('sha256').update(value).digest()
}

To insert a value into this dht make another script that does this following

const node = new DHT()

const q = node.query({
  target: hash(val),
  command: VALUES,
  value
}, {
  // commit true will make the query re-request the 20 closest
  // nodes with a valid round trip token to update the values
  commit: true
})

await q.finished()

Then after inserting run this script to query for a value

const target = Buffer.from(hexFromAbove, 'hex')
for await (const data of node.query({ target, command: VALUES })) {
  if (data.value && hash(data.value).toString('hex') === hexFromAbove) {
    // We found the value! Destroy the query stream as there is no need to continue.
    console.log(val, '-->', data.value.toString())
    break
  }
}
console.log('(query finished)')

API

const node = new DHT([options])

Create a new DHT node.

Options include:

{
  // A list of bootstrap nodes
  bootstrap: [ 'bootstrap-node.com:24242', ... ],
  // Optionally pass in array of { host, port } to add to the routing table if you know any peers
  nodes: [{ host, port }, ...],
  // Optionally pass a port you prefer to bind to instead of a random one
  port: 0,
  // Optionally pass a host you prefer to bind to instead of all networks
  host: '0.0.0.0',
  // Optionally pass a UDX instance on which sockets will be created.
  udx,
  // dht-rpc will automatically detect if you are firewalled. If you know that you are not set this to false
  firewalled: true
}

Nodes per default use something called adaptive mode to decide whether or not they want to join other nodes' routing table. This includes things like node uptime, if the node is firewalled etc. Adaptive mode is conservative, so it might take ~20-30 mins for the node to turn persistent. If you are making a test case with your own bootstrap network you'd usually want to turn this off to make sure your test finishes in a timely manner. You can do this by passing ephemeral: false in the constructor. For the vast majority of use-cases you should always use adaptive mode to ensure good DHT health, ie the defaults.

Your DHT routing id is hash(publicIp + publicPort) and will be autoconfigured internally.

const node = DHT.bootstrapper(port, host, [options])

Make a bootstrap node for your DHT. The port and host needs to be its globally accessible port and host. Note: port and host parameters are used to create the node id. Use options.host if you want to bind to i.e. 127.0.0.1. DHT nodes can use any other DHT node to bootstrap, but a bootstrap node can bootstrap itself, by itself.

await node.ready()

Wait for the node to be fully bootstrapped etc. You don't have to wait for this method, but can be useful during testing.

node.id

Get your own routing ID. Only available when the node is not ephemeral.

node.ephemeral

A boolean indicating if you are currently ephemeral or not

node.on('bootstrap')

Emitted when the routing table is fully bootstrapped. Emitted as a conveinience.

node.on('listening')

Emitted when the underlying UDX socket is listening. Emitted as a conveinience.

node.on('ready')

Emitted when the node is fully bootstrapped etc.

node.on('persistent')

Emitted when the node is no longer in ephemeral mode. All nodes start in ephemeral mode, as they figure out their NAT settings. If you set ephemeral: false then this is emitted during the bootstrap phase, assuming you are on an open NAT.

node.on('wake-up')

Emitted when the node has detected that the computer has gone to sleep. If this happens, it will switch from persistent mode to ephemeral again.

node.on('network-change', interfaces)

Emitted when the network interfaces of the computer change.

node.on('nat-update', (host, port) => {})

Emitted when node.host or node.port were changed.

node.on('close')

Will be emitted after node.destroy() is completed.

node.refresh()

Refresh the routing table by looking up a random node in the background. This is called internally periodically, but exposed in-case you want to force a refresh.

node.host

Get your node's public ip, inferred from other nodes in the DHT. If the ip cannot be determined, this is set to null.

node.port

Get your node's public port, inferred from other nodes in the DHT. If your node does not have a consistent port, this is set to 0.

node.firewalled

Boolean indicated if your node is behind a firewall.

This is auto detected by having other nodes trying to do a PING to you without you contacting them first.

node.randomized

Boolean indicating if your node is likely behind a randomizing NAT.

const addr = node.address()

Get the local address of the UDP socket bound.

Note that if you are in ephemeral mode, this will return a different port than the one you provided in the constructor (under port), as ephemeral mode always uses a random port.

node.on('request', req)

Emitted when an incoming DHT request is received. This is where you can add your own RPC methods.

  • req.target - the dht target the peer is looking (routing is handled behind the scene)
  • req.command - the RPC command enum
  • req.value - the RPC value buffer
  • req.token - If the remote peer echoed back a valid roundtrip token, proving their "from address" this is set
  • req.from - who sent this request (host, port)

To reply to a request use the req.reply(value) method and to reply with an error code use req.error(errorCode).

In general error codes are up to the user to define, with the general suggestion to start application specific errors from error code 16 and up, to avoid future clashes with dht-rpc internals.

Currently dht-rpc defines the following errors

DHT.OK = 0 // ie no error
DHT.ERROR_UNKNOWN_COMMAND = 1 // the command requested does not exist
DHT.ERROR_INVALID_TOKEN = 2 // the round trip token sent is invalid

reply = await node.request({ token, target, command, value }, to, [options])

Send a request to a specific node specified by the to address ({ host, port }). See the query API for more info on the arguments.

Options include:

{
  retry: true, // whether the request should retry on timeout
  socket: udxSocket // request on this specific socket
}

Normally you'd set the token when committing to the dht in the query's commit hook.

reply = await node.ping(to, [options])

Sugar for dht.request({ command: 'ping' }, to, options)

Additional options include:

{
  size: 0, // size of the value buffer, filled with zeroes
}

stream = node.query({ target, command, value }, [options])

Query the DHT. Will move as close as possible to the target provided, which should be a 32-byte uniformly distributed buffer (ie a hash).

  • target - find nodes close to this (should be a 32 byte buffer like a hash)
  • command - an enum (uint) indicating the method you want to invoke
  • value - optional binary payload to send with it

If you want to modify state stored in the dht, you can use the commit flag to signal the closest nodes.

{
  // "commit" the query to the 20 closest nodes so they can modify/update their state
  commit: true
}

Commiting a query will just re-request your command to the closest nodes once those are verified. If you want to do some more specific logic with the closest nodes you can specify a function instead, that is called for each close reply.

{
  async commit (reply, dht, query) {
    // normally you'd send back the roundtrip token here, to prove to the remote that you own
    // your ip/port
    await dht.request({ token: reply.token, target, command, value }, reply.from)
  }
}

Other options include:

{
  nodes: [
    // start the query by querying these nodes
    // useful if you are re-doing a query from a set of closest nodes.
  ],
  replies: [
    // similar to nodes, but useful if you have an array of closest replies instead
    // from a previous query.
  ],
  map (reply) {
    // map the reply into what you want returned on the stream
    return { onlyValue: reply.value }
  }
}

The query method returns a stream encapsulating the query, that is also an async iterator. Each data event contain a DHT reply. If you just want to wait for the query to finish, you can use the await stream.finished() helper. After completion the closest nodes are stored in stream.closestNodes array.

If you want to access the closest replies to your provided target you can see those at stream.closestReplies.

stream = node.findNode(target, [options])

Find the node closest to the node with id target. Returns a stream encapsulating the query (see node.query()). options are the same as node.query().

node.destroy()

Shutdown the DHT node.

node.destroyed

Boolean indicating if this has been destroyed.

node.toArray()

Get the routing table peers out as an array of { host, port }

node.addNode({ host, port })

Manually add a node to the routing table.

await node.suspend()

Tell the DHT you are going to background (ie suspend and allow it to make preperations for that)

await node.resume()

Tell the DHT you are resuming from suspension.

License

MIT

dht-rpc's People

Contributors

andrewosh avatar davidmarkclements avatar dzdidi avatar gmaclennan avatar hdegroote avatar jthomas43 avatar kasperisager avatar lukechilds avatar lukks avatar mafintosh avatar make-github-pseudonymous-again avatar rafapaezbas avatar raynos avatar reconbot avatar thomasfarstrike avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dht-rpc's Issues

Immediate destroy with empty bootstrap array throws

If you run this from a repl:

const DHT = require('dht-rpc')
const node = new DHT({ bootstrap: [] }); node.destroy()

It will produce the following error:

> (node:2432030) UnhandledPromiseRejectionWarning: Error [ERR_SOCKET_DGRAM_NOT_RUNNING]: Not running
    at healthCheck (dgram.js:897:11)
    at Socket.bind (dgram.js:207:3)
    at /home/andrewosh/Development/@hyperswarm/hyperswarm/node_modules/dht-rpc/lib/rpc.js:59:9
    at new Promise (<anonymous>)
    at RPC.bind (/home/andrewosh/Development/@hyperswarm/hyperswarm/node_modules/dht-rpc/lib/rpc.js:56:21)
    at DHT.bootstrap (/home/andrewosh/Development/@hyperswarm/hyperswarm/node_modules/dht-rpc/index.js:197:20)
    at processTicksAndRejections (internal/process/task_queues.js:93:5)
(node:2432030) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). To terminate the node process on unhandled promise rejection, use the CLI flag `--unhandled-rejections=strict` (see https://nodejs.org/api/cli.html#cli_unhandled_rejections_mode). (rejection id: 5)

uncaught error

Running the hyperdht example for a bit

/Users/maf/dev/node_modules/dht-rpc/index.js:461
    node.next.prev = node.prev
                   ^

TypeError: Cannot set property 'prev' of null
    at remove (/Users/maf/dev/node_modules/dht-rpc/index.js:461:20)
    at DHT._removeNode (/Users/maf/dev/node_modules/dht-rpc/index.js:420:3)
    at Object.afterPing [as callback] (/Users/maf/dev/node_modules/dht-rpc/index.js:389:10)
    at UDP._cancel (/Users/maf/dev/node_modules/dht-rpc/node_modules/udp-request/index.js:149:7)
    at UDP._checkTimeouts (/Users/maf/dev/node_modules/dht-rpc/node_modules/udp-request/index.js:194:10)
    at kick [as _repeat] (/Users/maf/dev/node_modules/dht-rpc/node_modules/udp-request/index.js:58:10)
    at wrapper [as _onTimeout] (timers.js:275:11)
    at Timer.listOnTimeout (timers.js:92:15)

Crash when key length mismatches

Giving Infohash (from torrent) of length 20bytes crashes this package. While infohashes may not be compatible, just crashing the whole app on differences over the length of the key may not be the best idea, since the data / key could be coming from any third party and handling the error is more better solution. Below is crash stack:

\test\node_modules\xor-distance\index.js:4
  if (a.length !== b.length) throw new Error('Inputs should have the same length')
                             ^

Error: Inputs should have the same length
    at dist (\test\node_modules\xor-distance\index.js:4:36)      
    at QueryTable.addUnverified (\test\node_modules\dht-rpc\lib\query-table.js:15:21)
    at QueryStream._onresponse (\test\node_modules\dht-rpc\lib\query-stream.js:80:20)
    at IO._finish (\test\node_modules\dht-rpc\lib\io.js:132:9)   
    at IO._onmessage (\test\node_modules\dht-rpc\lib\io.js:104:14
)
    at Socket.emit (events.js:209:13)
    at UDP.onMessage [as onmessage] (dgram.js:853:8)

Handling the exception here to ignore it could make the app more robust: https://github.com/mafintosh/dht-rpc/blob/a7752605bb18bf622a5e152bc4762b49f6525cda/lib/query-table.js#L15

README examples

Me and a friendly group p2p enthusiasts had a go at exploring this very interesting library (many thanks for all your hard work!) using the examples in the README and came up against a few difficulties. I'm sure some of it was our own inadequate understanding of the underlying networking logic, but also wondered if you consider the examples to be showing the whole picture, or maybe if they could be set out in a clearer way for people to be able to quite quickly get to a working demo?

Our concrete error was:

Error: Too few nodes responded
    at Query._endAfterCommit (/home/sandreae/Code/hyper-swarm/node_modules/dht-rpc/lib/query.js:195:20)
    at Query._flush (/home/sandreae/Code/hyper-swarm/node_modules/dht-rpc/lib/query.js:190:10)
    at Query._readMore (/home/sandreae/Code/hyper-swarm/node_modules/dht-rpc/lib/query.js:175:12)
    at Query._onerror (/home/sandreae/Code/hyper-swarm/node_modules/dht-rpc/lib/query.js:274:10)
    at Request.destroy (/home/sandreae/Code/hyper-swarm/node_modules/dht-rpc/lib/io.js:296:10)
    at Timeout.oncycle [as _onTimeout] (/home/sandreae/Code/hyper-swarm/node_modules/dht-rpc/lib/io.js:396:9)
    at listOnTimeout (node:internal/timers:559:11)
    at processTimers (node:internal/timers:500:7)

We had a publicly addressed bootstrap node running which was successfully receiving queries from nodes we were individually running but then the querying nodes timed out with the above error. We felt like we were missing an important step.

I understand there is a higher level API in https://github.com/hyperswarm/dht, but we actually chose to explore this lib as we're interested in the underlying functionality for discovery and hole-punching. Maybe you wouldn't recommend this though...

I'm afraid I don't feel I got my head around it enough to draw up any example code suggestions, but if you have pointers of another library we should be using, code we could refer to, or have time to look at the examples here then I'm very happy to test something out and give feedback.

seeking explanation of workings

I have a couple Q about how this works, which a couple hours study into source did not elucidate.

  1. When a node queries the DHT (eg. a new ephemeral node queries as hash dictionary command), after bootstrapping, does it then ask around non-ephemeral nodes if they can answer the query, or what happens?

  2. When an update is performed, do all nodes in the DHT eventually receive that same command update, or only a subset of the nodes?

  3. What is a good entry point for hacking access privileges into this module?

TIA

error stack trace is useless.

The errors in lib/errors.js are pre-allocated and the stack trace is about the DHT constructor.

I have no idea which request was destroyed because its throwing a recycled error object instead of giving me a new stacktrace.

Custom storage

Hi, am i correct? If i want to implement some storage sqllite, mongo, filestore or etc, i need to put logic inside query (query, cb) {} ?

Too many request failed error occurring due to TypeError bug

When using swarm.join(discoveryKey)

Error: Too many requests failed
    at race (/node_modules/dht-rpc/lib/race.js:14:46)

If the promises in race are logged they look like so:

Promise {
    <rejected> TypeError: Cannot read property 'split' of undefined
        at Object.encode (/node_modules/dht-rpc/lib/messages.js:8:21)
        at Object.encode (/node_modules/dht-rpc/lib/messages.js:26:10)
        at Object.encode (/node_modules/compact-encoding/index.js:304:49)
        at annSignable (/node_modules/@hyperswarm/dht/lib/persistent.js:280:26)
        at Function.signAnnounce (/node_modules/@hyperswarm/dht/lib/persistent.js:264:44)
        at commit (/node_modules/@hyperswarm/dht/index.js:394:38)
        at Query.commit [as _commit] (/node_modules/@hyperswarm/dht/index.js:341:14)
        at Query._flush (/node_modules/dht-rpc/lib/query.js:164:54)
        at Query._readMore (/node_modules/dht-rpc/lib/query.js:145:12)
        at Query._onerror (/node_modules/dht-rpc/lib/query.js:206:10)
}

Line 8 of messages calls split on the ip parameter, the encode function there is called rom line 26 of messages, where peer.host is passed in. Logging out the peers shows that some of them do not have a host property.

Example of a peer object that causes this error:

peer {
  version: 2,
  tid: 57237,
  from: { host: '138.68.164.198', port: 60981 },
  to: { host: '83.160.74.117', port: 52100 },
  id: <Buffer 5b 44 ad 56 71 5f a2 6e d3 5b c1 de 35 10 12 ce 96 e2 9a 8b c2 ec 34 75 f5 6e e1 94 46 8a d1 c3>,
  token: null,
  target: null,
  closerNodes: null,
  command: null,
  status: 0,
  value: null
}

IO accepts dht-rpc options but never uses socket

Since IO isn't supposed to be part of the public API, it used to be possible to provide an already established socket via the constructor. As IO is currently written, the option can be passed but it's never used as both clientsocket and serversocket are connected without considering its presence. See https://github.com/mafintosh/dht-rpc/blob/7527e8f83e7457750a0ece5469ed3b579c5799de/lib/io.js#L136.

There's a further bit of confusion in this function - it appears that you can provide either a port number or a function to provide a socket using the bind option but this parameter is also simply passed-through from the dht-rpc options. This dual use of the bind option isn't currently documented in the dht-rpc API. I understand that this is considered the RI for use in hyperswarm and that implementations in other languages are encourage to follow that language's conventions but I'm trying to write an implementation with the same public API. Replicating a duck-typed option in a strongly typed language is going to encourage awkward code.

mainline DHT compatibility?

Thank you for the good work. Wondering if this can work with mainline DHT of bittorrent. Can we just bootstrap with bittorrent nodes and start using it, or some marshalling needed?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.