Coder Social home page Coder Social logo

centrifugal / centrifugo Goto Github PK

View Code? Open in Web Editor NEW
7.9K 7.9K 573.0 111.87 MB

Scalable real-time messaging server in a language-agnostic way. Self-hosted alternative to Pubnub, Pusher, Ably. Set up once and forever.

Home Page: https://centrifugal.dev

License: Apache License 2.0

Go 94.22% Makefile 0.27% Dockerfile 0.02% Shell 4.80% Lua 0.37% Procfile 0.01% JavaScript 0.31%
eventsource grpc http-streaming http3 messaging pubsub real-time redis scalability sockjs sse streaming websocket websockets webtransport

centrifugo's People

Contributors

alexander-emelin avatar alexshavelev44 avatar banks avatar chebyrash avatar cristaloleg avatar dependabot[bot] avatar diasjorge avatar fzambia avatar gitter-badger avatar j178 avatar khvalov avatar klauspost avatar larinel avatar lopatkinevgeniy avatar lxm avatar oleh-ozimok avatar orthographic-pedant avatar patrickkvartsaniy avatar sannis avatar sergeyparamoshkin avatar shaunco avatar silischev avatar sinimawath avatar sobolevn avatar stringnick avatar tie avatar tufanbarisyildirim avatar vitoordaz avatar vladshut avatar wanshuangcheng avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

centrifugo's Issues

Non-obvious behavior in client.HandleCmd

How to reproduce
Using JSClient provide user or timestamp as integers
What Happens
Providing user or timestamp as integers causes following errors in client.HandleCmd respectively

json: cannot unmarshal number into Go value of type libcentrifugo.UserID
json: cannot unmarshal number into Go value of type string

Solution
Probably implict type cast in Js Client (and other lang-libraries) is the best solution...

OSX version

Hi there!
How can I use the Centrifugo on OSX?
Thank you

Client has an buffer limit of 256 messages before failing

A client can currently only buffer 256 messages. When that is reached the connection is simply closed:

func (c *client) send(message string) error {
    select {
    case c.messageChan <- message:
        return nil
    default:
        c.sess.Close(3000, "error sending message")
        return ErrInternalServerError
    }
}

It makes sense that we never want send() to block, the limit seems a bit arbitrary.

I the usage scenario you describe it does make sense, but if we imagine a scenario where a huge bunch of messages is being sent to a single client. For instance a "message received" notification from 1000 other clients, suddenly the client gets disconnected.

Solutions could be:

  • Make the number configurable. I don't like this so much, since it simply pushes the problem to the server user, but it is way the easiest.
  • Add a timeout to client.send() - not good because it will block the whole server.
  • Use a slice buffer to send messages to "client.sendMessages()". Very annoying to synchronize.
  • Have 'client.sendMessages()' receive messages as it does now, but put messages on a slice with yet another go-routine to send them. That way it should be safe to make "messageChan" sized '1' and blocking send().

Last option could look something like this:

// sendMessages waits for messages from messageChan and sends them to client
func (c *client) sendMessages() {
    var mu sync.Mutex
    m := make([]string, 0, 10)
    go func() {
        for {
            mu.Lock()
            if len(m) == 0 {
                mu.Unlock()
                // TODO: Replace sleep with a notification
                time.Sleep(time.Second)
                continue
            }
            msg := m[0]
            m = m[1:]
            mu.Unlock()
            err := c.sess.Send(msg)
            if err != nil {
                c.sess.Close(3000, "error sending message")
            }
        }
    }()
    for {
        select {
        case message := <-c.messageChan:
            mu.Lock()
            m = append(m, string)
            mu.Unlock()
        case <-c.closeChan:
            return
        }
    }
}

Engine Interface optimisations

With #52 we improved throughput enough to keep up with our publish volume for now.

