Coder Social home page Coder Social logo

fastly / pushpin Goto Github PK

View Code? Open in Web Editor NEW
3.6K 99.0 154.0 5.55 MB

A proxy server for adding push to your API, used at the core of Fastly's Fanout service

Home Page: https://pushpin.org

License: Apache License 2.0

Python 1.31% Shell 0.03% QMake 0.28% C++ 45.04% PHP 0.04% Rust 52.85% C 0.40% Makefile 0.02% JavaScript 0.03%
http-streaming websockets long-polling realtime push proxy api server-sent-events zeromq streaming

pushpin's Introduction

Pushpin

Website: https://pushpin.org/
Forum: https://community.fastly.com/c/pushpin/12

Pushpin is a reverse proxy server written in Rust & C++ that makes it easy to implement WebSocket, HTTP streaming, and HTTP long-polling services. The project is unique among realtime push solutions in that it is designed to address the needs of API creators. Pushpin is transparent to clients and integrates easily into an API stack.

How it works

Pushpin is placed in the network path between the backend and any clients:

pushpin-abstract

Pushpin communicates with backend web applications using regular, short-lived HTTP requests. This allows backend applications to be written in any language and use any webserver. There are two main integration points:

  1. The backend must handle proxied requests. For HTTP, each incoming request is proxied to the backend. For WebSockets, the activity of each connection is translated into a series of HTTP requests1 sent to the backend. Pushpin's behavior is determined by how the backend responds to these requests.
  2. The backend must tell Pushpin to push data. Regardless of how clients are connected, data may be pushed to them by making an HTTP POST request to Pushpin's private control API (http://localhost:5561/publish/ by default). Pushpin will inject this data into any client connections as necessary.

To assist with integration, there are libraries for many backend languages and frameworks. Pushpin has no libraries on the client side because it is transparent to clients.

Example

To create an HTTP streaming connection, respond to a proxied request with special headers Grip-Hold and Grip-Channel2:

HTTP/1.1 200 OK
Content-Type: text/plain
Content-Length: 22
Grip-Hold: stream
Grip-Channel: test

welcome to the stream

When Pushpin receives the above response from the backend, it will process it and send an initial response to the client that instead looks like this:

HTTP/1.1 200 OK
Content-Type: text/plain
Transfer-Encoding: chunked
Connection: Transfer-Encoding

welcome to the stream

