Coder Social home page Coder Social logo

Comments (47)

derekcollison avatar derekcollison commented on June 12, 2024 1

@vtolstov I think we understand now what you are asking. We call these exclusive receivers, meaning there could be multiple but one is selected to receive all the messages until it goes away and then another subscriber will be selected.

We currently do not have this behavior built in to JetStream but you are also not the first to ask for it, will keep this issue open.

from jetstream.

ripienaar avatar ripienaar commented on June 12, 2024

I dont think this is a feature we are planning specifically, you could use subject names and wildcards based on some information about the message and then arrange for subscribes on those patterns.

Say foo.p1 and foo.p2 for different partitions and have your subscribers gets only messages for a specific partition on their consumers? Not sure if that’s too simplistic for your actual use case.

NATS is payload agnostic so we won’t look at any UUID in a message

from jetstream.

vtolstov avatar vtolstov commented on June 12, 2024

but why? So in config provide func signature that receives interface{} and returns some stuff. And user write distribution logic in such func?

from jetstream.

vtolstov avatar vtolstov commented on June 12, 2024

@derekcollison sorry, what you can say about such useful feature? Mostly in distributed services i need to have guarantee that messages belongs to same specific "resource" always goes to one service (like in nginx sticky sessions =))

from jetstream.

vtolstov avatar vtolstov commented on June 12, 2024

i need something like this https://godoc.org/github.com/liftbridge-io/go-liftbridge#PartitionByKey
so each message have key/val attached and partitioner can use any logic to map message via key/val stuff to partition.

from jetstream.

derekcollison avatar derekcollison commented on June 12, 2024

With JetStream you do not need partition like Kafka or LB. You can filter messages from a stream for delivery to a consumer.

For your original request, asking about sticky consumers, what happens when that consumer fails? Do you want the system to wait for it to return or if other consumers are available utilize those?

Would like to understand the problem a bit more deeply.

Thanks.

from jetstream.

vtolstov avatar vtolstov commented on June 12, 2024

If consumer fails I'm prefer to have to postpone such messages small amount of time (configured) and either continue if it brings up, or rebalance to another consumer this amount of messages

from jetstream.

vtolstov avatar vtolstov commented on June 12, 2024

For filtering messages from stream, where i can find info/interface?

from jetstream.

vtolstov avatar vtolstov commented on June 12, 2024

As i saw jetstream cmd app able to filter via subject, but I'm need to filter by some key val attached to message

from jetstream.

ripienaar avatar ripienaar commented on June 12, 2024

As a possible example:

Lets say you have a stream receiving data on YOURSTUFF.*, you can say you have 10 workers, pick a random number 0-9 and publish your message to YOURSTUFF.p1 for example.

You can then set up 10 consumers each using a FilterSubject of lets say YOURSTUFF.p1, this worker will get all of the p1 messages and no others will receive them. This way it remains payload agnostic and you can easily grow/shrink the pool.

from jetstream.

vtolstov avatar vtolstov commented on June 12, 2024

from jetstream.

ripienaar avatar ripienaar commented on June 12, 2024

You need to know how many partitions to define with this scheme, it’s easier to add more since no new broker config or data rebalance happens but you must know how many.

In the document you linked you have to predefine partitions count when cresting the stream too

from jetstream.

derekcollison avatar derekcollison commented on June 12, 2024

Let me look into this a bit more. We have heard a few others possibly want something like this.

To summarize you would want the ability to configure a stream to have one active consumer and only deliver to that consumer until it fails and then deliver to another consumer, again in the same way.

That describe it?

from jetstream.

vtolstov avatar vtolstov commented on June 12, 2024

@derekcollison not so, i need to distribute messages by some attached metadata. So when i'm publish i'm add some key for example with uuid of user. And i need that all messages for this uuid processed via single consumer. But messages for another uuid by another consumer.
Mostly like - i have many messages in topic with balance changing. Messages in seq order.
user1 have +100, -50, +20 with messages. I need to be sure that they processed by one consumer, because if one get event +100 and another get event -50 and message -50 processed before (as consumer have more cpu power, network speed or other conditions) i get negative balance.
So for this user only consumer that chosen via consistent hashing must process this messages. For another user uuid they can process another consumer.

This example looks better?

from jetstream.

ripienaar avatar ripienaar commented on June 12, 2024

Here's a little - quite hacky- example that sets up a Stream and 2 partitions with a worker per partition

It's got a basic user and uses the UserID to determin the partition to use so a user is always consstently handled by the same parittion

https://gist.github.com/ripienaar/353c777e2abbf1ac0032d55345a0b3eb

