Coder Social home page Coder Social logo

cannot cancel watch about dotnet-etcd HOT 18 OPEN

shubhamranjan avatar shubhamranjan commented on July 24, 2024
cannot cancel watch

from dotnet-etcd.

Comments (18)

fofewyoung avatar fofewyoung commented on July 24, 2024

Hi, May I check if you read my questions please?
Maybe I know the reason, but i have no idea to fix it simply,
in dotnet-etcd, each watch operation has its own gRPC stream,
but cancel watch operation should use the streaming of watch.
cancel operation works after i hack the code to let all watch* share a single streaming.
Could you please help me deal with it?

from dotnet-etcd.

shubhamranjan avatar shubhamranjan commented on July 24, 2024

Hi, May I check if you read my questions please?
Maybe I know the reason, but i have no idea to fix it simply,
in dotnet-etcd, each watch operation has its own gRPC stream,
but cancel watch operation should use the streaming of watch.
cancel operation works after i hack the code to let all watch* share a single streaming.
Could you please help me deal with it?

Hi, I have read your question. Will get back once I reproduce the issue and if a problem exists, will fix it.

Do let me know about your findings and how did you modify the code to get it working.

from dotnet-etcd.

shubhamranjan avatar shubhamranjan commented on July 24, 2024

but cancel watch operation should use the streaming of watch.

Strange. What would be the use of watchId then. Need to dig a bit more in this.

from dotnet-etcd.

fofewyoung avatar fofewyoung commented on July 24, 2024

hi, this is my hack code :
it is not thread safe, and not graceful

It should be noted that Annotated code:
// await watcher.RequestStream.CompleteAsync();

 public partial class EtcdClient : IDisposable
 {
        // the share streaming
        AsyncDuplexStreamingCall<WatchRequest, WatchResponse> myWatchStreaming = null;
        AsyncDuplexStreamingCall<WatchRequest, WatchResponse> GetWatchStreaming()
        {
            if (myWatchStreaming == null)
                myWatchStreaming = _balancer.GetConnection().watchClient.Watch(null);

            return myWatchStreaming;
        }
        
         // unwatch operation
        public async void UnWatch(long wid)
        {
            WatchRequest req = new WatchRequest
            {
                CancelRequest = new WatchCancelRequest
                {
                    WatchId = wid
                }
            };

            await GetWatchStreaming().RequestStream.WriteAsync(req);
        }

        #region Watch Key
        /// <summary>
        /// Watches a key according to the specified watch request and
        /// passes the watch response to the method provided.
        /// </summary>
        /// <param name="request">Watch Request containing key to be watched</param>
        /// <param name="method">Method to which watch response should be passed on</param>
        public async void Watch(WatchRequest request, Action<WatchResponse> method, Metadata headers = null)
        {
            bool success = false;
            int retryCount = 0;
            while (!success)
            {
                try
                {
                    var watcher = GetWatchStreaming();
                    {
                        Task watcherTask = Task.Run(async () =>
                        {
                            while (await watcher.ResponseStream.MoveNext())
                            {
                                WatchResponse update = watcher.ResponseStream.Current;
                                if (update.Canceled)
                                    break;

                                method(update);
                            }
                        });

                        await watcher.RequestStream.WriteAsync(request);
                        //await watcher.RequestStream.CompleteAsync();    // prevent  streaming to be closed
                        await watcherTask;
                    }
                    success = true;
                }
                catch (RpcException ex) when (ex.StatusCode == StatusCode.Unavailable)
                {
                    retryCount++;
                    if (retryCount >= _balancer._numNodes)
                    {
                        throw ex;
                    }
                }
            }

        }
 }

from dotnet-etcd.

shubhamranjan avatar shubhamranjan commented on July 24, 2024

Thanks. I will look into this. A bit of a busy week, may not be able to come up with a solution soon enough.

from dotnet-etcd.

shubhamranjan avatar shubhamranjan commented on July 24, 2024

Update: I was able to replicate the issue. Still working on the solution.

from dotnet-etcd.

shubhamranjan avatar shubhamranjan commented on July 24, 2024

Lets take an example where we have an etcd cluster and an application using this client. Suppose I have 3 instances of my application behind a load balancer and my application connects to etcd using this client. Now if I a make an api call to my application via loadbalancer for cancel watch request, I would never be able to gurantee for the request to land on the same server from where the watch was started making it difficult to cancel the watch. Will have to look into it why the watch ID is not being honored by etcd.

I will take this up ahead with etcd team.

from dotnet-etcd.

yoricksmeets avatar yoricksmeets commented on July 24, 2024

The watchId exists to manage different watches on the same stream to the server and is only scoped to that stream. See this discussion here.

What I assume is that if you close the response stream (or maybe both request and response stream but in this client the request stream is always closed after sending the initial requests) the watch is also cancelled, but I could not confirm this assumption yet.

from dotnet-etcd.

shubhamranjan avatar shubhamranjan commented on July 24, 2024

So probably, we will have to maintain request streams on our end. Will try to re-design the watch implementation here to persist request streams. The discussion also states that each stream may share the same watch ID which becomes a bit difficult to manage on client side.

from dotnet-etcd.