However publishing messages that are almost all not actually delivered to anyone nr stored (thanks to #51) is using a lot of CPU and causing delivery latency to be slower than it could be in some cases.

To fix this I suggest we consider a few different optimisations:

  1. remove separate addHistory method and instead pass history options to publish().
    • this will allow redis engine to implement publish + addHistory as a lua script for example which will require only a single round trip to redis per publish call - even with the drop_inactive optimisation.
  2. support publishing a batch of messages (possibly each to different channels) at once
    • engines that don't have a way to optimise this can just call publish in a loop and decide on semantics for partial failure (broadcast API call already has same problem)
    • redis engine can make use of pipelining for example to send all the request and then reap them without waiting for synchronous round-trip for each one.

Batch publishing in 2. will require changes further up the stack though - the application itself will need to support batch publish API requests (should we make new API call? Or breaking change to current API params?).

This is becoming more important as we increase publish load and see CPU usage climb.

Make admin websocket more functional

The goal here - make admin websocket endpoint (/socket) more advanced and similar to client endpoint in the end. This should not affect users as our web interface is embedded and will be updated together with Centrifugo.

  • Rename auth to connect
  • Support API commands (so it will be possible to remove http ActionsHandler)
  • Call stats API instead of InfoHandler (so remove InfoHandler)
  • Possibility to start communicating with admin endpoint without auth (--insecure_admin, maybe rename --insecure_web to --insecure_admin, but leave alias for backwards compatibility)

Benefits:

  • publish over pool of Websocket connections
  • less various HTTP handlers that were only required for web interface

The only problem I see at moment - how to show secret and current configuration options - so it seems extra admin method will be required to get current config.

Nginx embed location docs needs correction

I try to work with centrifugo throught embed location on my site.

When I try to follow instructions in docs I have this errors in nginx logs:

[emerg] "keepalive_timeout" directive is duplicate in /etc/nginx/sites-enabled/site.com:76
[emerg] "proxy_read_timeout" directive is duplicate in /etc/nginx/sites-enabled/site.com:78

There is a directive duplication in docs:

location /centrifugo/connection {
        rewrite ^/centrifugo(.*)        $1 break;

        proxy_next_upstream error;
        keepalive_timeout 65;  <-- HERE
        proxy_read_timeout 60s;  <-- HERE
        gzip on;
        gzip_min_length 1000;
        gzip_proxied any;

        proxy_buffering off;
        keepalive_timeout 65;  <-- HERE
        proxy_pass http://centrifugo;
        proxy_read_timeout 60s;  <-- AND HERE
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Scheme $scheme;
        proxy_set_header Host $http_host;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection $connection_upgrade;
    }

After this duplicates removed — every thing works fine.

Create specific types for Project/Namespace/Channels/Users

Currently are a lot of strings being thrown around to identify channels, users, etc.

If we change "string" in calls like addPresence(projectKey, channel, uid string, info ClientInfo) to typed version addPresence(projectKey pID, channel cID, uid uID, info ClientInfo) there is no risk of using the wrong value, sending them in wrong order or similar errors. Types could be:

type (
    nID string // Namespace ID
    pID  string // Project ID
    cID string // Channel ID
    uID  string // User ID
)

I wouldn't mind sending in a PR after #11 lands

Consider rewriting latency metrics

In Centrifugo 1.4.0 we deprecated timers. Here is an issue to consider making them right and put back in future releases.

This includes: client request time, API request time.

As @banks said in #68:

If we really want them we should use something like https://www.godoc.org/github.com/vulcand/vulcan/metrics#RollingHistogram which uses a robust measurement technique (HDRHistrogram) and supports rolling window of these to have an accurate distribution of the last N minutes etc. If we did that, I'd recommend having something like 15 buckets of 1 min, with each histogram having range 100µs to 10 mins (600mill µs) with 3 significant figures. That gives you very high definition for last 15 mins in a bit over 1MB per timer (on my mac). That seems very reasonable if we have ~20 timers on a modern machine. We could make it configurable if we want to support people on super low memory VPS or something. Then expose in the results from stats at least:
mean
50%ile
99%ile
99.99%ile
max
Looks like it's not threadsafe though so would need to put a mutex around it or send results on a lightly buffered channel and have it update in single goroutine

Simple request: allow disconnecting all users

There is already an admin API for disconnecting specified user.

In some cases (handling outages/load testing) it can be useful to explicitly disconnect all current users such that they won't attempt to reconnect indefinitely (in the wild we always have some people who leave the page open for days and days).

It's not vital, but it would be useful in these cases to be able to force-disconnect every user.

Could support empty userid param, but probably better would be something explicit like ** so it can't be done accidentally...

Why use "mapstructure" and not just json.Unmarshal?

It seems strange to use a third party library for something the standard library does just as well:

    var cmd connectClientCommand
    err = mapstructure.Decode(params, &cmd)

Is the same as

    var cmd connectClientCommand
    err = json.Unmarshal(params, &cmd)

Furthermore, if you want to defer parsing of some parts, simply do:

type clientCommand struct {
    Method string
    Params json.RawMessage  // Params are stored here, but not decoded
}
 // ...
    var cmd connectClientCommand
    err = json.Unmarshal([]byte(params), &cmd)

Using encoding/json is likely faster, much better tested and easier to read for "outsiders". Unless I am missing something (I could likely be), you don't use any "magic" features from mapstructure.

Redis API can't keep up with high publish volume

I'm using --redis_api option to publish messages at high volume.

I found that at about 5k messages a second average, the centrifugo.api queue in redis just grew and grew, even though my 2 centrifugo instances had vertually no CPU usage.

See this graph:

image

The problem is that centrifugos publish API isn't batched. Each message we must make at least 2 complete round trips to redis for publish and then addHistory call before we can move on to the next.

I managed to fix the problem in my case with a quick hack which was just to put the recieving goroutine here: https://github.com/centrifugal/centrifugo/blob/master/libcentrifugo/engineredis.go#L156-L179 in a loop so there were 20 of them running. Now it keeps up fine with much more CPU used:

image

Discussed with @FZambia on gitter, and he was concerned that this breaks order of delivery for single-node case.

So we come up with following proposal to fix this issue cleanly without full batching refactor (for now).

  • make a new configuration param redis_api_num_pub_chans which is default 1
  • allow client to choose how to assign messages - they can hash by channel or whatever they want to guarantee order (in single node case)
  • redis engine simply starts a separate goroutine for each one in initalizeApi and process them synchronously in order
  • leave current centrifugo.api queue as it is for any non-publish use-case that might exist
  • call new keys centrifugo.api.pub.[number]

I intend to have a PR for this built tomorrow.

Docker image wont start

[F]: 2015/10/13 14:56:50 main.go:179: Unable to locate config file, use "centrifugo genconfig -c config.json" command to generate one

json: error calling MarshalJSON for type *json.RawMessage

Hi,

I had this error message on my heroku logs and could'nt figure out what's about.

json: error calling MarshalJSON for type *json.RawMessage: invalid character ''' looking for beginning of object key string

Thanks for getting a look on.

Move connection and API request log level to DEBUG

Currently default log level is INFO. This means that every connect/disconnect and HTTP API request logged by default. I think this makes default logging too chatty. When lots of publish requests received logs look useless and a lot of work actually spent of this logging (noticed on flame graph, will try to reproduce and post picture here).

So proposal here is move those log lines to DEBUG level and write INFO level logs with aggregated information over minute - i.e how many connects/disconnects, how many requests of each type. This is maybe must be done together with #72 to show really useful information.

Any suggestions and objections are welcome.

Allow server and web admin APIs to listen on separate port to websocket

Hi

I love the web UI - useful tool. But we certainly couldn't expose it like it is now with just static password in production.

What we could do instead is run our production public instances with it turned off, and then run a separate instance connected to same redis with web on and only expose this over our internal VPN. Or alternatively put it behind our admin proxy that is hooked into our robust user authentication.

But for the second approach to work, we'd need to then make it have no password in config so that users don't need to auth twice (and remember some global made up password).

An alternative I think would be generally useful would be to be able to configure the web interface to be on, to NOT need password, but to listen on a separate port to the public websocket/sockjs server.

That would allow us to run the web UI on same nodes as production but to ensure they are not publically accessible - put them behind our authenticating proxy and/or only expose them over VPN.

While we are at it, I'd like to propose the same for the server API. I realise that the HMAC offers some security for it, but in our case we never want public access so it would be necessary to just lock it down completely.

One option would be disable the server API on the public nodes and have separate instance again only accessible internally but that would require changes. Again it would be neatly solved if you could optionally configure the API to listen on a separate port to the sockjs/websocket endpoint such that we could use same instances but keep access to those totally private at network level rather than rely on HMAC and obscurity.

tl;dr I'm going to end up modifying centrifugo to do either of the following. If you have input on which if any you like the sound of for core project I'll put my effort into that so I can contribute it back.

  1. Use separate centrifugo instance(s) for private stuff
    • allow disabling of server API, leave only websocket/sockjs exposed
    • allow no password for web UI such that it can go behind auth proxy or VPN without annoyance (might be a ticket for the web UI repo - I've not looked at the split of code yet)
  2. Separate ports for private stuff
    • allow optional configuration of separate port(s) for web ui and server API endpoints
    • also allow no password for web UI as above

Thanks for the great project by the way - I started writing something extremely similar for our large scale site after being disappointed at all the open source alternatives I tried, and my design was about as close as can possibly be imagined to the details of centrifugo. I'm very happy you already built it for us :).

Uncaught SyntaxError: Unexpected end of JSON input

Nothing has changed in my code since upgrading from v1.4.5 to v1.5.0, however, I'm getting this syntax error when sending message between clients:

chat1

The JSON is being broken somehow:

{
    "method": "join",
    "body": {
        "channel": "testsite:575e286967c52:sim1",
        "data": {
            "default_info": {
                "user_data": {
                    "id": 3,
                    "username": "sim1",
                    "rank": 100,
                    "zip": "56894",
                    "href": "http://www.testsite.com/users/sim1",
                    "last_login_at": "1467261409"
                }
            },
            "user": "3",
            "client": "855cdfc3-5dd6-4af4-b005-fc12653f780  //here string is not terminated and missing '}'

This prevents my messages from being sent.


for my app I'm using:
front-end: centrifuge-js
back-end: centrifugo-1.5.0-linux-amd64

application.handlePublishCommand: inconsistent error

In handlePublishCommand there is the following code:

    channelOptions := app.getChannelOptions(p.Name, channel)
    if channelOptions == nil {
        resp.Error = ErrNamespaceNotFound
        return resp, nil
    }

    // Client disconnects here...
    err := app.publishClientMessage(p, channel, data, nil)
    if err != nil {
        resp.Error = ErrInternalServerError
    }
  1. app.getChannelOptions is actually called in app.publishClientMessage, so there is no real need for it here.
  2. If the client disconnects around the added comment ErrInternalServerError will be returned instead of ErrNamespaceNotFound.

Where has the docker image gone

Please could you create an official docker image on dockerhub. there used to be one on centrifuge (although I see that has also gone now)

http: panic serving [::1]:57351: runtime error: invalid memory address or nil pointer dereference

OS: Centos 7

Centrufugo version: 1.4.0

After request to centrifugo, service crashes with error:

http: panic serving [::1]:57351: runtime error: invalid memory address or nil pointer dereference
goroutine 18 [running]:
net/http.(*conn).serve.func1(0x18af8240, 0xf764a130, 0x18aec1d8)
    /Users/fz/projects/go15/src/net/http/server.go:1287 +0xa2
sync/atomic.AddUint64(0x18a8c454, 0x1, 0x0, 0x806559a, 0x18aeabd0)
    /Users/fz/projects/go15/src/sync/atomic/asm_386.s:112 +0xc
github.com/centrifugal/centrifugo/libcentrifugo.(*metricCounter).Add(0x18a8c454, 0x1, 0x0, 0x9, 0x18aeabc0)
    /private/var/www/different/go/gopath/src/github.com/centrifugal/centrifugo/libcentrifugo/metrics.go:129 +0x89
github.com/centrifugal/centrifugo/libcentrifugo.(*metricCounter).Inc(0x18a8c454, 0x18aeabb0, 0xf778a25c)
    /private/var/www/different/go/gopath/src/github.com/centrifugal/centrifugo/libcentrifugo/metrics.go:118 +0x31
github.com/centrifugal/centrifugo/libcentrifugo.(*Application).APIHandler(0x18a14900, 0xf764a218, 0x18b56100, 0x18afa310)
    /private/var/www/different/go/gopath/src/github.com/centrifugal/centrifugo/libcentrifugo/handlers.go:318 +0x43
github.com/centrifugal/centrifugo/libcentrifugo.(*Application).APIHandler-fm(0xf764a218, 0x18b56100, 0x18afa310)
    /private/var/www/different/go/gopath/src/github.com/centrifugal/centrifugo/libcentrifugo/handlers.go:103 +0x38
net/http.HandlerFunc.ServeHTTP(0x18b171c8, 0xf764a218, 0x18b56100, 0x18afa310)
    /Users/fz/projects/go15/src/net/http/server.go:1422 +0x34
github.com/centrifugal/centrifugo/libcentrifugo.(*Application).WrapShutdown.func1(0xf764a218, 0x18b56100, 0x18afa310)
    /private/var/www/different/go/gopath/src/github.com/centrifugal/centrifugo/libcentrifugo/handlers.go:470 +0x9d
net/http.HandlerFunc.ServeHTTP(0x18b0b190, 0xf764a218, 0x18b56100, 0x18afa310)
    /Users/fz/projects/go15/src/net/http/server.go:1422 +0x34
github.com/centrifugal/centrifugo/libcentrifugo.(*Application).Logged.func1(0xf764a218, 0x18b56100, 0x18afa310)
    /private/var/www/different/go/gopath/src/github.com/centrifugal/centrifugo/libcentrifugo/handlers.go:486 +0x105
net/http.HandlerFunc.ServeHTTP(0x18b0b1a0, 0xf764a218, 0x18b56100, 0x18afa310)
    /Users/fz/projects/go15/src/net/http/server.go:1422 +0x34
net/http.(*ServeMux).ServeHTTP(0x18b184a0, 0xf764a218, 0x18b56100, 0x18afa310)
    /Users/fz/projects/go15/src/net/http/server.go:1699 +0x133
net/http.serverHandler.ServeHTTP(0x18b58540, 0xf764a218, 0x18b56100, 0x18afa310)
    /Users/fz/projects/go15/src/net/http/server.go:1862 +0x156
net/http.(*conn).serve(0x18af8240)
    /Users/fz/projects/go15/src/net/http/server.go:1361 +0xc05
created by net/http.(*Server).Serve
    /Users/fz/projects/go15/src/net/http/server.go:1910 +0x343
^C[I]: 2016/02/25 12:50:06 Signal received: interrupt
[I]: 2016/02/25 12:50:06 Shutting down

Config.json:

{
"secret": "secret",
      "publish": false,
      "watch": true,
      "presence": true,
      "join_leave": false,
  "insecure_api" : true,
  "history_size": 10,
  "history_lifetime": 240,
  "log_level": "debug"
}

Graceful shutdown

Right now shutting down the server, will just kill whatever it currently is doing.

I would propose that we put in a way to flush pending messages while rejecting incoming requests.

I will send a PR.

Next Centrifugo release won't support multiple projects, if you have any objections - write here

I'll try to describe why I decided to do so.

At moment Centrifugo does not support any sort of sharding and having two project registered can result in denial of two project's real-time features because of one of those projects under heavy load. I have already written about this in docs.

Also this will allow to make a little shift in Centrifugo API philosophy - i.e. API will be not per project but per Centrifugo. This means that Centrifugo can theoretically return stats and metrics via calls to HTTP API. At moment this is semantically incorrect because of multiple project support - i.e. that data must be per project.

Of course it will result in some backwards incompatible changes. Slightly modified configuration options for project settings. And no need to use project name in connect parameters, API calls, token generation etc. I think most of those problems can be quick fixed by using empty string as project name. But some functions will have another signatures in new versions of libraries (i.e. won't accept project name where it was accepted before).

In my opinion this change will open a road to next improvements in Centrifugo which is currently very difficult because of multiple projects support.

Events

In my application very need to know when someone disconnect from channel. So, will be good to have something like disconnect event from centrifugo server.

How can i configure SSL?

Hi,
I don't know how to configure centrifugo to have SSL endpoint.
I am running the app on port 8000 and i want to use https://xxxxx:8000 not http://xxxxx:8000

I have this error message : was loaded over HTTPS, but requested an insecure XMLHttpRequest endpoint This request has been blocked; the content must be served over HTTPS.

Thanks

Use sync.RWMutex in hubs.go

Just looking through the code I wondered why you don't use sync.RWMutex in your clientConnectionHub? Right now you serialize all incoming data without any reason I can see.

It would appear that these are read-only operations on clientConnectionHub:

  • getClientsCount
  • getUniqueClientsCount,
  • getUserConnections (this should return a copy either way, it is unsafe/racy as it is now AFAICT)

In clientSubscriptionHub, read only are:

  • getChannelsCount
  • getChannels
  • broadcast

In adminConnectionHub:

  • broadcast

If these were using a RWMutex, and using RLock/RUnlock, they would be able to work concurrently. Right now only one broadcast can run at the time, without any real reason.

Only store history when there are already subscribers

This is an RFC for a new (optional) optimisation that is crucial for us.

I'm going to work on implementing this today on a fork and will hopefully have a PR, but wanted to document the rationale and design first.

Problem

Right now, if history/recovery is enabled, Centrifugo will store up to history_size messages for every channel, published to in the last history_lifetime seconds.

This makes designs with high degree of fan-out impractically expensive.

In my case I have about 5k messages a second being published (after fan-out) where some messages might be published into tens of thousand of channels - one for each "watcher". Even with no subscribers active and modest history settings, that means I need tens or hundreds of GB of Redis memory which is all doing nothing - never read by anyone.

In vast majority of cases, a user who is watched by tens of thousands of people will likely only have a very small fraction of them currently online and subscribed to centrifugo; most users with few watchers will have zero watchers online most of the time. I expect this optimisation to reduce redis memory requirements (and operations per second) by several orders of magnitude, with almost no change to app behaviour (see Edge Case section).

Solution Overview

I propose an optimisation that will ensure we simply don't store history when it is not needed by "active" subscribers. "Active" means: online now, or within history_lifetime seconds (and so will possibly reconnect, expecting recovery).

The logic below is almost transparent to the application - it will behave identically to current logic, but will result in only using redis memory for data that is actually delivered.

At a high level it's simple:

  • check if engine.publish() actually delivered to anyone
  • check if history already exists (i.e. something was listening in the last history_lifetime)
  • Add message to history key if and only if at least one of the above checks indicates active subscriber for that channel

Implementation

This should be relatively simple. If I were designing from scratch I'd be tempted to not have separate publish and addHistory methods in the engine interface as they are fairly closely linked. Indeed if you made that change it would be possible to implement this entire request in the publish method and potentially with a lua script inside redis entirely.

But in interest of minimising changes to core centrifugo code I propose we implement it with these minimal changes:

  • change Engine.publish signature to publish(chID ChannelID, message []byte) (bool, error) where the bool return value is true when we delivered to at least 1 active subscriber and false otherwise.
  • change addHistory's options struct to include a new option: OnlySaveIfActive bool. If this is set to true then addHistory should only add the new event if the history key already exists.

Engine Compatibility

  • The changes to publish are efficient to implement in memory engine as you have state about subscribers
  • Also efficient in redis engine since PUBLISH returns the number of active subscriptions that were delivered to (http://redis.io/commands/publish)
  • Even if there are future engines that don't know efficiently - e.g. use a third-party service that doesn't return that info, or queue publishes asynchronously - they can still just return true for every call, and you end up with exactly the same behaviour as now - suboptimal but totally correct.
  • The semantics of current client code is that we only recover on disconnection, so messages delivered before a client first connects are never read by the client anyway. Thus it's safe to simply not store them at all if there is no client connected now, and hasn't been for the last history_lifetime. Clients end up with identical deliverability guarantees - indeed they couldn't tell the difference with and without this.

Edge Case (resolved with #148 )

I've said this is //almost// completely transparent to the client which is true. There is one case in which it is not which I regard as an edge case, and acceptable given centrifugos "best effort" general design.

Given the proposed optimisation, consider the following:

  • Client connects and subscribes to foo
  • No messages are published before client loses connection (phone goes into tunnel for example)
  • While client is offline, message A is published into foo
  • Since there are no active subscribers (dropped connection pubsub state cleaned up) AND no existing history, this message is dropped not saved
  • Client reconnects within history_lifetime with recover option. But they don't get A because it was not saved.

Personally I don't consider this a big problem. However given that it is a change, we could consider making this optimisation configurable in case others find it unacceptable.

Self-subscriptions

The one other case that doesn't affect me but should be pointed out is that if you are using the Websocket API to allow publishing (i.e. publish: true in config) then these messages will always be saved due to the client API design - a client can't publish to a socket unless they have a subscription to it which means publish() will always see at least one active subscription.

This is no worse that current behaviour and is probably expected/desirable anyway so I don't think it's an argument against the optimisation.

Packaging for fedora/centos

Idea.
It's possible to update centrifuge packaging policy:

  1. Update RPM spec to use git describe --long to get version and release number
  2. Build all packages in travis-ci using cloud build
  3. Create free repository in packagecloud.io (example)
  4. Probably add tests runs directly in docker containers (to run test for each os)
    now we support centos 6/7, fedora 20-23

How it works: https://travis-ci.org/tarantool/tarantool/builds/96720534
Results: https://packagecloud.io/tarantool/1_6

[question] Alternative engines support

in the application.go there is a hook into the engine

// engine to use - in memory or redis.
engine Engine

I have am using NSQ, as well as NATS. These are both 100% golang based message queues.
NSQ has storage.

https://github.com/nsqio/nsq

https://github.com/nats-io/gnatsd

At the moment i use NSQ for microservices and large application. You can string together micro service libraries, without having to get into the HTTP Request Response stuff, but instead just passing messages with body payloads of data between them.

The other thing is that NATS and NSQ dont have great support for interacting with the Web client, Or mobiel client, and so Centrifo would get allot of support if it integrated with these well used MQ libds like NSQ and NATS.

Just an idea at this stage, and i have not had a chance to look at the interface that woudl need to be supported.

Oh and lastly, NSQ and NATS are both in the process of getting a Discovery Service integration, so that they can run on large Kubernetes clusters.
Each Server, has a MQ daemon running with it.
There are many lookup daemons that know all the MQ daemons, and the topics and channels they all use.
So at the code level, you can ask for the topic, and publish to it, and not have to know anything about what or where that other server is.

Basically, in summary, NSQ and NATs, will allow Centrigio to scale out on clouds much easier.
I knwo Redis can also scale out on k8 (kubernettes), but its no as decoupled is my point.

Simpler response.go

Removes the need for a custom marshaller, and converting to a map[string]interface{}. Also doesn't initialize fields to zero value:

package libcentrifugo

import (
    "encoding/json"
)

// response represents an answer Centrifugo sends
// to client or API request commands
type response struct {
    Body   interface{} `json:"body"`
    Error  error       `json:"-"`
    ErrString  *string       `json:"error"`
    Method string      `json:"method"`
}

func newResponse(method string) *response {
    return &response{Method: method}
}

// multiResponse is a slice of responses in execution
// order - from first executed to last one
type multiResponse []*response

// toJson converts response into JSON
func (r *response) toJson() ([]byte, error) {
    if (r.Error != nil {
        s := r.Error.Error()
        r.ErrString = &s
    }
    return json.Marshal(r)
}

// toJson converts multiResponse into JSON
func (mr *multiResponse) toJson() ([]byte, error) {
    return json.Marshal(mr)
}

Multiple channels publish

For example:

command = {
        "method": "publish",
        "params": {"channels": ['channelA', 'channelB', 'channelC'], "data": {"input": "xxx"}}
}

One message published to many channels at once. For many scenario this dramaticly save traffic and performance (and at client-side too)

E.g. I have rss news server with 100 feed. Any user can subscribe to any feed or array of feeds. One case - each feed has own channel, server push message per channel. But if i can read 50 feed, at client we have 50 subscribers. In other case, server fetch actual user subscriptions and publish one message for all users, who have subscription.

Refactor metrics to be more accurate and usefull

I'm looking into the metrics that Centrifugo exposes via stats api call as we move to put it into production.

I have a couple of issues that are not show-stoppers but I think can be better:

  • Our typical monitoring is to use diamond collector on localhost to gather metrics once every 5 seconds from process. Current stats API is awkward because:
    • Output includes all nodes in cluster and to figure out which is current one means poking around trying to match name to _, That's not just fiddly, it feels like a hack to rely on knowing how centrifugo chooses to name nodes, and if --name is given at runtime in someone's setup it will break. Relying on --name is marginally better but requires that your centrifugo config and diamond config agree on the name to look for, and that you coordinate unique names across the cluster somehow. All of this is possible with good config management but it seems way to hard for something so simple.
    • The counter values reset periodically (every node_metrics_interval). Which is OK if you are consuming them directly, but for counters it makes it impossible for external process like diamond to track actual number of requests in between polls. I.e. you can't get better than configured granularity.
    • In addition some of the actual stats (the times) are not really very useful. Not a show stopper (I don't need to monitor those right now), but:
    • exponential decay sampled histogram is already a little difficult to reason about - it shows "recently" weighted results but you don't know _how_recent. But then Centrifugo only shows you the value once every node_metrics_interval of a moving decaying sample... So what you actually see is the "recent weighted" mean/max value as of up to 1 minute ago.
    • Actually using these for latencies is plain wrong, I went into detail but it's better to just link to: https://groups.google.com/forum/#!msg/mechanical-sympathy/I4JfZQ1GYi8/ocuzIyC3N9EJ which is talking directly about the Dropwizard implementation that go-metrics is a port of.

As well as that, there is also a minor race condition:

  • updateMetricsOnce locks the metrics mutex and copies values from counters and histograms
  • then it unlocks
  • then it clears the counters
  • any request that came in in between reading counter values and clearing will be lost (we correctly don't lock mutex to increment since go-metrics already handles concurrency)

Proposal

Here is one option that I think solves all my issues, it's quite a big change and there are certainly variations that would still work for our case if there are downsides I don't see with this solution.

  • Stop using go-metrics. It's useful and has some interesting properties, but we are already losing almost all the value by adding the interval based update to publically visible values. In addition, sampled histograms are wrong way to measure latency, and without those, the rest of what we use can just be replaced with integer counters.
  • Make there be a local_node_only param to stats OR create new api endpoint localstats
  • Get rid of node_metrics_interval and periodic updating - just return current counter values on each call to stats (or each ping message sent to other servers)
    • If others really rely on the 1 minutes summary counts we could include both, but simple counters are much more useful in general
    • Just use plain old atomic uint64 for all counters and don't worry about locking
  • Get rid of the timers. If we really want them we should use something like https://www.godoc.org/github.com/vulcand/vulcan/metrics#RollingHistogram which uses a robust measurement technique (HDRHistrogram) and supports rolling window of these to have an accurate distribution of the last N minutes etc. If we did that, I'd recommend having something like 15 buckets of 1 min, with each histogram having range 100µs to 10 mins (600mill µs) with 3 significant figures. That gives you very high definition for last 15 mins in a bit over 1MB per timer (on my mac). That seems very reasonable if we have ~20 timers on a modern machine. We could make it configurable if we want to support people on super low memory VPS or something. Then expose in the results from stats at least:
    • mean
    • 50%ile
    • 99%ile
    • 99.99%ile
    • max
    • Looks like it's not threadsafe though so would need to put a mutex around it or send results on a lightly buffered channel and have it update in single goroutine

Redis Engine has concurrency bug

At high volume I start to see redis connections drop. I've not worked out why but it's not very surprising in general.

When they do I see a lot of errors like redigo: connection closed.

The only way you can get that as far As I can see from redigo code is if you tried to issue some command to a closed pool connection (and logged the err so additional conn.Close() doesn't count).

I suspect this happens:

  1. initializePubSub is in subscribe loop
  2. redis error occurs and it return, closing e.psc (actually not it closes it twice - once explicitly and once in defer but that shouldn't matter as second is a no-op)
  3. before checkConnectionStatus manages to run and reconnect e.psc, new client subscribes and subscribe() call blindly calls e.psc.Subscribe on the closed conn.

Note that this doesn't even require multiple processors to be triggered (could happen with GOMAXPROCS=1 due to checkConnectionStatus sleeping for a second before it reconnects).

Secret required error

I'm aware that recent release (1.0.0) changed a bit how Centrifugo works. However, when I try to setup it from scratch generating a new config file using genconfig I get the following error:

$ ./centrifugo genconfig
[F]: 2015/10/26 10:25:47 main.go:368: config error: secret required

After getting that error, I manually created a config.json file and used checkconfig, but I still get the same error even though I defined a secret property in config file:

$ ./centrifugo checkconfig
[F]: 2015/10/26 10:48:03 main.go:353: config error: secret required

My config.json manually created:

{
  "secret": "very-long-secret-key"
}

When I start Centrifugo server, I don't get any error though.

Close status codes?

What is the purpose of the "status" error code ion the session interface?

Typical usage:

    if err != nil {
        c.sess.Close(3000, "error sending message")
    }

Of course the number "3000" seems very arbitrary, and for instance in the wsConn it is just ignored:

func (conn wsConn) Close(status uint32, reason string) error {
    return conn.ws.Close()
}

How i can run it as daemon?

Hi,

I starting server by command: centrifugo -w --config="/opt/centrifugo/config.json" --log_level="debug" --log_file="/var/log/cintrifugo.log" but can I run it as daemon? E.g. options -d , for examples? Any idea?

Busy loop if redis is down

Someone complained that centrifugo was eating their dev VM CPU (constant 75%+ CPU usage from centrifugo with no load).

I debugged and logs look like this:

2016-03-23_17:31:49.10868 [I]: 2016/03/23 10:31:49 Starting RedisEngine Subscriber
2016-03-23_17:31:49.10872 [I]: 2016/03/23 10:31:49 Stopping RedisEngine Subscriber
2016-03-23_17:31:49.10880 [C]: 2016/03/23 10:31:49 dial tcp [::1]:6379: getsockopt: connection refused
2016-03-23_17:31:49.10886 [E]: 2016/03/23 10:31:49 dial tcp [::1]:6379: getsockopt: connection refused
2016-03-23_17:31:49.10905 [I]: 2016/03/23 10:31:49 Starting worker for API queue centrifugo.api
2016-03-23_17:31:49.10910 [C]: 2016/03/23 10:31:49 dial tcp [::1]:6379: getsockopt: connection refused
2016-03-23_17:31:49.10914 [E]: 2016/03/23 10:31:49 RedisEngine Receiver error: dial tcp [::1]:6379: getsockopt: connection refused
2016-03-23_17:31:49.10923 [I]: 2016/03/23 10:31:49 Starting RedisEngine Subscriber
2016-03-23_17:31:49.10928 [E]: 2016/03/23 10:31:49 RedisEngine Subscriber error: dial tcp [::1]:6379: getsockopt: connection refused
2016-03-23_17:31:49.10933 [I]: 2016/03/23 10:31:49 Stopping RedisEngine Subscriber
2016-03-23_17:31:49.10940 [C]: 2016/03/23 10:31:49 dial tcp [::1]:6379: getsockopt: connection refused
2016-03-23_17:31:49.10945 [E]: 2016/03/23 10:31:49 dial tcp [::1]:6379: getsockopt: connection refused
2016-03-23_17:31:49.10953 [I]: 2016/03/23 10:31:49 Starting worker for API queue centrifugo.api
2016-03-23_17:31:49.10958 [C]: 2016/03/23 10:31:49 dial tcp [::1]:6379: getsockopt: connection refused
2016-03-23_17:31:49.10962 [E]: 2016/03/23 10:31:49 RedisEngine Receiver error: dial tcp [::1]:6379: getsockopt: connection refused
2016-03-23_17:31:49.10970 [I]: 2016/03/23 10:31:49 Starting RedisEngine Subscriber
2016-03-23_17:31:49.10976 [E]: 2016/03/23 10:31:49 RedisEngine Subscriber error: dial tcp [::1]:6379: getsockopt: connection refused
2016-03-23_17:31:49.10980 [I]: 2016/03/23 10:31:49 Stopping RedisEngine Subscriber
2016-03-23_17:31:49.10987 [C]: 2016/03/23 10:31:49 dial tcp [::1]:6379: getsockopt: connection refused
2016-03-23_17:31:49.10991 [E]: 2016/03/23 10:31:49 dial tcp [::1]:6379: getsockopt: connection refused
2016-03-23_17:31:49.11000 [I]: 2016/03/23 10:31:49 Starting worker for API queue centrifugo.api
2016-03-23_17:31:49.11095 [C]: 2016/03/23 10:31:49 dial tcp [::1]:6379: getsockopt: connection refused
2016-03-23_17:31:49.11100 [E]: 2016/03/23 10:31:49 RedisEngine Receiver error: dial tcp [::1]:6379: getsockopt: connection refused
2016-03-23_17:31:49.11110 [I]: 2016/03/23 10:31:49 Starting RedisEngine Subscriber
2016-03-23_17:31:49.11116 [E]: 2016/03/23 10:31:49 RedisEngine Subscriber error: dial tcp [::1]:6379: 

So real cause was that his local redis instance was down for some reason but in general this is a nasty way to fail if redis is down.

In production it would be unfortunate to be stuck in a busy loop like that when redis goes away - might cause dropped pings and connections to clients even though centrifugo might be able to reconnect a few seconds later to fixed or newly promoted master and keep going seamlessly otherwise.

I think the fix is simple: in all of our worker loops that fail and restart on redis connection error, add a short sleep before they actual return after an error (say 500ms). so that we don't end up eating CPU while we try to reconnect.

Audit error sources and identify ones which are really fatal.

Since my comment a few days ago, centrifugo now disconnects with reconnect: false for almost any error that occurs while processing a message. The vast majority of these are not fatal errors and should not prevent the client from retrying.

For example, now I my client connects and the call to subscribe fails due to a temporary redis networking glitch, the client is permanently disconnected.

When trying to figure out how different errors should behave I realise that there are 3 different error scopes:

  1. session level errors
    • errors that should/will never be recovered from by this client in this state
  2. connection/transport level errors
    • physical: websocket is closed/fails
    • connection protocol issue
    • unsupported protocol version (should be fatal)
  3. individual api call failure
    • bad request (don't retry API call, connection OK)
    • transient backend failure (retry API call, connection OK*)

In case of 3, the retry option actually applies to API call not whole connection in most cases. I put * next to connection OK in last case because it's arguable that you should disconnect them here. The benefit of disconnect is that a network partition where one centrifugo host is unable to toal to backend but others are will recover itself. But in general it means disconnect/reconnect overhead for no reason if the connection failure was just temporary. Could have hybrid that keeps track of how many successive API calls failed on a connection and disconnect after 2 or 3 consecutive ones? That's out of scope anyway for now current behaviour is OK as long as we have defined ways to handle each type and tell the difference.

Finally, one option would be to return distinct error codes for each type of error and let client (ours or end-user) choose to retry or not.

That might make some sense, however the downside is that there will be long-lived clients out there and you risk having them retry indefinitely when you introduce a new fatal error into server later (or vice-versa) because they don't know.

So I like the current model of returning advice on whether client should retry requests and/or reconnect in each case. As that is future proof.

Redis Sentinel support

At moment we have pretty efficient Redis engine which allows to scale Centrifugo to many nodes. In this situation setup limited by Redis throughput - this is more than enough for most cases as Redis capable to serve tens of thousands requests per second. So this is ok for most projects in Internet (I am not considering giants).

The weak point in this architecture is that Redis is single point of failure. When it comes to setting up Centrifugo on your servers and you want High Availability for Redis then you should invent tricky proxies - something like tricky Haproxy setup for example.

So I think it's time to support official way to add HA to Redis – Redis Sentinel.

At moment Redis driver we use - Redigo - does not support Sentinel, I am experimenting with its fork to add Sentinel support. We can also consider moving to another Redis client - there are many of them

memoryPresenceHub.get(..) is racy.

You return a reference to the map[string]interface{}, but you unlock when the function returns. There is no way for the caller so ensure that the map isn't modified by another goroutine while it operates on the data.

Unfortunately copying a map[string]interface{} isn't trivial, but maybe "https://godoc.org/code.google.com/p/rog-go/exp/deepcopy" can help, but the best solution would probably be to not use interface{}, but some more "controlled" type.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.