Coder Social home page Coder Social logo

fred.rs's People

Contributors

aembke avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

fred.rs's Issues

Connection management issues

Hi - Thank you for a great library!

We have been experiencing some connection management issues (including using latest version 5.2.0).
Using fred as a redis client in a kubernetes cluster (redis is self-hosted in k8s) - if the redis instance is moved to another node, we end up in a corrupted state with connection management.

This can simply be reproduced doing kubectl scale deploy/redis --replicas 0.

We have tested with/without pipelining. With/without connection pool. Config is using exponential reconnect policy + command timeout. This is a trace of the events.

Initially we get a connection - everything is fine:

 DEBUG fred::multiplexer::commands > fred-JUKIBXKdhV: Initializing connections...
 TRACE fred::protocol::types       > fred-JUKIBXKdhV: Using 10.0.78.183 among 1 possible socket addresses for redis:6379
 TRACE fred::multiplexer::utils    > fred-JUKIBXKdhV: Connecting to 10.0.78.183:6379
 TRACE mio::poll                   > registering event source with poller: token=Token(0), interests=READABLE | WRITABLE
 DEBUG fred::protocol::connection  > fred-JUKIBXKdhV: Skip setting client name.
 TRACE redis_protocol::utils       > allocating more, len: 0, amt: 24
 TRACE fred::protocol::codec       > fred-JUKIBXKdhV: Encoded 24 bytes to 10.0.78.183:6379. Buffer len: 24 (RESP2)
 TRACE tokio_util::codec::framed_impl > flushing framed transport
 TRACE tokio_util::codec::framed_impl > writing; remaining=24
 TRACE tokio_util::codec::framed_impl > framed transport flushed
 TRACE tokio_util::codec::framed_impl > attempting to decode a frame
 TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Recv 4 bytes from 10.0.78.183:6379 (RESP2).
 TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Parsed 4 bytes from 10.0.78.183:6379
 TRACE tokio_util::codec::framed_impl > frame decoded from buffer
 DEBUG fred::protocol::connection     > fred-JUKIBXKdhV: Read client ID: Resp2(Integer(7))
 DEBUG fred::multiplexer::utils       > fred-JUKIBXKdhV: Creating new close tx sender.
 DEBUG fred::multiplexer::utils       > fred-JUKIBXKdhV: Set centralized connection closed sender.
 DEBUG fred::multiplexer::utils       > fred-JUKIBXKdhV: Emitting connect message.

Now we scale down the deployment:

kubectl scale deploy/redis --replicas 0

Fred realizes that the connection is lost:

 DEBUG fred::multiplexer::commands    > fred-JUKIBXKdhV: Starting command stream...
 DEBUG fred::multiplexer::commands    > fred-JUKIBXKdhV: Set inner connection closed sender.
 TRACE tokio_util::codec::framed_impl > attempting to decode a frame
 TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Recv 0 bytes from 10.0.78.183:6379 (RESP2).
 TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Recv 0 bytes from 10.0.78.183:6379 (RESP2).
 DEBUG fred::multiplexer::utils       > fred-JUKIBXKdhV: Redis frame stream closed with error Redis Error - kind: Canceled, details: Canceled.
 DEBUG fred::multiplexer::utils       > fred-JUKIBXKdhV: Emit connection closed from error: Redis Error - kind: Canceled, details: Canceled.
 TRACE fred::multiplexer::utils       > fred-JUKIBXKdhV: Emitting connection closed with 0 messages
 DEBUG fred::multiplexer::commands    > fred-JUKIBXKdhV: Recv reconnect message with 0 commands. State: Disconnected
 INFO  fred::multiplexer::commands    > fred-JUKIBXKdhV: Sleeping for 132 ms before reconnecting
 TRACE fred::protocol::types          > fred-JUKIBXKdhV: Using 10.0.78.183 among 1 possible socket addresses for redis:6379
 TRACE fred::multiplexer::utils       > fred-JUKIBXKdhV: Connecting to 10.0.78.183:6379
 TRACE mio::poll                      > registering event source with poller: token=Token(1), interests=READABLE | WRITABLE

Notice at this time there are no pods running redis, so the reconnect will just hang.
Now we attempt to execute a command against redis, using a client from the connection pool:

 TRACE fred::multiplexer::commands    > fred-JUKIBXKdhV: Recv command on multiplexer SET. Buffer len: 0
 DEBUG fred::multiplexer::commands    > fred-JUKIBXKdhV: Will block multiplexer loop waiting on SET to finish.
 TRACE fred::multiplexer              > fred-JUKIBXKdhV: Skip waiting on cluster sync.
 DEBUG fred::multiplexer::utils       > fred-JUKIBXKdhV: Writing command SET to redis:6379
 TRACE fred::protocol::connection     > fred-JUKIBXKdhV: Sending command and flushing the sink.
 TRACE redis_protocol::utils          > allocating more, len: 0, amt: 40
 TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Encoded 40 bytes to 10.0.78.183:6379. Buffer len: 40 (RESP2)
 TRACE tokio_util::codec::framed_impl > flushing framed transport
 TRACE tokio_util::codec::framed_impl > writing; remaining=40
 WARN  fred::multiplexer::commands    > fred-JUKIBXKdhV: Error writing command None: Redis Error - kind: IO, details: Os { code: 32, kind: BrokenPipe, message: "Broken pipe" }
 DEBUG fred::multiplexer::commands    > fred-JUKIBXKdhV: Reconnecting or stopping due to error: Redis Error - kind: IO, details: Os { code: 32, kind: BrokenPipe, message: "Broken pipe" }
 DEBUG fred::multiplexer::utils       > fred-JUKIBXKdhV: Emitting close all sockets message: Redis Error - kind: IO, details: Os { code: 32, kind: BrokenPipe, message: "Broken pipe" }
 WARN  fred::multiplexer::utils       > fred-JUKIBXKdhV: Error sending close message to socket streams: SendError(Redis Error - kind: IO, details: Os { code: 32, kind: BrokenPipe, message: "Broken pipe" })
 DEBUG fred::multiplexer::commands    > fred-JUKIBXKdhV: Waiting for client to reconnect...
resp Err(Redis Error - kind: Timeout, details: Request timed out.)

Next we scale up redis:

kubectl scale deploy/redis --replicas 1

After a while the connection pool will reconnect:

 DEBUG fred::protocol::connection     > fred-JUKIBXKdhV: Skip setting client name.
 TRACE redis_protocol::utils          > allocating more, len: 0, amt: 24
 TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Encoded 24 bytes to 10.0.78.183:6379. Buffer len: 24 (RESP2)
 TRACE tokio_util::codec::framed_impl > flushing framed transport
 TRACE tokio_util::codec::framed_impl > writing; remaining=24
 TRACE tokio_util::codec::framed_impl > framed transport flushed
 TRACE tokio_util::codec::framed_impl > attempting to decode a frame
 TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Recv 4 bytes from 10.0.78.183:6379 (RESP2).
 TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Parsed 4 bytes from 10.0.78.183:6379
 TRACE tokio_util::codec::framed_impl > frame decoded from buffer
 DEBUG fred::protocol::connection     > fred-JUKIBXKdhV: Read client ID: Resp2(Integer(3))
 DEBUG fred::multiplexer::utils       > fred-JUKIBXKdhV: Set centralized connection closed sender.
 TRACE mio::poll                      > deregistering event source from poller
 DEBUG fred::multiplexer::commands    > fred-JUKIBXKdhV: Reconnect task finished reconnecting or syncing with: Ok(())
 DEBUG fred::multiplexer::commands    > fred-JUKIBXKdhV: Sending 0 commands after reconnecting.
 DEBUG fred::multiplexer::utils       > fred-JUKIBXKdhV: Emitting connect message.
 TRACE tokio_util::codec::framed_impl > attempting to decode a frame
 TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Recv 0 bytes from 10.0.78.183:6379 (RESP2).

And now we have the problem - if we try to execute a command using the client from the connection pool:

 TRACE fred::multiplexer::commands    > fred-JUKIBXKdhV: Recv command on multiplexer SET. Buffer len: 0
 DEBUG fred::multiplexer::commands    > fred-JUKIBXKdhV: Will block multiplexer loop waiting on SET to finish.
 TRACE fred::multiplexer              > fred-JUKIBXKdhV: Skip waiting on cluster sync.
 DEBUG fred::multiplexer::utils       > fred-JUKIBXKdhV: Writing command SET to redis:6379
 TRACE fred::protocol::connection     > fred-JUKIBXKdhV: Sending command and flushing the sink.
 TRACE redis_protocol::utils          > allocating more, len: 0, amt: 40
 TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Encoded 40 bytes to 10.0.78.183:6379. Buffer len: 40 (RESP2)
 TRACE tokio_util::codec::framed_impl > flushing framed transport
 TRACE tokio_util::codec::framed_impl > writing; remaining=40
 TRACE tokio_util::codec::framed_impl > framed transport flushed
 DEBUG fred::multiplexer::commands    > fred-JUKIBXKdhV: Waiting on last request to finish without pipelining.
 TRACE tokio_util::codec::framed_impl > attempting to decode a frame
 TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Recv 5 bytes from 10.0.78.183:6379 (RESP2).
 TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Parsed 5 bytes from 10.0.78.183:6379
 TRACE tokio_util::codec::framed_impl > frame decoded from buffer
 TRACE fred::multiplexer::responses   > fred-JUKIBXKdhV: Processing response from redis:6379 to SET with frame kind SimpleString
 TRACE fred::multiplexer::responses   > fred-JUKIBXKdhV: Writing to multiplexer sender to unblock command loop.
 WARN  fred::multiplexer::responses   > fred-JUKIBXKdhV: Error sending cmd loop response: ()
 TRACE fred::multiplexer::responses   > fred-JUKIBXKdhV: Responding to caller for SET
 WARN  fred::multiplexer::responses   > fred-JUKIBXKdhV: Failed to respond to caller.
 TRACE tokio_util::codec::framed_impl > attempting to decode a frame
 TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Recv 0 bytes from 10.0.78.183:6379 (RESP2).
resp Err(Redis Error - kind: Timeout, details: Request timed out.)

The command is actually executed (we can verify this using redis-cli and checking that the key has been SET in the redis cluster.). All following requests using this client will now result in timeouts (although the command is in fact executed!) .

We use tokio v1.20.1 for reference.

If we use a headless service in k8s, we do not experience this problem (as the DNS will resolve with 0 ip-addresses when the deployment is scaled to 0 replicas, and resolve the new ip-address once the pod has been moved to the other node).

Using RedisPool in actix-web fails

actix-web = "4.2.1"
fred = "5.2.0"

Using RedisPoo in actix-web will prompt that method cannot be found, such an error

error[E0599]: no method named `set` found for struct `RedisPool` in the current scope
  --> src/service/user.rs:58:29
   |