from jetstream.

ripienaar avatar ripienaar commented on June 12, 2024

You can of course have multiple workers and use queu subscribe on a per partition basis etc, I just did not show that - so its easy to scale a partition horizontally. You can also use pull based to achieve the same using this design

from jetstream.

derekcollison avatar derekcollison commented on June 12, 2024

Yes that makes sense. A few points, first and foremost NATS is payload agnostic, and so is JetStream so we do not know anything about the semantics of the payload.

However you can achieve what you want by placing the meta-data you want to be able to filter on in the publish subject.

So let's say I want to filter on name (over simplified I know). I would create a stream with a wildcard.
STREAM = ORDERS.* (* here is the name)

So I could have messages in the stream like ORDERS.derek or ORDERS.ri and then create a consumer that filters based on what I want, like ORDERS.derek only.

from jetstream.

vtolstov avatar vtolstov commented on June 12, 2024

example is cool, thanks. but how this deals when some consumer died? publisher already sent to partitioned topic. so you need to recalculate again in this case and resubscribe all consumers.
this is possible i think , but does jetstream can provide some helper? server already knows about subscribers, why not create some selector interface that receives number of subscribers/partitions or something and can determine for which of them deliver message?
for example i'm prefer ketama to distribute across subscribers, also i have ceph crushmap golang parser and also straw2 algo selector. So with interface that can choose subscriber for message will be very useful.

from jetstream.

vtolstov avatar vtolstov commented on June 12, 2024

@derekcollison looks better, thanks, but this needs to cooperate between clients so each client must know about each other to count them and subscribe.
I'm propose ability to do this in server, because it already known number of subscribers for topic.

from jetstream.

ripienaar avatar ripienaar commented on June 12, 2024

You’re basically discovering the problems with this pattern. You have state in your workers at a place you don’t want to manage. We don’t recommend your pattern for these reasons and neither do typical micro services best practices.

Now about my example you won’t have them in a for loop in reality. You won’t have to restart all. As an example instead of this you make a cluster of worker micro services that take as a configuration which partition they manage. You have many of them for horizontal scaling and as with any micro service your platform restarts then independently

This way there is no state and you don’t need to do much of the problems you mention.

So what’s left is the 4 lines devoted to picking the partition. We don’t touch the payload what of you publish JSON. Someone YAML and someone else compressed base64 encoded JSON. We can’t support all this and we won’t limit you. The problem of picking a partition is so minimal you don’t gain much from a helper from us.

The problems of how to handle a failing consumer is present with or without a helper from us, how to run those manage them configure them etc - all present with or without a helper from us. You can see in the example you linked to you HAVE to know how many partiotions there and you have to know what partition your consumer belongs to and you have to also handle the “what if a consumer fails?” question.

It’s not really that much harder to maintain a tiny function for picking a partition

from jetstream.

vtolstov avatar vtolstov commented on June 12, 2024

ok, maybe you are right and i'm not...
last question before try =) - how can i run jetstream via embed in my app (like nats or nats-streaming server) ? i want to create test stand and try it with go-micro (as i'm already write stan broker for it, i think that i'm can write and jetstream stuff too)

from jetstream.

ripienaar avatar ripienaar commented on June 12, 2024

func startJSServer(t *testing.T) (*natsd.Server, *nats.Conn) {
t.Helper()
d, err := ioutil.TempDir("", "jstest")
if err != nil {
t.Fatalf("temp dir could not be made: %s", err)
}
opts := &natsd.Options{
JetStream: true,
StoreDir: d,
Port: -1,
Host: "localhost",
}
s, err := natsd.NewServer(opts)
if err != nil {
t.Fatal("server start failed: ", err)
}
go s.Start()
if !s.ReadyForConnections(10 * time.Second) {
t.Error("nats server did not start")
}

from jetstream.

vtolstov avatar vtolstov commented on June 12, 2024

last question for you minimal partition example - what you recommend to do if i know that for example consumer that listens 103.5 is down and not started for some time. I'm have messages in this topic. What is the best practice - start additional consumer in already worked service, or republish messages to different topic based on new partitions count?

from jetstream.

ripienaar avatar ripienaar commented on June 12, 2024

Since your requirement is specifically to handle P5 using consumer 5 - you have only really one option and that is to start it. Other approaches fundamentally break you requirement.

In my example I used ephemeral consumers but you would crest a durable consumer in the real case so that later when a new partition 5 consumer comes back it would automatically continue where the crashed one stopped.

I don’t see how republishing messages would honour the basic constraint you set of all messages for a specific partition must always be handled by the same worker - it’s a fundamentally bad idea but if that’s what you need you have to arrange that each consumer runs supervised and restarted. Something docker auto restart or kubernetes reschedules or even systemd restarts all handle that fine.

You should also be sure to set up max delivery attempts so if there is a message that repeatedly crashes a consumer we will stop delivering they message after, for example, 5 tries.

from jetstream.

vtolstov avatar vtolstov commented on June 12, 2024

Since your requirement is specifically to handle P5 using consumer 5 - you have only really one option and that is to start it

not so, if p5 consumer dies i can run it 5 partition with another service, for example p4 (decision by some hashing or so) my requirement that events for one user must processed by single (not matter which) consumer.

from jetstream.

ripienaar avatar ripienaar commented on June 12, 2024

But if consumer 5 is down mid way processing a users data. Then you can’t have consumer 4 pick up where 5 left off. And you will never know if they has happened.

The only thing you can do is restart consumer 5. Else you really don’t need any of this partitions if at some times it’s fine for messages to hop partitions.

Partitioned data is a bad idea for the reasons you are discovering :)

