Coder Social home page Coder Social logo

redis-protocol.rs's Introduction

Redis Protocol

License License CircleCI Crates.io API docs

A Rust implementation of the Redis protocol.

Features

  • Owned and zero-copy Bytes-based parsing interfaces.
  • Supports RESP2 and RESP3 frames, including streaming frames.
  • Publish-subscribe message utilities.
  • A cluster hash slot interface.
  • RESP2 and RESP3 codec interfaces.
  • Utility functions for converting from RESP2 to RESP3.
  • Traits for converting frames into other types.

Examples

use redis_protocol::resp2::{
  decode::decode,
  encode::encode,
  types::{OwnedFrame as Frame, Resp2Frame}
};

fn main() {
  let frame = Frame::BulkString("foobar".into());
  let mut buf = vec![0; frame.encode_len()];

  let len = encode(&mut buf, &frame).expect("Error encoding frame");
  println!("Encoded {} bytes into buffer with contents {:?}", len, buf);

  // ["Foo", nil, "Bar"]
  let buf: &[u8] = b"*3\r\n$3\r\nFoo\r\n$-1\r\n$3\r\nBar\r\n";
  match decode(&buf).unwrap() {
    Some((frame, amt)) => println!("Parsed {:?} and read {} bytes", frame, amt),
    None => println!("Incomplete frame."),
  };
}

Build Features

Name Default Description
std x Enable stdlib features and most dependency default features.
resp2 x Enable the RESP2 interface.
resp3 x Enable the RESP3 interface.
bytes Enable the zero-copy parsing interface via Bytes types.
decode-logs Enable extra debugging TRACE logs during the frame decoding process.
codec Enable a RESP2 and RESP3 Tokio codec interface.
convert Enable the FromResp2 and FromResp3 trait interfaces.
index-map Use IndexMap types instead of HashMap. This is useful for testing and may also be useful for callers.

no_std

no_std builds are supported by disabling the std feature. However, a few optional dependencies must be activated as a substitute.

redis-protocol = { version = "X.X.X", default-features = false, features = ["libm", "hashbrown", "alloc"] }

Decoding

Both RESP2 and RESP3 interfaces support 3 different Frame interfaces. These interfaces are designed to support different use cases:

  • OwnedFrame types use core container types to implement owned frame variants. This is the easiest frame interface to use, but often requires moving or copying the underlying buffer contents when decoding.
  • BytesFrame types use Bytes as the backing container. The bytes feature flag enables this frame type and a set of associated functions that avoid moving or copying BytesMut contents.
  • RangeFrame types represent ranges into an associated buffer and are typically used to implement forms of zero-copy parsing. This is the lowest level interface.

RESP2 OwnedFrame Decoding Example

Simple array decoding example adapted from the tests

use redis_protocol::resp2::{
  decode::decode,
  types::{OwnedFrame, Resp2Frame}
};

fn should_decode_array() {
  // ["Foo", nil, "Bar"]
  let buf: &[u8] = b"*3\r\n$3\r\nFoo\r\n$-1\r\n$3\r\nBar\r\n";

  let (frame, amt) = decode(&buf).unwrap().unwrap();
  assert_eq!(frame, OwnedFrame::Array(vec![
    OwnedFrame::BulkString("Foo".into()),
    OwnedFrame::Null,
    OwnedFrame::BulkString("Bar".into())
  ]));
  assert_eq!(amt, buf.len());
}

RESP2 BytesFrame Decoding Example

Array decoding example adapted from the tests

use redis_protocol::resp2::{
  decode::decode_bytes_mut,
  types::{BytesFrame, Resp2Frame}
};
use bytes::BytesMut;

fn should_decode_array_no_nulls() {
  let expected = (
    BytesFrame::Array(vec![
      BytesFrame::SimpleString("Foo".into()),
      BytesFrame::SimpleString("Bar".into()),
    ]),
    16,
  );
  let mut bytes: BytesMut = "*2\r\n+Foo\r\n+Bar\r\n".into();
  let total_len = bytes.len();

  let (frame, amt, buf) = match decode_bytes_mut(&mut bytes) {
    Ok(Some(result)) => result,
    Ok(None) => panic!("Expected complete frame"),
    Err(e) => panic!("{:?}", e)
  };

  assert_eq!(frame, expected.0, "decoded frame matched");
  assert_eq!(amt, expected.1, "decoded frame len matched");
  assert_eq!(buf.len(), expected.1, "output buffer len matched");
  assert_eq!(buf.len() + bytes.len(), total_len, "total len matched");
}

RESP2 RangeFrame Decoding Example

Implement a custom borrowed frame type that can only represent BulkString and SimpleString

use redis_protocol::resp2::{
  decode::decode_range,
  types::RangeFrame
};
use std::str;