58 |                 let _= pool.set("foo123", "1").await.unwrap();
   |                             ^^^ method not found in `RedisPool`

Running this before HttpServer::new works, but inside Request Handlers this error occurs

Change log print format [Bug]

Redis version - 6.1.0
Platform - linux|mac|windows
Using Docker and/or Kubernetes - yes|no
Deployment type - cluster|sentinel|centralized

hi,developers,I found the error log from reconnecting failed is debug format , that means I open info mode, I cannot realize this error when the redis is disconnected.

To Reproduce
Steps to reproduce the behavior:
if let Err(e) = router.connect().await {
_debug!(inner, "Failed reconnecting with error: {:?}", e);
client_utils::set_client_state(&inner.state, ClientState::Disconnected);
inner.notifications.broadcast_error(e.clone());
Err(e)
}

Logs
(Set RUST_LOG=fred=trace and run with --features debug-ids)

Additional context
I hope this _debug mode can be set to _error,thanks a lot

[Bug] Support multiple IPs result in DNS resolver

Redis version - doesn't matter
Platform - doesn't matter
Using Docker and/or Kubernetes - no
Deployment type - cluster

Describe the bug
DNS might contain multiple records, but current interface of DnsResolver expects only single address.

Don't think that this functionality should be part of implementation of DnsResolver, instead trait should be changed to return slice/vector of addresses. For example, reqwest crate has such definition of the resolver and example of handling

To Reproduce
Steps to reproduce the behavior:

  1. It should be more than 1 A or AAAA records for domain
  2. One of them doesn't work (during failover for example)

Logs
It's just connection errors, because dns-resolver can provide only one IP

[Bug?] Using `info replication` for replica discovery yields private ips

Redis version - 6.0.5 via ElastiCache
Platform - n/a
Using Docker and/or Kubernetes - n/an
Deployment type - cluster

Describe the bug
When connecting to replicas fred parses the response of the info replication command to find the replicas for a given node in the cluster [relevant pointer].

For ElastiCache (and possibly others), when compared to the cluster slots command, this yields at best, a different IP address and at worse an address that mightn't be accessible.

To Reproduce
Problematically I think this might rely on an AWS setup thing, rather than generalizable outside.

Output of cluster slots:

...
xx) 1) (integer) 16242                               
    2) (integer) 16383                               
    3) 1) "10.210.aaa.bbb"                           
       2) (integer) 6379                             
       3) "1a13137169931ad8db388fae36e96b9fbb84b672" 
    4) 1) "10.210.ccc.ddd"                            
       2) (integer) 6379                             
       3) "a57886a0ea4123a2a63a32f4c907b6c90f84d06d" 

From the master side:

❯ timeout 3 redis-cli -h 10.210.aaa.bbb -p 6379 info replication
# Replication
role:master
connected_slaves:1
slave0:ip=10.31.yyy.zzz,port=6379,state=online,offset=5541916556922,lag=1
...

And from the replica side using the ip from cluster slots:

❯ timeout 3 redis-cli -h 10.210.ccc.ddd -p 6379 info replication
# Replication
role:slave
master_host:10.210.aaa.bbb
master_port:6379
...

... and using the ip from the info replication output:

❯ timeout 3 redis-cli -h 10.31.yyy.zzz -p 6379 info replication
❯ echo $?
124

Logs
(Set RUST_LOG=fred=trace and run with --features debug-ids)
Can provide.

Additional context
Add any other context about the problem here.

Incorrect Cluster Failover Behavior

The library behaves incorrectly in a Redis cluster failover scenario.

Reproduce scenario:

  1. Run a Redis cluster in docker with failover
  2. Connect to the cluster
  3. Pause master container with data
  4. Wait for the cluster failover mechanism sync the nodes and rehash slots
  5. The library can no longer read and write data to the node, because reinitialization of slots and nodes does not occur. There is no way (public methods) to re-sync the cluster state and slots on the client side.

Problems:

  • There is no background monitoring of the cluster state on the client side. Cluster reconfiguration occurs only when the connection is explicitly disconnected or when the answer is "MOVED".
  • Timeout does not trigger reconnect.

Bottom line: If the connection to the master node of the cluster is lost, the library endlessly unsuccessfully tries to connect to it. This does not reconfigure the cluster and slots on the client side

[Bug] Not receiving invalidation messages on RedisPool

Redis version - 7.0.5
Platform - mac
Using Docker and/or Kubernetes - no
Deployment type - centralized (pool)
fred.rs: 6.3.0

Invalidation messages are received with RedisClient but not with RedisPool.

The following code works:

    // redis_config - server: centralised(localhost), version: resp3, all else default

    let basic = RedisClient::new(redis_config.clone(), None, None);
    let _ = basic.connect();
    basic.wait_for_connect().await?;

    basic
        .start_tracking(
            vec!["data"], // PREFIX
            true,         // BCAST
            false,        // OPTIN  (not compatible with BCAST)
            false,        // OPTOUT (not compatible with BCAST)
            false,        // NOLOOP (doesn't matter, we never modify keys)
        )
        .await?;

    {
        let mut invalidations = basic.on_invalidation();
        tokio::spawn(async move {
            while let Ok(invalidation) = invalidations.recv().await {
                println!("invalidation event");
            }
        });
    }

This prints "invalidation event" when you change a key starting with "data".

However, replace the first line to use RedisPool and it no longer prints anything.

    let basic = RedisPool::new(redis_config.clone(), None, None, 5)?;

Logs
I didn't get any log output 🤔

about how to set key-value