Pushpin eats the special headers and switches to chunked encoding (notice there's no Content-Length). The request between Pushpin and the backend is now complete, but the request between the client and Pushpin remains held open. The request is subscribed to a channel called test.

Data can then be pushed to the client by publishing data on the test channel:

curl -d '{ "items": [ { "channel": "test", "formats": { "http-stream": \
    { "content": "hello there\n" } } } ] }' \
    http://localhost:5561/publish

The client would then see the line "hello there" appended to the response stream. Ta-da, transparent realtime push!

For more details, see the HTTP streaming section of the documentation. Pushpin also supports HTTP long-polling and WebSockets.

Example using a library

Using a library on the backend makes integration even easier. Here's another HTTP streaming example, similar to the one shown above, except using Pushpin's Django library. Please note that Pushpin is not Python/Django-specific and there are backend libraries for other languages/frameworks, too.

The Django library requires configuration in settings.py:

MIDDLEWARE_CLASSES = (
    'django_grip.GripMiddleware',
    ...
)

GRIP_PROXIES = [{'control_uri': 'http://localhost:5561'}]

Here's a simple view:

from django.http import HttpResponse
from django_grip import set_hold_stream

def myendpoint(request):
    if request.method == 'GET':
        # subscribe every incoming request to a channel in stream mode
        set_hold_stream(request, 'test')
        return HttpResponse('welcome to the stream\n', content_type='text/plain')
    ...

What happens here is the set_hold_stream() method flags the request as needing to turn into a stream, bound to channel test. The middleware will see this and add the necessary Grip-Hold and Grip-Channel headers to the response.

Publishing data is easy:

from gripcontrol import HttpStreamFormat
from django_grip import publish

publish('test', HttpStreamFormat('hello there\n'))

Example using WebSockets

Pushpin supports WebSockets by converting connection activity/messages into HTTP requests and sending them to the backend. For this example, we'll use Pushpin's Express library. As before, please note that Pushpin is not Node/Express-specific and there are backend libraries for other languages/frameworks, too.

The Express library requires configuration and setting up a middleware handler:

const express = require('express');
const { ServeGrip } = require('@fanoutio/serve-grip');

var app = express();

// Instantiate the middleware and register it with Express
const serveGrip = new ServeGrip({
    grip: { 'control_uri': 'http://localhost:5561', 'key': 'changeme' }
});
app.use(serveGrip);

// Instantiate the publisher to use from your code to publish messages
const publisher = serveGrip.getPublisher();

app.get('/hello', (req, res) => {
    res.send('hello world\n');
});

With that structure in place, here's an example of a WebSocket endpoint:

const { WebSocketMessageFormat } = require( '@fanoutio/grip' );

app.post('/websocket', async (req, res) => {
    const { wsContext } = req.grip;

    // If this is a new connection, accept it and subscribe it to a channel
    if (wsContext.isOpening()) {
        wsContext.accept();
        wsContext.subscribe('all');
    }

    while (wsContext.canRecv()) {
        var message = wsContext.recv();

        // If return value is null then connection is closed
        if (message == null) {
            wsContext.close();
            break;
        }

        // broadcast the message to everyone connected
        await publisher.publishFormats('all', WebSocketMessageFormat(message));
    }

    res.end();
});

The above code binds all incoming connections to a channel called all. Any received messages are published out to all connected clients.

What's particularly noteworthy is that the above endpoint is stateless. The app doesn't keep track of connections, and the handler code only runs whenever messages arrive. Restarting the app won't disconnect clients.

The while loop is deceptive. It looks like it's looping for the lifetime of the WebSocket connection, but what it's really doing is looping through a batch of WebSocket messages that was just received via HTTP. Often this will be one message, and so the loop performs one iteration and then exits. Similarly, the wsContext object only exists for the duration of the handler invocation, rather than for the lifetime of the connection as you might expect. It may look like socket code, but it's all an illusion. 🎩

For details on the underlying protocol conversion, see the WebSocket-Over-HTTP Protocol spec.

Example without a webserver

Pushpin can also connect to backend servers via ZeroMQ instead of HTTP. This may be preferred for writing lower-level services where a real webserver isn't needed. The messages exchanged over the ZeroMQ connection contain the same information as HTTP, encoded as TNetStrings.

To use a ZeroMQ backend, first make sure there's an appropriate route in Pushpin's routes file:

* zhttpreq/tcp://127.0.0.1:10000

The above line tells Pushpin to bind a REQ-compatible socket on port 10000 that handlers can connect to.

Activating an HTTP stream is as easy as responding on a REP socket:

import zmq
import tnetstring

zmq_context = zmq.Context()
sock = zmq_context.socket(zmq.REP)
sock.connect('tcp://127.0.0.1:10000')

while True:
    req = tnetstring.loads(sock.recv()[1:])

    resp = {
        'id': req['id'],
        'code': 200,
        'reason': 'OK',
        'headers': [
            ['Grip-Hold', 'stream'],
            ['Grip-Channel', 'test'],
            ['Content-Type', 'text/plain']
        ],
        'body': 'welcome to the stream\n'
    }

    sock.send('T' + tnetstring.dumps(resp))

Why another realtime solution?

Pushpin is an ambitious project with two primary goals:

  • Make realtime API development easier. There are many other solutions out there that are excellent for building realtime apps, but few are useful within the context of APIs. For example, you can't use Socket.io to build Twitter's streaming API. A new kind of project is needed in this case.
  • Make realtime push behavior delegable. The reason there isn't a realtime push CDN yet is because the standards and practices necessary for delegating to a third party in a transparent way are not yet established. Pushpin is more than just another realtime push solution; it represents the next logical step in the evolution of realtime web architectures.

To really understand Pushpin, you need to think of it as more like a gateway than a message queue. Pushpin does not persist data and it is agnostic to your application's data model. Your backend provides the mapping to whatever that data model is. Tools like Kafka and RabbitMQ are complementary. Pushpin is also agnostic to your API definition. Clients don't necessarily subscribe to "channels" or receive "messages". Clients make HTTP requests or send WebSocket frames, and your backend decides the meaning of those inputs. Pushpin could perhaps be awkwardly described as "a proxy server that enables web services to delegate the handling of realtime push primitives".

On a practical level, there are many benefits to Pushpin that you don't see anywhere else:

  • The proxy design allows Pushpin to fit nicely within an API stack. This means it can inherit other facilities from your REST API, such as authentication, logging, throttling, etc. It can be combined with an API management system.
  • As your API scales, a multi-tiered architecture will become inevitable. With Pushpin you can easily do this from the start.
  • It works well with microservices. Each microservice can have its own Pushpin instance. No central bus needed.
  • Hot reload. Restarting the backend doesn't disconnect clients.
  • In the case of WebSocket messages being proxied out as HTTP requests, the messages may be handled statelessly by the backend. Messages from a single connection can even be load balanced across a set of backend instances.

Install

Check out the the Install guide, which covers how to install and run. There are packages available for Linux (Debian, Ubuntu, CentOS, Red Hat), Mac (Homebrew), or you can build from source.

By default, Pushpin listens on port 7999 and requests are handled by its internal test handler. You can confirm the server is working by browsing to http://localhost:7999/. Next, you should modify the routes config file to route requests to your backend webserver. See Configuration.

Scalability

Pushpin is horizontally scalable. Instances don’t talk to each other, and sticky routing is not needed. Backends must publish data to all instances to ensure clients connected to any instance will receive the data. Most of the backend libraries support configuring more than one Pushpin instance, so that a single publish call will send data to multiple instances at once.

Optionally, ZeroMQ PUB/SUB can be used to send data to Pushpin instead of using HTTP POST. When this method is used, subscription information is forwarded to each publisher, such that data will only be published to instances that have listeners.

As for vertical scalability, Pushpin has been tested with up to 1 million concurrent connections running on a single DigitalOcean droplet with 8 CPU cores. In practice, you may want to plan for fewer connections per instance, depending on your throughput. The new connection accept rate is about 800/sec (though this also depends on the speed of your backend), and the message throughput is about 8,000/sec. The important thing is that Pushpin is horizontally scalable which is effectively limitless.

What does the name mean?

Pushpin means to "pin" connections open for "pushing".

License

Pushpin is offered under the Apache License, Version 2.0. See the LICENSE file.

Footnotes

1: Pushpin can communicate WebSocket activity to the backend using either HTTP or WebSockets. Conversion to HTTP is generally recommended as it makes the backend easier to reason about.

2: GRIP (Generic Realtime Intermediary Protocol) is the name of Pushpin's backend protocol. More about that here.

pushpin's People

Contributors

curdbecker avatar dependabot[bot] avatar deweerdt avatar fanoutscrape avatar grantjenks avatar harmony7 avatar jannic avatar jkarneges avatar maddie-boby avatar rgarg100-sec avatar sima-fastly avatar sylvestre avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pushpin's Issues

Pushpin not able to start

When i want to start pushpin I am getting following output

starting...
starting mongrel2 (http:3000)
starting m2adapter
starting zurl
starting pushpin-proxy
starting pushpin-handler
started
Traceback (most recent call last):
  File "/usr/local/bin/pushpin", line 59, in <module>
    runner.run(exedir, config_file, verbose)
  File "/usr/local/lib/pushpin/runner/runner.py", line 111, in run
    p.wait()
  File "/usr/local/lib/pushpin/runner/processmanager.py", line 90, in wait
    p.check()
  File "/usr/local/lib/pushpin/runner/processmanager.py", line 41, in check
    raise RuntimeError("process exited unexpectedly: %s" % self.name)
RuntimeError: process exited unexpectedly: m2adapter

I checked mongrel2 and i am able to run it without any kind of issues. I checked m2adapter logs within var/www/pushpin log directory and nothing usefull there:

[INFO] 2015-04-07 14:08:25.554 starting...
[INFO] 2015-04-07 14:09:19.096 starting...
[INFO] 2015-04-07 14:12:22.728 starting...
[INFO] 2015-04-07 15:06:56.012 starting...
[INFO] 2015-04-07 19:05:10.023 starting...
[INFO] 2015-04-07 19:13:03.515 starting...
[INFO] 2015-04-07 19:15:24.216 starting...
[INFO] 2015-04-07 19:15:57.014 starting...
[INFO] 2015-04-07 19:23:19.709 starting...
[INFO] 2015-04-07 19:28:20.920 starting...
[INFO] 2015-04-07 19:29:00.137 starting...

Any kind of additional configuration for m2adapter required?

more practical logging

Default log level should produce data interesting to users. It shouldn't contain debug-ish output.

m2adapter: request attempt and initial response info
proxy: withhold log line until after proxying, then output a single line
handler: per-connection subscription changes. global subscription here/gone. incoming publish. publish being relayed.

debug mode

When there is a failure communicating with an origin server, Pushpin responds with uninformative errors. This is on purpose, to prevent leaking internal details to the outside world. However it makes diagnosing problems difficult. There should be a way to opt-in to a debug mode in which case Pushpin should respond with more verbose error responses indicating exactly what is wrong.

runner unified log

By default, the runner program should unify the output of all subprocesses into a single log file rather than creating a bunch of log files.

It should also be possible to have the runner output the logs to stdout.

Create /var/run/pushpin directory

I installed pushpin using apt-get, and got this when I tried to run it:

Traceback (most recent call last):
  File "/usr/bin/pushpin", line 56, in <module>
    runner.run(exedir, config_file, verbose)
  File "/usr/lib/pushpin/runner/runner.py", line 58, in run
    m2sqlpath = services.write_mongrel2_config(configdir, os.path.join(configdir, "mongrel2.conf.template"), rundir, logdir, http_port, https_ports, m2sh_bin)
  File "/usr/lib/pushpin/runner/services.py", line 34, in write_mongrel2_config
    compile_template(configpath, genconfigpath, vars)
  File "/usr/lib/pushpin/runner/services.py", line 11, in compile_template
    f = open(outfilename, "w")
IOError: [Errno 2] No such file or directory: '/var/run/pushpin/mongrel2.conf'

Workaround: Create /var/run/pushpin manually.

Ignore messages from self

Here's how it will likely work:

  1. The backend will assign a user ID to the connection via GRIP instructions.
  2. Published messages can contain a sending user ID as well as an indication that the message should not be delivered to any connections that match the sender.

Handle large initial content with streaming

Pushpin limits all GRIP responses to 100,000 bytes (as defined by MAX_ACCEPT_RESPONSE_BODY). However, it would be ideal if the server could send an initial response body with no size limit.

At first glance it might seem like we should just not have a limit on the response size. However this is easier said than done as Pushpin currently buffers the entire GRIP response before processing it. The code could be changed to stream out the response to the client as it is being received from the origin server, but doing that has a couple of problems:

  1. Streaming out an unbuffered initial response would not work with JSON-style instructions. It might be achievable with a streaming parse on the JSON input but this seems impractical.
  2. For the origin to properly generate GRIP instructions it may need to know the last part of the initial content before doing so. However, since instructions are provided in the headers, this might require the origin to buffer the whole response first in order to know what headers to set before beginning to send the response body. This means we'd still have the same buffer size problem, but in the origin server rather than Pushpin. Trailing HTTP headers could theoretically be used to supply instructions after sending the initial response, but this only works with chunked encoding and is probably too obscure of an HTTP feature to rely on.

Instead of allowing the origin to provide a huge response, the proposed solution is to have Pushpin "page" through a series of responses. Hold instructions would be included on the last page. This way the instructions are generated at the end, and would work fine with either headers-style or JSON-style instructions.

We could introduce a new response header telling Pushpin how to do the paging:

HTTP/1.1 200 OK
Content-Type: text/plain
Grip-Link: </stream/?after=nextID>; rel=next

{... first part of content ...}

We use a Link-style header here in case we need to support additional link types in the future. We call it Grip-Link rather than reusing Link so that Pushpin can remove the header before relaying to the client (as Pushpin does with all such Grip-* headers).

The server would still need to keep responses within the Pushpin limit. We'll add a request header to help with that:

GET /stream/?after=firstID HTTP/1.1
Grip-Response-Max-Size: 100000

Pushpin would then page through all of the response data before holding (subscribing) the request to channels. The expected usage is that the origin server would omit the Grip-Hold header from all pages until the last page. If a response contains neither Grip-Hold or a Grip-Link of type next, then Pushpin would close the response to the client after all data has been sent. If a response contains both such headers then the hold would win. The next link may still be useful as part of a reliability mechanism (to be discussed separately).

UPDATE: With the new Grip-Status feature there should no longer be much need for JSON-style instructions. We could consider streaming initial responses of any length without needing paging, for origin servers that are able to do so (and that understand the implications of having to provide hold headers in advance). This is a more practical default approach than requiring servers to limit response sizes.

route review

routes:

*,path_beg=/test localhost:8080,path_beg=/test
*,ssl=no,path_beg=/v2 acceptor:8080,path_beg=/v2,path_rem=3

In my scenario, I would like traffic to localhost:7999/test to go to localhost:8080/test (where I am not hosting somewhere, so I expect an error) and I get the expected result until I hit a route that matches the second scenario. Then the second route responds to requests mentioned in the first route.

In the second route, I would like localhost:7999/v2/abc to go to acceptor:8080/abc but it is proxying the whole path acceptor:8080/v2/abc. Also, when I make changes to the routes, it's not always reflecting the changes unless I restart pushpin -- I don't think that's expected, right?

Configure failure with Qt >= 5.10

The failure is caused by https://github.com/fanout/pushpin/blob/2a386e5ec9219003edc8185e0b91fc595dc1f6a2/configure#L216

?.?.? fails to match a two-digit minor version. So any version > 5.9 will fail.

The error is

==> ./configure --prefix=/usr/local/Cellar/pushpin/1.17.0 --configdir=/usr/local/etc --rundir=/usr/local/var/run --logdir=/usr/local/var/log --extraconf=QMAKE_MACOSX_DEPLOYMENT_TARGET=10.11
Configuring Pushpin ...
Verifying Qt build environment ... fail

Reason: Unable to find the 'qmake' tool for Qt 4 or 5.

Be sure you have a proper Qt 4.0+ build environment set up.  This means not
just Qt, but also a C++ compiler, a make tool, and any other packages
necessary for compiling C++ programs.

If you are certain everything is installed, then it could be that Qt is not
being recognized or that a different version of Qt is being detected by
mistake (for example, this could happen if $QTDIR is pointing to a Qt 3
installation).  At least one of the following conditions must be satisfied:

 1) --qtdir is set to the location of Qt
 2) $QTDIR is set to the location of Qt
 3) QtCore is in the pkg-config database
 4) qmake is in the $PATH

