Coder Social home page Coder Social logo

avro-rs's Introduction

avro-rs

Latest Version Continuous Integration Latest Documentation MIT licensed

⚠️ avro-rs is no longer maintained here. Please refer to the apache/avro/rust repository instead. Make sure you read the Apache Avro guidelines here. ⚠️

A library for working with Apache Avro in Rust.

Please check our documentation for examples, tutorials and API reference.

Apache Avro is a data serialization system which provides rich data structures and a compact, fast, binary data format.

All data in Avro is schematized, as in the following example:

{
    "type": "record",
    "name": "test",
    "fields": [
        {"name": "a", "type": "long", "default": 42},
        {"name": "b", "type": "string"}
    ]
}

There are basically two ways of handling Avro data in Rust:

  • as Avro-specialized data types based on an Avro schema;
  • as generic Rust serde-compatible types implementing/deriving Serialize and Deserialize;

avro-rs provides a way to read and write both these data representations easily and efficiently.

Installing the library

Add to your Cargo.toml:

[dependencies]
avro-rs = "x.y"

Or in case you want to leverage the Snappy codec:

[dependencies.avro-rs]
version = "x.y"
features = ["snappy"]

Upgrading to a newer minor version

The library is still in beta, so there might be backward-incompatible changes between minor versions. If you have troubles upgrading, check the version upgrade guide.

Defining a schema

An Avro data cannot exist without an Avro schema. Schemas must be used while writing and can be used while reading and they carry the information regarding the type of data we are handling. Avro schemas are used for both schema validation and resolution of Avro data.

Avro schemas are defined in JSON format and can just be parsed out of a raw string:

use avro_rs::Schema;

let raw_schema = r#"
    {
        "type": "record",
        "name": "test",
        "fields": [
            {"name": "a", "type": "long", "default": 42},
            {"name": "b", "type": "string"}
        ]
    }
"#;

// if the schema is not valid, this function will return an error
let schema = Schema::parse_str(raw_schema).unwrap();

// schemas can be printed for debugging
println!("{:?}", schema);

Additionally, a list of of definitions (which may depend on each other) can be given and all of them will be parsed into the corresponding schemas.

use avro_rs::Schema;

let raw_schema_1 = r#"{
        "name": "A",
        "type": "record",
        "fields": [
            {"name": "field_one", "type": "float"}
        ]
    }"#;

// This definition depends on the definition of A above
let raw_schema_2 = r#"{
        "name": "B",
        "type": "record",
        "fields": [
            {"name": "field_one", "type": "A"}
        ]
    }"#;

// if the schemas are not valid, this function will return an error
let schemas = Schema::parse_list(&[raw_schema_1, raw_schema_2]).unwrap();

// schemas can be printed for debugging
println!("{:?}", schemas);

N.B. It is important to note that the composition of schema definitions requires schemas with names. For this reason, only schemas of type Record, Enum, and Fixed should be input into this function.

The library provides also a programmatic interface to define schemas without encoding them in JSON (for advanced use), but we highly recommend the JSON interface. Please read the API reference in case you are interested.

For more information about schemas and what kind of information you can encapsulate in them, please refer to the appropriate section of the Avro Specification.

Writing data

Once we have defined a schema, we are ready to serialize data in Avro, validating them against the provided schema in the process. As mentioned before, there are two ways of handling Avro data in Rust.

NOTE: The library also provides a low-level interface for encoding a single datum in Avro bytecode without generating markers and headers (for advanced use), but we highly recommend the Writer interface to be totally Avro-compatible. Please read the API reference in case you are interested.

The avro way

Given that the schema we defined above is that of an Avro Record, we are going to use the associated type provided by the library to specify the data we want to serialize:

use avro_rs::types::Record;
use avro_rs::Writer;
#
// a writer needs a schema and something to write to
let mut writer = Writer::new(&schema, Vec::new());

// the Record type models our Record schema
let mut record = Record::new(writer.schema()).unwrap();
record.put("a", 27i64);
record.put("b", "foo");

// schema validation happens here
writer.append(record).unwrap();

// this is how to get back the resulting avro bytecode
// this performs a flush operation to make sure data has been written, so it can fail
// you can also call `writer.flush()` yourself without consuming the writer
let encoded = writer.into_inner().unwrap();

The vast majority of the times, schemas tend to define a record as a top-level container encapsulating all the values to convert as fields and providing documentation for them, but in case we want to directly define an Avro value, the library offers that capability via the Value interface.

use avro_rs::types::Value;

let mut value = Value::String("foo".to_string());

The serde way

Given that the schema we defined above is an Avro Record, we can directly use a Rust struct deriving Serialize to model our data:

use avro_rs::Writer;

#[derive(Debug, Serialize)]
struct Test {
    a: i64,
    b: String,
}

// a writer needs a schema and something to write to
let mut writer = Writer::new(&schema, Vec::new());

// the structure models our Record schema
let test = Test {
    a: 27,
    b: "foo".to_owned(),
};

// schema validation happens here
writer.append_ser(test).unwrap();