fred_error_1
i write the code like the example, but it error, i do not know why
fred_error_2
now, match it , it said:
--> src\controller.rs:87:18
|
87 | match client.set(key, val, None, None, false).await {
| ^^^ cannot infer type for type parameter R declared on the associated function set

i look the code R is FromRedis, It has been implemented for several types
i do not know how to fix it

Behavior during reconnect

Wanted to check some behavior we're seeing when we have a node in our cluster fails over, or loses connection.

For context, we run what is essentially a proxy service for many different storage backends, taking requests for items, modifying and routing them to the correct backend.

We see three things we wanted to check --

  1. Connections are re-established to all nodes, not just the failed node
    We see this manifest with logs (from the on_error channel) for each node of the form Ending replica reader task from xxx.xxx.xxx.xxx:6379 due to None, with the original first error being related to the first connection dropping.

  2. During a connection failover we see the memory usage of the containers for our service increase over time, often with two phases (highlighted example).

image 240226962-dc212736-630c-4e63-b569-fd805bd01d86
  1. During these periods of connection issues we see every request hitting our internal timeouts (using tokio's timeout with a setting about 20ms higher than fred's PerformanceConfig.default_command_timeout_ms), e.g. ->
image

(The yellow and purple lines are for the same underlying redis cluster, using amazon's data-tiering cache product, while the blue is a regular memory redis (also via AWS elasticache).

The settings that we're using are mostly defaulted...

   let mut redis_config = RedisConfig::from_url(&format!("redis-cluster://{}", host))     
       .expect("Could not create `RedisConfig`");                                         
   redis_config.replica = fred::types::ReplicaConfig {                                              
       lazy_connections: false,                                                                     
       connection_error_count: 1,                                                                   
       ..Default::default()                                                                         
   };                                                                                               
   let performance_config = fred::types::PerformanceConfig {                                        
       default_command_timeout_ms: 50,                                                             
       max_command_attempts: 1,                                                                     
       backpressure: fred::types::BackpressureConfig {                                              
           disable_auto_backpressure: true,                                                         
           ..Default::default()                                                                     
       },                                                                                           
       ..Default::default()                                                                         
   };                                                                                               
   let reconnect_policy = fred::types::ReconnectPolicy::default();                                  

Any guidance for whether this is expected? And if it is then any settings that we've missed that could help alleviate these observed problems would be really helpful.

Can provide more logs/metrics that you think might be useful. TIA 🙇

PUBLISH is called on random node when used inside MULTI within the cluster

Hi,
Running MULTI, SET, PUBLISH, and EXEC (in that order) on 3 nodes Redis cluster is not always atomically executed. PUBLISH is not executed on the same node as MULTI, therefore it is not queued.

I believe that it is caused by the fact that fred considers PUBLISH as "non cluster" command and executes it on the random node.

let trx = client.multi(true).await?;

let res: RedisValue = trx.set("foox", "bar", None, None, false).await?;
assert!(res.is_queued());

let res2: RedisValue = trx.publish("foox", "this is body x").await?;
assert!(res2.is_queued()); // this fails depending on selected random node

trx.exec().await?;

[Bug] 6.0.0: `blocking` example hangs with `--partial-tracing` or `--full-tracing`

Redis version: 7.0.5
Platform - Mac
Using Docker and/or Kubernetes - no
Deployment type - centralized

Describe the bug

blocking example connects to redis and does nothing forever with --partial-tracing or --full-tracing features enabled.
Race condition?

To Reproduce
Steps to reproduce the behavior:

  1. RUST_LOG=trace cargo run --example blocking --features="full-tracing"
**Logs**

   Compiling fred v6.0.0 (/Users/kika/Scratch/fred.rs)
    Finished dev [unoptimized + debuginfo] target(s) in 24.43s
     Running `target/debug/examples/blocking`
 DEBUG fred::multiplexer::commands > fred-OuyjL80Ik2: Initializing multiplexer with policy: None
 DEBUG fred::multiplexer::centralized > fred-OuyjL80Ik2: Initializing centralized connection.
 TRACE fred::protocol::connection     > fred-OuyjL80Ik2: Checking connection type. Native-tls: false, Rustls: false
 DEBUG fred::multiplexer::commands    > fred-7EMjtIUIYO: Initializing multiplexer with policy: None
 DEBUG fred::multiplexer::centralized > fred-7EMjtIUIYO: Initializing centralized connection.
 TRACE fred::protocol::connection     > fred-7EMjtIUIYO: Checking connection type. Native-tls: false, Rustls: false
 TRACE fred::protocol::types          > fred-OuyjL80Ik2: Using 127.0.0.1 among 1 possible socket addresses for 127.0.0.1:6379
 TRACE fred::protocol::types          > fred-7EMjtIUIYO: Using 127.0.0.1 among 1 possible socket addresses for 127.0.0.1:6379
 DEBUG fred::protocol::connection     > fred-OuyjL80Ik2: Creating TCP connection to 127.0.0.1 at 127.0.0.1:6379
 TRACE mio::poll                      > registering event source with poller: token=Token(0), interests=READABLE | WRITABLE
 DEBUG fred::protocol::connection     > fred-7EMjtIUIYO: Creating TCP connection to 127.0.0.1 at 127.0.0.1:6379
 TRACE mio::poll                      > registering event source with poller: token=Token(1), interests=READABLE | WRITABLE
 TRACE fred::protocol::connection     > fred-7EMjtIUIYO: Skip authentication without credentials.
 DEBUG fred::protocol::connection     > fred-7EMjtIUIYO: Setting client name.
 TRACE redis_protocol::utils          > allocating more, len: 0, amt: 51
 TRACE fred::protocol::codec          > fred-7EMjtIUIYO: Encoded 51 bytes to 127.0.0.1:6379. Buffer len: 51 (RESP2)
 TRACE fred::protocol::connection     > fred-OuyjL80Ik2: Skip authentication without credentials.
 DEBUG fred::protocol::connection     > fred-OuyjL80Ik2: Setting client name.
 TRACE redis_protocol::utils          > allocating more, len: 0, amt: 51
 TRACE fred::protocol::codec          > fred-OuyjL80Ik2: Encoded 51 bytes to 127.0.0.1:6379. Buffer len: 51 (RESP2)
 TRACE fred::protocol::codec          > fred-OuyjL80Ik2: Recv 5 bytes from 127.0.0.1:6379 (RESP2).
 TRACE fred::protocol::codec          > fred-OuyjL80Ik2: Parsed 5 bytes from 127.0.0.1:6379
 DEBUG fred::protocol::connection     > fred-OuyjL80Ik2: Successfully set Redis client name.
 TRACE redis_protocol::utils          > allocating more, len: 0, amt: 24
 TRACE fred::protocol::codec          > fred-OuyjL80Ik2: Encoded 24 bytes to 127.0.0.1:6379. Buffer len: 24 (RESP2)
 TRACE fred::protocol::codec          > fred-OuyjL80Ik2: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
 TRACE fred::protocol::codec          > fred-7EMjtIUIYO: Recv 5 bytes from 127.0.0.1:6379 (RESP2).
 TRACE fred::protocol::codec          > fred-7EMjtIUIYO: Parsed 5 bytes from 127.0.0.1:6379
 DEBUG fred::protocol::connection     > fred-7EMjtIUIYO: Successfully set Redis client name.
 TRACE redis_protocol::utils          > allocating more, len: 0, amt: 24
 TRACE fred::protocol::codec          > fred-7EMjtIUIYO: Encoded 24 bytes to 127.0.0.1:6379. Buffer len: 24 (RESP2)
 TRACE fred::protocol::codec          > fred-7EMjtIUIYO: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
 TRACE fred::protocol::codec          > fred-OuyjL80Ik2: Recv 5 bytes from 127.0.0.1:6379 (RESP2).
 TRACE fred::protocol::codec          > fred-OuyjL80Ik2: Parsed 5 bytes from 127.0.0.1:6379
 DEBUG fred::protocol::connection     > fred-OuyjL80Ik2: Read client ID: Number { data: 41, attributes: None }
 TRACE redis_protocol::utils          > allocating more, len: 0, amt: 26
 TRACE fred::protocol::codec          > fred-OuyjL80Ik2: Encoded 26 bytes to 127.0.0.1:6379. Buffer len: 26 (RESP2)
 TRACE fred::protocol::codec          > fred-OuyjL80Ik2: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
 TRACE fred::protocol::codec          > fred-7EMjtIUIYO: Recv 5 bytes from 127.0.0.1:6379 (RESP2).
 TRACE fred::protocol::codec          > fred-7EMjtIUIYO: Parsed 5 bytes from 127.0.0.1:6379
 DEBUG fred::protocol::connection     > fred-7EMjtIUIYO: Read client ID: Number { data: 42, attributes: None }
 TRACE redis_protocol::utils          > allocating more, len: 0, amt: 26
 TRACE fred::protocol::codec          > fred-7EMjtIUIYO: Encoded 26 bytes to 127.0.0.1:6379. Buffer len: 26 (RESP2)
 TRACE fred::protocol::codec          > fred-7EMjtIUIYO: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
 TRACE fred::protocol::codec          > fred-OuyjL80Ik2: Recv 598 bytes from 127.0.0.1:6379 (RESP2).
 TRACE fred::protocol::codec          > fred-OuyjL80Ik2: Parsed 598 bytes from 127.0.0.1:6379
 DEBUG fred::protocol::connection     > fred-OuyjL80Ik2: Read server version Some(Version { major: 7, minor: 0, patch: 5 })
 DEBUG fred::modules::inner           > fred-OuyjL80Ik2: No `on_reconnect` listeners.
 DEBUG fred::multiplexer::commands    > fred-OuyjL80Ik2: Starting command processing stream...
 TRACE fred::protocol::codec          > fred-OuyjL80Ik2: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
 TRACE fred::protocol::codec          > fred-7EMjtIUIYO: Recv 598 bytes from 127.0.0.1:6379 (RESP2).
 TRACE fred::protocol::codec          > fred-7EMjtIUIYO: Parsed 598 bytes from 127.0.0.1:6379
 DEBUG fred::protocol::connection     > fred-7EMjtIUIYO: Read server version Some(Version { major: 7, minor: 0, patch: 5 })
 DEBUG fred::modules::inner           > fred-7EMjtIUIYO: No `on_connect` listeners.
 DEBUG fred::modules::inner           > fred-7EMjtIUIYO: No `on_reconnect` listeners.
 DEBUG fred::multiplexer::commands    > fred-7EMjtIUIYO: Starting command processing stream...
 TRACE fred::protocol::codec          > fred-7EMjtIUIYO: Recv 0 bytes from 127.0.0.1:6379 (RESP2).

Works as expected without tracing features

No subscriber message_rx stop after quit

Redis version - 7.0.8
Platform - linux
Using Docker and/or Kubernetes - yes
Deployment type - centralized

Describe the bug
Trying code from https://docs.rs/fred/latest/fred/clients/struct.SubscriberClient.html example wait forever on command:
let _ = jh.await;
No quit notification on message_rx.recv().
With version 5.2 the example exit correctly.
This issue block the correct shutdown of programs build on fred.

To Reproduce
Steps to reproduce the behavior:

  1. Create a main on https://docs.rs/fred/latest/fred/clients/struct.SubscriberClient.html
  2. Run the program

Logs
2023-04-07T23:01:02.440 [tokio-runtime-worker-2] DEBUG fred::router::commands - fred-joVadhOLVQ: Initializing router with policy: Some(Constant { attempts: 0, max_attempts: 0, delay: 1000, jitter: 100 })
2023-04-07T23:01:02.440 [tokio-runtime-worker-2] DEBUG fred::router::centralized - fred-joVadhOLVQ: Initializing centralized connection.
2023-04-07T23:01:02.441 [tokio-runtime-worker-2] TRACE fred::protocol::connection - fred-joVadhOLVQ: Checking connection type. Native-tls: false, Rustls: false
2023-04-07T23:01:02.441 [tokio-runtime-worker-2] TRACE fred::utils - Using timeout: 60000 ms
2023-04-07T23:01:02.441 [tokio-runtime-worker-10] TRACE fred::protocol::types - fred-joVadhOLVQ: Using 127.0.0.1 among 1 possible socket addresses for 127.0.0.1:6379
2023-04-07T23:01:02.442 [tokio-runtime-worker-2] DEBUG fred::protocol::connection - fred-joVadhOLVQ: Creating TCP connection to 127.0.0.1 at 127.0.0.1:6379
2023-04-07T23:01:02.443 [tokio-runtime-worker-8] TRACE fred::utils - Using timeout: 60000 ms
2023-04-07T23:01:02.443 [tokio-runtime-worker-8] DEBUG fred::protocol::connection - fred-joVadhOLVQ: Authenticating Redis client...
2023-04-07T23:01:02.444 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Encoded 28 bytes to 127.0.0.1:6379. Buffer len: 28 (RESP2)
2023-04-07T23:01:02.446 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 5 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.447 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Parsed 5 bytes from 127.0.0.1:6379
2023-04-07T23:01:02.447 [tokio-runtime-worker-8] DEBUG fred::protocol::connection - fred-joVadhOLVQ: Setting client name.
2023-04-07T23:01:02.448 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Encoded 51 bytes to 127.0.0.1:6379. Buffer len: 51 (RESP2)
2023-04-07T23:01:02.448 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.449 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 5 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.449 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Parsed 5 bytes from 127.0.0.1:6379
2023-04-07T23:01:02.449 [tokio-runtime-worker-8] DEBUG fred::protocol::connection - fred-joVadhOLVQ: Successfully set Redis client name.
2023-04-07T23:01:02.449 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Encoded 24 bytes to 127.0.0.1:6379. Buffer len: 24 (RESP2)
2023-04-07T23:01:02.450 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.450 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 5 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.450 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Parsed 5 bytes from 127.0.0.1:6379
2023-04-07T23:01:02.451 [tokio-runtime-worker-8] DEBUG fred::protocol::connection - fred-joVadhOLVQ: Read client ID: Number { data: 14, attributes: None }
2023-04-07T23:01:02.451 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Encoded 26 bytes to 127.0.0.1:6379. Buffer len: 26 (RESP2)
2023-04-07T23:01:02.451 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.452 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 628 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.452 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Parsed 628 bytes from 127.0.0.1:6379
2023-04-07T23:01:02.452 [tokio-runtime-worker-8] DEBUG fred::protocol::connection - fred-joVadhOLVQ: Read server version Some(Version { major: 7, minor: 0, patch: 8 })
2023-04-07T23:01:02.452 [tokio-runtime-worker-8] DEBUG fred::modules::inner - fred-joVadhOLVQ: No on_reconnect listeners.
2023-04-07T23:01:02.452 [tokio-runtime-worker-8] DEBUG fred::router::commands - fred-joVadhOLVQ: Starting command processing stream...
2023-04-07T23:01:02.452 [main-1] TRACE fred::interfaces - fred-joVadhOLVQ: Sending command SUBSCRIBE (5) to router.
2023-04-07T23:01:02.453 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.453 [tokio-runtime-worker-8] TRACE fred::router::commands - fred-joVadhOLVQ: Recv command: RouterCommand { kind: "Command", command: "SUBSCRIBE" }
2023-04-07T23:01:02.453 [tokio-runtime-worker-8] TRACE fred::protocol::command - fred-joVadhOLVQ: Pipeline check SUBSCRIBE: true
2023-04-07T23:01:02.453 [tokio-runtime-worker-8] TRACE fred::router::utils - fred-joVadhOLVQ: Writing SUBSCRIBE (5). Timed out: false, Force flush: false
2023-04-07T23:01:02.453 [tokio-runtime-worker-8] TRACE fred::router::utils - fred-joVadhOLVQ: Sending command SUBSCRIBE (5) to 127.0.0.1:6379
2023-04-07T23:01:02.453 [tokio-runtime-worker-8] TRACE fred::protocol::connection - Writing and flushing 127.0.0.1:6379
2023-04-07T23:01:02.454 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Encoded 28 bytes to 127.0.0.1:6379. Buffer len: 28 (RESP2)
2023-04-07T23:01:02.454 [tokio-runtime-worker-8] TRACE fred::router::commands - fred-joVadhOLVQ: Sent command to 127.0.0.1:6379. Flushed: true
2023-04-07T23:01:02.455 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 32 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.455 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Parsed 32 bytes from 127.0.0.1:6379
2023-04-07T23:01:02.455 [tokio-runtime-worker-2] TRACE fred::router::centralized - fred-joVadhOLVQ: Parsing response frame from 127.0.0.1:6379
2023-04-07T23:01:02.455 [tokio-runtime-worker-2] TRACE fred::router::centralized - fred-joVadhOLVQ: Checking response to SUBSCRIBE (5)
2023-04-07T23:01:02.455 [tokio-runtime-worker-2] TRACE fred::router::centralized - fred-joVadhOLVQ: Handling centralized response kind: Multiple
2023-04-07T23:01:02.455 [tokio-runtime-worker-2] TRACE fred::protocol::responders - fred-joVadhOLVQ: Handling multiple response from 127.0.0.1:6379 for SUBSCRIBE
2023-04-07T23:01:02.456 [tokio-runtime-worker-2] TRACE fred::protocol::responders - fred-joVadhOLVQ: Finishing multiple response from 127.0.0.1:6379 for SUBSCRIBE
2023-04-07T23:01:02.456 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.456 [main-1] TRACE fred::interfaces - fred-joVadhOLVQ: Sending command PSUBSCRIBE (6) to router.
2023-04-07T23:01:02.456 [tokio-runtime-worker-2] TRACE fred::router::commands - fred-joVadhOLVQ: Recv command: RouterCommand { kind: "Command", command: "PSUBSCRIBE" }
2023-04-07T23:01:02.457 [tokio-runtime-worker-2] TRACE fred::protocol::command - fred-joVadhOLVQ: Pipeline check PSUBSCRIBE: false
2023-04-07T23:01:02.457 [tokio-runtime-worker-2] TRACE fred::router::utils - fred-joVadhOLVQ: Writing PSUBSCRIBE (6). Timed out: false, Force flush: false
2023-04-07T23:01:02.457 [tokio-runtime-worker-2] TRACE fred::router::utils - fred-joVadhOLVQ: Sending command PSUBSCRIBE (6) to 127.0.0.1:6379
2023-04-07T23:01:02.457 [tokio-runtime-worker-2] TRACE fred::protocol::connection - Writing and flushing 127.0.0.1:6379
2023-04-07T23:01:02.457 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Encoded 31 bytes to 127.0.0.1:6379. Buffer len: 31 (RESP2)
2023-04-07T23:01:02.458 [tokio-runtime-worker-2] TRACE fred::router::commands - fred-joVadhOLVQ: Sent command to 127.0.0.1:6379. Flushed: true
2023-04-07T23:01:02.458 [tokio-runtime-worker-2] DEBUG fred::router::commands - fred-joVadhOLVQ: Waiting on router channel.
2023-04-07T23:01:02.459 [tokio-runtime-worker-9] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 35 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.459 [tokio-runtime-worker-9] TRACE fred::protocol::codec - fred-joVadhOLVQ: Parsed 35 bytes from 127.0.0.1:6379
2023-04-07T23:01:02.459 [tokio-runtime-worker-9] TRACE fred::router::centralized - fred-joVadhOLVQ: Parsing response frame from 127.0.0.1:6379
2023-04-07T23:01:02.459 [tokio-runtime-worker-9] TRACE fred::router::centralized - fred-joVadhOLVQ: Checking response to PSUBSCRIBE (6)
2023-04-07T23:01:02.459 [tokio-runtime-worker-9] TRACE fred::router::centralized - fred-joVadhOLVQ: Handling centralized response kind: Multiple
2023-04-07T23:01:02.459 [tokio-runtime-worker-9] TRACE fred::protocol::responders - fred-joVadhOLVQ: Handling multiple response from 127.0.0.1:6379 for PSUBSCRIBE
2023-04-07T23:01:02.459 [tokio-runtime-worker-9] TRACE fred::protocol::responders - fred-joVadhOLVQ: Finishing multiple response from 127.0.0.1:6379 for PSUBSCRIBE
Tracking channels: {"foo"}
Tracking patterns: {"bar*"}
2023-04-07T23:01:02.459 [tokio-runtime-worker-9] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.460 [main-1] TRACE fred::interfaces - fred-joVadhOLVQ: Sending command SUBSCRIBE (7) to router.
2023-04-07T23:01:02.460 [tokio-runtime-worker-9] DEBUG fred::router::commands - fred-joVadhOLVQ: Recv router response.
2023-04-07T23:01:02.460 [tokio-runtime-worker-9] TRACE fred::router::commands - fred-joVadhOLVQ: Recv command: RouterCommand { kind: "Command", command: "SUBSCRIBE" }
2023-04-07T23:01:02.460 [tokio-runtime-worker-9] TRACE fred::protocol::command - fred-joVadhOLVQ: Pipeline check SUBSCRIBE: true
2023-04-07T23:01:02.460 [tokio-runtime-worker-9] TRACE fred::router::utils - fred-joVadhOLVQ: Writing SUBSCRIBE (7). Timed out: false, Force flush: false
2023-04-07T23:01:02.460 [tokio-runtime-worker-9] TRACE fred::router::utils - fred-joVadhOLVQ: Sending command SUBSCRIBE (7) to 127.0.0.1:6379
2023-04-07T23:01:02.461 [tokio-runtime-worker-9] TRACE fred::protocol::connection - Writing and flushing 127.0.0.1:6379
2023-04-07T23:01:02.461 [tokio-runtime-worker-9] TRACE fred::protocol::codec - fred-joVadhOLVQ: Encoded 28 bytes to 127.0.0.1:6379. Buffer len: 28 (RESP2)
2023-04-07T23:01:02.461 [tokio-runtime-worker-9] TRACE fred::router::commands - fred-joVadhOLVQ: Sent command to 127.0.0.1:6379. Flushed: true
2023-04-07T23:01:02.462 [tokio-runtime-worker-9] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 32 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.462 [tokio-runtime-worker-9] TRACE fred::protocol::codec - fred-joVadhOLVQ: Parsed 32 bytes from 127.0.0.1:6379
2023-04-07T23:01:02.462 [tokio-runtime-worker-9] TRACE fred::router::centralized - fred-joVadhOLVQ: Parsing response frame from 127.0.0.1:6379
2023-04-07T23:01:02.462 [tokio-runtime-worker-9] TRACE fred::router::centralized - fred-joVadhOLVQ: Checking response to SUBSCRIBE (7)
2023-04-07T23:01:02.462 [tokio-runtime-worker-9] TRACE fred::router::centralized - fred-joVadhOLVQ: Handling centralized response kind: Multiple
2023-04-07T23:01:02.462 [tokio-runtime-worker-9] TRACE fred::protocol::responders - fred-joVadhOLVQ: Handling multiple response from 127.0.0.1:6379 for SUBSCRIBE
2023-04-07T23:01:02.462 [tokio-runtime-worker-9] TRACE fred::protocol::responders - fred-joVadhOLVQ: Finishing multiple response from 127.0.0.1:6379 for SUBSCRIBE
2023-04-07T23:01:02.462 [tokio-runtime-worker-9] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.463 [main-1] TRACE fred::interfaces - fred-joVadhOLVQ: Sending command PSUBSCRIBE (8) to router.
2023-04-07T23:01:02.463 [tokio-runtime-worker-9] TRACE fred::router::commands - fred-joVadhOLVQ: Recv command: RouterCommand { kind: "Command", command: "PSUBSCRIBE" }
2023-04-07T23:01:02.463 [tokio-runtime-worker-9] TRACE fred::protocol::command - fred-joVadhOLVQ: Pipeline check PSUBSCRIBE: false
2023-04-07T23:01:02.463 [tokio-runtime-worker-9] TRACE fred::router::utils - fred-joVadhOLVQ: Writing PSUBSCRIBE (8). Timed out: false, Force flush: false
2023-04-07T23:01:02.463 [tokio-runtime-worker-9] TRACE fred::router::utils - fred-joVadhOLVQ: Sending command PSUBSCRIBE (8) to 127.0.0.1:6379
2023-04-07T23:01:02.463 [tokio-runtime-worker-9] TRACE fred::protocol::connection - Writing and flushing 127.0.0.1:6379
2023-04-07T23:01:02.464 [tokio-runtime-worker-9] TRACE fred::protocol::codec - fred-joVadhOLVQ: Encoded 31 bytes to 127.0.0.1:6379. Buffer len: 31 (RESP2)
2023-04-07T23:01:02.464 [tokio-runtime-worker-9] TRACE fred::router::commands - fred-joVadhOLVQ: Sent command to 127.0.0.1:6379. Flushed: true
2023-04-07T23:01:02.464 [tokio-runtime-worker-9] DEBUG fred::router::commands - fred-joVadhOLVQ: Waiting on router channel.
2023-04-07T23:01:02.466 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 35 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.466 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Parsed 35 bytes from 127.0.0.1:6379
2023-04-07T23:01:02.466 [tokio-runtime-worker-2] TRACE fred::router::centralized - fred-joVadhOLVQ: Parsing response frame from 127.0.0.1:6379
2023-04-07T23:01:02.466 [tokio-runtime-worker-2] TRACE fred::router::centralized - fred-joVadhOLVQ: Checking response to PSUBSCRIBE (8)
2023-04-07T23:01:02.466 [tokio-runtime-worker-2] TRACE fred::router::centralized - fred-joVadhOLVQ: Handling centralized response kind: Multiple
2023-04-07T23:01:02.466 [tokio-runtime-worker-2] TRACE fred::protocol::responders - fred-joVadhOLVQ: Handling multiple response from 127.0.0.1:6379 for PSUBSCRIBE
2023-04-07T23:01:02.467 [tokio-runtime-worker-2] TRACE fred::protocol::responders - fred-joVadhOLVQ: Finishing multiple response from 127.0.0.1:6379 for PSUBSCRIBE
2023-04-07T23:01:02.467 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.467 [tokio-runtime-worker-2] DEBUG fred::router::commands - fred-joVadhOLVQ: Recv router response.
2023-04-07T23:01:02.467 [main-1] TRACE fred::interfaces - fred-joVadhOLVQ: Sending command UNSUBSCRIBE (9) to router.
2023-04-07T23:01:02.467 [tokio-runtime-worker-2] TRACE fred::router::commands - fred-joVadhOLVQ: Recv command: RouterCommand { kind: "Command", command: "UNSUBSCRIBE" }
2023-04-07T23:01:02.467 [tokio-runtime-worker-2] TRACE fred::protocol::command - fred-joVadhOLVQ: Pipeline check UNSUBSCRIBE: true
2023-04-07T23:01:02.467 [tokio-runtime-worker-2] TRACE fred::router::utils - fred-joVadhOLVQ: Writing UNSUBSCRIBE (9). Timed out: false, Force flush: false
2023-04-07T23:01:02.467 [tokio-runtime-worker-2] TRACE fred::router::utils - fred-joVadhOLVQ: Sending command UNSUBSCRIBE (9) to 127.0.0.1:6379
2023-04-07T23:01:02.467 [tokio-runtime-worker-2] TRACE fred::protocol::connection - Writing and flushing 127.0.0.1:6379
2023-04-07T23:01:02.468 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Encoded 31 bytes to 127.0.0.1:6379. Buffer len: 31 (RESP2)
2023-04-07T23:01:02.468 [tokio-runtime-worker-2] TRACE fred::router::commands - fred-joVadhOLVQ: Sent command to 127.0.0.1:6379. Flushed: true
2023-04-07T23:01:02.469 [tokio-runtime-worker-9] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 35 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.469 [tokio-runtime-worker-9] TRACE fred::protocol::codec - fred-joVadhOLVQ: Parsed 35 bytes from 127.0.0.1:6379
2023-04-07T23:01:02.469 [tokio-runtime-worker-9] TRACE fred::router::centralized - fred-joVadhOLVQ: Parsing response frame from 127.0.0.1:6379
2023-04-07T23:01:02.469 [tokio-runtime-worker-9] TRACE fred::router::centralized - fred-joVadhOLVQ: Checking response to UNSUBSCRIBE (9)
2023-04-07T23:01:02.469 [tokio-runtime-worker-9] TRACE fred::router::centralized - fred-joVadhOLVQ: Handling centralized response kind: Multiple
2023-04-07T23:01:02.469 [tokio-runtime-worker-9] TRACE fred::protocol::responders - fred-joVadhOLVQ: Handling multiple response from 127.0.0.1:6379 for UNSUBSCRIBE
2023-04-07T23:01:02.469 [tokio-runtime-worker-9] TRACE fred::protocol::responders - fred-joVadhOLVQ: Finishing multiple response from 127.0.0.1:6379 for UNSUBSCRIBE
2023-04-07T23:01:02.469 [tokio-runtime-worker-9] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.469 [main-1] TRACE fred::interfaces - fred-joVadhOLVQ: Sending command PUNSUBSCRIBE (10) to router.
2023-04-07T23:01:02.470 [tokio-runtime-worker-9] TRACE fred::router::commands - fred-joVadhOLVQ: Recv command: RouterCommand { kind: "Command", command: "PUNSUBSCRIBE" }
2023-04-07T23:01:02.470 [tokio-runtime-worker-9] TRACE fred::protocol::command - fred-joVadhOLVQ: Pipeline check PUNSUBSCRIBE: false
2023-04-07T23:01:02.470 [tokio-runtime-worker-9] TRACE fred::router::utils - fred-joVadhOLVQ: Writing PUNSUBSCRIBE (10). Timed out: false, Force flush: false
2023-04-07T23:01:02.470 [tokio-runtime-worker-9] TRACE fred::router::utils - fred-joVadhOLVQ: Sending command PUNSUBSCRIBE (10) to 127.0.0.1:6379
2023-04-07T23:01:02.470 [tokio-runtime-worker-9] TRACE fred::protocol::connection - Writing and flushing 127.0.0.1:6379
2023-04-07T23:01:02.470 [tokio-runtime-worker-9] TRACE fred::protocol::codec - fred-joVadhOLVQ: Encoded 33 bytes to 127.0.0.1:6379. Buffer len: 33 (RESP2)
2023-04-07T23:01:02.470 [tokio-runtime-worker-9] TRACE fred::router::commands - fred-joVadhOLVQ: Sent command to 127.0.0.1:6379. Flushed: true
2023-04-07T23:01:02.471 [tokio-runtime-worker-9] DEBUG fred::router::commands - fred-joVadhOLVQ: Waiting on router channel.
2023-04-07T23:01:02.471 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 37 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.472 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Parsed 37 bytes from 127.0.0.1:6379
2023-04-07T23:01:02.472 [tokio-runtime-worker-2] TRACE fred::router::centralized - fred-joVadhOLVQ: Parsing response frame from 127.0.0.1:6379
2023-04-07T23:01:02.472 [tokio-runtime-worker-2] TRACE fred::router::centralized - fred-joVadhOLVQ: Checking response to PUNSUBSCRIBE (10)
2023-04-07T23:01:02.472 [tokio-runtime-worker-2] TRACE fred::router::centralized - fred-joVadhOLVQ: Handling centralized response kind: Multiple
2023-04-07T23:01:02.472 [tokio-runtime-worker-2] TRACE fred::protocol::responders - fred-joVadhOLVQ: Handling multiple response from 127.0.0.1:6379 for PUNSUBSCRIBE
2023-04-07T23:01:02.472 [tokio-runtime-worker-2] TRACE fred::protocol::responders - fred-joVadhOLVQ: Finishing multiple response from 127.0.0.1:6379 for PUNSUBSCRIBE
2023-04-07T23:01:02.472 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.472 [main-1] TRACE fred::interfaces - fred-joVadhOLVQ: Sending command SUNSUBSCRIBE (11) to router.
2023-04-07T23:01:02.472 [tokio-runtime-worker-2] DEBUG fred::router::commands - fred-joVadhOLVQ: Recv router response.
2023-04-07T23:01:02.473 [tokio-runtime-worker-2] TRACE fred::router::commands - fred-joVadhOLVQ: Recv command: RouterCommand { kind: "Command", command: "SUNSUBSCRIBE" }
2023-04-07T23:01:02.473 [tokio-runtime-worker-2] TRACE fred::protocol::command - fred-joVadhOLVQ: Pipeline check SUNSUBSCRIBE: false
2023-04-07T23:01:02.473 [tokio-runtime-worker-2] TRACE fred::router::utils - fred-joVadhOLVQ: Writing SUNSUBSCRIBE (11). Timed out: false, Force flush: false
2023-04-07T23:01:02.473 [tokio-runtime-worker-2] TRACE fred::router::utils - fred-joVadhOLVQ: Sending command SUNSUBSCRIBE (11) to 127.0.0.1:6379
2023-04-07T23:01:02.473 [tokio-runtime-worker-2] TRACE fred::protocol::connection - fred-joVadhOLVQ: Skip adding SUNSUBSCRIBE command to response buffer (no expected responses).
2023-04-07T23:01:02.473 [tokio-runtime-worker-2] TRACE fred::protocol::connection - Writing and flushing 127.0.0.1:6379
2023-04-07T23:01:02.473 [main-1] DEBUG fred::commands::impls::server - fred-joVadhOLVQ: Closing Redis connection with Quit command.
2023-04-07T23:01:02.473 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Encoded 23 bytes to 127.0.0.1:6379. Buffer len: 23 (RESP2)
2023-04-07T23:01:02.473 [main-1] DEBUG fred::modules::inner - fred-joVadhOLVQ: No close listeners.
2023-04-07T23:01:02.473 [tokio-runtime-worker-2] TRACE fred::router::commands - fred-joVadhOLVQ: Sent command to 127.0.0.1:6379. Flushed: true
2023-04-07T23:01:02.474 [main-1] TRACE fred::interfaces - fred-joVadhOLVQ: Sending command QUIT (12) to router.
2023-04-07T23:01:02.474 [tokio-runtime-worker-2] DEBUG fred::router::commands - fred-joVadhOLVQ: Waiting on router channel.
2023-04-07T23:01:02.474 [tokio-runtime-worker-2] DEBUG fred::router::commands - fred-joVadhOLVQ: Recv router response.
2023-04-07T23:01:02.474 [tokio-runtime-worker-2] TRACE fred::router::commands - fred-joVadhOLVQ: Recv command: RouterCommand { kind: "Command", command: "QUIT" }
2023-04-07T23:01:02.474 [tokio-runtime-worker-2] TRACE fred::protocol::command - fred-joVadhOLVQ: Pipeline check QUIT: false
2023-04-07T23:01:02.474 [tokio-runtime-worker-2] TRACE fred::router::utils - fred-joVadhOLVQ: Writing QUIT (12). Timed out: false, Force flush: false
2023-04-07T23:01:02.474 [tokio-runtime-worker-2] TRACE fred::router::utils - fred-joVadhOLVQ: Sending command QUIT (12) to 127.0.0.1:6379
2023-04-07T23:01:02.474 [tokio-runtime-worker-2] TRACE fred::protocol::connection - Writing and flushing 127.0.0.1:6379
2023-04-07T23:01:02.474 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Encoded 14 bytes to 127.0.0.1:6379. Buffer len: 14 (RESP2)
2023-04-07T23:01:02.475 [tokio-runtime-worker-2] TRACE fred::router::commands - fred-joVadhOLVQ: Sent command to 127.0.0.1:6379. Flushed: true
2023-04-07T23:01:02.475 [tokio-runtime-worker-2] DEBUG fred::router::commands - fred-joVadhOLVQ: Waiting on router channel.
2023-04-07T23:01:02.475 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 32 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.475 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Parsed 32 bytes from 127.0.0.1:6379
2023-04-07T23:01:02.475 [tokio-runtime-worker-2] TRACE fred::router::centralized - fred-joVadhOLVQ: Parsing response frame from 127.0.0.1:6379
2023-04-07T23:01:02.475 [tokio-runtime-worker-2] DEBUG fred::router::utils - fred-joVadhOLVQ: Dropping extra unsubscribe response.
2023-04-07T23:01:02.475 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.475 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 5 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.475 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Parsed 5 bytes from 127.0.0.1:6379
2023-04-07T23:01:02.476 [tokio-runtime-worker-2] TRACE fred::router::centralized - fred-joVadhOLVQ: Parsing response frame from 127.0.0.1:6379
2023-04-07T23:01:02.476 [tokio-runtime-worker-2] TRACE fred::router::centralized - fred-joVadhOLVQ: Checking response to QUIT (12)
2023-04-07T23:01:02.476 [tokio-runtime-worker-2] TRACE fred::router::centralized - fred-joVadhOLVQ: Handling centralized response kind: Respond
2023-04-07T23:01:02.476 [tokio-runtime-worker-2] TRACE fred::protocol::responders - fred-joVadhOLVQ: Respond to caller from 127.0.0.1:6379 for QUIT with SimpleString
2023-04-07T23:01:02.476 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.476 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.476 [tokio-runtime-worker-2] DEBUG fred::router::responses - fred-joVadhOLVQ: Ending reader task from 127.0.0.1:6379 due to None
2023-04-07T23:01:02.476 [tokio-runtime-worker-2] DEBUG fred::modules::inner - fred-joVadhOLVQ: Checking reconnect state. Has policy: true, Is intentionally disconnecting: true
2023-04-07T23:01:02.476 [tokio-runtime-worker-2] DEBUG fred::router::centralized - fred-joVadhOLVQ: Ending reader task from 127.0.0.1:6379
2023-04-07T23:01:02.477 [tokio-runtime-worker-2] DEBUG fred::router::commands - fred-joVadhOLVQ: Recv router response.
2023-04-07T23:01:02.477 [tokio-runtime-worker-2] TRACE fred::router::commands - fred-joVadhOLVQ: Ending command loop after QUIT or SHUTDOWN.
2023-04-07T23:01:02.477 [tokio-runtime-worker-2] DEBUG fred::router::commands - fred-joVadhOLVQ: Disconnecting after command stream closes.
2023-04-07T23:01:02.477 [tokio-runtime-worker-2] DEBUG fred::router - fred-joVadhOLVQ: Disconnecting from 127.0.0.1:6379
2023-04-07T23:01:02.477 [tokio-runtime-worker-2] TRACE fred::router - fred-joVadhOLVQ: Clearing retry buffer with 0 commands.
2023-04-07T23:01:02.477 [tokio-runtime-worker-2] TRACE fred::interfaces - fred-joVadhOLVQ: Ending connection task with Ok(())

Ping fails with SubscriberClient

Ping seems to be failing with the SubscriberClient consistently with an error: "Could not convert multiple frames to RedisValue".
The stack seems to show the response is parsed as resp3, but the server AFAIK only supports resp2.

Environment:

redis_version:5.0.7 (Seems to fail the same with latest 7.0)
fred.rs: 5.1.0

Logs:

2022-08-28 02:11:57.234867 [00d] [D] [connection]:0287 fred-JeHtXwyITZ: Read client ID: Resp2(Integer(82))
2022-08-28 02:12:57.236755 [00c] [D] [utils ]:0400 fred-JeHtXwyITZ: Writing command PING to localhost:6379
2022-08-28 02:12:57.237310 [00a] [D] [server ]:0094 fred-JeHtXwyITZ: Recv ping response.

Stack:

  * frame #0: 0x00005555562aaa94 test`fred::protocol::utils::frame_to_single_result(frame=redis_protocol::resp3::types::Frame::Array @ 0x00007ffff649ede0) at utils.rs:547:20
    frame #1: 0x000055555582dfb9 test`fred::commands::impls::server::ping::{{closure}}((null)=core::future::ResumeTy @ 0x00007ffff649efc8) at server.rs:95:3
    frame #2: 0x0000555555a7e497 test`<core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll(self=core::pin::Pin<&mut core::future::from_generator::GenFuture<fred::commands::impls::server::ping::{async_fn_env#0}>> @ 0x00007ffff649f460, cx=0x00007ffff64a93d0) at mod.rs:91:19
    frame #3: 0x00005555558197d9 test`fred::interfaces::ClientLike::ping::{{closure}}::{{closure}}((null)=core::future::ResumeTy @ 0x00007ffff64a2948) at interfaces.rs:256:58

Data:

print frame
(redis_protocol::resp3::types::Frame::Array) frame = {
  data = (2) vec![{...}, {...}] {}
  attributes = None {}
}

print frame.data.0
(alloc::raw_vec::RawVec<redis_protocol::resp3::types::Frame, alloc::alloc::Global>) buf = {
  ptr = {
    pointer = {}
    _marker = {}
  }
  cap = 2
  alloc = {}
}

[Feature-request] Improve `watch` interface for transactions (`multi`)

New interface for transactions in 6.0.0 is much better and allowed to relax my personal customization around it.
But there is one issue, sometimes it's needed to combine watch and multi.

But watch can't be called inside multi block, so I wonder if it possible to somehow combine them and prevent re-ordering of related watch and multi?
Currently it's possible to watch in parallel, so order will be:

watch K1
watch K2
multi
set K1 1
exec

# watch K2 is also was unwatched

I mean exec of multi block automatically unwatch keys on redis side, so it's possible that within one fred-connection I will call N watch operations and then any exec will discard them together which is incorrect/unexpected behavior for this combination.

For now I have to manually lock connection or create new one for such operations, it's a bit counterproductive. Instead I like to order their execution and combine them as atomic block.

I mean something like connection.multi_with_watch(keys) and it will be applied in this order:

... commands before ....

watch
multi .... exec 

... commands after ....

It could be tricky one, but I'd like to have similar interface to avoid collision between multi/exec and watches related to another transaction

[Bug] blocking (blpop) timeout lead to confusing RedisErrors

Redis version - 7.0.9
Platform - windows
Using Docker and/or Kubernetes - yes
Deployment type - centralized

Describe the bug
The easy/trivial client.blpop::<(String, String), _>("key", timeout).await can, in my opinion, return three things:

  • A value from the given list
  • A timeout
  • A generic Redis error

Right now, there doesn't seem to be a good way to separate the last two. Fred returns a RedisError with "Redis Error - kind: Parse, details: Could not convert to tuple." which .. is fair, I guess. There wasn't a tuple. There was a timeout. Which doesn't quite feel like an error case to me and IF it should be a RedisError then the message is still confusing/wrong.

To Reproduce
Steps to reproduce the behavior:

  1. blpop a string, string tuple from an empty list
  2. wait for timeout

Waiting for exec to return from server

Hi,
I don't know if it's an intended behavior, but I ran into following situation:

Suppose you have 10ms latency to Redis. Every 5ms you application needs to run

MULTI
HSET key value
PUBLISH message
EXEC

I am using the following code to execute this sequence in fred:

let trx = redis.multi(true).await?;
trx.hset(key, hashmap! { field => value.into()}).await?;
trx.publish(channel, message).await?;
trx.exec().await?;

I'd like to have a writer that executes this code, but waits for the response in a new tokio task every 5ms. This way there will be 2 running requests waiting for responses.
I don't know how to achieve this with fred.

There are 2 issues:

  • Fred allows to call another MULTI once the response from EXEC has arrived. I think new MULTI can be allowed once the EXEC has been sent
  • It's not possible to split the sending the commands and receiving the response. I don't know when it is safe to call second MULTI

Thanks for help

Connection to Redis Cluster in Minikube

Hello Alec, I have a problem connecting to a local Redis cluster created in Minikube. If the problem isn't on Fred's side let me know :)

The setup is relatively simple: there's a single LoadBalancer exposed via minikube tunnel that redirects to a cluster node, and the cluster itself isn't accessible from outside (so the nodes can only talk to each other and the load balancer). Here's the commands I used:

Using redis-cli on the host with load balancer IP 10.102.101.213 like so:

redis-cli -c -h 10.102.101.213 -a $REDIS_PASSWORD CLUSTER NODES
Warning: Using a password with '-a' or '-u' option on the command line interface may not be safe.
aceb5bf17a706440a297c5be764f4e4b6a60eb61 172.17.0.7:6379@16379 slave 16da11b0423cafe75261b5adbc2eb13a90358cc5 0 1640566876716 2 connected
a67bf5b0d784c9be05fba218bdc456b484f37c86 172.17.0.2:6379@16379 myself,slave 2008db068b8a58a28595b37060d52d3736d04a6d 0 1640566873000 1 connected
a10e1fe9800e7fb2a376f5ec5e1734658b15c6c9 172.17.0.5:6379@16379 master - 0 1640566875000 3 connected 10923-16383
2008db068b8a58a28595b37060d52d3736d04a6d 172.17.0.6:6379@16379 master - 0 1640566874000 1 connected 0-5460
53cf7c4159f5896deb1c42478c82a6404ac5f56e 172.17.0.3:6379@16379 slave a10e1fe9800e7fb2a376f5ec5e1734658b15c6c9 0 1640566876000 3 connected
16da11b0423cafe75261b5adbc2eb13a90358cc5 :0@0 master,noaddr - 1640566845731 1640566845730 2 disconnected 5461-10922

all works as expected.

Trying to connect to the same cluster with Fred (env variable REDIS_URI is the LoadBalancer IP):

let config = RedisConfig {
            server: ServerConfig::new_clustered([(dotenv!("REDIS_URI").to_string(), 6379 as u16)].to_vec()),
            fail_fast: true,
            pipeline: true,
            blocking: Blocking::Block,
            username: None,
            password: Some(dotenv!("REDIS_PASSWD").to_string()),
            tls: None,
            tracing: false
        };
        println!("Creating client...");
        let client = RedisClient::new(config);
        println!("Created client!");

        let policy = ReconnectPolicy::default();

        tokio::spawn(client
            .on_error()
            .for_each(|e| async move {
                println!("Client received connection error: {:?}", e);
            }));

        println!("Checked errors...");

        tokio::spawn(client
            .on_reconnect()
            .for_each(move |client| async move {
                println!("Client {} reconnected.", client.id());
                // select the database each time we connect or reconnect
                let _ = client.select(REDIS_SESSION_DB).await;
            }));


        println!("Checked reconnections...");

        client.connect(Some(policy));
        println!("Trying to connect Redis...");
        client.wait_for_connect().await.unwrap();
        println!("Redis connected!");

makes it throw this error:

Client received connection error: Redis Error - kind: IO, details: Os { code: 113, kind: HostUnreachable, message: "No route to host" }

Given that the info about nodes in cluster is gathered with CLUSTER NODES (per https://docs.rs/fred/latest/fred/types/enum.ServerConfig.html#variant.Clustered), I assume passing a single IP is fine.
I guess this is some kind of a DNS/redirect resolution problem. I see that you're currently working on supporting custom DNS resolvers on the client, could this be related?

Keep up the great work, this is probably the best Redis driver for Rust at the moment!

P.S. When do you plan to release Streams functionality by the way (in terms of time)?

Can't resume automatically after redis cluster reboot

I encountered a problem when using fred. I use the fred cluster mode. If the cluster redis restarts, fred cannot recover automatically. How to solve this problem.

[WARN] [2023.04.04 01:13:08.914] [127.0.0.1] [fred::multiplexer::utils] [/root/.cargo/registry/src/github.com-1ecc6299db9ec823/fred-5.2.0/src/multiplexer/utils.rs(1229)] - fred-mbn8GkCsjF: Error creating or using backchannel for cluster nodes: Redis Error - kind: IO, details: Os { code: 110, kind: TimedOut, message: "Connection timed out" }

let config = RedisConfig { server: ServerConfig::Clustered { hosts: nodes, }, .. };

Version 5

Hello Alec, it's me again. I'd just like to thank you again for making this driver and for incredible help in my last issue ( #21 ). Fred is awesome, the best among Redis drivers I've encountered. Never thought that someone could make something better pretty much single-handedly than the teams/communities behind other drivers, especially the "official" one (https://github.com/mitsuhiko/redis-rs).

As a thank you I'd really like to support this library, you've got Patreon or something like that?

Can't wait for the 5.0.0 release, hope it will be ready soon!

Client not responding

Description

Hi! Thanks for the awesome library.

I'm having a problem with the Redis client getting stuck before sending the command to the Redis server (it happens with any command).
This is happening in a Stream, but didn't try to replicate outside of one.

The pattern of getting stuck is random, sometimes it runs for tens of stream values and gets stuck and other times it gets stuck on the first value of the stream.

What I tried

I tried to track to the best of my abilities where it was getting stuck and I think it's at wait_for_response(rx) in src/utils.rs, but I'm not sure.

When setting a default timeout it gets hit, otherwise it waits forever.

I also ran redis-cli monitor on the Redis server and confirmed that the client gets stuck before sending the command.

Reproducibility

The minimal code to reproduce it is (UPDATE: this doesn't reproduce the issue, see #13 (comment)):

use std::time::Duration;

use futures::StreamExt;
use fred::prelude::*;
use fred::pool::StaticRedisPool;

#[tokio::main]
async fn main() {
  let pool = StaticRedisPool::new(RedisConfig::default(), 5);
  pool.connect(Some(ReconnectPolicy::default()));
  pool.wait_for_connect().await.unwrap();

  let stream = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_millis(100)))
    .then(move |_| {
      let pool = pool.clone();

      async move {
        let value: Option<String> = pool.get("key").await.unwrap(); // This call never responds.

        value
      }
    });

  futures::pin_mut!(stream);

  while let Some(value) = stream.next().await {
    println!("{:?}", value);
  }
}

Cargo.toml:

[package]
edition = "2021"

[dependencies]
fred = { version = "4.2.1", default-features = false, features = [
    "pool-prefer-active",
    "ignore-auth-error",
] }
futures = { version = "0.3.17", default-features = false }
tokio = { version = "1.13.0", features = ["rt", "macros", "time"] }
tokio-stream = "0.1.8"

OS and versions

Rust: 1.56.0
fred: 4.2.1
Redis: 6.2.6 inside Docker
OS: Ubuntu 21.04

`DynamicRedisPool` blocked on `wait_for_connect`

Description

I am attempting to use a dynamic pool in an actix web app (btw I am using a v4 beta so am on the correct tokio version) and my code hangs on wat_for_connect consistently. I know similar things have happened previously but as those were closed issues I started this one.

Reproduction

To reproduce I am able to just run the code from the example here https://github.com/aembke/fred.rs/blob/main/examples/dynamic_pool.rs

Here is my app main

use fred::pool::DynamicRedisPool;
use fred::prelude::*;

#[tokio::main]
async fn main() -> Result<(), RedisError> {
    let config = RedisConfig {
        // whether to skip reconnect logic when first connecting
        fail_fast: true,
        // server configuration
        server: ServerConfig::new_centralized("127.0.0.1", 6379),
        // whether to automatically pipeline commands
        pipeline: true,
        // how to handle commands sent while a connection is blocked
        blocking: Blocking::Block,
        // an optional username, if using ACL rules
        username: None,
        // an optional authentication key or password
        password: None,
        // optional TLS settings
        tls: None,
        // whether to enable tracing
    };
    // the max size isn't a hard limit - it just determines the size of the client array when the pool is initialized
    let pool = DynamicRedisPool::new(config, None, 5, 10);

    println!("I am printed");
    let _ = pool.connect();
    println!("Me too");
    let _ = pool.wait_for_connect().await?;
    println!("Not Me :(");

    // modify the size of the pool at runtime
    let (new_client, _) = pool.scale_up().await;
    if let Some(old_client) = pool.scale_down(true).await {
        assert_eq!(new_client.id(), old_client.id());
    }

    for client in pool.clients() {
        println!("Client ID {} in pool.", client.id());
    }

    // due to the locking required by the resizing operations the Deref trait cannot be used with this pool implementation.
    // if modifications to the pool are not required at runtime the static pool is usually easier to use
    let _ = pool.next().get("foo").await?;
    let _ = pool.next().set("foo", "bar", None, None, false).await?;
    let _ = pool.next().get("foo").await?;

    // if the pool can be empty a function exists that will lazily create a new client, if needed.
    // if the pool is not empty this just calls `next` without creating a new client.
    let _ = pool.next_connect(true).await.get("foo").await?;

    let _ = pool.quit_pool().await;
    Ok(())
}

Hope this helps and please ask any questions.

System

MacOS, Redis via Docker, Rust 1.57

Documentation: Improve Documentation

This library is pretty opinionated and should have better documentation on implementation details behind the following design decisions:

  • Pipelining
  • Connection management
  • Retry
  • Backpressure
  • Cluster failure mode handling (how and when CLUSTER NODES is used)
  • Read replica usage, or lack thereof

Disallowing nested items interferes with `serde_json::Value`

It is exteremly unclear how can you write an impl of FromRedis for a custom struct in the case that you want to set a key:value with the value being a JSON. So instead when using the .set() it is possible to pass serde_json::Value - but it seems that it does not work because it cannot deal with nesting because of https://github.com/aembke/fred.rs/blob/main/src/utils.rs#L530. This makes it impossible to set a value of a custom struct as a json.

Creating connection hangs when connecting in clustered mode

I am trying to connect to a redis cluster, running locally via docker with:

let cfg = ServerConfig::new_clustered(vec![(settings.host, settings.port)])
let client = RedisClient::new(cfg);
let _jh = client.connect(None);
let _ = client.wait_for_connect().await;

but it's just hanging a wait_for_connect. If I change to a single node and use ServerConfig::new_centralized(settings.host, settings.port) I am able to connect and issue commands.

How can I configure the server parameters?

Hello! I need to create an instance of RedisClient with my own parameters. But I only see the default function. Where I can set up my host and port to RedisConfig?

Also, where i can set a new client name?

Thank you!

Authentication failure in RESP3

When trying to connect to a password-protected Redis instance using RESP3, I get the following error:

NOAUTH HELLO must be called with the client already authenticated, otherwise the HELLO AUTH <user> <pass> option can be used to authenticate the client and select the RESP protocol version at the same time

It works just fine if I turn off RESP3 and go back to RESP2.

I've devised a quick patch that seems to fix this problem by ensuring the client is authenticated before attempting to switch protocols:

diff --git a/src/protocol/connection.rs b/src/protocol/connection.rs
index f936197..5565039 100644
--- a/src/protocol/connection.rs
+++ b/src/protocol/connection.rs
@@ -332,8 +332,9 @@ pub async fn create_authenticated_connection_tls(
   let socket = TcpStream::connect(addr).await?;
   let tls_stream = tls::create_tls_connector(&inner.config)?;
   let socket = tls_stream.connect(domain, socket).await?;
-  let framed = switch_protocols(inner, Framed::new(socket, codec)).await?;
+  let framed = Framed::new(socket, codec);
   let framed = authenticate(framed, &client_name, username, password, inner.is_resp3()).await?;
+  let framed = switch_protocols(inner, framed).await?;
   let framed = select_database(inner, framed).await?;
 
   client_utils::set_client_state(&inner.state, ClientState::Connected);
@@ -360,8 +361,9 @@ pub async fn create_authenticated_connection(
   let username = inner.config.read().username.clone();
 
   let socket = TcpStream::connect(addr).await?;
-  let framed = switch_protocols(inner, Framed::new(socket, codec)).await?;
+  let framed = Framed::new(socket, codec);
   let framed = authenticate(framed, &client_name, username, password, inner.is_resp3()).await?;
+  let framed = switch_protocols(inner, framed).await?;
   let framed = select_database(inner, framed).await?;
 
   client_utils::set_client_state(&inner.state, ClientState::Connected);

After applying this patch, it works as intended. If this is deemed to be an appropriate solution, I am happy to send a PR, or you can apply it yourself if it is more convenient.

Question: Is fred.rs a thread-safe?

Thank you very much for developing and maintaining awesome Redis library. This is a question, not the bug report. I am wondering if fred.rs is a thread-safe?

[Bug] Type `ClusterRouting` is not `pub` thus `ClusterLike::cached_cluster_state()` can not be used

Describe the bug

The following code does not compile:

use fred::{prelude::*, protocol::types::ClusterRouting};

#[tokio::main]
async fn main() -> Result<(), RedisError> {
    pretty_env_logger::init();

    let config = RedisConfig::from_url("redis-cluster://127.0.0.1:7000").expect("Creating config");
    let cluster_client = RedisClient::new(config, None, None);
    let _ = cluster_client.connect();
    let _ = cluster_client.wait_for_connect().await?;
    let cluster_state: Option<ClusterRouting> = cluster_client.cached_cluster_state();
    if let Some(state) = cluster_state  {
        println!("Cluster state: {:?}", state);
    }
    Ok(())
}

because fred::protocol is private.

it could be made compile by inferring the return type of cached_cluster_state() but that doesn't work in structs.

Change this to pub mod and it works:

mod protocol;

[Bug?] 6.0.0: Reading concurrently from multiple connections

It's either a bug in my DNA (© old programming joke) or in fred.

Redis version - 7.0.5
Platform - Mac
Using Docker and/or Kubernetes - no
Deployment type - centralized

Describe the bug

A brief explanation to why I'm doing things the way the repro is written: I need to read a lot of STREAMs from a Redis Cluster. XREAD is able to read many streams at once, just not in the cluster environment. In cluster environment it can only read from streams that hash into the same slot (not the same node!). So to minimize the number of XREADs I group the channel names by the result of redis_keyslot(), for every slot that has streams I open a connection to the cluster node (not a clustered connection, but a result of the clone_new() from one of split_cluster()) and run XREAD on this connection. And that doesn't work the way I intended. After I run a test case with just a few streams only one stream works as supposed. The others wake up the connection but do not trigger XREADs. If you keep trying to XADD to these "ignored" streams, they suddenly start working (from 1-2 attempts, sometimes more). The original stream that was working stops working. But the data sent to the "ignored" channels to wake them up is lost which probably means that these other XREADs are not even started because otherwise they'd have captured the data due to their "$" wildcard ID. Phew.

To Reproduce

This repro simulates this behavior without the cluster and that's why it looks the way it is. The easiest way to produce the repro is to put it into the examples folder and RUST_LOG=trace cargo run --example bug84. Then from another terminal run redis-cli and issue a few commands:

xadd test0 * v test0
xadd test1 * v test1
xadd test0 * v test2
etc

I had this problem with 5.2.0 and moved to 6.0.0 hoping it's going to be better.

Quite possibly it's my insufficient understanding on how the async Rust works...

Repro itself:

use std::sync::Arc;

use fred::{prelude::*, types::XReadResponse};
use futures::stream::FuturesUnordered;
use tokio_stream::StreamExt;

async fn _read(
        client: Arc<RedisClient>,
        stream_name: &String,
    ) -> Result<Vec<(String, String)>, RedisError> {
        let r: XReadResponse<String, String, String, String> =
            client.xread_map(None, Some(0), stream_name, "$".to_string()).await?;
        let mut result = vec![];
        for (stream_name, records) in r.iter() {
            for (last_id, _record) in records {
                result.push((stream_name.to_string(), last_id.to_string()));
            }
        }
        Ok(result)
    }

#[tokio::main]
async fn main() -> Result<(), RedisError> {
    pretty_env_logger::init();

    let config = RedisConfig::default();
    let mut clients: Vec<Arc<RedisClient>> = vec![];
    let mut stream_names: Vec<String> = vec![];

    for i in 0..3 {
        let c = RedisClient::new(config.clone(), None, None);
        c.connect();
        c.wait_for_connect().await?;
        clients.push(Arc::new(c));
        stream_names.push(format!("test{i}"));
    }

    loop {
        let mut xreads = FuturesUnordered::new();
        for (i, stream_name) in stream_names.iter().enumerate() {
            xreads.push(_read(clients.get(i).unwrap().clone(), stream_name))
        }
        tokio::select! {
            Some(Ok(events)) = xreads.next() => {
                for event in events {
                    println!("Stream {} last ID {}", event.0, event.1);
                }
            }
        }
    }
}

Intermittent failure to connect with wait_for_connect on StaticPool

Hello, I have written a test using testcontainer with the following code and I'm seeing an error of: Redis Error - kind: Unknown, details: Redis connection is not initialized every once in a while. For the most part it works fine, but sometimes that .expect fails at the call to .ping. It seemed to me that the rate of failure decreased with an increase in the size of the StaticRedisPool so I wonder if there's some condition under which .wait_for_connect returns before every client has connected?

    use testcontainers::{clients::Cli, images::redis::Redis, Docker};

    #[tokio::test]
    async fn test_condition() {
        let docker = Cli::default();
        let container = docker.run(Redis::default());

        let port = container
            .get_host_port(6379)
            .expect("should be able to get host port");

        let pool = StaticRedisPool::new(
            RedisConfig {
                server: ServerConfig::new_centralized("127.0.0.1", port),
                ..Default::default()
            },
            3,
        )
        .unwrap();

        let _ = pool.connect(None);
        pool.wait_for_connect().await.unwrap();

        pool.ping().await.expect("should be able to ping");
    }

Please let me know if you would benefit from any further details or I missed something.

Dangerous configuration through environment variables

Hello, and thanks for all your effort on Fred!

We are looking to use fred in our projects, but one of the most significant issues is an environment variables-based configuration. At first, it's not a library's responsibility to read environment variables - it should be under the complete control of the application. I know there are some counter-examples like env-logger. Still, it's a library whose only goal is to read and parse env var for simplification purposes.
Configuring through env vars is especially sensitive for such an essential behavior as the security configuration. In my application, I would hardcode sensitive behavior and never allow insecure connections in the production environment. It's possible to unset environment variables on the app level, but still easy to forget and quite unergonomic.
Considering that it's breaking some isolation rules and affecting the security, I suggest removing env variables support, or at least hiding it behind the non-default feature, and moving this configuration to a code level so that the application has complete control over such a sensitive configuration.

Unable to reconnect to Redis Cluster in Kubernetes after cluster rollout

Thanks for the library! We're trying to use this library in a server running in Kubernetes connecting to a Redis Cluster running in the same Kubernetes cluster. We're connecting to Redis using redis-cluster://my-redis-cluster:6379 as the RedisConfig with a RedisPool.

We noticed that when we go to roll out a new version of the Redis deployment that our fred-using server will hang trying to send commands to the Redis Cluster. The logs we're seeing are of the form:

Error creating or using backchannel for cluster nodes: Redis Error - kind: IO, details: Os { code: 110, kind: TimedOut, message: "Connection timed out" }
Failed to reconnect with error Redis Error - kind: Cluster, details: Failed to read cluster nodes on all possible backchannel servers.

Restarting our fred-using server with the same configuration is able to connect to the Redis Cluster just fine.

After adding some logging, it seems that what's happening is that the IP addresses on the cluster cycle out to different ones, but IIUC fred doesn't try to re-resolve the DNS address to reconnect to the cluster.

Different auth for sentinel and redis

Hi. I saw in the example how to connect through sentinel. However, the example seems to rely on the same auth used for both sentinel and redis itself. My company sets it up such that the auth (i.e. username/password) between the two are different, for security purpose. Is there a way I can specific different auths for the two? Thanks!

[Feature request] Support for client-side caching

This will require supporting commands CLIENT TRACKING and CLIENT CACHING plus notification channel for RESP2 and RESP3.
I can probably take a stab at a PR if this is considered a good idea, but I'll need some guidance because inner workings of the crate a very clearly written but not documented much.

The first guidance question is the design of the OPTIN case: should I implement CLIENT CACHING as a high-level command/method and just issue it before every read operation I want to be notified about? Or we'd better have some abstraction built in that allows to mark the commands or keys we're interested in and automatically issue CLIENT CACHING before each command? Maybe RedisConfig option?

Mocking layer missing but still referenced in Cargo.toml

The mocks feature still exists in Cargo.toml but is no longer present in the crate.

It's mentioned in the prior repository's README.md, and was located under /src/mocks, but has since been removed.

Two items to address:

  • Remove the feature flag or mark it as something to be supported in the future
  • Suggest an alternative method of testing for developers using this library, so people don't reinvent the wheel or write tests with IO dependencies and race conditions on external servers

How to set a key:value where value is a complex struct

I want to set a key which is String and a value which is a struct that has all kinds of types for its fields like Option<T> where T in String, serde_json::Value, i64, f64, bool etc. I didn't see that in the documentation. How can I do that?

Questions: Read selection strategy and failing replica node handling

This is actually questions, not issues. I can't seem to find the info around this.

  1. For reading, what's the reading strategy among replica nodes? Round-robin? Or something different? Can it be configured?

  2. How does the RedisClient handle a failing replica node?

Thank you.
Ed

[Bug] replicas feature broken with pooling?

Redis version - 6.2.6
Platform - linux
Using Docker and/or Kubernetes - both
Deployment type - cluster with replicas

I've been using RedisPool with a Clustered ServerConfig (all else in RedisConfig defaulted) for a while. Recently we've realised that we should be preferably sending all commands (get, and mget) to replica nodes, so I tried out the replicas feature of this crate, and called .replicas() on the RedisPool. This worked fine in an environment without replica nodes, but in an environment with replica nodes, all commands timed out (!). The clustered config's .host has a list of all nodes in both scenarios.

fred (and redis-protocol) uses wrong assumption that keys are utf8

Hello,
First of all thanks for the great library!

I just want to raise an issue about wrong assumption (and behaviour) that Redis keys are valid utf8 strings.
According the Redis data types doc

Redis keys are binary safe, this means that you can use any binary sequence as a key, from a string like "foo" to the content of a JPEG file.

Although in the redis-protocol library the function redis_keyslot wrongly accepts key as &str type.

In combination with String::from_utf8_lossy conversion in https://github.com/aembke/fred.rs/blob/main/src/commands/lua.rs#L26 it produces wrong keys.

Unfortuantely this is not a single issue. In https://github.com/aembke/redis-protocol.rs/blob/main/src/utils.rs#L360 utf8 string key is indexed using byte index which not always correct because index can be not a UTF-8 code point boundary (which generates panic).

How to set a map value

Hi all:

I want to set a map data with RedisMap

let redis_key = "KEYS".into();
let mut redis_map_value = RedisMap::new();
redis_map_value.insert(
       "foo_key".to_string(),
      RedisValue::Integer(123),
 );
let value = RedisValue::Map(redis_map_value);

let _ = redis_client
                .set::<RedisValue, String, RedisValue>(
                    redis_key,
                    value,
                    Some(Expiration::EX(10)),
                    None,
                    false,
                )
                .await;

No error was found, but I can not get the key and value in Redis server also, what's happened, and how to do that?

Cluster Redis should reconnect to hosts other than those configured

Great work btw!

When connecting to a Redis cluster, if it connects to a master node the connection will not update itself with the other nodes. If the master fails at this point the client will lose all connection as it did not update the RedisInnerClient. It seems as though sync_cluster is only called on handle_connection_closed.

I feel like this should be a fairly simple fix of adding a call to sync_cluster, and I could open a PR if you want, but I could be overlooking something trivial.

Also I just want to confirm that this line does not contain a a bug:

if parts[2].contains("master") {

The third column of the CLUSTER NODES command can also contain a list of flags such as master,fail. Would it be the right move here to not include nodes that are failed?

Stats values units are undocumented

Hello,
I tried to use the metrics feature and paid attention that Stats struct used to get latency numbers has undocumented units.
I figured out that it's milliseconds after checking the source code.
Just curious, maybe using the standard Duration type could be more useful to get desired units.

Get on non-existent key returns wrong error

Hello, when I try to get a non-existent key I receive this error: Redis Error - kind: Parse, details: Cannot convert to number., but I believe this is the wrong type of error for the triggered action.

What I would expect

Since the Redis CLI returns nil maybe the library should just return an Option::None, or a different kind of RedisError like NotFound.

Personally I think both solutions are right and in an ideal world there should be an option for both behaviors, but for my use case a different kind of error it's preferred since I want to do a specific action on specific errors and the error propagation operator (?) would help me achieve this more simply and in a cleaner way.

Get rid of global configs

I think it's necessary to drop global configs and move them to structure-based ones.

Motivation: there are cases when an application should use different clients (I mean several fred-clients to different clusters/nodes), and they might have different configs (e.g default_connection_timeout_ms)

[`6.0.0-beta.1`] panic with `partial-tracing` for `debug` lvl on reconnect

When I tested re-connection with current version in the main branch, I faced with panic when I enabled debug level tracing.

2022-12-07T09:55:31.463755Z DEBUG fred::multiplexer::clustered: fred-mpjrO24y5w: Finish synchronizing cluster connections.
    at /usr/local/cargo/git/checkouts/fred.rs-3fb6d817444a1870/4f0c3f4/src/multiplexer/clustered.rs:622 on reactor
thread 'reactor' panicked at 'tried to clone Id(2251799813685263), but no span exists with that ID
This may be caused by consuming a parent span (`parent: span`) rather than borrowing it (`parent: &span`).', /usr/local/cargo/registry/src/github.com-1ecc6299db9ec823/tracing-subscriber-0.3.16/src/registry/sharded.rs:311:32
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

It doesn't appear when I use info level and reconnection works totally fine (with this fix)
Thanks for all your efforts @aembke!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.