This script will use the first one it finds to be true, checked in the above
order.  #3 and #4 are the recommended options.  #1 and #2 are mainly for
overriding the system configuration.

I can work around it by string replacing "?.?.?" with "?.??.?"

SUB to stats socket not working

The demo Python program (pasted below) that connects to the stats socket and prints the decoded messages is stuck at the sock.recv() and does not print anything, although I can successfully have pushpin play along with clients and origin server. The permissions on the socket look good (srwxr-xr-x 1 pushpin zurl ). Am I missing anything? Thanks!

import sys
import tnetstring
import zmq

ctx = zmq.Context()
sock = ctx.socket(zmq.SUB)
sock.connect('ipc://var/run/pushpin/pushpin-stats')
sock.setsockopt(zmq.SUBSCRIBE, '')

while True:
    m_raw = sock.recv()
    mtype, mdata = m_raw.split(' ', 1)
    if mdata[0] != 'T':
        print 'unsupported format'
        continue
    m = tnetstring.loads(mdata[1:])
    print '%s %s' % (mtype, m)

HTTP streaming reliability

Note: This approach falls within the scope of a patent application filed by Fanout. However, if implemented, the Pushpin license (AGPL) ensures that the necessary rights would be granted to users.

Publish-subscribe services are unreliable by design. Since Pushpin can be used to provide historical and streaming content over a single connection, there is an opportunity for the two data sources to be unified in a way that creates a reliable stream.