// this is how to get back the resulting avro bytecode
// this performs a flush operation to make sure data is written, so it can fail
// you can also call `writer.flush()` yourself without consuming the writer
let encoded = writer.into_inner();

The vast majority of the times, schemas tend to define a record as a top-level container encapsulating all the values to convert as fields and providing documentation for them, but in case we want to directly define an Avro value, any type implementing Serialize should work.

let mut value = "foo".to_string();

Using codecs to compress data

Avro supports three different compression codecs when encoding data:

  • Null: leaves data uncompressed;
  • Deflate: writes the data block using the deflate algorithm as specified in RFC 1951, and typically implemented using the zlib library. Note that this format (unlike the "zlib format" in RFC 1950) does not have a checksum.
  • Snappy: uses Google's Snappy compression library. Each compressed block is followed by the 4-byte, big-endianCRC32 checksum of the uncompressed data in the block. You must enable the snappy feature to use this codec.

To specify a codec to use to compress data, just specify it while creating a Writer:

use avro_rs::Writer;
use avro_rs::Codec;
#
let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Deflate);

Reading data

As far as reading Avro encoded data goes, we can just use the schema encoded with the data to read them. The library will do it automatically for us, as it already does for the compression codec:

use avro_rs::Reader;
#
// reader creation can fail in case the input to read from is not Avro-compatible or malformed
let reader = Reader::new(&input[..]).unwrap();

In case, instead, we want to specify a different (but compatible) reader schema from the schema the data has been written with, we can just do as the following:

use avro_rs::Schema;
use avro_rs::Reader;
#

let reader_raw_schema = r#"
    {
        "type": "record",
        "name": "test",
        "fields": [
            {"name": "a", "type": "long", "default": 42},
            {"name": "b", "type": "string"},
            {"name": "c", "type": "long", "default": 43}
        ]
    }
"#;

let reader_schema = Schema::parse_str(reader_raw_schema).unwrap();

// reader creation can fail in case the input to read from is not Avro-compatible or malformed
let reader = Reader::with_schema(&reader_schema, &input[..]).unwrap();

The library will also automatically perform schema resolution while reading the data.

For more information about schema compatibility and resolution, please refer to the Avro Specification.

As usual, there are two ways to handle Avro data in Rust, as you can see below.

NOTE: The library also provides a low-level interface for decoding a single datum in Avro bytecode without markers and header (for advanced use), but we highly recommend the Reader interface to leverage all Avro features. Please read the API reference in case you are interested.

The avro way

We can just read directly instances of Value out of the Reader iterator:

use avro_rs::Reader;
#
let reader = Reader::new(&input[..]).unwrap();

// value is a Result  of an Avro Value in case the read operation fails
for value in reader {
    println!("{:?}", value.unwrap());
}

The serde way

Alternatively, we can use a Rust type implementing Deserialize and representing our schema to read the data into:

use avro_rs::Reader;
use avro_rs::from_value;

#[derive(Debug, Deserialize)]
struct Test {
    a: i64,
    b: String,
}

let reader = Reader::new(&input[..]).unwrap();

// value is a Result in case the read operation fails
for value in reader {
    println!("{:?}", from_value::<Test>(&value.unwrap()));
}

Putting everything together

The following is an example of how to combine everything showed so far and it is meant to be a quick reference of the library interface:

use avro_rs::{Codec, Reader, Schema, Writer, from_value, types::Record, Error};
use serde::{Deserialize, Serialize};

#[derive(Debug, Deserialize, Serialize)]
struct Test {
    a: i64,
    b: String,
}

fn main() -> Result<(), Error> {
    let raw_schema = r#"
        {
            "type": "record",
            "name": "test",
            "fields": [
                {"name": "a", "type": "long", "default": 42},
                {"name": "b", "type": "string"}
            ]
        }
    "#;

    let schema = Schema::parse_str(raw_schema)?;

    println!("{:?}", schema);

    let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Deflate);

    let mut record = Record::new(writer.schema()).unwrap();
    record.put("a", 27i64);
    record.put("b", "foo");

    writer.append(record)?;

    let test = Test {
        a: 27,
        b: "foo".to_owned(),
    };

    writer.append_ser(test)?;

    let input = writer.into_inner()?;
    let reader = Reader::with_schema(&schema, &input[..])?;

    for record in reader {
        println!("{:?}", from_value::<Test>(&record?));
    }
    Ok(())
}

avro-rs also supports the logical types listed in the Avro specification:

  1. Decimal using the num_bigint crate
  2. UUID using the uuid crate
  3. Date, Time (milli) as i32 and Time (micro) as i64
  4. Timestamp (milli and micro) as i64
  5. Duration as a custom type with months, days and millis accessor methods each of which returns an i32

Note that the on-disk representation is identical to the underlying primitive/complex type.

Read and write logical types

use avro_rs::{
    types::Record, types::Value, Codec, Days, Decimal, Duration, Millis, Months, Reader, Schema,
    Writer, Error,
};
use num_bigint::ToBigInt;

