Coder Social home page Coder Social logo

Comments (14)

FZambia avatar FZambia commented on June 1, 2024 1

Yep, you can experiment with queue_size branch where I added this change - diff, please report whether it helps to achieve the desired behavior. Think you can emulate slow consumers and make sure this is a reasonable way to go in your case.

In general I'd like to see some code which shows how you are using Centrifuge currently, and how adding a queue size helps to solve a problem. Currently I still do not fully understand details.

from centrifuge.

FZambia avatar FZambia commented on June 1, 2024

Hello, @icellan!

I think there is no mechanism currently which can help with this. Seems that in real-time messaging semantics being aggressive with slow consumers is what usually desired and it's hard to find a strategy better than it. Each individual subscriber (which is usually a frontend app) should be able to handle all the load coming from the connection. Well, possibly except some use cases where you can safely drop some data at any moment like you described (but I think only when you operate on individual connection basis without channel semantics at all). Curious which use case can tolerate throttling like this?

We have TransportWriteHandler which is close, but it seems you may need something different here.

Client's current buffer queue size is not exposed now (it's only possible to globally configure its maximum size) – we can theoretically expose current buffer size to Client's API, but I am lacking details about the use case - if you are using publications over channels then having queue size available won't help a lot (as it's a PUB/SUB semantics with broadcast so you don't have access to all client's queue sizes). If you are operating on per-client basis (which I believe you are given the issue description) - then possibly it may help, but I can not suggest good algorithm here without your help and some proposal with examples. If I understand right in basic form it should be sth like this:

go func() {
	for {
		select {
		case <-ctx.Done():
			return
		case data := <-databaseStream:
			if client.QueueSize() > 500*1024 {
				// Skip publish
			} else {
				client.Send(data)
			}
		}
	}
}()

But as I said – I don't know how you are sending data at the moment. So example above may be completely wrong.

from centrifuge.

FZambia avatar FZambia commented on June 1, 2024

One other assumption, if you operate on client basis and have a way to stream updates from the database and want to tolerate spikes in load and handle slow client use cases, - then possibly you need pull consumers instead of push consumers. So that application request some data, then process it, then request the next batch. Like Kafka. In this case Centrifuge may not be the best fit, as it's not really real-time messaging, though with RPC calls over WebSocket it can theoretically can be implemented.

from centrifuge.

icellan avatar icellan commented on June 1, 2024

Hi @FZambia, thanks for the extensive answer.

To give you more information about what we are doing, we are reading blocks from a blockchain, searching for transactions in that block based on a filter, and sending the transactions to the user. Sometimes we find many transactions in a block, sometimes none. The search is batched per block.

What Centrigue brings to the table is (next to connect/disconnect, protobuf, etc.) to allow us to smooth out the sending of transactions to the client. When we find lots of transactions, they are queued, and in the meantime, we search for new transactions in subsequent blocks.

What I would need to make this not fail is to be able to peek into the size of the queue for the channel and pause the processing for a bit before continuing. Something like this:

go func() {
	for {
                txs, err := getTransactions(block)
                while client.QueueSize(channel) > 500*1024 {
                     time.Sleep(1 * time.Second)
                }
                err = send(txs);
                block++
	}
}()

from centrifuge.

FZambia avatar FZambia commented on June 1, 2024

while client.QueueSize(channel)

QueueSize may be exposed per the whole Client connection returning the current size of client's message write buffer (based on which we make decisions about slow clients), not per individual channel, Client does not have per-channel buffer queues, only one []byte queue from which we are reading messages before sending to Transport. So I do not understand the expectations in the snippet you provided.

from centrifuge.

FZambia avatar FZambia commented on June 1, 2024

Thinking more, given the information you provided I am not sure relying fully on Centrifuge queue size is a proper way to go in this case. If you don't have strict latency SLA here (since you can afford pausing a stream) then sort of backpressure is needed to avoid overwhelming the client. Now you are overhelming it and Centrifuge queue size is the last queue in chain of app buffers, network buffers, etc. This backpressure is natively provided when you are using polling techniques, but when pushing some data this is trickier and possibly can be sort of message sent from the client side to control a stream. I may be wrong in some assumptions as the apllication specifics can dictate its own rules.

from centrifuge.

FZambia avatar FZambia commented on June 1, 2024

If your goal is to send as much data as possible, then possibly the current Centrifuge behavior with queueing is not a good fit since it hides socket write native TCP flow control behind the queue of limited size. It was originally designed for channel broadcasts where we can't wait for slow writes and now used for all server writes, but when operating with a single Client seems what you actually need is blocking Send function, which will unblock as soon as data written into network socket (into OS buffer). This way no tricks with checking queue size are required I think.

Are you using Client.Send method to push data to the client?

from centrifuge.

icellan avatar icellan commented on June 1, 2024

@FZambia I'm using the node.Publish function.

I actually really like the way we are doing this now + we are also using Centrifuge for other communication to the client.

The problem here is indeed that at some times the server is pushing data too hard to the channel, but I don't think the solution should be to block the Send or Publish functions in Centrifuge, because now the intermittent finding of data by the server to send to the client is averaged out (with the queue) and the client is able to receive at its maximum speed.

The usage of while in my pseudo code is just a simple way to wait on the client until the queue is low enough again to send new data. So, Centrifuge is operating at maximum speed to the client, but getting data on the server is sometimes paused.

Adding to this, this is actually when a client startups and is catching up on old data (that's why it's reading from the DB), after the startup, it will be just getting new data as it comes in.

from centrifuge.

FZambia avatar FZambia commented on June 1, 2024

I don't think the solution should be to block the Send or Publish functions in Centrifuge

Actually I only talked about Client.Send to block until data is written, it's not possible for Node.Publish to be sync (i.e. return after data written to socket) due to its broadcast semantics (and real socket writing can happen on another node actually).

The usage of while in my pseudo code is just a simple way to wait on the client until the queue is low enough

I understand this, my concern was about the fact that it's not possible to monitor queue size on per channel basis in current Centrifuge implementation (i.e. using client.QueueSize(channel)), Client has a single []byte queue for all channels where already serialized messages are put (so it could be possible to get its size using sth like client.QueueSize() - note, without channel argument).

from centrifuge.

icellan avatar icellan commented on June 1, 2024

@FZambia Apologies for the confusion on my side, it would not be necessary in my mind to add the channel to the queue size call, it is even better not to do it and just get the total queue size for that client.

from centrifuge.

icellan avatar icellan commented on June 1, 2024

@FZambia Was looking into the code. Am I correct in thinking that this is the only addition to client.go needed to make it work for me?

func (c *Client) QueueSize() int {
	return c.messageWriter.messages.Size()
}

from centrifuge.

icellan avatar icellan commented on June 1, 2024

@FZambia The access to QueueSize has solved my problem, I do not get DisconnectSlow errors anymore, since I can throttle on the server how fast new data is sent.

I am however getting DisconnectError now, that I think I hunted down to the pig/pong not being deliver properly, maybe due to the throttling.

from centrifuge.

FZambia avatar FZambia commented on June 1, 2024

It's pretty hard for me to help with this and suggest sth without code example which reproduces the case. So I could also find the root cause and think on the possible solution.

If this helps, Centrifugo periodically puts ping frames into same client queue, queue is drained into socket, clients read socket and should respond with pong frame. If the connection is slow - this may lead to the situation when client is disconnected with no ping code (since it's not fast enough to process incoming data and does not have a chance to respond with pong to ping). Probably that's the issue, but may be sth different. If this is the issue then increasing ping/pong server intervals and increasing max delay of server pings on the client side could help to avoid disconnects – but seems like a rather dirty solution for WebSocket communication. WebSocket does not provide any way to prioritize data and process streams separately, it's just a single connection with head-of-line blocking issues.

Again, probably you hit sth different.

from centrifuge.

FZambia avatar FZambia commented on June 1, 2024

Closing this one, looks like we were not able to find sth actionable here.

from centrifuge.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.