When creating a stream hold, any channel to be subscribed should include a prev-id value. A next link should also be provided:

HTTP/1.1 200 OK
Content-Type: text/plain
Grip-Hold: stream
Grip-Channel: fruit; prev-id=3
Grip-Link: </fruit/?after=3>; rel=next

{... initial response ...}

The prev-id here would have a similar effect as it does with response holds, in that if the ID doesn't match the last known ID published to the channel, then Pushpin will make a request again to the proxy. The difference is that with a stream hold, the subscription will be activated immediately and initial response data sent. Then the next link will be retrieved, and any content received will be dumped into the already-open response. If another next link is received then Pushpin will follow it until the end is hit (hold instruction, or no link/hold which would cause the response to close).

When Pushpin retrieves a next link as a result of a publish conflict, it will also include a Grip-Last header indicating the last known ID received on a given channel. This should take precedence over whatever checkpoint information may have been encoded in the link.

GET /fruit/?after=3 HTTP/1.1
Grip-Last: fruit; last-id=7

For example, if the origin server received the above request, then the last-id of 7 would be used as the basis for determining the response content rather than the after query param.

Note: this approach could be applied to WebSockets as well, to be expored in a separate issue.

Properly publish response status

I am trying to pass status in my publish request. I tried something like:

    {
      "items" => [{
        "channel" => 'mychannel'
        "http-response" => {
          "headers" => {
            "Content-Type" => "application/json"
          },
          "body" => body.to_json,
          "status" => 204
        }
      }]
    }

I also tried to pass this in headers section or as code attribute. Can you tell me what am I doing wrong? Also, how can I pass status code in hold instruction response?

websockets

(adapted from prior discussion)

Here's an idea for "Websocket GRIP":

  1. Websocket connections are forwarded to an origin server (or balanced to a set of origin servers). Messages would be forwarded in either direction as-is, with the exception of instructions as discussed below.

  2. Instead of supplying GRIP instructions in the body of an HTTP response, the origin server would supply them as the content of the first websocket message. Further messages would be relayed as-is back to the client. The origin could also choose to provide no instructions and let its first message be relayed back as normal. So there should be a way for the initial response message to be uniquely identified as instructions vs regular traffic. Example of instructions could be channel binding as per HTTP GRIP.

  3. The origin would be allowed to close its websocket connection with the proxy without the proxy closing its websocket connection to the client. This could be the default behavior if the connection is bound to one or more channels.