from jetstream.

vtolstov avatar vtolstov commented on June 12, 2024

but why? if consumer 5 is dies it not ack some messages (ack of message happens after transaction to db completes) so i know that specific message processes and data in db. If consumer 5 dies after commit, but before ack - this is not problem, i'm skip on next iteration such message.
And why other consumer can't subscribe to p5 and consume messages from it ? if they specify last received flag

from jetstream.

ripienaar avatar ripienaar commented on June 12, 2024

You requirement is: Only 1 specific consumer should get all messages for a specific partition.

So why can't another handle the messages? Because your requirements says they cannot.

With a durable consumer of course any thing can decide to subscribe to that messages, so you're welcome to do what you say with consumer 4 - I am just saying it breaks your requirement. And if you can break your requirement then, then in fact you dont have the requirement at all.

from jetstream.

vtolstov avatar vtolstov commented on June 12, 2024

may be my english bad, but my requirement not to stick always p5 messages to 5 consumer. But always distribute messages between available consumers in consistent way.

from jetstream.

ripienaar avatar ripienaar commented on June 12, 2024

I missed where you earlier said rebalancing after some time is ok - however we don't support automatic rebalancing or failover consumers - though I know failover consumers is something Derek do have in mind, but I dont think that would manifest as consumer 4 - who is consuming partition 4 - would later get consumer 5 messages just because 5 i gone for some time, it would probably be more a standby consumer for 5 only.

As I said earlier though, you can program some thing where you let 4 take also 5 traffic by subscribing additionally to the consumers topic but you'd need to code that.

The liftbridge example also does not support it, I showed you how to do exactly what their partition key behavior does since you said thats what you need

from jetstream.

vtolstov avatar vtolstov commented on June 12, 2024

thanks! i'm try to code something and check how it deals.
P.S. how much broken jetstream to use it in not production, but without data loss guarantee ?=)

from jetstream.

derekcollison avatar derekcollison commented on June 12, 2024

JetStream is in tech preview and things may change but is mostly feature complete, however only single server for now. It will recover messages stored to the filestore properly etc.

Please give us feedback as you begin to work with it.

from jetstream.

vtolstov avatar vtolstov commented on June 12, 2024

@vtolstov I think we understand now what you are asking. We call these exclusive receivers, meaning there could be multiple but one is selected to receive all the messages until it goes away and then another subscriber will be selected.
You say something like hot standby. I mean all active, but each have exclusive for it partition. With ability to rebalance after consumer bring up or goes down.

from jetstream.

vtolstov avatar vtolstov commented on June 12, 2024

So if you provide hook on subscribe and on onsubscribe this will be cool. Something like technical events from nats. What you think?

from jetstream.

derekcollison avatar derekcollison commented on June 12, 2024

We have that functionality internal to the server for JetStream now but we do not generate external events as of now for the transition from 0->1 and from 1->0. It is also not service import aware at the moment, but we plan to add that.

However I am not sure that would solve the problem you are looking to solve.

from jetstream.

vtolstov avatar vtolstov commented on June 12, 2024

@derekcollison sorry, where i can mini how to how to use jetstream capabilities with go client? I'm plan to test it in real world scenarios and also write go-micro broker

from jetstream.

derekcollison avatar derekcollison commented on June 12, 2024

We have not put any of the syntactic sugar into the Go client yet, so would be all raw interactions. @ripienaar may have some suggestions, he has written the most by far amount of client code for JetStream to date.