fn main() -> Result<(), Error> {
    let raw_schema = r#"
    {
      "type": "record",
      "name": "test",
      "fields": [
        {
          "name": "decimal_fixed",
          "type": {
            "type": "fixed",
            "size": 2,
            "name": "decimal"
          },
          "logicalType": "decimal",
          "precision": 4,
          "scale": 2
        },
        {
          "name": "decimal_var",
          "type": "bytes",
          "logicalType": "decimal",
          "precision": 10,
          "scale": 3
        },
        {
          "name": "uuid",
          "type": "string",
          "logicalType": "uuid"
        },
        {
          "name": "date",
          "type": "int",
          "logicalType": "date"
        },
        {
          "name": "time_millis",
          "type": "int",
          "logicalType": "time-millis"
        },
        {
          "name": "time_micros",
          "type": "long",
          "logicalType": "time-micros"
        },
        {
          "name": "timestamp_millis",
          "type": "long",
          "logicalType": "timestamp-millis"
        },
        {
          "name": "timestamp_micros",
          "type": "long",
          "logicalType": "timestamp-micros"
        },
        {
          "name": "duration",
          "type": {
            "type": "fixed",
            "size": 12,
            "name": "duration"
          },
          "logicalType": "duration"
        }
      ]
    }
    "#;

    let schema = Schema::parse_str(raw_schema)?;

    println!("{:?}", schema);

    let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Deflate);

    let mut record = Record::new(writer.schema()).unwrap();
    record.put("decimal_fixed", Decimal::from(9936.to_bigint().unwrap().to_signed_bytes_be()));
    record.put("decimal_var", Decimal::from((-32442.to_bigint().unwrap()).to_signed_bytes_be()));
    record.put("uuid", uuid::Uuid::new_v4());
    record.put("date", Value::Date(1));
    record.put("time_millis", Value::TimeMillis(2));
    record.put("time_micros", Value::TimeMicros(3));
    record.put("timestamp_millis", Value::TimestampMillis(4));
    record.put("timestamp_micros", Value::TimestampMicros(5));
    record.put("duration", Duration::new(Months::new(6), Days::new(7), Millis::new(8)));

    writer.append(record)?;

    let input = writer.into_inner()?;
    let reader = Reader::with_schema(&schema, &input[..])?;

    for record in reader {
        println!("{:?}", record?);
    }
    Ok(())
}

Calculate Avro schema fingerprint

This library supports calculating the following fingerprints:

  • SHA-256
  • MD5
  • Rabin

An example of fingerprinting for the supported fingerprints:

use avro_rs::rabin::Rabin;
use avro_rs::{Schema, Error};
use md5::Md5;
use sha2::Sha256;

fn main() -> Result<(), Error> {
    let raw_schema = r#"
        {
            "type": "record",
            "name": "test",
            "fields": [
                {"name": "a", "type": "long", "default": 42},
                {"name": "b", "type": "string"}
            ]
        }
    "#;
    let schema = Schema::parse_str(raw_schema)?;
    println!("{}", schema.fingerprint::<Sha256>());
    println!("{}", schema.fingerprint::<Md5>());
    println!("{}", schema.fingerprint::<Rabin>());
    Ok(())
}

Ill-formed data

In order to ease decoding, the Binary Encoding specification of Avro data requires some fields to have their length encoded alongside the data.

If encoded data passed to a Reader has been ill-formed, it can happen that the bytes meant to contain the length of data are bogus and could result in extravagant memory allocation.

To shield users from ill-formed data, avro-rs sets a limit (default: 512MB) to any allocation it will perform when decoding data.