With this basic framework you can at least get something similar to HTTP streaming, where each client has a connection open with the server receiving one-way data (server->client). For example, say your website broadcasts news items to everyone visiting. On page load, a websocket connection could be made to the proxy, forwarded to origin, origin responds with binding to a shared channel (say "news"), and then origin closes the connection. Whenever the origin has data to broadcast to all listeners, it would publish on the "news" channel by making some API call to the origin (possibly via REST). The proxy would then multicast this message to all connected clients. The origin server would not need to maintain any open connections.

There is interest in having two-way communication where the inbound messages are funneled through a limited set of connections between the proxy and the origin. To go about this, a sharing policy could be configured in advance. Only the first websocket connection request would truly be forwarded (i.e. headers and all). Future connections that match on the policy would not cause additional connections to be made between the proxy and the origin (or at least, not more than one per proxy node or such).

Additionally, the connection could be configured to use multiplexing wrapping. This could be selected on the fly in the instructions response from origin. Without wrapping, the origin would not be able to tell which clients sent which messages that it receives, and messages sent from the origin would be copied out to all of the shared clients by the proxy. With wrapping, some client identification would be tagged on every incoming message sent to the origin, and outbound messages sent from origin would also need to be tagged with destination client identification.

There is also interest in some kind of stickiness. If the goal is for every message from the same client to end up going down the same proxy->origin connection then I believe this is exactly what would happen in the above design. If a client was disconnected from the proxy and reconnected to a different proxy node though, then messages from that new client connection might go down a different proxy->origin connection.

Connection reset when filesize is big

I am using hapi as api server. And testing via request.

test_request.js

fs.createReadStream('./workbooks/tmp/somefile.csv')
  .pipe(
    request({
      method: 'POST',
      url: 'https://some.domain.com/some_route',
      qs: {
        params: {
          param_a: 'some_param'
        },
        email: '[email protected]',
        prj: 'someprj'
      },
      agentOptions: {
        ca: fs.readFileSync('./ssl/cert.pem')
      }      
    },function(err,response,body){
      if (err) console.log(err);
      console.log(body);
    })
  )
  .on('error', function(err){
    console.log(err.stack);
  });

hapi_route.js

module.exports = {
  method  : "POST",
  path    : "/some_route",
  config: { 
    auth: false,
    payload: {
      maxBytes: 100*1024*1024,
      output: 'stream'
      parse: false
    }
  },
  handler : handleRoute
};

I get this error

Bad Request
Error: write ECONNRESET
    at errnoException (net.js:905:11)
    at Object.afterWrite (net.js:721:19)

if I just send the content as a chunk

test_request.js

request({
      method: 'POST',
      url: 'https://some.domain.com/some_route',
      qs: {
        params: {
          param_a: 'some_param'
        },
        email: '[email protected]',
        prj: 'someprj'
      },
      agentOptions: {
        ca: fs.readFileSync('./ssl/cert.pem')
      } ,
     body: fs.readFileSync('somefile.csv')
    },function(err,response,body){
      if (err) console.log(err);
      console.log(body);
    })

hapi_route.js

module.exports = {
  method  : "POST",
  path    : "/some_route",
  config: { 
    auth: false,
    payload: {
      maxBytes: 100*1024*1024,
      parse: false
    }
  },
  handler : handleRoute
};

I get this error

{ [Error: socket hang up] code: 'ECONNRESET' }

however, if I try with a smaller file, everything is working fine. I can't seem to find any configuration to control the max payload size in pushpin? I assume there should be some equivalent to nginx's upload_max_filesize??

WebSocket-over-HTTP retry requests

Currently, if Pushpin makes a request to the origin server and it fails, then the associated WebSocket client connection will be closed. Instead, Pushpin should retry the request a few times (with backoff delay) before failing the connection. This way, WS connections always survive origin server restarts, even if they happen to send messages while the origin is restarting.

Probably we should only retry requests if we were unable to submit the request data. If the request fails after Pushpin has submitted the POST request and application/websocket-events body, then it should not be retried, else the origin might receive duplicate events.

ability to disable message reordering

Normally, if id and prev-id fields are used on published items, Pushpin will reorder messages before delivery. To do this, Pushpin will hold on to a message before delivering it, if it is received out of order. This might not be desired if you are already ensuring in-order publishing some other way.

For example, if you have exactly one worker that only publishes one message at a time, then all messages will be received by Pushpin in order. If this worker then fails to publish for some reason, causing a gap in the message sequence, the next message received will be held on to in expectation of preceding messages that will never come. So if you already know you are publishing in order, Pushpin's own reordering could actually cause unnecessary problems.

This could be implemented as an app-level configuration or a message-level flag.

I'm not 100% sure if this is something we should implement but I wanted to file an issue so it's not forgotten.

wildcard subscriptions

It would be useful to subscribe to channels by wildcard. For example, a publisher could publish a message to channel A.B.C, and a receiver with subscription to A.* could receive the message.