from jetstream.

vtolstov avatar vtolstov commented on June 12, 2024

ok, does nats have some limits on topic name and it len? For example i have notify services that differs by backend like telegram/slack/email... so if i have topic notify and want to send message to specific backend
My use-case replace rpc in my 30 services with pub/sub. rpc have ability to filter such services via labels (map[string]string). So if i want to replace it i need ability to filter stream from services ...

from jetstream.

vtolstov avatar vtolstov commented on June 12, 2024

and i don't want to subscribe to 100 topics from each services for each label =)

from jetstream.

derekcollison avatar derekcollison commented on June 12, 2024

If the label is part of the subject you can have a jetstream consumer filter on a subject or subject pattern that you like.

Can you describe some basic message flows you want to support?

from jetstream.

vtolstov avatar vtolstov commented on June 12, 2024

I'm worry about encoding all labels in subject. Now it simple, but i don't know about feature.

I'm try to improve my "saga" orchestrator for go-micro.
I have services like account, instance, sshkey, notify, project, nework, volume, object, flavour, zone...

All services on start register dependencies on other services and for DAG.
Most of the services depends on account, so when user registers i'm form account create request, encode it to []byte slice and pass to orchestrator. Orchestrator pass messages to services in order (DAG vertex iteration from top to bottom, some of the in parallel). Last step for user registration - notify via email or phone. Method depends on entered data in registration step. Notify service don't know about backend service for notify. It only pass notify message to topic (and i want to filter messages in consumers, so email provider service gets all emails, sms gets all phone messages and so).
Also for instance i have providers like container or vm....
Also if i have more then one zone - i want to be able to filter messages based on zone label or host...
So if i'm encode all of this stuff to subject it can be really big =)
May be i can create DAG vertex for this deps with all labels variants but this is too expensive...

Now i have rpc that semi-async calls services, but i think that this is not right design when count of messages grows or i need to connect to other datacenters

from jetstream.

derekcollison avatar derekcollison commented on June 12, 2024

Although there are possibly many labels, these would only represent one token in the subject you want to filter.

e.g.
SAGA.SVC. etc

SVC could be account, instance, sshkey like you said above.

You could then create one stream for all events and have filtered consumers (and normal consumers as well) or you could use stream templates to have streams created on the fly.

from jetstream.

vtolstov avatar vtolstov commented on June 12, 2024

I'm play with partitioned example and have some issues:

  1. when some subscriber dies , messages from partition not handled by another subscribers because they filtered by subject. How to handle such case? In kafka on rebalance event i'm get new partitions for consumer group. So client (subscriber) not need to know about cluster. It handles only assigned partitions. Why not add ability to specify for jetstream server function that assigns partitions based on subscribers?

from jetstream.

derekcollison avatar derekcollison commented on June 12, 2024

Two ways to handle, and which one depends on your exact use case.

  1. You can use as many pull based consumers and they will effectively load balance between themselves as they request new messages. You can add and remove new ones at will.

  2. Create a single push based consumer but create multiple queue based subscriptions on the delivery subject. You can pick a well known subject for delivery that all apps know about a-priori.

from jetstream.

quintans avatar quintans commented on June 12, 2024

Hi @derekcollison. I was looking for the same functionality described by @vtolstov and I don't think this is the same as exclusive receivers.
If I understand correctly, in exclusive receivers one consumer is selected to consume all the events.

What we want is, considering two consumers A and B, is that some messages go to consumer A and others go to consumer B. The rule to balance would be based on the hash of a key (eg: aggregate ID), provided when publishing. So, messages published with the same key hash would go to the same consumer. When a rebalancing happens due to the change of the number of consumers, another consumer might be chosen.
In my case, my requirements are:

  • Messages with the same key from the same producer should be consumed in order.
    To guarantee the order messages for the same key should be handled by the same consumer.

Looking it naively it seems a straight forward implementation. Using something like consistent hashing applied to a key, provided by when publishing, the NATS serve could easily route the message to corresponding consumer.
I would say that the NATS server seems better positioned to this balancing than a client, because it already tracks the number of consumers.
I would say that this is a very useful feature for event sourcing+CQRS.

What do you think? Is there an interest in implement this functionality?

from jetstream.

derekcollison avatar derekcollison commented on June 12, 2024

You could put the key as a token in the subject. We would still need to only send messages to one consumer though which we do not currently support. We are investigating though for sure.

from jetstream.

ripienaar avatar ripienaar commented on June 12, 2024

Closing as JS is now GA, we have this on roadmap

from jetstream.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.