yoricksmeets avatar yoricksmeets commented on July 24, 2024

@shubhamranjan I'm working on a watch manager to use in our distributed locking client. I can share the code when I have progressed a bit more. You could get some inspiration from it or we could adjust it to be included in this library if you think it fits the scope of this library.

The basic idea is as follows (WM = Watch Manager, WMS = Watch Manager Subscription):

  • The WM is created from a etcdclient instance and the WM will use this etcdclient instance to get AsyncDuplexStreamingCall for its request and response stream
  • The WM can issue WMS that are tracked by the WM
  • The WM will issue unique watchIds for all WMS issued by this WM
  • The WM creates a task that reads the response stream in a seperate task and dispatches the responses to the correct WMS based on the watchId
  • The WM will respond to any exceptions in the response stream reading task, if the stream fails the WM will resubscribe all WMS that it tracks on a newly created channel
  • WMS will implement IDisposable: disposing the WMS will cancel the subscription
  • WM will implement IDisposable: disposing the WM will close the grpc channel and dispose all WMS it was tracking
  • The etcd client will track all issued WMs and will dispose them in its own Dispose function

I'm not sure if a WMS needs to track versions of the keys it receives so when the WM needs to create a new stream and resubscribe it knows what the start version is that it needs to receive, making sure the WMS only receives every version once even if we need to create a new channel.

A developer now can get a WatchManger that can issue multiple Subscriptions using a single duplex gprc stream. A Subscription will manage its own lifetime and by disposing the Subscription object the watch will be cancelled on the etcd server.

from dotnet-etcd.

shubhamranjan avatar shubhamranjan commented on July 24, 2024

The WM will issue unique watchIds for all WMS issued by this WM.

I would like to see how you plan to achieve this in case of same etcd watch ID(s) issued in different streams. Because in case of connection exceptions, I believe the stream would be recreated and etcd issued watch ID(s) won't be valid anymore for the new stream.

Looks fine to me if its included in this library. Would make the watch client better.

from dotnet-etcd.

yoricksmeets avatar yoricksmeets commented on July 24, 2024

I plan on making the watchId of the WM increment only (the watchId is a long so I don't think we will run out). Because the WM manages both the stream to the etcd server and the watchId-counter it can guarantee that watchIds are unique for the specific stream that it manages. Because the etcd server scopes the watchIds to a specific stream we only need to guarantee the uniqueness of the watchId in combination with a specific stream to the etcd server.

The WMS will store the watch creation data inside its object so the WM can issue new WatchCreateRequests on behalf of the WMS to restore its watch after a stream has been recreated. The recreated stream will be a new stream to the etcd server with no watches (and therefor no watchIds) linked to this stream. Because the WM will only increment the watchId I can re-use the old watchIds in the new WatchCreateRequests to create new watches with the same watchId.

Note: 2 different WM instances can issue the same watchId but that is not a problem since they would also manage their own stream to the server, so the guarantee is only needed per WM instance.

Note2: Because I keep the watch creation data in the WMS and will have to send new WatchCreateRequests when restoring the connection I think the WMS will have to store the last seen update version so we can issue a correct start version in the new WatchCreateRequests and have the same behaviour as a stream that was not recreated where all updates are delivered exactly once. For single keys this seems do-able since the WMS will receive every update for that watch from the WM but for ranged watches I'm not sure how that would work because I did not see an option to issue start versions for individual keys within a keyrange creation request.

from dotnet-etcd.

shubhamranjan avatar shubhamranjan commented on July 24, 2024

For single keys this seems do-able since the WMS will receive every update for that watch from the WM but for ranged watches I'm not sure how that would work because I did not see an option to issue start versions for individual keys within a keyrange creation request.

Watch and WatchRange internally are the same api(s). The revision specified in the request should mean something whether its an individual key or a range. However, I will try these cases out on the weekend and check on the behavior.

from dotnet-etcd.

yoricksmeets avatar yoricksmeets commented on July 24, 2024

That would be good news and should simplify the rebuild of the channel. I will start with just supporting an exact key for this moment and if that is working I will look at the WatchRange.

I have included my fork of this client as submodule in our solution so I can test its behaviour before making the PR to this repository. It will go through manual testing for our use cases and I'm hoping to add some integration-tests. These test would need a running etcd server as asserting and mocking a etcd server at the grpc network level is not something I'm planning to do in the foreseeable future.

from dotnet-etcd.

shubhamranjan avatar shubhamranjan commented on July 24, 2024

Yeah, Automation of test cases is long pending here. I should start that soon enough now.

from dotnet-etcd.

kaflake avatar kaflake commented on July 24, 2024

Is there still a solution for the cancelation problem?

from dotnet-etcd.

lapinbleu007 avatar lapinbleu007 commented on July 24, 2024

Hi guys, what are the consequenses if we watch variables without canceling the watch? Also, what happens if we watch the same variable again with the same id?

from dotnet-etcd.

shubhamranjan avatar shubhamranjan commented on July 24, 2024

@lapinbleu007 - For now, nothing is handled at the client level, the requests are passed as is to the etcd server, So each watch request would basically be a new watch request.

from dotnet-etcd.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.