If you expect some of your data fields to be larger than this limit, be sure to make use of the max_allocation_bytes function before reading any data (we leverage Rust's std::sync::Once mechanism to initialize this value, if any call to decode is made before a call to max_allocation_bytes, the limit will be 512MB throughout the lifetime of the program).

use avro_rs::max_allocation_bytes;

max_allocation_bytes(2 * 1024 * 1024 * 1024);  // 2GB

// ... happily decode large data

Check schemas compatibility

This library supports checking for schemas compatibility.

Note: It does not yet support named schemas (more on #76).

Examples of checking for compatibility:

  1. Compatible schemas

Explanation: an int array schema can be read by a long array schema- an int (32bit signed integer) fits into a long (64bit signed integer)

use avro_rs::{Schema, schema_compatibility::SchemaCompatibility};

let writers_schema = Schema::parse_str(r#"{"type": "array", "items":"int"}"#).unwrap();
let readers_schema = Schema::parse_str(r#"{"type": "array", "items":"long"}"#).unwrap();
assert_eq!(true, SchemaCompatibility::can_read(&writers_schema, &readers_schema));
  1. Incompatible schemas (a long array schema cannot be read by an int array schema)

Explanation: a long array schema cannot be read by an int array schema- a long (64bit signed integer) does not fit into an int (32bit signed integer)

use avro_rs::{Schema, schema_compatibility::SchemaCompatibility};

let writers_schema = Schema::parse_str(r#"{"type": "array", "items":"long"}"#).unwrap();
let readers_schema = Schema::parse_str(r#"{"type": "array", "items":"int"}"#).unwrap();
assert_eq!(false, SchemaCompatibility::can_read(&writers_schema, &readers_schema));

License

This project is licensed under MIT License. Please note that this is not an official project maintained by Apache Avro.

Contributing

Everyone is encouraged to contribute! You can contribute by forking the GitHub repo and making a pull request or opening an issue. All contributions will be licensed under MIT License.

Please consider adding documentation, tests and a line for your change under the Unreleased section in the CHANGELOG. If you introduce a backward-incompatible change, please consider adding instruction to migrate in the Migration Guide If you modify the crate documentation in lib.rs, run make readme to sync the README file.

avro-rs's People

Contributors

aananthram avatar atsheehan avatar batconjurer avatar calvinbrown085 avatar collosi avatar cpcloud avatar davidalber avatar dgrijalva-squire avatar fanatid avatar flavray avatar jaboatman avatar jeffw-wherethebitsroam avatar jiak94 avatar lerouxrgd avatar mikaelstaldal avatar mkroli avatar nlopes avatar piropaolo27 avatar poros avatar sebastianblunt avatar zyxw59 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

avro-rs's Issues

Properly support recursive types

Porting tests over from the official Avro python package (see #18), I noticed that recursive types aren't properly supported (for very simple use cases the schema parsing doesn't crash, but I am not even sure encoding would actually work).

Let's support recursive types for real.

There are already some tests to uncomment which cover the issue; they can be found grepping for the issue number.

Error types

Most of the apis are returning failure::Error. How about reexporting it? Or implementing much more concrete errors?

Avoid calling .resolve when encoding datum

Due to the way enums work right now, we are forced to make a call to Value::resolve before encoding datum in avro format. This might suck performance (for arrays, maps, records mostly) by doing extra allocations.

We should probably treat enum as a special case (or come up with a magical solution) and avoid calling .resolve :)

Unsafe code to transmute floats

Unsafe code is used to transmute data between floating point numbers and bytes. This is unsafe for a good reason, as it encourages not caring about endianness. Better use the safe methods f32::from_bits, f32::to_bits, f64::from_bits, f64::to_bits, u32::to_le_bytes, u32::from_le_bytes, u64::from_le_bytes and u64::to_le_bytes.

Ok(Value::Float(unsafe { transmute::<[u8; 4], f32>(buf) }))

Ok(Value::Double(unsafe { transmute::<[u8; 8], f64>(buf) }))

Value::Float(x) => buffer.extend_from_slice(&unsafe { transmute::<f32, [u8; 4]>(*x) }),

Value::Double(x) => buffer.extend_from_slice(&unsafe { transmute::<f64, [u8; 8]>(*x) }),

Is it possible to Serialize/Deserialize rust enums?

Support for rust enum?

I appear to be able to serialize a struct with an enum, but can't deserialize. Is this feature supported?

Code below gives

Err(Error { message: "not an enum" })

Test code.

use avro_rs::{Codec, Reader, Schema, Writer, from_value};
use failure::Error;
use serde::{Serialize, Deserialize};

#[derive(Debug, Deserialize, Serialize)]
enum Suit {
    Diamonds,
    Spades,
    Clubs,
    Hearts
}

#[derive(Debug, Deserialize, Serialize)]
struct Test {
    a: i64,
    b: String,
    c: Suit
}

fn main() -> Result<(), Error> {
    let raw_schema = r#"
        {
            "type": "record",
            "name": "Test",
            "fields": [
                {"name": "a", "type": "long", "default": 42},
                {"name": "b", "type": "string"},
                {
                    "name": "c",
                    "type": {
                        "type": "enum",
                        "name": "Suit",
                        "symbols": ["Diamonds", "Spades", "Clubs", "Hearts"]
                    },
                    "default": "Spades"
                }
            ]
        }
    "#;
    let schema = Schema::parse_str(raw_schema).unwrap();
    let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Null);

    let test = Test {
        a: 1,
        b: "Name".to_string(),
        c: Suit::Spades
    };
    
    writer.append_ser(test)?;
    writer.flush()?;
    let input = writer.into_inner();
    let reader = Reader::with_schema(&schema, &input[..]).unwrap();
    
    for record in reader {
        println!("{:?}", from_value::<Test>(&record?));
    }
    Ok(())
}

Investigate failures in tests covering default values

Porting tests over from the official Avro python package (see #18), I found out that we are failing 4 test cases covering default values assigned to fields while reading. This is probably the symptom of one or more bugs in the way the library implemented support for default values.

The test cases can be found grepping for the issue number.

Auto Schema Creation from the struct

It would be beneficial to generate the schema automatically from the structure. For example,

#[derive(Serializable, Deserializable)]
struct Test {
    a: i32,
    b: f32,
}
let schema = schema::from_ser(Test::avro_schema()).unwrap();

I am not 100% sure on how the design should look like. Do you have any suggestions?

Problem encoding large values in long

This code appears to fail the test. It's a round trip encoding/decoding of a large integer value. The value is larger than an 2^32 but smaller than 2^63.

    pub const JSON: &str = r#"
    { "name": "athing", "type": "long" }
    "#;

    fn serialize_round_trip2() {
        let json_scheme: serde_json::Value = serde_json::from_str(JSON).unwrap();
        let scheme = Schema::parse(&json_scheme).unwrap();
        let a_thing = 1553201383562400 as i64;
        let bytes = to_avro_datum(&scheme, to_value(a_thing).unwrap()).unwrap();
        let rehy: i64 =
            from_value(&from_avro_datum(&scheme, &mut &bytes[..], None).unwrap())
                .unwrap();
        assert_eq!(rehy, 1553201383562400 as i64);
    }

When run with cargo test the result is:

---- tests::serialize_round_trip2 stdout ----
thread 'tests::serialize_round_trip2' panicked at 'assertion failed: `(left == right)`
  left: `-1553201383790593`,
 right: `1553201383562400`', src/lib.rs:502:9
note: Run with `RUST_BACKTRACE=1` environment variable to display a backtrace.

Not using the Codec?

So I am trying to write the snappy compressed Avro to a file. I am giving the write from the given example the Snappy codec.

let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Snappy);

When I print what into_inner(); returns I see a vector printed with integers in it. I can only assume that is the avro encoded or something. However when I change the codec to Codec::Null the vector is no different. I am a little confused on how the codec is used and how I can get the compressed avro written to a file?

I should also note that I check the vector and its not valid Snappy.

Union index is zig-zag long

The avro spec says that a Union value is encoded by first encoding the index of the Union entry as a long--meaning zig-zag encoded. In decode.rs, it reads the index as a single byte, directly encoded as an integer. I think this change needs to be made:

diff --git a/src/decode.rs b/src/decode.rs
index 28980c2..f43005d 100644
--- a/src/decode.rs
+++ b/src/decode.rs
@@ -112,12 +112,11 @@ pub fn decode<R: Read>(schema: &Schema, reader: &mut R) -> Result<Value, Error>
             Ok(Value::Map(items))
         },
         &Schema::Union(ref inner) => {
-            let mut buf = [0u8; 1];
-            reader.read_exact(&mut buf)?;
+            let index = zag_i64(reader)?;
 
-            match buf[0] {
-                0u8 => Ok(Value::Union(None)),
-                1u8 => decode(inner, reader),
+            match index {
+                0 => Ok(Value::Union(None)),
+                1 => decode(inner, reader),
                 _ => Err(DecodeError::new("union index out of bounds").into()),
             }
         },

Update readme example to reflect rust 2018 and smooth onboarding.

To make the example on the front page work we say add to your cargo.toml

[dependencies]
avro-rs = "^0.6"

Actually I needed to do this.

[dependencies]
serde = "1.0"
serde_derive = "1.0"
failure = "0.1.5"
avro-rs = "^0.6"

Then the example code in the readme uses the older rust syntax

extern crate avro_rs;

#[macro_use]
extern crate serde_derive;
extern crate failure;

use avro_rs::{Codec, Reader, Schema, Writer, from_value, types::Record};
use failure::Error;

With rust 2018 we can replace the above with

use serde_derive::{Deserialize, Serialize};
use avro_rs::{Codec, Reader, Schema, Writer, from_value, types::Record};
use failure::Error;

It would also be nice add enums and some other types to the example to show them working.

Thanks.

Support string values for Enum

Avro Enum are represented as integers but they should work with string values as well. Since they are the only ones in the spec to have such a disparity between representation and usage, this feature isn't supported yet. Let's make that happen.

Write a tutorial

The library does have an example in lib.rs but that should be expanded in at least a short tutorial.

Error: ParseSchemaError("Unknown type: enum")

I'm not sure of the status of enum, but not working for me with the following example using avro-rs 0.6.5

use avro_rs::Schema;
use failure::Error;

fn main() -> Result<(), Error> {
    let raw_schema = r#"
        {
            "type": "record",
            "name": "test",
            "fields": [
                {"name": "a", "type": "long", "default": 42},
                {"name": "b", "type": "string"},
                { "type" : "enum",  "name" : "Numbers", "namespace": "data", "symbols" : [ "ONE", "TWO", "THREE", "FOUR" ]}
            ]
        }
    "#;
    let _schema = Schema::parse_str(raw_schema)?;
    Ok(())
}

Gives

Error: ParseSchemaError("Unknown type: enum")

Record serialization is sensitive to order of fields in struct

I'm not sure this is not intended behavior, but I just got tripped up by this so I thought I'd point it out. If you declare a schema like this:

{
  "type": "record",
  "name": "my_record",
  "fields": [
    {"name": "a", "type": "long"},
    {"name": "b", "type": "string"},
  ]
}

and then try to de/serialize it into a rust struct of this type:

#[derive(Deserialize, Serialize)]
pub struct MyRecord {
  b: String,
  a: i64,
}

You will get an error, "value does not match schema". I'm not sure that should be an error, seems like there's an obvious way to translate from one to another.

The operation that is erroring for me is:

to_avro_datum(&record_scheme, to_value(record)?)?

Support same types but with different names in unions

Porting tests over from the official Avro python package (see #18), I found out that Avro allows unions to contain more than once the same type (only for record, fixed and enum) as long as they specify different names.

Quoting the spec:

Unions may not contain more than one schema with the same type, except for the types record, fixed and enum. For example, unions containing two array types or two map types are not permitted, but two types with different names are permitted. (Names permit efficient resolution when reading and writing unions.)

There is already a test to uncomment which covers the issue; it can be found grepping for the issue number.

Code to serialize derived struct is commented out

There's code that's commented out in types.rs:

impl<S: Serialize> ToAvro for S {
    fn avro(self) -> Value {
        use ser::Serializer;
        self.serialize(&mut Serializer::new()).unwrap()
    }
}

This appears to do exactly what I need to do (create an avro-rs Value from a struct that has derived Serialize). Is there a preferred way to do this, or can this code be un-commented?

Rethink the Writer support - Add DatumWriter

Pretty much like both the Python and Java Avro client libraries do, it would be nice to rethink the way data is written and encoded.

The current Writer implementation is limited, and we had to add the SingleWriter interface to allow single-message writing support. The approach taken by the official Avro clients is really different and it would be nice to align the Rust library with them.

Proposed changes:

  • Add a DatumWriter interface (or keep the Writer name /shrug) that has a schema and is able to write datum, if the datum is valid according to the schema
  • Move header, sync marker, etc... support to another place?

There is already support for encoding data without a schema in encode.rs, that should be enough to mimic the Encoder interface in Java/Python

Errors hold strings

All errors hold a string as their contents. Better hold an enum to make it more semantic and to avoid unnecessary memory allocation and string copying, or at least use a 'static str.

message: String,

message: String,

pub struct ParseSchemaError(String);

pub struct AllocationError(String);

pub struct DecodeError(String);

Consuming avro messages from kafka

It looks like the crate today supports to use case of reading avro data from a file. I think the ability to deserialize messages from kafka would be nice. Is this a use case that would fit into this crate or would it be better served in a standalone crate?

After some research it appears this functionality would end up needing at least a schema registry client in addition to the ability to parse the wire format described here.

Union tags are cast unchecked

Union tags are cast from i64 to usize when decoding without checking that the conversion makes sense. This has weird effects, such as on 32 bit platforms interpreting the tag 4294967300 as a 4.

match variants.get(index as usize) {

Combination of Option type and doc attribute causes parse error.

I'm get an issue when combining nullable types i.e. Option and doc comments. The code below will fail with

Err(Error { message: "not a union" })

However, if you remove the doc comment from the raw schema it will start working again.

Thanks.

use avro_rs::{Codec, Reader, Schema, Writer, from_value, types::Record};
use failure::Error;
use serde::{Serialize, Deserialize};

#[derive(Debug, Deserialize, Serialize)]
struct Test {
    a: Option<i64>,
    b: String,
}

fn main() -> Result<(), Error> {
    let raw_schema = r#"
        {
            "type": "record",
            "name": "test",
            "fields": [
                {
                    "name": "a", 
                    "type": ["null", "long"], 
                    "default": 42
                },
                {
                    "name": "b", 
                    "type": "string",
                    "doc" : "Remove this line and it will work again."
                }
            ]
        }
    "#;

    let schema = Schema::parse_str(raw_schema)?;

    println!("{:?}", schema);

    let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Deflate);

    let mut record = Record::new(writer.schema()).unwrap();
    record.put("a", Some(27i64));
    record.put("b", "foo");

    writer.append(record)?;

    let test = Test {
        a: Some(26),
        b: "foo".to_owned(),
    };

    writer.append_ser(test)?;

    writer.flush()?;

    let input = writer.into_inner();
    let reader = Reader::with_schema(&schema, &input[..])?;

    for record in reader {
        println!("{:?}", from_value::<Test>(&record?));
    }
    Ok(())
}

Compatibility

Are there any plans to implement compatibility functions?

On the off chance that anyone reading that has already done this in some other repo, could you point me to it?

Support default values for type Enum

The Avro type Enum is missing support for default value due to its weird nature of being represented as integers but having default values as string. The library should support that use case to be feature complete.

Publish library to crates.io

Library must be published to crates.io before we can ask serde to add to its main page. We should also check that documentation builds correctly on the website.

Snappy codec does not work

When writing Avro data with the Snappy codec, it cannot be read by the Java implementation from Apache. And vice versa.

To reproduce, run this Rust program and redirect stdout to a file (snappy.avro):

[dependencies.avro-rs]
version = "0.6.1"
features = ["snappy"]
extern crate avro_rs;

use avro_rs::Codec;
use avro_rs::Schema;
use avro_rs::types::Record;
use avro_rs::Writer;
use std::io::{self, Write};

fn main() {
    let raw_schema = r#"
        {
            "type": "record",
            "name": "test",
            "fields": [
                {"name": "a", "type": "long", "default": 42},
                {"name": "b", "type": "string"}
            ]
        }
    "#;

    let schema = Schema::parse_str(raw_schema).unwrap();

    let mut writer = Writer::with_codec(&schema, io::stdout(), Codec::Snappy);
    let mut record = Record::new(writer.schema()).unwrap();
    record.put("a", 27i64);
    record.put("b", "foo");
    writer.append(record).unwrap();
    writer.flush().unwrap();
}

Then run

java -jar avro-tools-1.8.2.jar tojson snappy.avro

I get this error:

Exception in thread "main" org.apache.avro.AvroRuntimeException: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
        at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:210)
        at org.apache.avro.tool.DataFileReadTool.run(DataFileReadTool.java:76)
        at org.apache.avro.tool.Main.run(Main.java:87)
        at org.apache.avro.tool.Main.main(Main.java:76)
Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
        at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
        at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
        at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444)
        at org.xerial.snappy.Snappy.uncompress(Snappy.java:480)
        at org.apache.avro.file.SnappyCodec.decompress(SnappyCodec.java:60)
        at org.apache.avro.file.DataFileStream$DataBlock.decompressUsing(DataFileStream.java:355)
        at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:199)
        ... 3 more

(If I use Null or Deflate codec, it works as expected.)

Review public interface

The library has the tendency to declare most things public, like it happened to the de and ser module. Let's review the public interface and check whether something that shouldn't be part of it is still public.

Add library to serde ecosystem

Serde has a list of serde formats specifying the main library supporting each of those. Let's ask to be added to the list for avro.

Union with many types

I'm new to avro and its schema format but I'm diving in and trying to read a data stream using an avro schema. It's metric data, a timestamp, a scalar, and some tags:

{"type"=>"record",
 "name"=>"ut",
 "namespace"=>"vnoportal",
 "fields"=>
  [{"name"=>"timestamp",
    "type"=>
     ["long",
      "int",
      "float",
      "double",
      {"type"=>"fixed", "name"=>"uint64_t", "size"=>8},
      {"type"=>"fixed", "name"=>"int64_t", "size"=>8}]},
   {"name"=>"metric", "type"=>"string"},
   {"name"=>"value",
    "type"=>
     ["long",
      "int",
      "float",
      "double",
      {"type"=>"fixed", "name"=>"uint8_t", "size"=>1},
      {"type"=>"fixed", "name"=>"uint16_t", "size"=>2},
      {"type"=>"fixed", "name"=>"uint32_t", "size"=>4},
      "uint64_t",
      {"type"=>"fixed", "name"=>"int8_t", "size"=>1},
      {"type"=>"fixed", "name"=>"int16_t", "size"=>2},
      {"type"=>"fixed", "name"=>"int32_t", "size"=>4},
      "int64_t"]},
   {"name"=>"tags", "type"=>["null", {"type"=>"map", "values"=>"string"}]},
   {"name"=>"metadata",
    "type"=>["null", {"type"=>"map", "values"=>"string"}]}]}

it's very "generous" in terms of what kind of data can come in, any flavor of scalars for the most part. How could I convert the schema parser to handle this?

Support logical types

Avro defines a set of special types built on top of regular types called logical types which this package doesn't yet support.

There are already some tests to uncomment which cover the issue that have been backported from the official Avro Python package in #18; they can be found grepping for the issue number.

Figure out why "type": "error" seems to be valid and support it

Porting tests over from the official Avro python package (see #18), I found out that there is one specifying "type": "error" instead of "type": "record". The spec doesn't mention it explicitly, but there is an example where this mysterious type is used.

Let's figure out why this type is valid and support it.

There is already a test to uncomment which covers the issue; it can be found grepping for the issue number.

Support user-defined attributes

Avro allows users to specify arbitrary attributes (as long as they don't clash with reserved keywords and attributes) that can be attached to types and schemas, which this library doesn't yet support.

Please note that these are different from the attributes like precision or scale which are attributes defined by logical types and which are already covered by #93.

There are already some tests to uncomment which cover the issue that have been backported from the official Avro Python package in #18; they can be found grepping for the issue number.

Support for writing Option values as Avro union types

Using avro-rs version 0.6.5:

use serde::Serialize;
use avro_rs::{Schema, Writer};

#[derive(Serialize)]
struct Foo {
    bar: Option<String>,
}

fn main() {
    let schema = Schema::parse_str(r#"{
        "name": "Foo",
        "type": "record",
        "fields": [
            {"name": "bar", "type": ["null", "string"]}
        ]
    }"#).unwrap();
    let mut writer = Writer::new(&schema, Vec::new());
    writer.append_ser(Foo { bar: None }).unwrap();
}

This would be the canonical way to indicate a nullable field in Avro. However, when serializing, it produces the error ValidationError("value does not match schema").

Even if I change the definition of bar to be just String instead of Option<String> I get the same error, although strings are a subset of legal fields according to the schema.

(As an aside, it would also be useful if the error specified which value did not match which schema.)

Lengths are cast to unsigned unchecked

Lengths are first decoded as i64 and then cast to usize without checking that the conversion makes sense. This has weird effects, such as producing an allocation error instead of a parse error, or on 32 bit platforms interpreting the length 4294967300 as a nice little array of four items.

zag_i64(reader).and_then(|len| safe_len(len as usize))

Improve readme example

I notice that in the example provided in the README (and elsewhere), there is there are two serializations done of the Test struct, once with types::Record, and once with a native rust struct that derives Serialize. Both of those serializations use the same data (a: 27, b:"foo"), however when I change the data in the second record to:

let test = Test {
    a: 999,
    b: "bar".to_owned(),
};

I get this output:

...
Ok(Test { a: 27, b: "foo" })
Ok(Test { a: 27, b: "foo" })

Because I'm new to Rust and Avro, I'm not sure that I'm not doing something wrong. Shouldn't I be getting:

...
Ok(Test { a: 27, b: "foo" })
Ok(Test { a: 999, b: "bar" })

Partial deserialisation - ignore unknown?

Hi,

Apologies if I have missed anything but is partial deserialisation currently supported/in the pipeline?

I.e if I have a complex schema containing records with many fields, can i deserialise into a struct with only 2 of the fields declared - the others are simply skipped? Equivalent to ignore unknown?

Error serializing/deserializing record with consecutive arrays

It appears that a record with two consecutive array fields is not getting serialized (or maybe deserialized) correctly. The following test fails, saying that the rehydrated value of secondvec is [] where it should be [3,4]. I could only get the test to fail when firstvec is empty. If you uncomment the line "my_struct.firstvec = vec![8,9];" the test passes.

extern crate serde;
#[macro_use] extern crate serde_derive;
extern crate serde_json;
extern crate avro_rs;
extern crate serde_bytes;


fn main() {
    println!("Hello, world!");
}

#[derive(Clone, Debug, Deserialize, Serialize, Default)]
pub struct MyType {
    pub firstvec: Vec<i64>,
    pub secondvec: Vec<i64>,
}

pub const JSON: &str = r#"
{
  "type": "record",
  "name": "mytype",
  "fields": [
    {"name": "firstvec", "type":{"type":"array", "items":"long"}},
    {"name": "secondvec", "type":{"type":"array", "items":"long"}}
  ]
}
"#;


#[cfg(test)]
mod tests {
    use super::*;
    use avro_rs::{from_avro_datum, from_value, to_avro_datum, to_value, Schema};

    #[test]
    fn serialize_round_trip() {
        let json_scheme: serde_json::Value = serde_json::from_str(JSON).unwrap();
        let scheme = Schema::parse(&json_scheme).unwrap();
        let mut my_struct: MyType = Default::default();
	//my_struct.firstvec = vec![8,9];
        my_struct.secondvec = vec![3,4];
        let bytes = to_avro_datum(&scheme, to_value(my_struct).unwrap()).unwrap();
        let rehy_struct: MyType =
            from_value(&from_avro_datum(&scheme, &mut &bytes[..], None).unwrap())
                .unwrap();
        assert_eq!{rehy_struct.secondvec, [3,4]};

    }
}

Crash on bogus input

Avro-rs has a tendency to crash the process when given invalid input. The typical error message that I've seen (I don't have it right now) mentions an out of memory condition. My guess is it's trying to allocate an array of size -7 or something. I'd like to start using this crate in a project of mine, but the crashes are an issue.

I've actually created a fuzzer project with afl.rs, that I can share if that would be helpful, but at its core it's just:

extern crate afl;
extern crate avro_rs;

use avro_rs::{Schema, from_avro_datum};

    static SCHEMA: &'static str = r#"
            {
                "type": "record",
                "name": "test",
                "fields": [
                    {"name": "a", "type": "long", "default": 42},
                    {"name": "b", "type": "string"}
                ]
            }
        "#;


fn main() {
    afl::read_stdio_bytes(|bs| {
	let schema = Schema::parse_str(SCHEMA).unwrap();
        let _ = from_avro_datum(&schema, &mut &bs[..], None);
    });
}

And here are the stats (4 crashes) after running for a minute or two:

start_time        : 1529615156
last_update       : 1529615246
fuzzer_pid        : 27636
cycles_done       : 32
execs_done        : 178688
execs_per_sec     : 2081.67
paths_total       : 18
paths_favored     : 7
paths_found       : 17
paths_imported    : 0
max_depth         : 2
cur_path          : 0
pending_favs      : 0
pending_total     : 0
variable_paths    : 18
stability         : 98.40%
bitmap_cvg        : 1.62%
unique_crashes    : 4
unique_hangs      : 0
last_path         : 1529615159
last_crash        : 1529615157
last_hang         : 0
execs_since_crash : 178078
exec_timeout      : 20
afl_banner        : avro-fuzz
afl_version       : 2.52b
target_mode       : default

Record type deserialisation performance

I've been doing work with very large avro datasets and comparing performance with existing Java implementation, originally performance was very poor, over 3x slower for the same workload. I implemented zero-copy deserialisation which helped substantially but performance was still 2x slower for the Rust impl. I have since modified my fork of avro-rs to treat Record field names as Rc<String> to stop a string allocation for each record row, this has provided a substantial performance boost to the point that the Rust implementation for my workload is now 30% faster than the Java.

Is Rc<String> the best approach for this? My logic was that record field names are fixed at Schema load - would it be possible to remove the Rc completely in favour of &'schema str?

And is there any other places where there's scope for performance improvements?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.