aembke / fred.rs Goto Github PK
View Code? Open in Web Editor NEWAn async Redis client for Rust.
License: Apache License 2.0
An async Redis client for Rust.
License: Apache License 2.0
Hi - Thank you for a great library!
We have been experiencing some connection management issues (including using latest version 5.2.0
).
Using fred as a redis client in a kubernetes cluster (redis is self-hosted in k8s) - if the redis instance is moved to another node, we end up in a corrupted state with connection management.
This can simply be reproduced doing kubectl scale deploy/redis --replicas 0
.
We have tested with/without pipelining. With/without connection pool. Config is using exponential reconnect policy + command timeout. This is a trace of the events.
Initially we get a connection - everything is fine:
DEBUG fred::multiplexer::commands > fred-JUKIBXKdhV: Initializing connections...
TRACE fred::protocol::types > fred-JUKIBXKdhV: Using 10.0.78.183 among 1 possible socket addresses for redis:6379
TRACE fred::multiplexer::utils > fred-JUKIBXKdhV: Connecting to 10.0.78.183:6379
TRACE mio::poll > registering event source with poller: token=Token(0), interests=READABLE | WRITABLE
DEBUG fred::protocol::connection > fred-JUKIBXKdhV: Skip setting client name.
TRACE redis_protocol::utils > allocating more, len: 0, amt: 24
TRACE fred::protocol::codec > fred-JUKIBXKdhV: Encoded 24 bytes to 10.0.78.183:6379. Buffer len: 24 (RESP2)
TRACE tokio_util::codec::framed_impl > flushing framed transport
TRACE tokio_util::codec::framed_impl > writing; remaining=24
TRACE tokio_util::codec::framed_impl > framed transport flushed
TRACE tokio_util::codec::framed_impl > attempting to decode a frame
TRACE fred::protocol::codec > fred-JUKIBXKdhV: Recv 4 bytes from 10.0.78.183:6379 (RESP2).
TRACE fred::protocol::codec > fred-JUKIBXKdhV: Parsed 4 bytes from 10.0.78.183:6379
TRACE tokio_util::codec::framed_impl > frame decoded from buffer
DEBUG fred::protocol::connection > fred-JUKIBXKdhV: Read client ID: Resp2(Integer(7))
DEBUG fred::multiplexer::utils > fred-JUKIBXKdhV: Creating new close tx sender.
DEBUG fred::multiplexer::utils > fred-JUKIBXKdhV: Set centralized connection closed sender.
DEBUG fred::multiplexer::utils > fred-JUKIBXKdhV: Emitting connect message.
Now we scale down the deployment:
kubectl scale deploy/redis --replicas 0
Fred realizes that the connection is lost:
DEBUG fred::multiplexer::commands > fred-JUKIBXKdhV: Starting command stream...
DEBUG fred::multiplexer::commands > fred-JUKIBXKdhV: Set inner connection closed sender.
TRACE tokio_util::codec::framed_impl > attempting to decode a frame
TRACE fred::protocol::codec > fred-JUKIBXKdhV: Recv 0 bytes from 10.0.78.183:6379 (RESP2).
TRACE fred::protocol::codec > fred-JUKIBXKdhV: Recv 0 bytes from 10.0.78.183:6379 (RESP2).
DEBUG fred::multiplexer::utils > fred-JUKIBXKdhV: Redis frame stream closed with error Redis Error - kind: Canceled, details: Canceled.
DEBUG fred::multiplexer::utils > fred-JUKIBXKdhV: Emit connection closed from error: Redis Error - kind: Canceled, details: Canceled.
TRACE fred::multiplexer::utils > fred-JUKIBXKdhV: Emitting connection closed with 0 messages
DEBUG fred::multiplexer::commands > fred-JUKIBXKdhV: Recv reconnect message with 0 commands. State: Disconnected
INFO fred::multiplexer::commands > fred-JUKIBXKdhV: Sleeping for 132 ms before reconnecting
TRACE fred::protocol::types > fred-JUKIBXKdhV: Using 10.0.78.183 among 1 possible socket addresses for redis:6379
TRACE fred::multiplexer::utils > fred-JUKIBXKdhV: Connecting to 10.0.78.183:6379
TRACE mio::poll > registering event source with poller: token=Token(1), interests=READABLE | WRITABLE
Notice at this time there are no pods running redis, so the reconnect will just hang.
Now we attempt to execute a command against redis, using a client from the connection pool:
TRACE fred::multiplexer::commands > fred-JUKIBXKdhV: Recv command on multiplexer SET. Buffer len: 0
DEBUG fred::multiplexer::commands > fred-JUKIBXKdhV: Will block multiplexer loop waiting on SET to finish.
TRACE fred::multiplexer > fred-JUKIBXKdhV: Skip waiting on cluster sync.
DEBUG fred::multiplexer::utils > fred-JUKIBXKdhV: Writing command SET to redis:6379
TRACE fred::protocol::connection > fred-JUKIBXKdhV: Sending command and flushing the sink.
TRACE redis_protocol::utils > allocating more, len: 0, amt: 40
TRACE fred::protocol::codec > fred-JUKIBXKdhV: Encoded 40 bytes to 10.0.78.183:6379. Buffer len: 40 (RESP2)
TRACE tokio_util::codec::framed_impl > flushing framed transport
TRACE tokio_util::codec::framed_impl > writing; remaining=40
WARN fred::multiplexer::commands > fred-JUKIBXKdhV: Error writing command None: Redis Error - kind: IO, details: Os { code: 32, kind: BrokenPipe, message: "Broken pipe" }
DEBUG fred::multiplexer::commands > fred-JUKIBXKdhV: Reconnecting or stopping due to error: Redis Error - kind: IO, details: Os { code: 32, kind: BrokenPipe, message: "Broken pipe" }
DEBUG fred::multiplexer::utils > fred-JUKIBXKdhV: Emitting close all sockets message: Redis Error - kind: IO, details: Os { code: 32, kind: BrokenPipe, message: "Broken pipe" }
WARN fred::multiplexer::utils > fred-JUKIBXKdhV: Error sending close message to socket streams: SendError(Redis Error - kind: IO, details: Os { code: 32, kind: BrokenPipe, message: "Broken pipe" })
DEBUG fred::multiplexer::commands > fred-JUKIBXKdhV: Waiting for client to reconnect...
resp Err(Redis Error - kind: Timeout, details: Request timed out.)
Next we scale up redis:
kubectl scale deploy/redis --replicas 1
After a while the connection pool will reconnect:
DEBUG fred::protocol::connection > fred-JUKIBXKdhV: Skip setting client name.
TRACE redis_protocol::utils > allocating more, len: 0, amt: 24
TRACE fred::protocol::codec > fred-JUKIBXKdhV: Encoded 24 bytes to 10.0.78.183:6379. Buffer len: 24 (RESP2)
TRACE tokio_util::codec::framed_impl > flushing framed transport
TRACE tokio_util::codec::framed_impl > writing; remaining=24
TRACE tokio_util::codec::framed_impl > framed transport flushed
TRACE tokio_util::codec::framed_impl > attempting to decode a frame
TRACE fred::protocol::codec > fred-JUKIBXKdhV: Recv 4 bytes from 10.0.78.183:6379 (RESP2).
TRACE fred::protocol::codec > fred-JUKIBXKdhV: Parsed 4 bytes from 10.0.78.183:6379
TRACE tokio_util::codec::framed_impl > frame decoded from buffer
DEBUG fred::protocol::connection > fred-JUKIBXKdhV: Read client ID: Resp2(Integer(3))
DEBUG fred::multiplexer::utils > fred-JUKIBXKdhV: Set centralized connection closed sender.
TRACE mio::poll > deregistering event source from poller
DEBUG fred::multiplexer::commands > fred-JUKIBXKdhV: Reconnect task finished reconnecting or syncing with: Ok(())
DEBUG fred::multiplexer::commands > fred-JUKIBXKdhV: Sending 0 commands after reconnecting.
DEBUG fred::multiplexer::utils > fred-JUKIBXKdhV: Emitting connect message.
TRACE tokio_util::codec::framed_impl > attempting to decode a frame
TRACE fred::protocol::codec > fred-JUKIBXKdhV: Recv 0 bytes from 10.0.78.183:6379 (RESP2).
And now we have the problem - if we try to execute a command using the client from the connection pool:
TRACE fred::multiplexer::commands > fred-JUKIBXKdhV: Recv command on multiplexer SET. Buffer len: 0
DEBUG fred::multiplexer::commands > fred-JUKIBXKdhV: Will block multiplexer loop waiting on SET to finish.
TRACE fred::multiplexer > fred-JUKIBXKdhV: Skip waiting on cluster sync.
DEBUG fred::multiplexer::utils > fred-JUKIBXKdhV: Writing command SET to redis:6379
TRACE fred::protocol::connection > fred-JUKIBXKdhV: Sending command and flushing the sink.
TRACE redis_protocol::utils > allocating more, len: 0, amt: 40
TRACE fred::protocol::codec > fred-JUKIBXKdhV: Encoded 40 bytes to 10.0.78.183:6379. Buffer len: 40 (RESP2)
TRACE tokio_util::codec::framed_impl > flushing framed transport
TRACE tokio_util::codec::framed_impl > writing; remaining=40
TRACE tokio_util::codec::framed_impl > framed transport flushed
DEBUG fred::multiplexer::commands > fred-JUKIBXKdhV: Waiting on last request to finish without pipelining.
TRACE tokio_util::codec::framed_impl > attempting to decode a frame
TRACE fred::protocol::codec > fred-JUKIBXKdhV: Recv 5 bytes from 10.0.78.183:6379 (RESP2).
TRACE fred::protocol::codec > fred-JUKIBXKdhV: Parsed 5 bytes from 10.0.78.183:6379
TRACE tokio_util::codec::framed_impl > frame decoded from buffer
TRACE fred::multiplexer::responses > fred-JUKIBXKdhV: Processing response from redis:6379 to SET with frame kind SimpleString
TRACE fred::multiplexer::responses > fred-JUKIBXKdhV: Writing to multiplexer sender to unblock command loop.
WARN fred::multiplexer::responses > fred-JUKIBXKdhV: Error sending cmd loop response: ()
TRACE fred::multiplexer::responses > fred-JUKIBXKdhV: Responding to caller for SET
WARN fred::multiplexer::responses > fred-JUKIBXKdhV: Failed to respond to caller.
TRACE tokio_util::codec::framed_impl > attempting to decode a frame
TRACE fred::protocol::codec > fred-JUKIBXKdhV: Recv 0 bytes from 10.0.78.183:6379 (RESP2).
resp Err(Redis Error - kind: Timeout, details: Request timed out.)
The command is actually executed (we can verify this using redis-cli
and checking that the key has been SET in the redis cluster.). All following requests using this client will now result in timeouts (although the command is in fact executed!) .
We use tokio v1.20.1
for reference.
If we use a headless service in k8s, we do not experience this problem (as the DNS will resolve with 0 ip-addresses when the deployment is scaled to 0 replicas, and resolve the new ip-address once the pod has been moved to the other node).
actix-web = "4.2.1"
fred = "5.2.0"
Using RedisPoo in actix-web will prompt that method cannot be found, such an error
error[E0599]: no method named `set` found for struct `RedisPool` in the current scope
--> src/service/user.rs:58:29
|
58 | let _= pool.set("foo123", "1").await.unwrap();
| ^^^ method not found in `RedisPool`
Running this before HttpServer::new works, but inside Request Handlers this error occurs
Is there a way to disable the CLIENT SETNAME
call during authentication?
Proxies like twemproxy don't support it.
Redis version - 6.1.0
Platform - linux|mac|windows
Using Docker and/or Kubernetes - yes|no
Deployment type - cluster|sentinel|centralized
hi,developers,I found the error log from reconnecting failed is debug format , that means I open info mode, I cannot realize this error when the redis is disconnected.
To Reproduce
Steps to reproduce the behavior:
if let Err(e) = router.connect().await {
_debug!(inner, "Failed reconnecting with error: {:?}", e);
client_utils::set_client_state(&inner.state, ClientState::Disconnected);
inner.notifications.broadcast_error(e.clone());
Err(e)
}
Logs
(Set RUST_LOG=fred=trace
and run with --features debug-ids
)
Additional context
I hope this _debug mode can be set to _error,thanks a lot
Redis version - doesn't matter
Platform - doesn't matter
Using Docker and/or Kubernetes - no
Deployment type - cluster
Describe the bug
DNS might contain multiple records, but current interface of DnsResolver
expects only single address.
Don't think that this functionality should be part of implementation of DnsResolver
, instead trait should be changed to return slice/vector of addresses. For example, reqwest
crate has such definition of the resolver and example of handling
To Reproduce
Steps to reproduce the behavior:
A or AAAA
records for domainLogs
It's just connection errors, because dns-resolver can provide only one IP
Redis version - 6.0.5 via ElastiCache
Platform - n/a
Using Docker and/or Kubernetes - n/an
Deployment type - cluster
Describe the bug
When connecting to replicas fred parses the response of the info replication
command to find the replicas for a given node in the cluster [relevant pointer].
For ElastiCache (and possibly others), when compared to the cluster slots
command, this yields at best, a different IP address and at worse an address that mightn't be accessible.
To Reproduce
Problematically I think this might rely on an AWS setup thing, rather than generalizable outside.
Output of cluster slots
:
...
xx) 1) (integer) 16242
2) (integer) 16383
3) 1) "10.210.aaa.bbb"
2) (integer) 6379
3) "1a13137169931ad8db388fae36e96b9fbb84b672"
4) 1) "10.210.ccc.ddd"
2) (integer) 6379
3) "a57886a0ea4123a2a63a32f4c907b6c90f84d06d"
From the master side:
❯ timeout 3 redis-cli -h 10.210.aaa.bbb -p 6379 info replication
# Replication
role:master
connected_slaves:1
slave0:ip=10.31.yyy.zzz,port=6379,state=online,offset=5541916556922,lag=1
...
And from the replica side using the ip from cluster slots
:
❯ timeout 3 redis-cli -h 10.210.ccc.ddd -p 6379 info replication
# Replication
role:slave
master_host:10.210.aaa.bbb
master_port:6379
...
... and using the ip from the info replication
output:
❯ timeout 3 redis-cli -h 10.31.yyy.zzz -p 6379 info replication
❯ echo $?
124
Logs
(Set RUST_LOG=fred=trace
and run with --features debug-ids
)
Can provide.
Additional context
Add any other context about the problem here.
The library behaves incorrectly in a Redis cluster failover scenario.
Reproduce scenario:
Problems:
Bottom line: If the connection to the master node of the cluster is lost, the library endlessly unsuccessfully tries to connect to it. This does not reconfigure the cluster and slots on the client side
Redis version - 7.0.5
Platform - mac
Using Docker and/or Kubernetes - no
Deployment type - centralized (pool)
fred.rs: 6.3.0
Invalidation messages are received with RedisClient but not with RedisPool.
The following code works:
// redis_config - server: centralised(localhost), version: resp3, all else default
let basic = RedisClient::new(redis_config.clone(), None, None);
let _ = basic.connect();
basic.wait_for_connect().await?;
basic
.start_tracking(
vec!["data"], // PREFIX
true, // BCAST
false, // OPTIN (not compatible with BCAST)
false, // OPTOUT (not compatible with BCAST)
false, // NOLOOP (doesn't matter, we never modify keys)
)
.await?;
{
let mut invalidations = basic.on_invalidation();
tokio::spawn(async move {
while let Ok(invalidation) = invalidations.recv().await {
println!("invalidation event");
}
});
}
This prints "invalidation event" when you change a key starting with "data".
However, replace the first line to use RedisPool and it no longer prints anything.
let basic = RedisPool::new(redis_config.clone(), None, None, 5)?;
Logs
I didn't get any log output 🤔
i write the code like the example, but it error, i do not know why
now, match it , it said:
--> src\controller.rs:87:18
|
87 | match client.set(key, val, None, None, false).await {
| ^^^ cannot infer type for type parameter R
declared on the associated function set
i look the code R is FromRedis, It has been implemented for several types
i do not know how to fix it
Wanted to check some behavior we're seeing when we have a node in our cluster fails over, or loses connection.
For context, we run what is essentially a proxy service for many different storage backends, taking requests for items, modifying and routing them to the correct backend.
We see three things we wanted to check --
Connections are re-established to all nodes, not just the failed node
We see this manifest with logs (from the on_error
channel) for each node of the form Ending replica reader task from xxx.xxx.xxx.xxx:6379 due to None
, with the original first error being related to the first connection dropping.
During a connection failover we see the memory usage of the containers for our service increase over time, often with two phases (highlighted example).
timeout
with a setting about 20ms higher than fred's PerformanceConfig.default_command_timeout_ms
), e.g. ->(The yellow and purple lines are for the same underlying redis cluster, using amazon's data-tiering cache product, while the blue is a regular memory redis (also via AWS elasticache).
The settings that we're using are mostly defaulted...
let mut redis_config = RedisConfig::from_url(&format!("redis-cluster://{}", host))
.expect("Could not create `RedisConfig`");
redis_config.replica = fred::types::ReplicaConfig {
lazy_connections: false,
connection_error_count: 1,
..Default::default()
};
let performance_config = fred::types::PerformanceConfig {
default_command_timeout_ms: 50,
max_command_attempts: 1,
backpressure: fred::types::BackpressureConfig {
disable_auto_backpressure: true,
..Default::default()
},
..Default::default()
};
let reconnect_policy = fred::types::ReconnectPolicy::default();
Any guidance for whether this is expected? And if it is then any settings that we've missed that could help alleviate these observed problems would be really helpful.
Can provide more logs/metrics that you think might be useful. TIA 🙇
Hi,
Running MULTI, SET, PUBLISH, and EXEC (in that order) on 3 nodes Redis cluster is not always atomically executed. PUBLISH is not executed on the same node as MULTI, therefore it is not queued.
I believe that it is caused by the fact that fred considers PUBLISH as "non cluster" command and executes it on the random node.
let trx = client.multi(true).await?;
let res: RedisValue = trx.set("foox", "bar", None, None, false).await?;
assert!(res.is_queued());
let res2: RedisValue = trx.publish("foox", "this is body x").await?;
assert!(res2.is_queued()); // this fails depending on selected random node
trx.exec().await?;
Redis version: 7.0.5
Platform - Mac
Using Docker and/or Kubernetes - no
Deployment type - centralized
Describe the bug
blocking
example connects to redis and does nothing forever with --partial-tracing
or --full-tracing
features enabled.
Race condition?
To Reproduce
Steps to reproduce the behavior:
RUST_LOG=trace cargo run --example blocking --features="full-tracing"
Compiling fred v6.0.0 (/Users/kika/Scratch/fred.rs)
Finished dev [unoptimized + debuginfo] target(s) in 24.43s
Running `target/debug/examples/blocking`
DEBUG fred::multiplexer::commands > fred-OuyjL80Ik2: Initializing multiplexer with policy: None
DEBUG fred::multiplexer::centralized > fred-OuyjL80Ik2: Initializing centralized connection.
TRACE fred::protocol::connection > fred-OuyjL80Ik2: Checking connection type. Native-tls: false, Rustls: false
DEBUG fred::multiplexer::commands > fred-7EMjtIUIYO: Initializing multiplexer with policy: None
DEBUG fred::multiplexer::centralized > fred-7EMjtIUIYO: Initializing centralized connection.
TRACE fred::protocol::connection > fred-7EMjtIUIYO: Checking connection type. Native-tls: false, Rustls: false
TRACE fred::protocol::types > fred-OuyjL80Ik2: Using 127.0.0.1 among 1 possible socket addresses for 127.0.0.1:6379
TRACE fred::protocol::types > fred-7EMjtIUIYO: Using 127.0.0.1 among 1 possible socket addresses for 127.0.0.1:6379
DEBUG fred::protocol::connection > fred-OuyjL80Ik2: Creating TCP connection to 127.0.0.1 at 127.0.0.1:6379
TRACE mio::poll > registering event source with poller: token=Token(0), interests=READABLE | WRITABLE
DEBUG fred::protocol::connection > fred-7EMjtIUIYO: Creating TCP connection to 127.0.0.1 at 127.0.0.1:6379
TRACE mio::poll > registering event source with poller: token=Token(1), interests=READABLE | WRITABLE
TRACE fred::protocol::connection > fred-7EMjtIUIYO: Skip authentication without credentials.
DEBUG fred::protocol::connection > fred-7EMjtIUIYO: Setting client name.
TRACE redis_protocol::utils > allocating more, len: 0, amt: 51
TRACE fred::protocol::codec > fred-7EMjtIUIYO: Encoded 51 bytes to 127.0.0.1:6379. Buffer len: 51 (RESP2)
TRACE fred::protocol::connection > fred-OuyjL80Ik2: Skip authentication without credentials.
DEBUG fred::protocol::connection > fred-OuyjL80Ik2: Setting client name.
TRACE redis_protocol::utils > allocating more, len: 0, amt: 51
TRACE fred::protocol::codec > fred-OuyjL80Ik2: Encoded 51 bytes to 127.0.0.1:6379. Buffer len: 51 (RESP2)
TRACE fred::protocol::codec > fred-OuyjL80Ik2: Recv 5 bytes from 127.0.0.1:6379 (RESP2).
TRACE fred::protocol::codec > fred-OuyjL80Ik2: Parsed 5 bytes from 127.0.0.1:6379
DEBUG fred::protocol::connection > fred-OuyjL80Ik2: Successfully set Redis client name.
TRACE redis_protocol::utils > allocating more, len: 0, amt: 24
TRACE fred::protocol::codec > fred-OuyjL80Ik2: Encoded 24 bytes to 127.0.0.1:6379. Buffer len: 24 (RESP2)
TRACE fred::protocol::codec > fred-OuyjL80Ik2: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
TRACE fred::protocol::codec > fred-7EMjtIUIYO: Recv 5 bytes from 127.0.0.1:6379 (RESP2).
TRACE fred::protocol::codec > fred-7EMjtIUIYO: Parsed 5 bytes from 127.0.0.1:6379
DEBUG fred::protocol::connection > fred-7EMjtIUIYO: Successfully set Redis client name.
TRACE redis_protocol::utils > allocating more, len: 0, amt: 24
TRACE fred::protocol::codec > fred-7EMjtIUIYO: Encoded 24 bytes to 127.0.0.1:6379. Buffer len: 24 (RESP2)
TRACE fred::protocol::codec > fred-7EMjtIUIYO: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
TRACE fred::protocol::codec > fred-OuyjL80Ik2: Recv 5 bytes from 127.0.0.1:6379 (RESP2).
TRACE fred::protocol::codec > fred-OuyjL80Ik2: Parsed 5 bytes from 127.0.0.1:6379
DEBUG fred::protocol::connection > fred-OuyjL80Ik2: Read client ID: Number { data: 41, attributes: None }
TRACE redis_protocol::utils > allocating more, len: 0, amt: 26
TRACE fred::protocol::codec > fred-OuyjL80Ik2: Encoded 26 bytes to 127.0.0.1:6379. Buffer len: 26 (RESP2)
TRACE fred::protocol::codec > fred-OuyjL80Ik2: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
TRACE fred::protocol::codec > fred-7EMjtIUIYO: Recv 5 bytes from 127.0.0.1:6379 (RESP2).
TRACE fred::protocol::codec > fred-7EMjtIUIYO: Parsed 5 bytes from 127.0.0.1:6379
DEBUG fred::protocol::connection > fred-7EMjtIUIYO: Read client ID: Number { data: 42, attributes: None }
TRACE redis_protocol::utils > allocating more, len: 0, amt: 26
TRACE fred::protocol::codec > fred-7EMjtIUIYO: Encoded 26 bytes to 127.0.0.1:6379. Buffer len: 26 (RESP2)
TRACE fred::protocol::codec > fred-7EMjtIUIYO: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
TRACE fred::protocol::codec > fred-OuyjL80Ik2: Recv 598 bytes from 127.0.0.1:6379 (RESP2).
TRACE fred::protocol::codec > fred-OuyjL80Ik2: Parsed 598 bytes from 127.0.0.1:6379
DEBUG fred::protocol::connection > fred-OuyjL80Ik2: Read server version Some(Version { major: 7, minor: 0, patch: 5 })
DEBUG fred::modules::inner > fred-OuyjL80Ik2: No `on_reconnect` listeners.
DEBUG fred::multiplexer::commands > fred-OuyjL80Ik2: Starting command processing stream...
TRACE fred::protocol::codec > fred-OuyjL80Ik2: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
TRACE fred::protocol::codec > fred-7EMjtIUIYO: Recv 598 bytes from 127.0.0.1:6379 (RESP2).
TRACE fred::protocol::codec > fred-7EMjtIUIYO: Parsed 598 bytes from 127.0.0.1:6379
DEBUG fred::protocol::connection > fred-7EMjtIUIYO: Read server version Some(Version { major: 7, minor: 0, patch: 5 })
DEBUG fred::modules::inner > fred-7EMjtIUIYO: No `on_connect` listeners.
DEBUG fred::modules::inner > fred-7EMjtIUIYO: No `on_reconnect` listeners.
DEBUG fred::multiplexer::commands > fred-7EMjtIUIYO: Starting command processing stream...
TRACE fred::protocol::codec > fred-7EMjtIUIYO: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
Works as expected without tracing features
Redis version - 7.0.8
Platform - linux
Using Docker and/or Kubernetes - yes
Deployment type - centralized
Describe the bug
Trying code from https://docs.rs/fred/latest/fred/clients/struct.SubscriberClient.html example wait forever on command:
let _ = jh.await;
No quit notification on message_rx.recv().
With version 5.2 the example exit correctly.
This issue block the correct shutdown of programs build on fred.
To Reproduce
Steps to reproduce the behavior:
Logs
2023-04-07T23:01:02.440 [tokio-runtime-worker-2] DEBUG fred::router::commands - fred-joVadhOLVQ: Initializing router with policy: Some(Constant { attempts: 0, max_attempts: 0, delay: 1000, jitter: 100 })
2023-04-07T23:01:02.440 [tokio-runtime-worker-2] DEBUG fred::router::centralized - fred-joVadhOLVQ: Initializing centralized connection.
2023-04-07T23:01:02.441 [tokio-runtime-worker-2] TRACE fred::protocol::connection - fred-joVadhOLVQ: Checking connection type. Native-tls: false, Rustls: false
2023-04-07T23:01:02.441 [tokio-runtime-worker-2] TRACE fred::utils - Using timeout: 60000 ms
2023-04-07T23:01:02.441 [tokio-runtime-worker-10] TRACE fred::protocol::types - fred-joVadhOLVQ: Using 127.0.0.1 among 1 possible socket addresses for 127.0.0.1:6379
2023-04-07T23:01:02.442 [tokio-runtime-worker-2] DEBUG fred::protocol::connection - fred-joVadhOLVQ: Creating TCP connection to 127.0.0.1 at 127.0.0.1:6379
2023-04-07T23:01:02.443 [tokio-runtime-worker-8] TRACE fred::utils - Using timeout: 60000 ms
2023-04-07T23:01:02.443 [tokio-runtime-worker-8] DEBUG fred::protocol::connection - fred-joVadhOLVQ: Authenticating Redis client...
2023-04-07T23:01:02.444 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Encoded 28 bytes to 127.0.0.1:6379. Buffer len: 28 (RESP2)
2023-04-07T23:01:02.446 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 5 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.447 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Parsed 5 bytes from 127.0.0.1:6379
2023-04-07T23:01:02.447 [tokio-runtime-worker-8] DEBUG fred::protocol::connection - fred-joVadhOLVQ: Setting client name.
2023-04-07T23:01:02.448 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Encoded 51 bytes to 127.0.0.1:6379. Buffer len: 51 (RESP2)
2023-04-07T23:01:02.448 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.449 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 5 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.449 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Parsed 5 bytes from 127.0.0.1:6379
2023-04-07T23:01:02.449 [tokio-runtime-worker-8] DEBUG fred::protocol::connection - fred-joVadhOLVQ: Successfully set Redis client name.
2023-04-07T23:01:02.449 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Encoded 24 bytes to 127.0.0.1:6379. Buffer len: 24 (RESP2)
2023-04-07T23:01:02.450 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.450 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 5 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.450 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Parsed 5 bytes from 127.0.0.1:6379
2023-04-07T23:01:02.451 [tokio-runtime-worker-8] DEBUG fred::protocol::connection - fred-joVadhOLVQ: Read client ID: Number { data: 14, attributes: None }
2023-04-07T23:01:02.451 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Encoded 26 bytes to 127.0.0.1:6379. Buffer len: 26 (RESP2)
2023-04-07T23:01:02.451 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.452 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 628 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.452 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Parsed 628 bytes from 127.0.0.1:6379
2023-04-07T23:01:02.452 [tokio-runtime-worker-8] DEBUG fred::protocol::connection - fred-joVadhOLVQ: Read server version Some(Version { major: 7, minor: 0, patch: 8 })
2023-04-07T23:01:02.452 [tokio-runtime-worker-8] DEBUG fred::modules::inner - fred-joVadhOLVQ: No on_reconnect
listeners.
2023-04-07T23:01:02.452 [tokio-runtime-worker-8] DEBUG fred::router::commands - fred-joVadhOLVQ: Starting command processing stream...
2023-04-07T23:01:02.452 [main-1] TRACE fred::interfaces - fred-joVadhOLVQ: Sending command SUBSCRIBE (5) to router.
2023-04-07T23:01:02.453 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.453 [tokio-runtime-worker-8] TRACE fred::router::commands - fred-joVadhOLVQ: Recv command: RouterCommand { kind: "Command", command: "SUBSCRIBE" }
2023-04-07T23:01:02.453 [tokio-runtime-worker-8] TRACE fred::protocol::command - fred-joVadhOLVQ: Pipeline check SUBSCRIBE: true
2023-04-07T23:01:02.453 [tokio-runtime-worker-8] TRACE fred::router::utils - fred-joVadhOLVQ: Writing SUBSCRIBE (5). Timed out: false, Force flush: false
2023-04-07T23:01:02.453 [tokio-runtime-worker-8] TRACE fred::router::utils - fred-joVadhOLVQ: Sending command SUBSCRIBE (5) to 127.0.0.1:6379
2023-04-07T23:01:02.453 [tokio-runtime-worker-8] TRACE fred::protocol::connection - Writing and flushing 127.0.0.1:6379
2023-04-07T23:01:02.454 [tokio-runtime-worker-8] TRACE fred::protocol::codec - fred-joVadhOLVQ: Encoded 28 bytes to 127.0.0.1:6379. Buffer len: 28 (RESP2)
2023-04-07T23:01:02.454 [tokio-runtime-worker-8] TRACE fred::router::commands - fred-joVadhOLVQ: Sent command to 127.0.0.1:6379. Flushed: true
2023-04-07T23:01:02.455 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 32 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.455 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Parsed 32 bytes from 127.0.0.1:6379
2023-04-07T23:01:02.455 [tokio-runtime-worker-2] TRACE fred::router::centralized - fred-joVadhOLVQ: Parsing response frame from 127.0.0.1:6379
2023-04-07T23:01:02.455 [tokio-runtime-worker-2] TRACE fred::router::centralized - fred-joVadhOLVQ: Checking response to SUBSCRIBE (5)
2023-04-07T23:01:02.455 [tokio-runtime-worker-2] TRACE fred::router::centralized - fred-joVadhOLVQ: Handling centralized response kind: Multiple
2023-04-07T23:01:02.455 [tokio-runtime-worker-2] TRACE fred::protocol::responders - fred-joVadhOLVQ: Handling multiple
response from 127.0.0.1:6379 for SUBSCRIBE
2023-04-07T23:01:02.456 [tokio-runtime-worker-2] TRACE fred::protocol::responders - fred-joVadhOLVQ: Finishing multiple
response from 127.0.0.1:6379 for SUBSCRIBE
2023-04-07T23:01:02.456 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.456 [main-1] TRACE fred::interfaces - fred-joVadhOLVQ: Sending command PSUBSCRIBE (6) to router.
2023-04-07T23:01:02.456 [tokio-runtime-worker-2] TRACE fred::router::commands - fred-joVadhOLVQ: Recv command: RouterCommand { kind: "Command", command: "PSUBSCRIBE" }
2023-04-07T23:01:02.457 [tokio-runtime-worker-2] TRACE fred::protocol::command - fred-joVadhOLVQ: Pipeline check PSUBSCRIBE: false
2023-04-07T23:01:02.457 [tokio-runtime-worker-2] TRACE fred::router::utils - fred-joVadhOLVQ: Writing PSUBSCRIBE (6). Timed out: false, Force flush: false
2023-04-07T23:01:02.457 [tokio-runtime-worker-2] TRACE fred::router::utils - fred-joVadhOLVQ: Sending command PSUBSCRIBE (6) to 127.0.0.1:6379
2023-04-07T23:01:02.457 [tokio-runtime-worker-2] TRACE fred::protocol::connection - Writing and flushing 127.0.0.1:6379
2023-04-07T23:01:02.457 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Encoded 31 bytes to 127.0.0.1:6379. Buffer len: 31 (RESP2)
2023-04-07T23:01:02.458 [tokio-runtime-worker-2] TRACE fred::router::commands - fred-joVadhOLVQ: Sent command to 127.0.0.1:6379. Flushed: true
2023-04-07T23:01:02.458 [tokio-runtime-worker-2] DEBUG fred::router::commands - fred-joVadhOLVQ: Waiting on router channel.
2023-04-07T23:01:02.459 [tokio-runtime-worker-9] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 35 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.459 [tokio-runtime-worker-9] TRACE fred::protocol::codec - fred-joVadhOLVQ: Parsed 35 bytes from 127.0.0.1:6379
2023-04-07T23:01:02.459 [tokio-runtime-worker-9] TRACE fred::router::centralized - fred-joVadhOLVQ: Parsing response frame from 127.0.0.1:6379
2023-04-07T23:01:02.459 [tokio-runtime-worker-9] TRACE fred::router::centralized - fred-joVadhOLVQ: Checking response to PSUBSCRIBE (6)
2023-04-07T23:01:02.459 [tokio-runtime-worker-9] TRACE fred::router::centralized - fred-joVadhOLVQ: Handling centralized response kind: Multiple
2023-04-07T23:01:02.459 [tokio-runtime-worker-9] TRACE fred::protocol::responders - fred-joVadhOLVQ: Handling multiple
response from 127.0.0.1:6379 for PSUBSCRIBE
2023-04-07T23:01:02.459 [tokio-runtime-worker-9] TRACE fred::protocol::responders - fred-joVadhOLVQ: Finishing multiple
response from 127.0.0.1:6379 for PSUBSCRIBE
Tracking channels: {"foo"}
Tracking patterns: {"bar*"}
2023-04-07T23:01:02.459 [tokio-runtime-worker-9] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.460 [main-1] TRACE fred::interfaces - fred-joVadhOLVQ: Sending command SUBSCRIBE (7) to router.
2023-04-07T23:01:02.460 [tokio-runtime-worker-9] DEBUG fred::router::commands - fred-joVadhOLVQ: Recv router response.
2023-04-07T23:01:02.460 [tokio-runtime-worker-9] TRACE fred::router::commands - fred-joVadhOLVQ: Recv command: RouterCommand { kind: "Command", command: "SUBSCRIBE" }
2023-04-07T23:01:02.460 [tokio-runtime-worker-9] TRACE fred::protocol::command - fred-joVadhOLVQ: Pipeline check SUBSCRIBE: true
2023-04-07T23:01:02.460 [tokio-runtime-worker-9] TRACE fred::router::utils - fred-joVadhOLVQ: Writing SUBSCRIBE (7). Timed out: false, Force flush: false
2023-04-07T23:01:02.460 [tokio-runtime-worker-9] TRACE fred::router::utils - fred-joVadhOLVQ: Sending command SUBSCRIBE (7) to 127.0.0.1:6379
2023-04-07T23:01:02.461 [tokio-runtime-worker-9] TRACE fred::protocol::connection - Writing and flushing 127.0.0.1:6379
2023-04-07T23:01:02.461 [tokio-runtime-worker-9] TRACE fred::protocol::codec - fred-joVadhOLVQ: Encoded 28 bytes to 127.0.0.1:6379. Buffer len: 28 (RESP2)
2023-04-07T23:01:02.461 [tokio-runtime-worker-9] TRACE fred::router::commands - fred-joVadhOLVQ: Sent command to 127.0.0.1:6379. Flushed: true
2023-04-07T23:01:02.462 [tokio-runtime-worker-9] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 32 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.462 [tokio-runtime-worker-9] TRACE fred::protocol::codec - fred-joVadhOLVQ: Parsed 32 bytes from 127.0.0.1:6379
2023-04-07T23:01:02.462 [tokio-runtime-worker-9] TRACE fred::router::centralized - fred-joVadhOLVQ: Parsing response frame from 127.0.0.1:6379
2023-04-07T23:01:02.462 [tokio-runtime-worker-9] TRACE fred::router::centralized - fred-joVadhOLVQ: Checking response to SUBSCRIBE (7)
2023-04-07T23:01:02.462 [tokio-runtime-worker-9] TRACE fred::router::centralized - fred-joVadhOLVQ: Handling centralized response kind: Multiple
2023-04-07T23:01:02.462 [tokio-runtime-worker-9] TRACE fred::protocol::responders - fred-joVadhOLVQ: Handling multiple
response from 127.0.0.1:6379 for SUBSCRIBE
2023-04-07T23:01:02.462 [tokio-runtime-worker-9] TRACE fred::protocol::responders - fred-joVadhOLVQ: Finishing multiple
response from 127.0.0.1:6379 for SUBSCRIBE
2023-04-07T23:01:02.462 [tokio-runtime-worker-9] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.463 [main-1] TRACE fred::interfaces - fred-joVadhOLVQ: Sending command PSUBSCRIBE (8) to router.
2023-04-07T23:01:02.463 [tokio-runtime-worker-9] TRACE fred::router::commands - fred-joVadhOLVQ: Recv command: RouterCommand { kind: "Command", command: "PSUBSCRIBE" }
2023-04-07T23:01:02.463 [tokio-runtime-worker-9] TRACE fred::protocol::command - fred-joVadhOLVQ: Pipeline check PSUBSCRIBE: false
2023-04-07T23:01:02.463 [tokio-runtime-worker-9] TRACE fred::router::utils - fred-joVadhOLVQ: Writing PSUBSCRIBE (8). Timed out: false, Force flush: false
2023-04-07T23:01:02.463 [tokio-runtime-worker-9] TRACE fred::router::utils - fred-joVadhOLVQ: Sending command PSUBSCRIBE (8) to 127.0.0.1:6379
2023-04-07T23:01:02.463 [tokio-runtime-worker-9] TRACE fred::protocol::connection - Writing and flushing 127.0.0.1:6379
2023-04-07T23:01:02.464 [tokio-runtime-worker-9] TRACE fred::protocol::codec - fred-joVadhOLVQ: Encoded 31 bytes to 127.0.0.1:6379. Buffer len: 31 (RESP2)
2023-04-07T23:01:02.464 [tokio-runtime-worker-9] TRACE fred::router::commands - fred-joVadhOLVQ: Sent command to 127.0.0.1:6379. Flushed: true
2023-04-07T23:01:02.464 [tokio-runtime-worker-9] DEBUG fred::router::commands - fred-joVadhOLVQ: Waiting on router channel.
2023-04-07T23:01:02.466 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 35 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.466 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Parsed 35 bytes from 127.0.0.1:6379
2023-04-07T23:01:02.466 [tokio-runtime-worker-2] TRACE fred::router::centralized - fred-joVadhOLVQ: Parsing response frame from 127.0.0.1:6379
2023-04-07T23:01:02.466 [tokio-runtime-worker-2] TRACE fred::router::centralized - fred-joVadhOLVQ: Checking response to PSUBSCRIBE (8)
2023-04-07T23:01:02.466 [tokio-runtime-worker-2] TRACE fred::router::centralized - fred-joVadhOLVQ: Handling centralized response kind: Multiple
2023-04-07T23:01:02.466 [tokio-runtime-worker-2] TRACE fred::protocol::responders - fred-joVadhOLVQ: Handling multiple
response from 127.0.0.1:6379 for PSUBSCRIBE
2023-04-07T23:01:02.467 [tokio-runtime-worker-2] TRACE fred::protocol::responders - fred-joVadhOLVQ: Finishing multiple
response from 127.0.0.1:6379 for PSUBSCRIBE
2023-04-07T23:01:02.467 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.467 [tokio-runtime-worker-2] DEBUG fred::router::commands - fred-joVadhOLVQ: Recv router response.
2023-04-07T23:01:02.467 [main-1] TRACE fred::interfaces - fred-joVadhOLVQ: Sending command UNSUBSCRIBE (9) to router.
2023-04-07T23:01:02.467 [tokio-runtime-worker-2] TRACE fred::router::commands - fred-joVadhOLVQ: Recv command: RouterCommand { kind: "Command", command: "UNSUBSCRIBE" }
2023-04-07T23:01:02.467 [tokio-runtime-worker-2] TRACE fred::protocol::command - fred-joVadhOLVQ: Pipeline check UNSUBSCRIBE: true
2023-04-07T23:01:02.467 [tokio-runtime-worker-2] TRACE fred::router::utils - fred-joVadhOLVQ: Writing UNSUBSCRIBE (9). Timed out: false, Force flush: false
2023-04-07T23:01:02.467 [tokio-runtime-worker-2] TRACE fred::router::utils - fred-joVadhOLVQ: Sending command UNSUBSCRIBE (9) to 127.0.0.1:6379
2023-04-07T23:01:02.467 [tokio-runtime-worker-2] TRACE fred::protocol::connection - Writing and flushing 127.0.0.1:6379
2023-04-07T23:01:02.468 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Encoded 31 bytes to 127.0.0.1:6379. Buffer len: 31 (RESP2)
2023-04-07T23:01:02.468 [tokio-runtime-worker-2] TRACE fred::router::commands - fred-joVadhOLVQ: Sent command to 127.0.0.1:6379. Flushed: true
2023-04-07T23:01:02.469 [tokio-runtime-worker-9] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 35 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.469 [tokio-runtime-worker-9] TRACE fred::protocol::codec - fred-joVadhOLVQ: Parsed 35 bytes from 127.0.0.1:6379
2023-04-07T23:01:02.469 [tokio-runtime-worker-9] TRACE fred::router::centralized - fred-joVadhOLVQ: Parsing response frame from 127.0.0.1:6379
2023-04-07T23:01:02.469 [tokio-runtime-worker-9] TRACE fred::router::centralized - fred-joVadhOLVQ: Checking response to UNSUBSCRIBE (9)
2023-04-07T23:01:02.469 [tokio-runtime-worker-9] TRACE fred::router::centralized - fred-joVadhOLVQ: Handling centralized response kind: Multiple
2023-04-07T23:01:02.469 [tokio-runtime-worker-9] TRACE fred::protocol::responders - fred-joVadhOLVQ: Handling multiple
response from 127.0.0.1:6379 for UNSUBSCRIBE
2023-04-07T23:01:02.469 [tokio-runtime-worker-9] TRACE fred::protocol::responders - fred-joVadhOLVQ: Finishing multiple
response from 127.0.0.1:6379 for UNSUBSCRIBE
2023-04-07T23:01:02.469 [tokio-runtime-worker-9] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.469 [main-1] TRACE fred::interfaces - fred-joVadhOLVQ: Sending command PUNSUBSCRIBE (10) to router.
2023-04-07T23:01:02.470 [tokio-runtime-worker-9] TRACE fred::router::commands - fred-joVadhOLVQ: Recv command: RouterCommand { kind: "Command", command: "PUNSUBSCRIBE" }
2023-04-07T23:01:02.470 [tokio-runtime-worker-9] TRACE fred::protocol::command - fred-joVadhOLVQ: Pipeline check PUNSUBSCRIBE: false
2023-04-07T23:01:02.470 [tokio-runtime-worker-9] TRACE fred::router::utils - fred-joVadhOLVQ: Writing PUNSUBSCRIBE (10). Timed out: false, Force flush: false
2023-04-07T23:01:02.470 [tokio-runtime-worker-9] TRACE fred::router::utils - fred-joVadhOLVQ: Sending command PUNSUBSCRIBE (10) to 127.0.0.1:6379
2023-04-07T23:01:02.470 [tokio-runtime-worker-9] TRACE fred::protocol::connection - Writing and flushing 127.0.0.1:6379
2023-04-07T23:01:02.470 [tokio-runtime-worker-9] TRACE fred::protocol::codec - fred-joVadhOLVQ: Encoded 33 bytes to 127.0.0.1:6379. Buffer len: 33 (RESP2)
2023-04-07T23:01:02.470 [tokio-runtime-worker-9] TRACE fred::router::commands - fred-joVadhOLVQ: Sent command to 127.0.0.1:6379. Flushed: true
2023-04-07T23:01:02.471 [tokio-runtime-worker-9] DEBUG fred::router::commands - fred-joVadhOLVQ: Waiting on router channel.
2023-04-07T23:01:02.471 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 37 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.472 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Parsed 37 bytes from 127.0.0.1:6379
2023-04-07T23:01:02.472 [tokio-runtime-worker-2] TRACE fred::router::centralized - fred-joVadhOLVQ: Parsing response frame from 127.0.0.1:6379
2023-04-07T23:01:02.472 [tokio-runtime-worker-2] TRACE fred::router::centralized - fred-joVadhOLVQ: Checking response to PUNSUBSCRIBE (10)
2023-04-07T23:01:02.472 [tokio-runtime-worker-2] TRACE fred::router::centralized - fred-joVadhOLVQ: Handling centralized response kind: Multiple
2023-04-07T23:01:02.472 [tokio-runtime-worker-2] TRACE fred::protocol::responders - fred-joVadhOLVQ: Handling multiple
response from 127.0.0.1:6379 for PUNSUBSCRIBE
2023-04-07T23:01:02.472 [tokio-runtime-worker-2] TRACE fred::protocol::responders - fred-joVadhOLVQ: Finishing multiple
response from 127.0.0.1:6379 for PUNSUBSCRIBE
2023-04-07T23:01:02.472 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.472 [main-1] TRACE fred::interfaces - fred-joVadhOLVQ: Sending command SUNSUBSCRIBE (11) to router.
2023-04-07T23:01:02.472 [tokio-runtime-worker-2] DEBUG fred::router::commands - fred-joVadhOLVQ: Recv router response.
2023-04-07T23:01:02.473 [tokio-runtime-worker-2] TRACE fred::router::commands - fred-joVadhOLVQ: Recv command: RouterCommand { kind: "Command", command: "SUNSUBSCRIBE" }
2023-04-07T23:01:02.473 [tokio-runtime-worker-2] TRACE fred::protocol::command - fred-joVadhOLVQ: Pipeline check SUNSUBSCRIBE: false
2023-04-07T23:01:02.473 [tokio-runtime-worker-2] TRACE fred::router::utils - fred-joVadhOLVQ: Writing SUNSUBSCRIBE (11). Timed out: false, Force flush: false
2023-04-07T23:01:02.473 [tokio-runtime-worker-2] TRACE fred::router::utils - fred-joVadhOLVQ: Sending command SUNSUBSCRIBE (11) to 127.0.0.1:6379
2023-04-07T23:01:02.473 [tokio-runtime-worker-2] TRACE fred::protocol::connection - fred-joVadhOLVQ: Skip adding SUNSUBSCRIBE
command to response buffer (no expected responses).
2023-04-07T23:01:02.473 [tokio-runtime-worker-2] TRACE fred::protocol::connection - Writing and flushing 127.0.0.1:6379
2023-04-07T23:01:02.473 [main-1] DEBUG fred::commands::impls::server - fred-joVadhOLVQ: Closing Redis connection with Quit command.
2023-04-07T23:01:02.473 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Encoded 23 bytes to 127.0.0.1:6379. Buffer len: 23 (RESP2)
2023-04-07T23:01:02.473 [main-1] DEBUG fred::modules::inner - fred-joVadhOLVQ: No close
listeners.
2023-04-07T23:01:02.473 [tokio-runtime-worker-2] TRACE fred::router::commands - fred-joVadhOLVQ: Sent command to 127.0.0.1:6379. Flushed: true
2023-04-07T23:01:02.474 [main-1] TRACE fred::interfaces - fred-joVadhOLVQ: Sending command QUIT (12) to router.
2023-04-07T23:01:02.474 [tokio-runtime-worker-2] DEBUG fred::router::commands - fred-joVadhOLVQ: Waiting on router channel.
2023-04-07T23:01:02.474 [tokio-runtime-worker-2] DEBUG fred::router::commands - fred-joVadhOLVQ: Recv router response.
2023-04-07T23:01:02.474 [tokio-runtime-worker-2] TRACE fred::router::commands - fred-joVadhOLVQ: Recv command: RouterCommand { kind: "Command", command: "QUIT" }
2023-04-07T23:01:02.474 [tokio-runtime-worker-2] TRACE fred::protocol::command - fred-joVadhOLVQ: Pipeline check QUIT: false
2023-04-07T23:01:02.474 [tokio-runtime-worker-2] TRACE fred::router::utils - fred-joVadhOLVQ: Writing QUIT (12). Timed out: false, Force flush: false
2023-04-07T23:01:02.474 [tokio-runtime-worker-2] TRACE fred::router::utils - fred-joVadhOLVQ: Sending command QUIT (12) to 127.0.0.1:6379
2023-04-07T23:01:02.474 [tokio-runtime-worker-2] TRACE fred::protocol::connection - Writing and flushing 127.0.0.1:6379
2023-04-07T23:01:02.474 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Encoded 14 bytes to 127.0.0.1:6379. Buffer len: 14 (RESP2)
2023-04-07T23:01:02.475 [tokio-runtime-worker-2] TRACE fred::router::commands - fred-joVadhOLVQ: Sent command to 127.0.0.1:6379. Flushed: true
2023-04-07T23:01:02.475 [tokio-runtime-worker-2] DEBUG fred::router::commands - fred-joVadhOLVQ: Waiting on router channel.
2023-04-07T23:01:02.475 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 32 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.475 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Parsed 32 bytes from 127.0.0.1:6379
2023-04-07T23:01:02.475 [tokio-runtime-worker-2] TRACE fred::router::centralized - fred-joVadhOLVQ: Parsing response frame from 127.0.0.1:6379
2023-04-07T23:01:02.475 [tokio-runtime-worker-2] DEBUG fred::router::utils - fred-joVadhOLVQ: Dropping extra unsubscribe response.
2023-04-07T23:01:02.475 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.475 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 5 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.475 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Parsed 5 bytes from 127.0.0.1:6379
2023-04-07T23:01:02.476 [tokio-runtime-worker-2] TRACE fred::router::centralized - fred-joVadhOLVQ: Parsing response frame from 127.0.0.1:6379
2023-04-07T23:01:02.476 [tokio-runtime-worker-2] TRACE fred::router::centralized - fred-joVadhOLVQ: Checking response to QUIT (12)
2023-04-07T23:01:02.476 [tokio-runtime-worker-2] TRACE fred::router::centralized - fred-joVadhOLVQ: Handling centralized response kind: Respond
2023-04-07T23:01:02.476 [tokio-runtime-worker-2] TRACE fred::protocol::responders - fred-joVadhOLVQ: Respond to caller from 127.0.0.1:6379 for QUIT with SimpleString
2023-04-07T23:01:02.476 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.476 [tokio-runtime-worker-2] TRACE fred::protocol::codec - fred-joVadhOLVQ: Recv 0 bytes from 127.0.0.1:6379 (RESP2).
2023-04-07T23:01:02.476 [tokio-runtime-worker-2] DEBUG fred::router::responses - fred-joVadhOLVQ: Ending reader task from 127.0.0.1:6379 due to None
2023-04-07T23:01:02.476 [tokio-runtime-worker-2] DEBUG fred::modules::inner - fred-joVadhOLVQ: Checking reconnect state. Has policy: true, Is intentionally disconnecting: true
2023-04-07T23:01:02.476 [tokio-runtime-worker-2] DEBUG fred::router::centralized - fred-joVadhOLVQ: Ending reader task from 127.0.0.1:6379
2023-04-07T23:01:02.477 [tokio-runtime-worker-2] DEBUG fred::router::commands - fred-joVadhOLVQ: Recv router response.
2023-04-07T23:01:02.477 [tokio-runtime-worker-2] TRACE fred::router::commands - fred-joVadhOLVQ: Ending command loop after QUIT or SHUTDOWN.
2023-04-07T23:01:02.477 [tokio-runtime-worker-2] DEBUG fred::router::commands - fred-joVadhOLVQ: Disconnecting after command stream closes.
2023-04-07T23:01:02.477 [tokio-runtime-worker-2] DEBUG fred::router - fred-joVadhOLVQ: Disconnecting from 127.0.0.1:6379
2023-04-07T23:01:02.477 [tokio-runtime-worker-2] TRACE fred::router - fred-joVadhOLVQ: Clearing retry buffer with 0 commands.
2023-04-07T23:01:02.477 [tokio-runtime-worker-2] TRACE fred::interfaces - fred-joVadhOLVQ: Ending connection task with Ok(())
Ping seems to be failing with the SubscriberClient consistently with an error: "Could not convert multiple frames to RedisValue".
The stack seems to show the response is parsed as resp3, but the server AFAIK only supports resp2.
redis_version:5.0.7 (Seems to fail the same with latest 7.0)
fred.rs: 5.1.0
2022-08-28 02:11:57.234867 [00d] [D] [connection]:0287 fred-JeHtXwyITZ: Read client ID: Resp2(Integer(82))
2022-08-28 02:12:57.236755 [00c] [D] [utils ]:0400 fred-JeHtXwyITZ: Writing command PING to localhost:6379
2022-08-28 02:12:57.237310 [00a] [D] [server ]:0094 fred-JeHtXwyITZ: Recv ping response.
* frame #0: 0x00005555562aaa94 test`fred::protocol::utils::frame_to_single_result(frame=redis_protocol::resp3::types::Frame::Array @ 0x00007ffff649ede0) at utils.rs:547:20
frame #1: 0x000055555582dfb9 test`fred::commands::impls::server::ping::{{closure}}((null)=core::future::ResumeTy @ 0x00007ffff649efc8) at server.rs:95:3
frame #2: 0x0000555555a7e497 test`<core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll(self=core::pin::Pin<&mut core::future::from_generator::GenFuture<fred::commands::impls::server::ping::{async_fn_env#0}>> @ 0x00007ffff649f460, cx=0x00007ffff64a93d0) at mod.rs:91:19
frame #3: 0x00005555558197d9 test`fred::interfaces::ClientLike::ping::{{closure}}::{{closure}}((null)=core::future::ResumeTy @ 0x00007ffff64a2948) at interfaces.rs:256:58
print frame
(redis_protocol::resp3::types::Frame::Array) frame = {
data = (2) vec![{...}, {...}] {}
attributes = None {}
}
print frame.data.0
(alloc::raw_vec::RawVec<redis_protocol::resp3::types::Frame, alloc::alloc::Global>) buf = {
ptr = {
pointer = {}
_marker = {}
}
cap = 2
alloc = {}
}
New interface for transactions in 6.0.0
is much better and allowed to relax my personal customization around it.
But there is one issue, sometimes it's needed to combine watch
and multi
.
But watch
can't be called inside multi block, so I wonder if it possible to somehow combine them and prevent re-ordering of related watch
and multi
?
Currently it's possible to watch in parallel, so order will be:
watch K1
watch K2
multi
set K1 1
exec
# watch K2 is also was unwatched
I mean exec
of multi
block automatically unwatch
keys on redis side, so it's possible that within one fred
-connection I will call N
watch
operations and then any exec
will discard them together which is incorrect/unexpected behavior for this combination.
For now I have to manually lock connection or create new one for such operations, it's a bit counterproductive. Instead I like to order their execution and combine them as atomic block.
I mean something like connection.multi_with_watch(keys)
and it will be applied in this order:
... commands before ....
watch
multi .... exec
... commands after ....
It could be tricky one, but I'd like to have similar interface to avoid collision between multi/exec
and watch
es related to another transaction
Redis version - 7.0.9
Platform - windows
Using Docker and/or Kubernetes - yes
Deployment type - centralized
Describe the bug
The easy/trivial client.blpop::<(String, String), _>("key", timeout).await
can, in my opinion, return three things:
Right now, there doesn't seem to be a good way to separate the last two. Fred returns a RedisError
with "Redis Error - kind: Parse, details: Could not convert to tuple." which .. is fair, I guess. There wasn't a tuple. There was a timeout. Which doesn't quite feel like an error case to me and IF it should be a RedisError
then the message is still confusing/wrong.
To Reproduce
Steps to reproduce the behavior:
Hi,
I don't know if it's an intended behavior, but I ran into following situation:
Suppose you have 10ms latency to Redis. Every 5ms you application needs to run
MULTI
HSET key value
PUBLISH message
EXEC
I am using the following code to execute this sequence in fred:
let trx = redis.multi(true).await?;
trx.hset(key, hashmap! { field => value.into()}).await?;
trx.publish(channel, message).await?;
trx.exec().await?;
I'd like to have a writer that executes this code, but waits for the response in a new tokio task every 5ms. This way there will be 2 running requests waiting for responses.
I don't know how to achieve this with fred.
There are 2 issues:
Thanks for help
Hello Alec, I have a problem connecting to a local Redis cluster created in Minikube. If the problem isn't on Fred's side let me know :)
The setup is relatively simple: there's a single LoadBalancer exposed via minikube tunnel that redirects to a cluster node, and the cluster itself isn't accessible from outside (so the nodes can only talk to each other and the load balancer). Here's the commands I used:
Using redis-cli on the host with load balancer IP 10.102.101.213
like so:
redis-cli -c -h 10.102.101.213 -a $REDIS_PASSWORD CLUSTER NODES
Warning: Using a password with '-a' or '-u' option on the command line interface may not be safe.
aceb5bf17a706440a297c5be764f4e4b6a60eb61 172.17.0.7:6379@16379 slave 16da11b0423cafe75261b5adbc2eb13a90358cc5 0 1640566876716 2 connected
a67bf5b0d784c9be05fba218bdc456b484f37c86 172.17.0.2:6379@16379 myself,slave 2008db068b8a58a28595b37060d52d3736d04a6d 0 1640566873000 1 connected
a10e1fe9800e7fb2a376f5ec5e1734658b15c6c9 172.17.0.5:6379@16379 master - 0 1640566875000 3 connected 10923-16383
2008db068b8a58a28595b37060d52d3736d04a6d 172.17.0.6:6379@16379 master - 0 1640566874000 1 connected 0-5460
53cf7c4159f5896deb1c42478c82a6404ac5f56e 172.17.0.3:6379@16379 slave a10e1fe9800e7fb2a376f5ec5e1734658b15c6c9 0 1640566876000 3 connected
16da11b0423cafe75261b5adbc2eb13a90358cc5 :0@0 master,noaddr - 1640566845731 1640566845730 2 disconnected 5461-10922
all works as expected.
Trying to connect to the same cluster with Fred (env variable REDIS_URI is the LoadBalancer IP):
let config = RedisConfig {
server: ServerConfig::new_clustered([(dotenv!("REDIS_URI").to_string(), 6379 as u16)].to_vec()),
fail_fast: true,
pipeline: true,
blocking: Blocking::Block,
username: None,
password: Some(dotenv!("REDIS_PASSWD").to_string()),
tls: None,
tracing: false
};
println!("Creating client...");
let client = RedisClient::new(config);
println!("Created client!");
let policy = ReconnectPolicy::default();
tokio::spawn(client
.on_error()
.for_each(|e| async move {
println!("Client received connection error: {:?}", e);
}));
println!("Checked errors...");
tokio::spawn(client
.on_reconnect()
.for_each(move |client| async move {
println!("Client {} reconnected.", client.id());
// select the database each time we connect or reconnect
let _ = client.select(REDIS_SESSION_DB).await;
}));
println!("Checked reconnections...");
client.connect(Some(policy));
println!("Trying to connect Redis...");
client.wait_for_connect().await.unwrap();
println!("Redis connected!");
makes it throw this error:
Client received connection error: Redis Error - kind: IO, details: Os { code: 113, kind: HostUnreachable, message: "No route to host" }
Given that the info about nodes in cluster is gathered with CLUSTER NODES (per https://docs.rs/fred/latest/fred/types/enum.ServerConfig.html#variant.Clustered), I assume passing a single IP is fine.
I guess this is some kind of a DNS/redirect resolution problem. I see that you're currently working on supporting custom DNS resolvers on the client, could this be related?
Keep up the great work, this is probably the best Redis driver for Rust at the moment!
P.S. When do you plan to release Streams functionality by the way (in terms of time)?
I encountered a problem when using fred. I use the fred cluster mode. If the cluster redis restarts, fred cannot recover automatically. How to solve this problem.
[WARN] [2023.04.04 01:13:08.914] [127.0.0.1] [fred::multiplexer::utils] [/root/.cargo/registry/src/github.com-1ecc6299db9ec823/fred-5.2.0/src/multiplexer/utils.rs(1229)] - fred-mbn8GkCsjF: Error creating or using backchannel for cluster nodes: Redis Error - kind: IO, details: Os { code: 110, kind: TimedOut, message: "Connection timed out" }
let config = RedisConfig { server: ServerConfig::Clustered { hosts: nodes, }, .. };
Hello Alec, it's me again. I'd just like to thank you again for making this driver and for incredible help in my last issue ( #21 ). Fred is awesome, the best among Redis drivers I've encountered. Never thought that someone could make something better pretty much single-handedly than the teams/communities behind other drivers, especially the "official" one (https://github.com/mitsuhiko/redis-rs).
As a thank you I'd really like to support this library, you've got Patreon or something like that?
Can't wait for the 5.0.0 release, hope it will be ready soon!
Hi! Thanks for the awesome library.
I'm having a problem with the Redis client getting stuck before sending the command to the Redis server (it happens with any command).
This is happening in a Stream
, but didn't try to replicate outside of one.
The pattern of getting stuck is random, sometimes it runs for tens of stream values and gets stuck and other times it gets stuck on the first value of the stream.
I tried to track to the best of my abilities where it was getting stuck and I think it's at wait_for_response(rx)
in src/utils.rs, but I'm not sure.
When setting a default timeout it gets hit, otherwise it waits forever.
I also ran redis-cli monitor
on the Redis server and confirmed that the client gets stuck before sending the command.
The minimal code to reproduce it is (UPDATE: this doesn't reproduce the issue, see #13 (comment)):
use std::time::Duration;
use futures::StreamExt;
use fred::prelude::*;
use fred::pool::StaticRedisPool;
#[tokio::main]
async fn main() {
let pool = StaticRedisPool::new(RedisConfig::default(), 5);
pool.connect(Some(ReconnectPolicy::default()));
pool.wait_for_connect().await.unwrap();
let stream = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_millis(100)))
.then(move |_| {
let pool = pool.clone();
async move {
let value: Option<String> = pool.get("key").await.unwrap(); // This call never responds.
value
}
});
futures::pin_mut!(stream);
while let Some(value) = stream.next().await {
println!("{:?}", value);
}
}
Cargo.toml:
[package]
edition = "2021"
[dependencies]
fred = { version = "4.2.1", default-features = false, features = [
"pool-prefer-active",
"ignore-auth-error",
] }
futures = { version = "0.3.17", default-features = false }
tokio = { version = "1.13.0", features = ["rt", "macros", "time"] }
tokio-stream = "0.1.8"
Rust: 1.56.0
fred: 4.2.1
Redis: 6.2.6 inside Docker
OS: Ubuntu 21.04
I am attempting to use a dynamic pool in an actix web app (btw I am using a v4 beta so am on the correct tokio version) and my code hangs on wat_for_connect
consistently. I know similar things have happened previously but as those were closed issues I started this one.
To reproduce I am able to just run the code from the example here https://github.com/aembke/fred.rs/blob/main/examples/dynamic_pool.rs
Here is my app main
use fred::pool::DynamicRedisPool;
use fred::prelude::*;
#[tokio::main]
async fn main() -> Result<(), RedisError> {
let config = RedisConfig {
// whether to skip reconnect logic when first connecting
fail_fast: true,
// server configuration
server: ServerConfig::new_centralized("127.0.0.1", 6379),
// whether to automatically pipeline commands
pipeline: true,
// how to handle commands sent while a connection is blocked
blocking: Blocking::Block,
// an optional username, if using ACL rules
username: None,
// an optional authentication key or password
password: None,
// optional TLS settings
tls: None,
// whether to enable tracing
};
// the max size isn't a hard limit - it just determines the size of the client array when the pool is initialized
let pool = DynamicRedisPool::new(config, None, 5, 10);
println!("I am printed");
let _ = pool.connect();
println!("Me too");
let _ = pool.wait_for_connect().await?;
println!("Not Me :(");
// modify the size of the pool at runtime
let (new_client, _) = pool.scale_up().await;
if let Some(old_client) = pool.scale_down(true).await {
assert_eq!(new_client.id(), old_client.id());
}
for client in pool.clients() {
println!("Client ID {} in pool.", client.id());
}
// due to the locking required by the resizing operations the Deref trait cannot be used with this pool implementation.
// if modifications to the pool are not required at runtime the static pool is usually easier to use
let _ = pool.next().get("foo").await?;
let _ = pool.next().set("foo", "bar", None, None, false).await?;
let _ = pool.next().get("foo").await?;
// if the pool can be empty a function exists that will lazily create a new client, if needed.
// if the pool is not empty this just calls `next` without creating a new client.
let _ = pool.next_connect(true).await.get("foo").await?;
let _ = pool.quit_pool().await;
Ok(())
}
Hope this helps and please ask any questions.
MacOS, Redis via Docker, Rust 1.57
This library is pretty opinionated and should have better documentation on implementation details behind the following design decisions:
It is exteremly unclear how can you write an impl of FromRedis
for a custom struct in the case that you want to set a key:value with the value being a JSON. So instead when using the .set()
it is possible to pass serde_json::Value
- but it seems that it does not work because it cannot deal with nesting because of https://github.com/aembke/fred.rs/blob/main/src/utils.rs#L530. This makes it impossible to set a value of a custom struct as a json.
If RESP3 is used (see HELLO), a client can issue any commands while in the subscribed state (from https://redis.io/docs/manual/pubsub/).
So maybe SubscriberClient
could implement the same traits as RedisClient
and we wouldn't need 2 clients when working with SubscriberClient
. What do you think?
I am trying to connect to a redis cluster, running locally via docker with:
let cfg = ServerConfig::new_clustered(vec![(settings.host, settings.port)])
let client = RedisClient::new(cfg);
let _jh = client.connect(None);
let _ = client.wait_for_connect().await;
but it's just hanging a wait_for_connect
. If I change to a single node and use ServerConfig::new_centralized(settings.host, settings.port)
I am able to connect and issue commands.
Hello! I need to create an instance of RedisClient with my own parameters. But I only see the default function. Where I can set up my host and port to RedisConfig?
Also, where i can set a new client name?
Thank you!
When trying to connect to a password-protected Redis instance using RESP3, I get the following error:
NOAUTH HELLO must be called with the client already authenticated, otherwise the HELLO AUTH <user> <pass> option can be used to authenticate the client and select the RESP protocol version at the same time
It works just fine if I turn off RESP3 and go back to RESP2.
I've devised a quick patch that seems to fix this problem by ensuring the client is authenticated before attempting to switch protocols:
diff --git a/src/protocol/connection.rs b/src/protocol/connection.rs
index f936197..5565039 100644
--- a/src/protocol/connection.rs
+++ b/src/protocol/connection.rs
@@ -332,8 +332,9 @@ pub async fn create_authenticated_connection_tls(
let socket = TcpStream::connect(addr).await?;
let tls_stream = tls::create_tls_connector(&inner.config)?;
let socket = tls_stream.connect(domain, socket).await?;
- let framed = switch_protocols(inner, Framed::new(socket, codec)).await?;
+ let framed = Framed::new(socket, codec);
let framed = authenticate(framed, &client_name, username, password, inner.is_resp3()).await?;
+ let framed = switch_protocols(inner, framed).await?;
let framed = select_database(inner, framed).await?;
client_utils::set_client_state(&inner.state, ClientState::Connected);
@@ -360,8 +361,9 @@ pub async fn create_authenticated_connection(
let username = inner.config.read().username.clone();
let socket = TcpStream::connect(addr).await?;
- let framed = switch_protocols(inner, Framed::new(socket, codec)).await?;
+ let framed = Framed::new(socket, codec);
let framed = authenticate(framed, &client_name, username, password, inner.is_resp3()).await?;
+ let framed = switch_protocols(inner, framed).await?;
let framed = select_database(inner, framed).await?;
client_utils::set_client_state(&inner.state, ClientState::Connected);
After applying this patch, it works as intended. If this is deemed to be an appropriate solution, I am happy to send a PR, or you can apply it yourself if it is more convenient.
I got below error when calling from actix web request. Any idea ?
thread 'actix-rt:worker:0' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', D:\RUST\CARGO\registry\src\github.com-1ecc6299db9ec823\fred-4.2.0\src\client.rs:179:5
Thank you very much for developing and maintaining awesome Redis library. This is a question, not the bug report. I am wondering if fred.rs is a thread-safe?
Describe the bug
The following code does not compile:
use fred::{prelude::*, protocol::types::ClusterRouting};
#[tokio::main]
async fn main() -> Result<(), RedisError> {
pretty_env_logger::init();
let config = RedisConfig::from_url("redis-cluster://127.0.0.1:7000").expect("Creating config");
let cluster_client = RedisClient::new(config, None, None);
let _ = cluster_client.connect();
let _ = cluster_client.wait_for_connect().await?;
let cluster_state: Option<ClusterRouting> = cluster_client.cached_cluster_state();
if let Some(state) = cluster_state {
println!("Cluster state: {:?}", state);
}
Ok(())
}
because fred::protocol
is private.
it could be made compile by inferring the return type of cached_cluster_state()
but that doesn't work in struct
s.
Change this to pub mod
and it works:
Line 74 in 8c3131b
It's either a bug in my DNA (© old programming joke) or in fred.
Redis version - 7.0.5
Platform - Mac
Using Docker and/or Kubernetes - no
Deployment type - centralized
Describe the bug
A brief explanation to why I'm doing things the way the repro is written: I need to read a lot of STREAMs from a Redis Cluster. XREAD is able to read many streams at once, just not in the cluster environment. In cluster environment it can only read from streams that hash into the same slot (not the same node!). So to minimize the number of XREADs I group the channel names by the result of redis_keyslot()
, for every slot that has streams I open a connection to the cluster node (not a clustered connection, but a result of the clone_new()
from one of split_cluster()
) and run XREAD on this connection. And that doesn't work the way I intended. After I run a test case with just a few streams only one stream works as supposed. The others wake up the connection but do not trigger XREADs. If you keep trying to XADD to these "ignored" streams, they suddenly start working (from 1-2 attempts, sometimes more). The original stream that was working stops working. But the data sent to the "ignored" channels to wake them up is lost which probably means that these other XREADs are not even started because otherwise they'd have captured the data due to their "$" wildcard ID. Phew.
To Reproduce
This repro simulates this behavior without the cluster and that's why it looks the way it is. The easiest way to produce the repro is to put it into the examples
folder and RUST_LOG=trace cargo run --example bug84
. Then from another terminal run redis-cli
and issue a few commands:
xadd test0 * v test0
xadd test1 * v test1
xadd test0 * v test2
etc
I had this problem with 5.2.0 and moved to 6.0.0 hoping it's going to be better.
Quite possibly it's my insufficient understanding on how the async Rust works...
Repro itself:
use std::sync::Arc;
use fred::{prelude::*, types::XReadResponse};
use futures::stream::FuturesUnordered;
use tokio_stream::StreamExt;
async fn _read(
client: Arc<RedisClient>,
stream_name: &String,
) -> Result<Vec<(String, String)>, RedisError> {
let r: XReadResponse<String, String, String, String> =
client.xread_map(None, Some(0), stream_name, "$".to_string()).await?;
let mut result = vec![];
for (stream_name, records) in r.iter() {
for (last_id, _record) in records {
result.push((stream_name.to_string(), last_id.to_string()));
}
}
Ok(result)
}
#[tokio::main]
async fn main() -> Result<(), RedisError> {
pretty_env_logger::init();
let config = RedisConfig::default();
let mut clients: Vec<Arc<RedisClient>> = vec![];
let mut stream_names: Vec<String> = vec![];
for i in 0..3 {
let c = RedisClient::new(config.clone(), None, None);
c.connect();
c.wait_for_connect().await?;
clients.push(Arc::new(c));
stream_names.push(format!("test{i}"));
}
loop {
let mut xreads = FuturesUnordered::new();
for (i, stream_name) in stream_names.iter().enumerate() {
xreads.push(_read(clients.get(i).unwrap().clone(), stream_name))
}
tokio::select! {
Some(Ok(events)) = xreads.next() => {
for event in events {
println!("Stream {} last ID {}", event.0, event.1);
}
}
}
}
}
Hello, I have written a test using testcontainer with the following code and I'm seeing an error of: Redis Error - kind: Unknown, details: Redis connection is not initialized
every once in a while. For the most part it works fine, but sometimes that .expect
fails at the call to .ping
. It seemed to me that the rate of failure decreased with an increase in the size of the StaticRedisPool
so I wonder if there's some condition under which .wait_for_connect
returns before every client has connected?
use testcontainers::{clients::Cli, images::redis::Redis, Docker};
#[tokio::test]
async fn test_condition() {
let docker = Cli::default();
let container = docker.run(Redis::default());
let port = container
.get_host_port(6379)
.expect("should be able to get host port");
let pool = StaticRedisPool::new(
RedisConfig {
server: ServerConfig::new_centralized("127.0.0.1", port),
..Default::default()
},
3,
)
.unwrap();
let _ = pool.connect(None);
pool.wait_for_connect().await.unwrap();
pool.ping().await.expect("should be able to ping");
}
Please let me know if you would benefit from any further details or I missed something.
Hello, and thanks for all your effort on Fred!
We are looking to use fred in our projects, but one of the most significant issues is an environment variables-based configuration. At first, it's not a library's responsibility to read environment variables - it should be under the complete control of the application. I know there are some counter-examples like env-logger. Still, it's a library whose only goal is to read and parse env var for simplification purposes.
Configuring through env vars is especially sensitive for such an essential behavior as the security configuration. In my application, I would hardcode sensitive behavior and never allow insecure connections in the production environment. It's possible to unset environment variables on the app level, but still easy to forget and quite unergonomic.
Considering that it's breaking some isolation rules and affecting the security, I suggest removing env variables support, or at least hiding it behind the non-default feature, and moving this configuration to a code level so that the application has complete control over such a sensitive configuration.
Thanks for the library! We're trying to use this library in a server running in Kubernetes connecting to a Redis Cluster running in the same Kubernetes cluster. We're connecting to Redis using redis-cluster://my-redis-cluster:6379
as the RedisConfig
with a RedisPool
.
We noticed that when we go to roll out a new version of the Redis deployment that our fred-using server will hang trying to send commands to the Redis Cluster. The logs we're seeing are of the form:
Error creating or using backchannel for cluster nodes: Redis Error - kind: IO, details: Os { code: 110, kind: TimedOut, message: "Connection timed out" }
Failed to reconnect with error Redis Error - kind: Cluster, details: Failed to read cluster nodes on all possible backchannel servers.
Restarting our fred-using server with the same configuration is able to connect to the Redis Cluster just fine.
After adding some logging, it seems that what's happening is that the IP addresses on the cluster cycle out to different ones, but IIUC fred doesn't try to re-resolve the DNS address to reconnect to the cluster.
Hi. I saw in the example how to connect through sentinel. However, the example seems to rely on the same auth used for both sentinel and redis itself. My company sets it up such that the auth (i.e. username/password) between the two are different, for security purpose. Is there a way I can specific different auths for the two? Thanks!
I suspect this may be some sort of deadlock issue, as this is resolved by running in a single-threaded runtime. You will find minimum viable examples of the multi- and single-threaded behaviors at https://github.com/bin/fred_broken.git
@aembke fred also uses nom and redis-protocol, they need to be upgraded to latest versions as well as both internally uses bitvec.
This will require supporting commands CLIENT TRACKING
and CLIENT CACHING
plus notification channel for RESP2 and RESP3.
I can probably take a stab at a PR if this is considered a good idea, but I'll need some guidance because inner workings of the crate a very clearly written but not documented much.
The first guidance question is the design of the OPTIN case: should I implement CLIENT CACHING
as a high-level command/method and just issue it before every read operation I want to be notified about? Or we'd better have some abstraction built in that allows to mark the commands or keys we're interested in and automatically issue CLIENT CACHING
before each command? Maybe RedisConfig
option?
The mocks
feature still exists in Cargo.toml but is no longer present in the crate.
It's mentioned in the prior repository's README.md, and was located under /src/mocks
, but has since been removed.
Two items to address:
I want to set a key which is String
and a value which is a struct that has all kinds of types for its fields like Option<T>
where T
in String, serde_json::Value, i64, f64, bool
etc. I didn't see that in the documentation. How can I do that?
This is actually questions, not issues. I can't seem to find the info around this.
For reading, what's the reading strategy among replica nodes? Round-robin? Or something different? Can it be configured?
How does the RedisClient
handle a failing replica node?
Thank you.
Ed
Redis version - 6.2.6
Platform - linux
Using Docker and/or Kubernetes - both
Deployment type - cluster with replicas
I've been using RedisPool with a Clustered ServerConfig (all else in RedisConfig defaulted) for a while. Recently we've realised that we should be preferably sending all commands (get, and mget) to replica nodes, so I tried out the replicas feature of this crate, and called .replicas()
on the RedisPool. This worked fine in an environment without replica nodes, but in an environment with replica nodes, all commands timed out (!). The clustered config's .host has a list of all nodes in both scenarios.
Hello,
First of all thanks for the great library!
I just want to raise an issue about wrong assumption (and behaviour) that Redis keys are valid utf8 strings.
According the Redis data types doc
Redis keys are binary safe, this means that you can use any binary sequence as a key, from a string like "foo" to the content of a JPEG file.
Although in the redis-protocol
library the function redis_keyslot
wrongly accepts key as &str
type.
In combination with String::from_utf8_lossy
conversion in https://github.com/aembke/fred.rs/blob/main/src/commands/lua.rs#L26 it produces wrong keys.
Unfortuantely this is not a single issue. In https://github.com/aembke/redis-protocol.rs/blob/main/src/utils.rs#L360 utf8 string key is indexed using byte index which not always correct because index can be not a UTF-8 code point boundary (which generates panic).
Hi all:
I want to set a map data with RedisMap
let redis_key = "KEYS".into();
let mut redis_map_value = RedisMap::new();
redis_map_value.insert(
"foo_key".to_string(),
RedisValue::Integer(123),
);
let value = RedisValue::Map(redis_map_value);
let _ = redis_client
.set::<RedisValue, String, RedisValue>(
redis_key,
value,
Some(Expiration::EX(10)),
None,
false,
)
.await;
No error was found, but I can not get the key and value in Redis server also, what's happened, and how to do that?
Great work btw!
When connecting to a Redis cluster, if it connects to a master node the connection will not update itself with the other nodes. If the master fails at this point the client will lose all connection as it did not update the RedisInnerClient
. It seems as though sync_cluster
is only called on handle_connection_closed
.
I feel like this should be a fairly simple fix of adding a call to sync_cluster
, and I could open a PR if you want, but I could be overlooking something trivial.
Also I just want to confirm that this line does not contain a a bug:
Line 150 in 93ddcad
The third column of the CLUSTER NODES command can also contain a list of flags such as master,fail
. Would it be the right move here to not include nodes that are failed?
Hello,
I tried to use the metrics
feature and paid attention that Stats
struct used to get latency numbers has undocumented units.
I figured out that it's milliseconds after checking the source code.
Just curious, maybe using the standard Duration
type could be more useful to get desired units.
Hello, when I try to get a non-existent key I receive this error: Redis Error - kind: Parse, details: Cannot convert to number.
, but I believe this is the wrong type of error for the triggered action.
Since the Redis CLI returns nil
maybe the library should just return an Option::None
, or a different kind of RedisError like NotFound
.
Personally I think both solutions are right and in an ideal world there should be an option for both behaviors, but for my use case a different kind of error it's preferred since I want to do a specific action on specific errors and the error propagation operator (?
) would help me achieve this more simply and in a cleaner way.
I think it's necessary to drop global configs and move them to structure-based ones.
Motivation: there are cases when an application should use different clients (I mean several fred
-clients to different clusters/nodes), and they might have different configs (e.g default_connection_timeout_ms
)
When I tested re-connection with current version in the main
branch, I faced with panic
when I enabled debug
level tracing.
2022-12-07T09:55:31.463755Z DEBUG fred::multiplexer::clustered: fred-mpjrO24y5w: Finish synchronizing cluster connections.
at /usr/local/cargo/git/checkouts/fred.rs-3fb6d817444a1870/4f0c3f4/src/multiplexer/clustered.rs:622 on reactor
thread 'reactor' panicked at 'tried to clone Id(2251799813685263), but no span exists with that ID
This may be caused by consuming a parent span (`parent: span`) rather than borrowing it (`parent: &span`).', /usr/local/cargo/registry/src/github.com-1ecc6299db9ec823/tracing-subscriber-0.3.16/src/registry/sharded.rs:311:32
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
It doesn't appear when I use info
level and reconnection works totally fine (with this fix)
Thanks for all your efforts @aembke!
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.