enum MyBorrowedFrame<'a> {
  BulkString(&'a [u8]),
  SimpleString(&'a str),
}

fn decode_borrowed(buf: &[u8]) -> Option<MyBorrowedFrame> {
  match decode_range(buf).ok()? {
    Some((RangeFrame::BulkString((i, j)), _)) => {
      Some(MyBorrowedFrame::BulkString(&buf[i..j]))
    }
    Some((RangeFrame::SimpleString((i, j)), _)) => {
      let parsed = str::from_utf8(&buf[i..j]).ok()?;
      Some(MyBorrowedFrame::SimpleString(parsed))
    }
    _ => None,
  }
}

redis-protocol.rs's People

Contributors

aembke avatar alecembke-okta avatar marius-meissner avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

redis-protocol.rs's Issues

Update the nom version

Hi we are using nom = "6.1" which internally uses old version of bitvec (bitvec=0.19). bitvec has some breaking changes and now has a stable bitvec=1.0.0. bitvec is also used across many projects and it is creating conflicts with libraries using latest bitvec.

error[E0034]: multiple applicable items in scope
   --> /home/user1/.cargo/registry/src/github.com-1ecc6299db9ec823/bitvec-0.19.4/src/order.rs:315:15
    |
315 |         if ct == R::BITS {
    |                     ^^^^ multiple `BITS` found
    |
note: candidate #1 is defined in the trait `IsNumber`
   --> /home/user1/.cargo/registry/src/github.com-1ecc6299db9ec823/funty-1.2.0/src/lib.rs:144:2
    |
144 |     const BITS: u32;
    |     ^^^^^^^^^^^^^^^^
note: candidate #2 is defined in the trait `BitMemory`
   --> /home/user1/.cargo/registry/src/github.com-1ecc6299db9ec823/bitvec-0.19.4/src/mem.rs:29:2
    |
29  |     const BITS: u8 = mem::size_of::<Self>() as u8 * 8;
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
help: disambiguate the associated constant for candidate #1
    |
315 |         if ct == <R as IsNumber>::BITS {
    |                  ~~~~~~~~~~~~~~~~~
help: disambiguate the associated constant for candidate #2
    |
315 |         if ct == <R as BitMemory>::BITS {
    |                  ~~~~~~~~~~~~~~~~~~

Please check the compatibility with nom=7.1

Invalid frame kind on Frame::Error

First of all thank you very much for this crate, it helped a lot!

Now, I've setup a tokio server that uses Framed to decode/encode using this crate, the encode function is:

impl Encoder for RedisCodec {
    type Item = Frame;
    type Error = io::Error;

    fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
        match encode_bytes(dst, &item) {
            Ok(_l) => return Ok(()),
            Err(e) => {
                println!("{:?} {:?}", e, item);
                return Err(io::Error::new(ErrorKind::Other, e.description()));
            }
        }
    }
}

trying to send this:

framed_socket.send(Frame::Error("foo".into())).await;

I get this:

RedisProtocolError { desc: "Invalid frame kind.", kind: EncodeError, context: None }

sending bulk strings works fine but I wanted to send an error when the command was unrecognized and I've tried even with different strings like ERR test to match what redis sends but with no luck, is there a way to understand better what the encoding error is?

Update: btw I'm using nightly-2019-09-23 (to be able to use tokio 0.2)

Consider using Bytes/Str instead of BytesMut/StrMut in Frame struct

I'm not super familiar with where its best to use Bytes vs BytesMut but my thinking goes like:
Once parsed it would be really rare for someone to want to directly modify the value in a Frame.
Instead its much more likely to convert to a new type entirely or possibly to combine into a seperate BytesMut.
And if Frame uses Bytes then its a lot easier to construct a Frame as we can use a Bytes type or a BytesMut type (freeze is free)
But when Frame uses BytesMut it would require a new allocation to go from Bytes to BytesMut.
Another advantage of Bytes is being able to construct Frames from Bytes::from_static(b"OK") which performs no allocation.

The implementation of the parser could be the same but just call freeze https://docs.rs/bytes/latest/bytes/struct.BytesMut.html#method.freeze

In fact I wonder if BytesMut is needed at all in the decoder if its just splitting up the bytes of the message which Bytes can do just fine. Possibly this is just due to the nom API? The version of nom used is out of date so this is possibly fixed in a later version?

Bytes parser performance

This isnt really an issue for redis-protocol, but raising an issue here seemed like the easiest way to make a discussion on this topic.

I was experimenting with writing a parser for cql (cassandra sql) using nom and https://github.com/aembke/redis-protocol.rs/blob/main/src/nom_bytes.rs
The experiment is available here https://github.com/rukai/cqlparser

I did the following:

  1. Implemented the nom parser the simple way, just pass around an &[u8] and output an ast that allocates Strings everywhere.
  2. Write benchmarks for this.
  3. Ported my parser to use nom_bytes + Bytes.
  4. Ran the benchmarks for both the old and new implementation and noticed significant performance regressions.
git clone https://github.com/rukai/cqlparser
git checkout main
cargo bench
git checkout convert_to_bytes
cargo bench

image

I can kind of see why this would occur, a Bytes would be more expensive to clone around than an &[u8] and nom does a lot of cloning going through all the combinators.
Also maybe the logic in the Bytes Clone is preventing optimizations that normally keep nom performant?

Did you benchmark a Bytes and non-Bytes equivalent parser before settling on a Bytes parser?
Might be worth investigating that if you havent already.

If you did observe performance issues when combining nom and Bytes and found workarounds I would be interested in hearing them.
I noticed that redis-protocol tends to manually implement some stuff that nom does for you, is that related to this problem?

Fix CI

It looks like circle CI for this repo is no longer running.
Should it be fixed? Or should we port the repo to use github actions instead?

Error

expected struct bytes::bytes::BytesMut, found struct bytes::BytesMut

cannot decode Hello message

Using the streaming codec example:

use bytes::BytesMut;
use redis_protocol::resp3::decode::streaming::decode_mut;
use redis_protocol::resp3::encode::complete::encode_bytes;
use redis_protocol::resp3::types::*;
use redis_protocol::types::{RedisProtocolError, RedisProtocolErrorKind};
use tokio_util::codec::{Decoder, Encoder};

#[derive(Default)]
pub struct RedisCodec {
    decoder_stream: Option<StreamedFrame>,
}

impl Encoder<Frame> for RedisCodec {
    type Error = RedisProtocolError;

    fn encode(&mut self, item: Frame, dst: &mut BytesMut) -> Result<(), Self::Error> {
        // in this example we only show support for encoding complete frames. see the resp3 encoder
        // documentation for examples showing how encode streaming frames
        let _ = encode_bytes(dst, &item)?;
        Ok(())
    }
}

impl Decoder for RedisCodec {
    type Item = Frame;
    type Error = RedisProtocolError;

    // Buffer the results of streamed frame before returning the complete frame to the caller.
    // Callers that want to surface streaming frame chunks up the stack would simply return after calling `decode` here.
    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        if src.is_empty() {
            return Ok(None);
        }

        if let Some((frame, _, _)) = decode_mut(src)? {
            if self.decoder_stream.is_some() && frame.is_streaming() {
                // it doesn't make sense to start a stream while inside another stream
                return Err(RedisProtocolError::new(
                  RedisProtocolErrorKind::DecodeError,
                  "Cannot start a stream while already inside a stream.",
                ));
            }

            let result = if let Some(ref mut streamed_frame) = self.decoder_stream {
                // we started receiving streamed data earlier

                // we already checked for streams within streams above
                let frame = frame.into_complete_frame()?;
                streamed_frame.add_frame(frame);

                if streamed_frame.is_finished() {
                    // convert the inner stream buffer into the final output frame
                    Some(streamed_frame.into_frame()?)
                } else {
                    // buffer the stream in memory until it completes
                    None
                }
            } else {
                // we're processing a complete frame or starting a new streamed frame
                if frame.is_streaming() {
                    // start a new stream, saving the internal buffer to the codec state
                    self.decoder_stream = Some(frame.into_streaming_frame()?);
                    // don't return anything to the caller until the stream finishes (shown above)
                    None
                } else {
                    // we're not in the middle of a stream and we found a complete frame
                    Some(frame.into_complete_frame()?)
                }
            };

            if result.is_some() {
                // we're either done with the stream or we found a complete frame. either way clear the buffer.
                let _ = self.decoder_stream.take();
            }

            Ok(result)
        } else {
            Ok(None)
        }
    }
}

With this processor function:

    async fn process(&self, mut stream: TcpStream) -> Result<()> {
        let mut stream = stream;
        let codec = RedisCodec::default();
        let mut conn = codec.framed(stream);
        loop {
            while let Some(message) = conn.next().await {
                if let Ok(message) = message {
                    tracing::info!(message = ?message, "received");
                } else if let Err(message) = message {
                    tracing::error!(message = ?message, "error received");
                }
            }
        }
        Ok(())
    }

I was not able to decode Hello message:

peer connected: 127.0.0.1:52744
  2022-05-21T03:38:25.378855Z TRACE tokio_util::codec::framed_impl: attempting to decode a frame
    at C:\Users\steve\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-util-0.7.2\src\codec\framed_impl.rs:197 on tokio-runtime-worker ThreadId(2)

  2022-05-21T03:38:25.379105Z TRACE tokio_util::codec::framed_impl: Got an error, going to errored state
    at C:\Users\steve\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-util-0.7.2\src\codec\framed_impl.rs:200 on tokio-runtime-worker ThreadId(2)

  2022-05-21T03:38:25.379334Z ERROR sledis::server: error received, RedisProtocolError { desc: "frame_type: Invalid frame type prefix.", kind: DecodeError }
    at src\server.rs:31 on tokio-runtime-worker ThreadId(2)

  2022-05-21T03:38:25.379489Z TRACE tokio_util::codec::framed_impl: Returning None and setting paused
    at C:\Users\steve\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-util-0.7.2\src\codec\framed_impl.rs:163 on tokio-runtime-worker ThreadId(2)

It turns out the message does not have a tag?

  2022-05-21T03:45:18.455055Z  INFO sledis::server: got data, str: Ok("HELLO 3\r\n")

I have to put it in issue because there is no discussion tab, it is way better to put it there imo

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.