Wildcard subscriptions are not uncommon in publish-subscribe systems, however they tend to work differently in each system. For example, ZeroMQ supports filtering by prefix. Bayeux supports segmented channels and trailing wildcards of either single segment or multi-segment (e.g. /a/* matches /a/b, and /a/** matches /a/b/c/d). At least in these two cases, wildcarding is effectively suffix only, which is pretty conservative and seems like something we could reasonably do in Pushpin.

Proposal:

  • If a subscribed channel ends with a single asterisk, then it should count as a wildcard. For example: Grip-Channel: foo.*.
  • Pushpin should subscribe to the channel without the asterisk on its ZeroMQ SUB socket. A subscription to foo.* would use foo. as the SUB socket filter.

Note that ZeroMQ being limited to filtering by prefix doesn't necessarily mean that we can't support more complex wildcarding. For example, a mid-string wildcard like foo*bar could be supported by setting the SUB socket to use foo as the filter, but then only deliver to clients if the full expression matches. However we should only implement something like this if there is a real need. For most applications needing wildcards, suffix-only is usually enough.

mongrel2 exits uncleanly

Sometimes Pushpin will log a message like this on exit:

[ERR] 2016-10-22 14:09:37.809 m2 http:7999: Exited uncleanly

This is a bug in Mongrel2 (a dependency of Pushpin), and not Pushpin itself, but since users may see this error I figured it would be good to have an issue filed until it is fixed. Note that the bug is harmless.

publish to close websockets

Pushpin's HTTP streaming mode enables the publisher to close connections. We should support something similar with WebSockets, e.g.:

"ws-message": {
  "action": "close"
}

command to trigger stream recovery

The HTTP streaming reliability feature makes a request to the origin server if received messages are incorrectly sequenced or after a period of inactivity. For quiet channels, this may mean that it takes awhile to recover. If the user is aware of publishing issues (e.g. the publisher crashed), it would be useful if the user could explicitly invoke the recovery request rather than having to wait for new messages to be published or for the timeout to expire.

Consider exposing a new command (via zmq command interface as well as HTTP) called recover, with potential parameters to fine tune:

  • channel: recover only subscribers of a certain channel.
  • channel-prefix: recover any subscribers that match.
  • prev-id: recover only subscribers who previously received an item with this ID.

Default would be to recover all subscribers.

ImportError: No module named runner

I'm trying pushpin with following documentation.
When I start pushpin with example config I get following error:

$ git clone https://github.com/fanout/pushpin
$ cd pushpin/examples/config/
$ pushpin --verbose
Traceback (most recent call last):
  File "/usr/local/Cellar/pushpin/1.5.0/libexec/bin/pushpin", line 57, in <module>
    from runner import runner
ImportError: No module named runner

I installed via homebrew and puspin version is 1.5.0.

$ brew install pushpin
$ pushpin --version
pushpin 1.5.0

Is any kind of additional configuration required?

optimize disconnect handling rate

Currently, m2adapter processes disconnect events at a rate of 100/sec. Ideally we'd be able to handle a larger rate. Things to consider:

  • If there are too many pending disconnects, we should remove connection structures but send nothing to other components, or even ignore disconnect events completely. Connection structures will eventually be cleaned up when sessions time out. This is pretty much how things work with TCP, when the OS can't keep up with close packets.
  • Simply removing connection structures in m2adapter can cause internal overload. This is because m2adapter responds to batched keep-alives with individual cancels, so when pushpin-handler pings a thousand connections it thinks might exist, m2adapter will slap pushpin-handler around. m2adapter should respond with batched cancels, or it should ignore keep-alives for sessions it doesn't recognize.
  • Removing session structures in pushpin-handler is somewhat expensive due to having to muck with a bunch of tables and send stats. If there are a lot of disconnects occurring at once, we should consider a cheaper processing route and don't bother sending stats. Downstream stats listeners will have to timeout connections on their own in that case.

Crashes after backend was unavailable for some time

I set up pushpin to work like this:

Nginx:38080 -> Pushpin:7999 -> rails:3000 for websockets enabled paths. Other requests are not proxied through Pushpin.

Routes:

*,sockjs=/api/v1/sjs,sockjs_as_path=/api/v1/pp rails:3000,over_http

If I stop my rails service for some time while clients were connected and restart the service again, Pushpin will crash with the following stack trace:

pushpin_1  | [DEBUG] 2017-04-05 16:10:53.973 [m2a] m2: OUT [pushpin-m2-7999 4:X 21, 26:3:ctl,16:6:cancel,4:true!}]]
pushpin_1  | [DEBUG] 2017-04-05 16:10:53.974 [m2a] m2: IN pushpin-m2-7999 21 @* 17:{"METHOD":"JSON"},21:{"type":"disconnect"},
pushpin_1  | [DEBUG] 2017-04-05 16:10:53.974 [m2a] m2: pushpin-m2-7999 id=21 disconnected
pushpin_1  | [DEBUG] 2017-04-05 16:10:53.975 [m2a] m2: IN pushpin-m2-7999 21 @* 17:{"METHOD":"JSON"},21:{"type":"disconnect"},
pushpin_1  | [DEBUG] 2017-04-05 16:10:53.975 [m2a] m2: pushpin-m2-7999 id=21 disconnected
pushpin_1  | [DEBUG] 2017-04-05 16:10:53.975 [proxy] wsproxysession: 0x55ace41db350 connected
pushpin_1  | [DEBUG] 2017-04-05 16:10:53.975 [proxy] wsproxysession: 0x55ace41db350 grip enabled, message-prefix=[m:]
pushpin_1  | [DEBUG] 2017-04-05 16:10:53.976 [proxy] wscontrol: OUT { "items": [ { "ttl": 60, "type": "here", "cid": "d2377f6b-7b87-4317-aaa7-9f40186917af", "uri": "ws://pushpin:7999/api/v1/pp?t=1491408653941" } ] }
pushpin_1  | [INFO] 2017-04-05 16:10:53.976 [proxy] GET ws://pushpin:7999/api/v1/pp?t=1491408653941 -> rails:3000[http] ref=http://localhost:38080/app code=101 0
pushpin_1  | pushpin-proxy: zhttprequest.cpp:1186: virtual void ZhttpRequest::beginResponse(int, const QByteArray&, const HttpHeaders&): Assertion `d->state == Private::ServerReceiving || d->state == Private::ServerResponseWait' failed.
pushpin_1  | [ERR] 2017-04-05 16:10:53.979 proxy: Exited unexpectedly
pushpin_1  | [INFO] 2017-04-05 16:10:53.980 stopping m2 http:7999
pushpin_1  | [INFO] 2017-04-05 16:10:53.980 [zurl] stopping...
pushpin_1  | [INFO] 2017-04-05 16:10:53.980 [zurl] stopped
pushpin_1  | [DEBUG] 2017-04-05 16:10:53.980 [zurl] curl: Closing connection 1
pushpin_1  | [DEBUG] 2017-04-05 16:10:53.980 [zurl] socketFunction: CURL_POLL_REMOVE 26
pushpin_1  | [INFO] 2017-04-05 16:10:53.980 [handler] stopping...
pushpin_1  | [INFO] 2017-04-05 16:10:53.981 [m2a] stopping...
pushpin_1  | [INFO] 2017-04-05 16:10:53.981 [m2a] stopped
nginx_1    | 172.19.0.1 - - [05/Apr/2017:16:10:53 +0000] "GET /api/v1/sjs/305/db15fqwl/websocket HTTP/1.1" 101 612 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.98 Safari/537.36" "-"
nginx_1    | 2017/04/05 16:10:53 [error] 8#8: *137 upstream prematurely closed connection while reading response header from upstream, client: 172.19.0.1, server: domain2.com, request: "POST /api/v1/sjs/601/vig4ufk2/xhr?t=1491408653941 HTTP/1.1", upstream: "http://172.19.0.3:7999/api/v1/sjs/601/vig4ufk2/xhr?t=1491408653941", host: "localhost:38080", referrer: "http://localhost:38080/app"
nginx_1    | 172.19.0.1 - - [05/Apr/2017:16:10:53 +0000] "POST /api/v1/sjs/601/vig4ufk2/xhr?t=1491408653941 HTTP/1.1" 502 575 "http://localhost:38080/app" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.98 Safari/537.36" "-"
pushpin_1  | [INFO] 2017-04-05 16:10:53.985 [handler] stopped
pushpin_1  | [ERR] 2017-04-05 16:10:53.987 m2 http:7999: Exited uncleanly
pushpin_1  | [INFO] 2017-04-05 16:10:53.988 stopped

WebSocket reliable mode

It should be possible to enable a reliable mode for WebSocket connections, similar to what can be done with HTTP connections. The server should be able to supply a prev ID when subscriptions are assigned to connections, and Pushpin should be able to query for missed messages per-channel by last known ID. The feature should work for both WebSocket targets and HTTP targets (WebSocket-over-HTTP).

The big question is what protocol to use for the recovery requests. For WebSocket targets, it could be done using WebSocket messages, however there may be value in having it work over HTTP. For HTTP targets, it could be done using WebSocket messages encapsulated in WebSocket-over-HTTP protocol, but it would be nice if requests could use GET and reuse existing constructs like Grip-Link, Grip-Last, etc.

Note that if the protocol will use WebSocket messages, we'll likely need a way for Pushpin to send control messages to the backend which is not yet possible.

Request body size bug

When pushpin gets a request, it compares all request size (header and body) with MAX_HEADERS_SIZE. So, our requests size are limited with this value.

in src/handler/httpserver.cpp:
in void processIn():

Line 219: QByteArray buf = sock->readAll(); // Reads all request
Line 223: if(inBuf.size() + buf.size() > MAX_HEADERS_SIZE) // Comparses request size with headers max size value

Examples of using stats via HTTP

Hi @jkarneges,

Having a lot of fun playing with pushpin! Love the approach, I never realised how much time I was spending solving "socket" problems vs application problems before, and it's so nice to have it handled so simply.

I saw somewhere that for scaling multiple instances one could use a stat service for querying subscriptions.

Is this an HTTP endpoint or ZMQ? If so what are it's specs? How can one get at this data?

Many Thanks!

Checking Pushpin with PVS-Studio static analyzer

Hello,
I have found some weaknesses and bugs using PVS-Studio tool. PVS-Studio is a static code analyzer for C, C++ and C#: https://www.viva64.com/en/pvs-studio/

Analyzer warnings:

In addition, I suggest having a look at the emails, sent from @pvs-studio.com.

Best regards,
Phillip Khandeliants

Feature Request: Replace Path

I want to proxy routes from http://localhost:7999/something/* to http://backend:8080/something_else/*, replacing something with something_else.

Django WebSocket Browser: connection refused

Got a django endpoint which is working well with an http curl from cli, but connection gets refused when attempting to create a websocket object in browser.

JS Console in Chrome:
var ws = new WebSocket('ws://localhost:7999/dashboard/stream/');

WebSocket connection to 'ws://localhost:7999/dashboard/stream/' failed: Error in connection establishment: net::ERR_CONNECTION_REFUSED

enabled the over_http with no luck.

Django server registers no incoming requests when attempting websocket conn from browser.

Setting hold instruction timeout

I set response on grip hold instruction, but timeout is much bigger than server timeout. Is there a way to set custom timeout (i.e. 5 seconds)?

Apache Setup

Hi,

i want to test your package but i become a little error after trying the url with curl -i or on my browser

There error contains Error while proxying to origin.

Can you give an example ob configuration the server?

Pushpin retries the requests indefinitely if channel name changes between requests

We have a unique use case in which each client gets a unique channel which is used to publish a message only once. This worked perfectly in 1.14. But in 1.15 for each new channel, the first request is always retried. So if the channel changes between requests, pushpin indefinitely retries the requests

Sample server code to reproduce the problem in ruby

require 'sinatra'
require 'json'
require 'pp'
require 'securerandom'
get '/hold' do
  #pp request.env
  headers \
    'Grip-Hold' => 'response',
    'Grip-Channel' => SecureRandom.uuid,
    'Content-Type' => 'application/json'
  '{"resp": "timeout"}'
end
curl http://localhost:7999/hold

index path_beg

Matching path_beg in routes is currently a linear search. To better support configurations with many paths on the same domain, the lookup should be improved.

In practice this probably affects nobody other than some older features in the Fanout Cloud.

client side possibility?

hello

is there a library or a way to use javascript websocket to listen on a channel?
i am getting an net::ERR_EMPTY_RESPONSE

thanks

SockJS support

In a project I need to build a chat server and I wanted to use Pushpin as a proxy between a Laravel (PHP) app and sockjs on the client.
At the moment I can connect the client successfully and receive messages broadcasted an a channel. But when I try to send a message it does not seem to call the backend (backend only get's called on initial connection).

I am using the "fanout/laravel-grip" package and the following code (for now it should be an echo server, if that works I can start building the actual functionality)...

        \Log::info('(' . time() . ') chat');

        \LaravelGrip\verify_is_websocket();

        /** @var WebSocketContext $context */
        $context = \LaravelGrip\get_wscontext();

        if ($context->is_opening()) {
            $context->accept();
            $context->subscribe('test');
        }

        while($context->can_recv()) {
            $message = $context->recv();
            \Log::info('Received message', ['message' => $message]);

            if (is_null($message)) {
                $context->close();
                break;
            }

            $context->send($message);

            \LaravelGrip\publish('test', new WebSocketMessageFormat($message));
        }

        return null;

If I look into the laravel logfile I can see the log message only being called on the initial client connection not when the client sends a message.

The route is

*,debug,sockjs=/sockjs,sockjs_as_path=/chat localhost:80,over_http

When connecting a regular websocket to /chat it works without a problem. Does sockjs require any special handling (compared to regular websockets), since I assumed that sockjs emulated websockets behave like regular ones (ie. when a message is received a post request is send to the backend, after which the backend handles the message and sends a response/publishes a message on a channel/etc)...

Docker-compose with Pushpin

Was trying out the docker image from johnjelinek, but seems to have problem getting it to work with Docker compose due to the need of a tty by mongrel2.

I got the following error:

(errno: Inappropriate ioctl for device) getlogin failed and no LOGNAME env variable, how'd you do that?

I am not too familiar with mongrel2, any ideas what I need to tweak so that an automated script can start pushpin without tty?

automatic keep-alive with websockets

Pushpin's HTTP streaming mode enables specifying keep alive instructions, e.g.:

Grip-Hold: stream
Grip-Keep-Alive: still here\n; format=cstring; timeout=30

It would be good to have something similar for WebSockets.

Proposal: a new control message type keep-alive:

c{"type": "keep-alive", "content": "still here", "timeout": 30}

If set more than once, the previous setting would be overwritten. Omit content to disable keep alives.

configure: --qtselect shouldn't be needed

If both Qt 5 and Qt 4 dev environments are installed, you need to explicitly specify --qtselect=5 when running configure. However, Pushpin requires Qt 5 so it should just do the right thing automatically. Fixing this will require fixing qconf.

Using Python sortedcontainers vs blist

First, thanks for a great package. I'm exploring real-time Django solutions and am currently evaluating this.

I noticed that the project is using the Python blist module, in particular the sorteddict data type is used for expiring subs by time. As the author of the sortedcontainers module, I wondered why. SortedContainers is a pure-Python implementation of SortedList, SortedDict, and SortedSet data types. For your use pattern of getting, setting, deleting, and iterating sortedcontainers.SortedDict looks faster than blist.sorteddict. Take a look at the benchmarks at http://www.grantjenks.com/docs/sortedcontainers/performance.html#id2

But then I noticed that your typical install instructions are a list of Debian packages. And sortedcontainers doesn't have a Debian package (though someone recently requested that (grantjenks/python-sortedcontainers#29) so I wanted to ask:

  1. How did you choose blist?
  2. Is the lack of a Debian package for sortedcontainers a deal breaker?
  3. Would you accept a pull request that changed the dependency and made the code a bit more idiomatic? In particular, I'd be really curious to test any benchmark that might be affected by such a change.

Thanks for reading.

ability to provide initial streaming content via a series of requests

If the origin needs to return a lot of initial content for an HTTP streaming request, then it may be desirable to have the origin only return a portion of the content and wait for the proxy to fetch the next part.

Some reasons this would be useful:

  1. Origin can return large initial content to the client without having to buffer the entire content at once.
  2. For the origin to properly generate GRIP instructions it may need to know the last part of the initial content before doing so. However, since instructions are provided in the headers, this might require the origin to buffer the whole response in order to know what headers to set before beginning to send the response body. Breaking up the content into a series of requests would let the origin wait until the last request before generating GRIP instructions. Trailing HTTP headers could theoretically be used to supply instructions after sending a single streamed response, but this only works with chunked encoding and is too obscure of an HTTP feature to rely on.
  3. Paging may be useful as part of a reliability mechanism (to be discussed separately).

We could introduce a new response header telling Pushpin how to do the paging:

HTTP/1.1 200 OK
Content-Type: text/plain
Grip-Link: </stream/?after=nextID>; rel=next

{... first part of content ...}

We use a Link-style header here in case we need to support additional link types in the future. We call it Grip-Link rather than reusing Link so that Pushpin can remove the header before relaying to the client (as Pushpin does with all such Grip-* headers).

Pushpin would then page through all of the response data before holding (subscribing) the request to channels. The expected usage is that the origin server would omit the Grip-Hold header from all pages until the last page. If a response contains neither Grip-Hold, nor a Grip-Link of type next, then Pushpin would close the response to the client after all data has been sent. If a response contains both such headers then the hold would win.

Release notifications

I maintain Arch Linux's AUR packages for pushpin and zurl. It would be great if there was a way to know when new versions were released (a mailing list or some other similar method) so I could update said packages in a timely fashion.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.