Coder Social home page Coder Social logo

parquet-go / parquet-go Goto Github PK

View Code? Open in Web Editor NEW

This project forked from segmentio/parquet-go

248.0 7.0 41.0 8.09 MB

High-performance Go package to read and write Parquet files

Home Page: https://pkg.go.dev/github.com/parquet-go/parquet-go

License: Apache License 2.0

Go 86.26% Assembly 13.73% Makefile 0.02%
columnar-format golang parquet parquet-files performance

parquet-go's People

Contributors

achille-roussel avatar annanay25 avatar asubiotto avatar bartleyg avatar bprosnitz avatar brancz avatar cyriltovena avatar dependabot[bot] avatar derekperkins avatar forsaken628 avatar fpetkovski avatar gernest avatar hhoughgg avatar javierhonduco avatar jeffail avatar joe-elliott avatar kevinburkesegment avatar mapno avatar mdisibio avatar michaelurman avatar ngotchac avatar pelletier avatar pryz avatar shinena1998 avatar stoewer avatar thorfour avatar tschaub avatar yonesko avatar zolstein avatar zolstein-clumio avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

parquet-go's Issues

Corrupt parquet files due to overflow in UncompressedPageSize

I came across an issue where parquet files written by parquet-go were found to be corrupted, leading to an inability to read them using parquet-go. The panic occurred within the filePages.ReadPage() function, more specifically at bufferPool.get(bufferSize int).

Upon debugging, it was evident that the bufferSize causing the panic had the value -1926395834. The buffer size is derived from header.UncompressedPageSize.

Based on my investigation, it appears that an overflow may be occurring in the conversion of the uncompressed page size value from int to int32 in writer.go.

Unable to read nested map value with schema generated from NewSchema()

When reading parquet data that contains a map with a nested value and using a schema generated with parquet.NewSchema() I'm seeing a panic from the library. The panic happens during the read (writing is fine):

panic: reflect: call of reflect.Value.MapIndex on struct Value

The following test illustrates the issue.

func TestMapValue(t *testing.T) {
	// Main struct used to mashal/unmarshal data.
	type NestedStruct struct {
		Val string
	}
	type MapValue struct {
		Nested NestedStruct
	}
	type Message struct {
		TestMap map[string]MapValue
	}

	testKey, testValue := "test-key", "test-value"
	in := Message{
		TestMap: map[string]MapValue{
			"test-key": {
				Nested: NestedStruct{
					Val: testValue,
				},
			},
		},
	}

	var f bytes.Buffer

	// Generate a schema that matches Message exactly.
	schema := parquet.NewSchema("Message", parquet.Group{
		"TestMap": parquet.Group{
			"key_value": parquet.Repeated(parquet.Group{
				"key": parquet.String(),
				"value": parquet.Group{
					"Nested": parquet.Group{
						"Val": parquet.String(),
					},
				},
			}),
		},
	})

	pw := parquet.NewGenericWriter[Message](&f, schema)
	_, err := pw.Write([]Message{in})
	if err != nil {
		t.Fatal(err)
	}

	err = pw.Close()
	if err != nil {
		t.Fatal(err)
	}

	// The goal here is to not use a schema generated from Message with
	// parquet.SchemaOf(Message), but one made with parquet.NewSchema(...).
	pr := parquet.NewGenericReader[*Message](bytes.NewReader(f.Bytes()), schema)

	out := make([]*Message, 1)
	_, err = pr.Read(out)
	if err != nil {
		t.Fatal(err)
	}
	pr.Close()
	if want, got := testValue, out[0].TestMap[testKey].Nested.Val; want != got {
		t.Error("failed to read map value")
	}
}

Is it possible to write the schema in such a way that the test above passes? In my case I can not use parquet.SchemaOf(Message) as the Message type is not available/usable for this purpose.

[PowerBI] Yet implemented: Unsupported encoding.

I encountered an error while attempting to open a file with a string column in PowerBI. The specific error message I received was:

Parquet: class parquet::ParquetException (message: 'Not yet implemented: Unsupported encoding.')

Interestingly, the issue does not occur when the Parquet file contains only numeric columns; it works perfectly in that case.

However, I successfully converted data.csv, which contains strings, to Parquet format using parquet-tools, and I was able to open it in PowerBI without any problems.

Here is the code I used to test the scenario:

// Not works
type RowString struct{ Data string }
parquet.WriteFile("file_with_string.parquet", []RowString{
	{Data: "Bob"},
})

// Not works
group := parquet.Group{
	"Data": parquet.Optional(parquet.String()),
}
schema := parquet.NewSchema("schema", group)
output, _ := os.OpenFile("file_with_string2.parquet", os.O_RDWR|os.O_CREATE, 0755)
pw := parquet.NewGenericWriter[any](output, schema)
defer pw.Close()
builder := parquet.NewRowBuilder(group)
builder.Add(0, parquet.ByteArrayValue([]byte(`Bob`)))
pw.WriteRows([]parquet.Row{builder.Row()})

// Works
type RowInt struct{ Data int }
parquet.WriteFile("file_with_int.parquet", []RowInt{
	{Data: 1},
})

Problems with dynamic schemas and writing with optional fields

I am new to this package, and have a requirement for building dynamic schema's at runtime, but am having problems getting the whole solution to work. I have seen many other issues regarding this, but have been unable to piece together a full solution. Most the other issues are either unanswered, or don't have a full code snippet that works. As part of this, I am also needing Optional fields.

These are all the similar issues I have read and have not been able to figure it out.


I would like to create a schema at runtime. Both of these two options appear to work correctly for me when I create the schema.

schema := parquet.NewSchema("runtime_schema", parquet.Group{
	"id":  parquet.Optional(parquet.Int(32)),
	"age": parquet.Optional(parquet.Int(32)),
})
structFields := []reflect.StructField{}

tp := reflect.TypeOf(int32(0))

structFields = append(structFields, reflect.StructField{
	Name: strings.ToUpper("age"),
	Type: tp,
	Tag:  reflect.StructTag(fmt.Sprintf(`parquet:"%v,optional"`, "age")),
})
structFields = append(structFields, reflect.StructField{
	Name: strings.ToUpper("id"),
	Type: tp,
	Tag:  reflect.StructTag(fmt.Sprintf(`parquet:"%v,optional"`, "id")),
})

structType := reflect.StructOf(structFields)
structElem := reflect.New(structType)

schema := parquet.NewSchema("runtime_schema", parquet.SchemaOf(structElem.Interface()))

Both of those options create the same schema as expected:

message runtime_schema {
        optional int32 age (INT(32,true));
        optional int32 id (INT(32,true));
}

I am not sure on which option is better? I have seen both ways suggested in different issues: #30 (comment), segmentio#311 (comment).

Now, the problem I am running in to, is using the the GenericWriter with this schema. I am unsure of what to put for the type parameter, so I have been using any. It compiles, and effectively "works", but nothing I have tries works with Optional fields.

When trying to use WriteRows with optional fields, in any way the schema is defined(typed struct, Group, or reflection), the optional fields are always null. This bug was noted here:#25 (comment), although the comment made it seem like it was only on the deprecated Writer api, however it appears to also apply to the GenericWriter as well. Example of the bug with GenericWriter: #30 (comment).

Now, I don't have a specific need for WriteRows right now, but it's the only way I could get it to compile, or not panic when writing. When trying to use Write instead, I run into panics. Usually of some reflection call. I'm sure I'm just making a simple mistake.

I am able to use Write like this:

func main() {
	schema := parquet.NewSchema("runtime_schema", parquet.Group{
		"id":  parquet.Optional(parquet.Int(32)),
		"age": parquet.Optional(parquet.Int(32)),
	})

	var f bytes.Buffer
	pw := parquet.NewGenericWriter[any](&f, schema)

	a := []any{
		map[string]int32{
			"id":  20,
			"age": 22,
		},
	}

	_, err := pw.Write(a)
	if err != nil {
		log.Fatal(err)
	}
	if err := pw.Close(); err != nil {
		log.Fatal(err)
	}

	reader := parquet.NewGenericReader[any](bytes.NewReader(f.Bytes()), schema)
	out := make([]parquet.Row, 1)
	if _, err := reader.ReadRows(out); err != nil {
		log.Fatal(err)
	}
	fmt.Printf("%+v", out)
}

Prints: [[C:0 D:1 R:0 V:22 C:1 D:1 R:0 V:20]]

however this very simplified schema will not work for my use case. Because the schema isn't just of one type. So I would need to use a map[string]any, but it panics.

Example:

func main() {
	schema := parquet.NewSchema("runtime_schema", parquet.Group{
		"id":   parquet.Optional(parquet.Int(32)),
		"age":  parquet.Optional(parquet.Int(32)),
		"name": parquet.Optional(parquet.String()),
	})

	var f bytes.Buffer
	pw := parquet.NewGenericWriter[any](&f, schema)

	a := []any{
		map[string]any{
			"id":   20,
			"age":  22,
			"name": "bob",
		},
	}

	_, err := pw.Write(a)
	if err != nil {
		log.Fatal(err)
	}
	if err := pw.Close(); err != nil {
		log.Fatal(err)
	}

	reader := parquet.NewGenericReader[any](bytes.NewReader(f.Bytes()), schema)
	out := make([]parquet.Row, 1)
	if _, err := reader.ReadRows(out); err != nil {
		log.Fatal(err)
	}
	fmt.Printf("%+v", out)
}

This panics with:

panic: cannot create parquet value of type INT32 from go value of type interface {}

goroutine 1 [running]:
github.com/parquet-go/parquet-go.makeValue(0x1, 0xc00018a930, {0x796100?, 0xc0000254a0?, 0xc000193a60?})
        /home/kason/go/pkg/mod/github.com/parquet-go/[email protected]/value.go:339 +0xd89
github.com/parquet-go/parquet-go.deconstructFuncOfLeaf.func1({0xc00009c3c0, 0x3, 0xc0000a7410?}, {0xd0?, 0x71?, 0x41?}, {0x796100?, 0xc0000254a0?, 0x198?})
        /home/kason/go/pkg/mod/github.com/parquet-go/[email protected]/row.go:538 +0x99
github.com/parquet-go/parquet-go.deconstructFuncOfOptional.func1({0xc00009c3c0?, 0x3?, 0x3?}, {0x0?, 0x0?, 0x0?}, {0x796100?, 0xc0000254a0?, 0xc000193bd0?})
        /home/kason/go/pkg/mod/github.com/parquet-go/[email protected]/row.go:439 +0x103
github.com/parquet-go/parquet-go.deconstructFuncOfGroup.func1({0xc00009c3c0, 0x3, 0x3}, {0xc0?, 0x3b?, 0x19?}, {0x79d840?, 0xc0000a7410?, 0x7fb0e0?})
        /home/kason/go/pkg/mod/github.com/parquet-go/[email protected]/row.go:515 +0x19c
github.com/parquet-go/parquet-go.(*Schema).deconstructValueToColumns(0xc000092300, {0xc00009c3c0, 0x3, 0x3}, {0x79d840?, 0xc0000a7410?, 0x7a8400?})
        /home/kason/go/pkg/mod/github.com/parquet-go/[email protected]/schema.go:236 +0x114
github.com/parquet-go/parquet-go.(*Schema).Deconstruct(0xc000092300, {0x0, 0x0, 0x0}, {0x79d840?, 0xc0000a7410})
        /home/kason/go/pkg/mod/github.com/parquet-go/[email protected]/schema.go:224 +0x1f4
github.com/parquet-go/parquet-go.(*Writer).Write(0xc0000925a0, {0x79d840, 0xc0000a7410})
        /home/kason/go/pkg/mod/github.com/parquet-go/[email protected]/writer.go:379 +0x1d6
github.com/parquet-go/parquet-go.(*GenericWriter[...]).writeAny(0x0, {0xc000025490?, 0x1, 0x10})
        /home/kason/go/pkg/mod/github.com/parquet-go/[email protected]/writer.go:233 +0x65
github.com/parquet-go/parquet-go.(*GenericWriter[...]).Write.func1(0xc000193e30?)
        /home/kason/go/pkg/mod/github.com/parquet-go/[email protected]/writer.go:169 +0x62
github.com/parquet-go/parquet-go.(*writer).writeRows(0xc000188160, 0x1, 0xc000193e28)
        /home/kason/go/pkg/mod/github.com/parquet-go/[email protected]/writer.go:993 +0xbb
github.com/parquet-go/parquet-go.(*GenericWriter[...]).Write(0xc0000a7410?, {0xc000025490?, 0x4?, 0xc0000686b0?})
        /home/kason/go/pkg/mod/github.com/parquet-go/[email protected]/writer.go:168 +0x65
main.main()
        /home/kason/nuspire/nusiem-log-mover/cmd/parquet2/main.go:36 +0x4cd
exit status 2

Any help would be greatly appreciated! Thank you.

map/list are not properly supported when using `any` and `GenericReader`

Hello,

it seems to me that there is a discrepancy when reading maps or lists with the generic any.
Here is a simple testcase:

func TestParquetReaderSegmentRepro(t *testing.T) {
	type RowType struct {
		FirstName string
		Thing     map[string]int
	}

	file, err := os.CreateTemp(t.TempDir(), "fakeFilename")
	if err != nil {
		t.Fail()
	}
	writer := segmentparquet.NewGenericWriter[RowType](file)
	n, err := writer.Write([]RowType{
		{"John", map[string]int{"a": 1}},
		{"George", map[string]int{"b": 2}},
	})
	_ = n
	if err != nil {
		t.Fail()
	}
	err = writer.Close()
	if err != nil {
		t.Fail()
	}

	reader := segmentparquet.NewGenericReader[any](file)
	for {
		hi := make([]any, 1)
		howMany, err := reader.Read(hi)
		if err != nil {
			break
		}
		if howMany != 1 {
			t.Fail()
		}
	}
}

Here I create a new file with a struct that contains a map, and then try to read it with any, I get back an object that it is like this:

{"FirstName": "John", "Thing": {"key_value": [{"key":"a","value":1}]}}

instead of:

{"FirstName": "John", "Thing" :{"a":1}}

Note, that if I read it again by providing the RowType in the generic input, it is correct.
Am I missing something? 🤔

Support writing 8-bit and 16-bit integers

Attempting to serialize signed or unsigned 8-bit or 16-bit results in: "panic: cannot convert Go values of type to parquet value". Having the ability to serialize smaller integers into parquet (presumably as physical int32 but logical int(bit-width, sign) would help enforce logical bounds on numbers and to reduce the in-memory footprint of values to be written (and read, if the same struct is reused for reading).

I'm happy to do most of the legwork on the implementation if needed, though I may need a bit of direction.

parquet-go can read nested objects, but not parquet cli (Parquet/Avro schema mismatch)

Hello,
Copying the issue from the previous repo (segmentio#483), just tested against 0.19 and the issue is still present.

I'm seeing a weird situation which I can't explain. I've created a parquet file in go, I can read it in go, I can read the schema from the cli, but I get an error when I cat it.

Note: I've never used the parquet command before, could be a user error too, I'm using the brew install parquet-cli
Note2: If I remove the nested structure it works fine

code:

package main

import (
	"bytes"
	"flag"
	"os"
	"time"

	"github.com/davecgh/go-spew/spew"
	"github.com/golang/glog"
	"github.com/parquet-go/parquet-go"
)

type ParquetOtherElements struct {
	ElementId   string   `parquet:"elementId"`
	Type        string   `parquet:"type,enum"`
	RehomedFrom []string `parquet:"rehomedFrom,list"`
}

type ParquetElement struct {
	ElementId string `parquet:"elementId"`
	Type      string `parquet:"type,enum"`
	Label     string `parquet:"label"`
	Status    string `parquet:"status,enum"`
	Element   string `parquet:"element,json"`
}

type ParquetStruct struct {
	TenantId             string                 `parquet:"tenantId"`
	TriggerProviderId    string                 `parquet:"triggerProviderId"`
	TriggerEventId       string                 `parquet:"triggerEventId"`
	TriggerProcessorType string                 `parquet:"triggerProcessorType"`
	EventTime            int64                  `parquet:"eventTime,timestamp(nanosecond)"`
	EventFlagged         bool                   `parquet:"eventFlagged"`
	Nodes                []ParquetElement       `parquet:"nodes,list"`
	Edges                []ParquetElement       `parquet:"edges,list"`
	OtherElements        []ParquetOtherElements `parquet:"otherElements,optional,list"`
}

func main() {
	flag.Set("alsologtostderr", "true")
	flag.Set("v", "3")
	flag.Parse()

	content := new(bytes.Buffer)
	w := parquet.NewGenericWriter[ParquetStruct](content)
	p := []ParquetStruct{
		{
			TenantId:             "tenantIdV",
			TriggerProviderId:    "triggerProviderIdV",
			TriggerEventId:       "triggerEventIdV",
			TriggerProcessorType: "triggerProcessorTypeV",
			EventFlagged:         false,
			EventTime:            time.Now().UnixNano(),
			Nodes:                []ParquetElement{{ElementId: "elementIdV", Type: "typeV", Label: "labelV", Status: "statusV", Element: "{}"}},
		},
	}
	spew.Dump(p)
	if _, err := w.Write(p); err != nil {
		glog.Fatal(err)
	}
	if err := w.Close(); err != nil {
		glog.Fatal(err)
	}

	// write file from content buffer
	f, err := os.Create("myfile.parquet")
	if err != nil {
		glog.Fatal(err)
	}
	defer f.Close()
	if _, err := f.Write(content.Bytes()); err != nil {
		glog.Fatal(err)
	}

	glog.Infoln("reading buffer....")
	file := bytes.NewReader(content.Bytes())
	rows, err := parquet.Read[ParquetStruct](file, file.Size())
	if err != nil {
		glog.Fatal(err)
	}
	spew.Dump(rows)
}
 bob  ~/scratch/parquet  go run .                                                                                                                                                                                                                                                                                              
([]main.ParquetStruct) (len=1 cap=1) {
 (main.ParquetStruct) {
  TenantId: (string) (len=9) "tenantIdV",
  TriggerProviderId: (string) (len=18) "triggerProviderIdV",
  TriggerEventId: (string) (len=15) "triggerEventIdV",
  TriggerProcessorType: (string) (len=21) "triggerProcessorTypeV",
  EventTime: (int64) 1678235792424865000,
  EventFlagged: (bool) false,
  Nodes: ([]main.ParquetElement) (len=1 cap=1) {
   (main.ParquetElement) {
    ElementId: (string) (len=10) "elementIdV",
    Type: (string) (len=5) "typeV",
    Label: (string) (len=6) "labelV",
    Status: (string) (len=7) "statusV",
    Element: (string) (len=2) "{}"
   }
  },
  Edges: ([]main.ParquetElement) <nil>,
  OtherElements: ([]main.ParquetOtherElements) <nil>
 }
}
I0307 16:36:32.428573   62343 main.go:76] reading buffer....
([]main.ParquetStruct) (len=1 cap=1) {
 (main.ParquetStruct) {
  TenantId: (string) (len=9) "tenantIdV",
  TriggerProviderId: (string) (len=18) "triggerProviderIdV",
  TriggerEventId: (string) (len=15) "triggerEventIdV",
  TriggerProcessorType: (string) (len=21) "triggerProcessorTypeV",
  EventTime: (int64) 1678235792424865000,
  EventFlagged: (bool) false,
  Nodes: ([]main.ParquetElement) (len=1 cap=1) {
   (main.ParquetElement) {
    ElementId: (string) (len=10) "elementIdV",
    Type: (string) (len=5) "typeV",
    Label: (string) (len=6) "labelV",
    Status: (string) (len=7) "statusV",
    Element: (string) (len=2) "{}"
   }
  },
  Edges: ([]main.ParquetElement) {
  },
  OtherElements: ([]main.ParquetOtherElements) <nil>
 }
}
 bob  ~/scratch/parquet  parquet cat myfile.parquet                                                                                                                                                                                                                                                                           
Unknown error
java.lang.RuntimeException: Failed on record 0
        at org.apache.parquet.cli.commands.CatCommand.run(CatCommand.java:86)
        at org.apache.parquet.cli.Main.run(Main.java:157)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
        at org.apache.parquet.cli.Main.main(Main.java:187)
Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema mismatch: Avro field 'elementId' not found
        at org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:221)
        at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:126)
        at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:284)
        at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:228)
        at org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:74)
        at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:539)
        at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:489)
        at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:293)
        at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:137)
        at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:91)
        at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
        at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:142)
        at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:190)
        at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:166)
        at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
        at org.apache.parquet.cli.BaseCommand$1$1.advance(BaseCommand.java:363)
        at org.apache.parquet.cli.BaseCommand$1$1.<init>(BaseCommand.java:344)
        at org.apache.parquet.cli.BaseCommand$1.iterator(BaseCommand.java:342)
        at org.apache.parquet.cli.commands.CatCommand.run(CatCommand.java:73)
        ... 3 more
 1  bob  ~/scratch/parquet  parquet schema myfile.parquet                                                                                                                                                                                                                                                                     
{
  "type" : "record",
  "name" : "ParquetStruct",
  "fields" : [ {
    "name" : "tenantId",
    "type" : "string"
  }, {
    "name" : "triggerProviderId",
    "type" : "string"
  }, {
    "name" : "triggerEventId",
    "type" : "string"
  }, {
    "name" : "triggerProcessorType",
    "type" : "string"
  }, {
    "name" : "eventTime",
    "type" : "long"
  }, {
    "name" : "eventFlagged",
    "type" : "boolean"
  }, {
    "name" : "nodes",
    "type" : {
      "type" : "array",
      "items" : {
        "type" : "record",
        "name" : "list",
        "fields" : [ {
          "name" : "element",
          "type" : {
            "type" : "record",
            "name" : "element",
            "fields" : [ {
              "name" : "elementId",
              "type" : "string"
            }, {
              "name" : "type",
              "type" : "string"
            }, {
              "name" : "label",
              "type" : "string"
            }, {
              "name" : "status",
              "type" : "string"
            }, {
              "name" : "element",
              "type" : "bytes"
            } ]
          }
        } ]
      }
    }
  }, {
    "name" : "edges",
    "type" : {
      "type" : "array",
      "items" : {
        "type" : "record",
        "name" : "list",
        "namespace" : "list2",
        "fields" : [ {
          "name" : "element",
          "type" : {
            "type" : "record",
            "name" : "element",
            "namespace" : "element2",
            "fields" : [ {
              "name" : "elementId",
              "type" : "string"
            }, {
              "name" : "type",
              "type" : "string"
            }, {
              "name" : "label",
              "type" : "string"
            }, {
              "name" : "status",
              "type" : "string"
            }, {
              "name" : "element",
              "type" : "bytes"
            } ]
          }
        } ]
      }
    }
  }, {
    "name" : "otherElements",
    "type" : [ "null", {
      "type" : "array",
      "items" : {
        "type" : "record",
        "name" : "list",
        "namespace" : "list3",
        "fields" : [ {
          "name" : "element",
          "type" : {
            "type" : "record",
            "name" : "element",
            "namespace" : "element3",
            "fields" : [ {
              "name" : "elementId",
              "type" : "string"
            }, {
              "name" : "type",
              "type" : "string"
            }, {
              "name" : "rehomedFrom",
              "type" : {
                "type" : "array",
                "items" : {
                  "type" : "record",
                  "name" : "list",
                  "namespace" : "list4",
                  "fields" : [ {
                    "name" : "element",
                    "type" : "string"
                  } ]
                }
              }
            } ]
          }
        } ]
      }
    } ],
    "default" : null
  } ]
}
 bob  ~/scratch/parquet 

Allow opening a single RowGroup

The OpenFile function parses the page index of all row groups, which for large files (100GB+) can already take a couple of seconds. It would be great to have an API which allows the user to open a set of row groups instead opening all of them.

Support for protobuf messages

It'd be great if this library was able to support reading and writing protobuf messages. Protobufs are generated and there is no (clean) way to control field tags which would be necessary to set field names to something other than camel-case. There are however json: tags being added by default as in the example below:

type SomeMessage struct {
  ...
  UserAgent string                 `protobuf:"bytes,7,opt,name=user_agent,json=userAgent,proto3" json:"user_agent,omitempty"`
}

Perhaps it'd be possible to add a function similar to SchemaOf() that would generate a schema for a protobuf, with options to control field names.

Spark can't read parquet-go generated files: can not read class org.apache.parquet.format.PageHeader

Hi!

I have a go program that generate parquet files. (I am using go 1.17)

func Example() {
	// Creating writer:
	writer := parquet.NewWriter(
		localParquetFile,
		parquet.SchemaOf(pblogger.ParquetEventV3{}),
		parquet.Compression(parquet.LookupCompressionCodec(format.Snappy)),
		parquet.WriteBufferSize(50_000),
	)

	// Write ~10k rows
	writer.Write(myData)

	// Close
	writer.Close()
	localParquetFile.Close()
}

This is working well. I can read the generated files with the parquet-go library with no issues.

I generate around 1000 files / hours. Each files is around 5mb.

I have a spark job ingesting those files and running aggregates on the data.
This spark job fails because for some files (not all of them, ~10% of them), it can't read the PageHeader
Error: can not read class org.apache.parquet.format.PageHeader: Socket is closed by peer
Full trace HERE

When I try to read this corrupted file with parquet-go, it still work.

Tools like https://github.com/apache/parquet-mr also shows the same error on corrupted files when running command like cat. The rowcount command works well.

How can I fix this?
Thanks!

go: 1.17.5
parquet-go: github.com/segmentio/parquet-go v0.0.0-20220720215406-8d9a5b560e39
spark: Spark 3.3.2

Data corruption in page with CRC32 checksum of value 0

A data page with a CRC32 checksum of value 0x0000 seems to be triggering a false-positive corruption check. The error is ErrCorrupted, which is thrown when reading the file page.

The data page where this has happened belongs to a string column with a dictionary (see definition). While trying to debug this error, we've noticed that the CRC32 checksum from the data page outputs 0. While that is a valid value in CRC32, further debugging showed that the CRC stored in the header of the page is the same as in the dictionary page of that column, which seems very suspicious.

Disabling the checksum verification allows to read the page's value normally, which suggests that there may not be actual corruption of the data.

I wasn't able to get much further than this. Is it possible that there is a check where if CRC is equal to 0, a different value is used? I've tried a couple of things such as manually disabling caching of PageHeaders and manually disabling caching of the encoders, in case an object used during read had data from previous uses, but didn't find anything.

LIST and MAP types not properly preserved in read after write

See the following code that writes, then reads, a parquet file. The schema doesn't seem to preserve the LIST and MAP types:

package main

import (
	"github.com/parquet-go/parquet-go"
	"log"
	"os"
)

func main() {
	fileName := "schema-issue.parquet"

	var root parquet.Node = parquet.Group{
		"aaa": parquet.Uint(32),
		"bbb": parquet.List(
			parquet.List(
				parquet.String(),
			),
		),
		"ccc": parquet.Group{
			"ddd": parquet.Leaf(parquet.ByteArrayType),
			"eee": parquet.Leaf(parquet.BooleanType),
		},
		"fff": parquet.Map(
			parquet.Int(32),
			parquet.Group{
				"ggg": parquet.Date(),
			},
		),
	}
	writtenSchema := parquet.NewSchema("mySchema", root)

	writtenFile, _ := os.Create(fileName)
	writer := parquet.NewGenericWriter[any](writtenFile, writtenSchema)
	writer.Close()

	openedFile, _ := os.Open(fileName)
	reader := parquet.NewReader(openedFile)

	log.Printf("Written schema: %s", writtenSchema.String())
	log.Printf("Opened schema: %s", reader.Schema().String())
}

Here's the output:

Written schema: message mySchema {
        required int32 aaa (INT(32,false));
        required group bbb (LIST) {
                repeated group list {
                        required group element (LIST) {
                                repeated group list {
                                        required binary element (STRING);
                                }
                        }
                }
        }
        required group ccc {
                required binary ddd;
                required boolean eee;
        }
        required group fff (MAP) {
                repeated group key_value {
                        required int32 key (INT(32,true));
                        required group value {
                                required int32 ggg (DATE);
                        }
                }
        }
}

Opened schema: message mySchema {
        required int32 aaa (INT(32,false));
        required group bbb {
                repeated group list {
                        required group element {
                                repeated group list {
                                        required binary element (STRING);
                                }
                        }
                }
        }
        required group ccc {
                required binary ddd;
                required boolean eee;
        }
        required group fff {
                repeated group key_value {
                        required int32 key (INT(32,true));
                        required group value {
                                required int32 ggg (DATE);
                        }
                }
        }
}

See how the LIST and MAP types are missing in the read schema:
2023-09-01_13-42-01

Is that a bug or am I maybe missing something?

I'm sorry

Based on #1 , I had the wrong impression I was the only one who stepped up and temporary taking over maintaining this project.

Fixing issues and ensuring the healthy of the library has been my core objective. I raised a ticket to outline first steps I had in mind for the project here #19.

Unfortunately, I was operating under the assumption I can make correct judgement while executing my duties. I had no impression that there are other maintainers out there who were actively involved for this I'm terribly sorry.

I love this project and I want it to thrive, If my actions hurts the project in any way I apologise to anyone affected by it.

For those who were wondering.

I merged a fix to a critical issue #31 without giving time for maintainers to review.

That being said, I have some questions.

  • Is there governance for this project that I'm not aware of ?
  • Is there a list of active maintainers ?
  • Should I step down ?
  • What should be done moving forward ? I'm happy to revert all non approved changes, Including removing the release just please let me know who I should talk to.

Thanks and have a good day.

Sorting for nested structure column

Hey, i have a problem with sorting parquet file by nested column.
For example look at these structs

type Data struct {
ID string
Time int64
Value Value
}

type Value struct {
Freq float64
Power float64
}

The string of column name looks like "Value.Freq"

When i try to use
parquet.Ascending("Value.Freq")
It doesnt work.

Is there any other way to sort parquet file by nested value?

GenericWriter should write map keys to matching columns

Given a parquet file with a schema generated from the following model:

type Inner struct {
	FieldB int
	FieldC string
}

type Model struct {
	FieldA string
	Nested Inner  
}

The GenericWriter should be able to write data from an an alternative model, where Nested is represented as map, as long as the map keys match column names from the original schema:

type AltModel struct {
	FieldA string
	Nested map[string]any
}

data := []AltModel{
	{
		FieldA: "a",
		Nested: map[string]any{"FieldB": 11, "FieldC": "c"},
	},
}

schema := parquet.SchemaOf(new(Model))
w := parquet.NewGenericWriter[AltModel](f, schema)
w.Write(data)

In its current implementation GenericWriter panics for the above code. Here is a gist that reproduces the error

This feature is useful when writing data to a schema where parts of the schema (i.e. the struct Inner) are defined dynamically at runtime.

Merging row groups throws panic when rows from one group are empty

Merging multiple sorted row groups will throw a panic when one of the groups returns empty rows during ReadRows, even if n is correctly returned as 0. The reason why we sometimes return empty rows is because we apply filtering during the merge process, and occasionally no rows will match the filter. I think this only happens when the entire row group is empty.

Here is a PR reproducing the issue: #67.

How to keep columns order definition ?

How can I maintain the order of column definitions?
It appears to have been resolved by PR 140, but I have yet to discover how to do this.

package main

import (
	"fmt"

	"github.com/parquet-go/parquet-go"
)

func main() {

	group := parquet.Group{
		"C": parquet.String(), // 0
		"B": parquet.String(), // 1
		"A": parquet.String(), // 2
	}

	schema := parquet.NewSchema("runtime_schema", group)

	for i, column := range schema.Columns() {
		fmt.Println(i, column[0])
	}

	/*
		0 A
		1 B
		2 C
	*/
}

Who is responsible for cleaning up files done in `fileBufferPool.GetBuffer` ?

Hi there! Thanks for the amazing library. One question I have is related to ability to store parquet pages in files. From the docs:

writer := parquet.NewGenericWriter[Row](oFile, writerConfig, parquet.ColumnPageBuffers(
    parquet.NewFileBufferPool(dir, "buffers.*"),
))

It seems like the underlying fileBufferPool.GetBuffer opens temporary file and returns io.ReadWriteSeeker, so it seems to me this is not closed anywhere. I'm currently experiencing issues related to the number of open file descriptors. Does it make sense to close the buffers as soon as they are used?

Support nested array fields

Descriptions

It seems to have some problems if the Golang struct has a nested array field(e.g. [][]string). You can check the following example code.
NestedStringArrayWithTag has a Field with [][]int type, however when I use NewGenericWriter() to write to a file, then Field become []int type. It is rather unexcepted.

package main

import (
	"log"
	"os"

	"github.com/parquet-go/parquet-go"
)

type NestedArrayWithTag struct {
	Field [][]int `parquet:"field,list"`
}

func main() {
	output, _ := os.Create("file.parquet")
	defer output.Close()

	writer := parquet.NewGenericWriter[NestedArrayWithTag](output)
	_, err := writer.Write([]NestedArrayWithTag{
		{
			Field: [][]int{{1}},
		},
	})
	if err != nil {
		log.Fatal(err)
	}

	if err := writer.Close(); err != nil {
		log.Fatal(err)
	}

	// contents of file.parquet

	// ##################
	// File: file.parquet
	// ##################
	// {field: [1]}

        // should be {field: [[1]]}
}

PS. I have used a list tag to prevent this problem.

If I use parquet-cli to cat the file, it raises the following exception

parquet cat file.parquet
Unknown error
java.io.FileNotFoundException: File file:/Users/xxx/others/parquet-go/file.parquet does not exist
        at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:668)
        at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:989)
        at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:658)
        at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:458)
        at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:148)
        at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:349)
        at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:906)
        at org.apache.parquet.cli.util.SeekableFSDataInputStream.<init>(SeekableFSDataInputStream.java:38)
        at org.apache.parquet.cli.BaseCommand.openSeekable(BaseCommand.java:240)
        at org.apache.parquet.cli.BaseCommand.getAvroSchema(BaseCommand.java:399)
        at org.apache.parquet.cli.commands.CatCommand.run(CatCommand.java:66)
        at org.apache.parquet.cli.Main.run(Main.java:163)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
        at org.apache.parquet.cli.Main.main(Main.java:193)

Expected Result:

  • should output data correctly for nested array type

Data corruption when recording byte array column statistics

Discovered while implementing #52

parquet-cli showed that for one of the sample columns containing ["Solo", "Skywalker", "Skywalker"] the min and max values recorded were ("Skywalker", "Skywalker"), not ("Skywalker", "Solo") as they should have been.

I spent a bit more time investigating this. The problem comes from these lines in (*writerColumn).recordPageStats (writer.go:1560):

if existingMaxValue.isNull() || c.columnType.Compare(maxValue, existingMaxValue) > 0 {
    c.columnChunk.MetaData.Statistics.MaxValue = maxValue.Bytes()
}

For byte-array-backed Values, Bytes() returns an unsafe cast of the value ptr to a byte-slice. In this case, the ptr points into the values slice of the columnBuffer's page, and that slice gets saved directly into the statistics. It seems like after we finish flushing this page, the page's values slice gets reused - as the next page is written, it overwrites the MaxValue stored in the statistics. In this particular case, the stored MaxValue was "Solo" (&values[0], 4) but "Skywalker" is the first value of the next data page, which gets written to &values[0]. When we update the statistics for this page, we compare "Skywalker" (&values[0], 9) against "Skyw" (&values[0], 4), find it's larger, and updates it. In general, though, this could behave even more erratically.

Fundamentally, it seems like the solution is that the stored value needs to be copied into a slice backed by a new array, not left pointing to the one inside the page. The question is: how should we do it?

Option 1: Change the implementation of (Value).Bytes() to always copy the bytes into a new array. This makes the API less dangerous and may prevent other bugs like this, but it will negatively impact performance anywhere Bytes() is called and creating a copy is not dangerous. It's hard to assess how big a deal this is from either perspective. Bytes() is currently only called in (*writerColumn).makePageStatistics and (*writerColumn).recordPageStats. We have to add the copy in recordPageStats, but makePageStatistics seems to work as-is (I think this is because this value isn't held past the page flush in this case.) However, because Value is exported outside the package, the level of safety and performance impact is probably overwhelmed by the effects on usage outside the package, which are hard to know. I worry a little bit that from outside the package it's nearly impossible to tell when the value returned by Bytes is safe or when it needs to be copied, unless for some reason all values exposed outside the module are safe.

Option 2: We could do the copy inside (*writerColumn).recordPageStats. This has the smallest impact on the system, affecting only the one site where we know there's a problem. However, it leaves us and others open to similar bugs in the future. It also imposes a small performance/complexity hit on recordPageStats, though in practice this probably won't matter much. EDIT: I realized that using (Value).AppendBytes actually works better than (Value).Bytes in all cases, and requires the fewest unnecessary allocations.

Option 3: Implement a new method (Value).SafeBytes() (name TBD) that always copies the data into a new slice, and let the caller decide on the behavior. IMO this probably pollutes the API more than it provides useful options, but I include it for completeness.

Add documentation about single row writes and compression

I was using the GenericWriter and writing one row at a time as I read in the data, thinking that compression would happen after calling close. What actually happened was that when writing the same html data 1,000 times, one row at a time, ended with a 192 MB file. Writing all the rows at once resulted in a 7 MB file, while also using 70% less RAM.

I'm not saying that this is a bug, but it would be helpful to have in the documentation. Maybe using GenericBuffer would have worked. I don't feel like I understand well enough what happened to add it to the readme. It feels like compression is applied for each Write call separately, but the output file said I had 1 row group, so I thought meant the full row group was compressed together.

Thanks for a great library.

`SortingWriter` only writes a single row group

While using the SortingWriter API I noticed that my files were awfully small. I expected them to be >1GiB but they were around 2 MiB. This did not happen previously when I used it a few months ago.

I wrote up a short repro in Go.
I used lz4_raw_compressed_larger.parquet from the apache/parquet-testing repo. The script reads this file, which has 10,000 rows, and then writes a new copy of the input using a SortingWriter. We set sortRowCount to 1000 in the SortingWriter to reproduce the issue. The issue is not observed when sortRowCount is greater than 10,000.
With these settings, we observe only 1024 rows being written, where we expected all 10,000.

Code
package main

import (
	"os"

	"github.com/parquet-go/parquet-go"
	"github.com/sirupsen/logrus"
)

func main() {
	type Record struct {
		A string `parquet:"a"`
	}

	fi, err := os.Open("lz4_raw_compressed_larger.parquet")
	if err != nil {
		logrus.WithError(err).Fatal("couldn't open input")
	}

	pr := parquet.NewGenericReader[Record](fi, parquet.DefaultReaderConfig())
	defer pr.Close()

	out, err := os.Create("test.parquet")
	if err != nil {
		logrus.WithError(err).Fatal("couldn't create output file")
	}

	pw := parquet.NewSortingWriter[Record](
		out,
		1000,
		parquet.SortingWriterConfig(
			parquet.SortingColumns(parquet.Ascending("a")),
		),
	)

	if _, err := parquet.CopyRows(pw, pr); err != nil {
		logrus.WithError(err).Fatal("couldn't copy rows")
	}

	if err := pw.Close(); err != nil {
		logrus.WithError(err).Fatal("couldn't close parquet writer")
	}

	if err := out.Sync(); err != nil {
		logrus.WithError(err).Fatal("couldn't sync to disk")
	}

	result := parquet.NewGenericReader[Record](out, parquet.DefaultReaderConfig())
	defer result.Close()
	if result.NumRows() != pr.NumRows() {
		logrus.Fatalf("resulting parquet file did not have expected rows (expected %d, got %d)", pr.NumRows(), result.NumRows())
	}
}

// ❯ go run .
// FATA[0000] resulting parquet file did not have expected rows (expected 10000, got 1024)

While digging into the issue, I noticed a piece of code that was added recently:

parquet-go/merge.go

Lines 111 to 118 in d42d339

func (r *mergedRowGroupRows) WriteRowsTo(w RowWriter) (n int64, err error) {
b := newMergeBuffer()
b.setup(r.rows, r.merge.compare)
n, err = b.WriteRowsTo(w)
r.rowIndex += int64(n)
b.release()
return
}

It appears that it was added to fix a data corruption issue in #31. In any case, I commented out this piece of code, so as to make CopyRows fall back to the existing RowReader API, and that fixed the above repro script I made.

My conclusion is that the addition of mergeBuffer probably introduced the bug I am seeing right now.

Panic when reading nested map-values with a schema

In the example test below, I'm trying to write/read a nested map-value using an equivalent but different schema. The struct used for writing/reading has the same field as the one that is used to generate the schema, but it uses different types. This currently fails with:

panic: reflect.Value.Convert: value of type parquet_test.SchemaMapValue cannot be converted to type parquet_test.MapValue

It's interesting this only happens when using a nested map-value. It's just fine in a slice or regular structs. Using struct as map-value is fine too, it's just when nesting is involved.

While the example below seems contrived since the SchemaMessage technically isn't necessary, I have a use-case that involves generating a new type using reflection which is then used with parquet.SchemaOf() to generate a schema that runs into the same issue.

func TestMapValue(t *testing.T) {
	// Main struct used to mashal/unmarshal data.
	type NestedStruct struct {
		Val string
	}
	type MapValue struct {
		Nested NestedStruct
	}
	type Message struct {
		TestMap map[string]MapValue
	}

	// Clone of the main struct used exclusively to generate the schema.
	type SchemaNestedStruct struct {
		Val string
	}
	type SchemaMapValue struct {
		Nested SchemaNestedStruct
	}
	type SchemaMessage struct {
		TestMap map[string]SchemaMapValue
	}

	testKey, testValue := "test-key", "test-value"
	in := Message{
		TestMap: map[string]MapValue{
			"test-key": {
				Nested: NestedStruct{
					Val: testValue,
				},
			},
		},
	}

	var f bytes.Buffer

	// Generate a schema for reading/writing using an exact clone of data.
	schema := parquet.SchemaOf(SchemaMessage{})

	pw := parquet.NewGenericWriter[Message](&f, schema)
	_, err := pw.Write([]Message{in})
	if err != nil {
		t.Fatal(err)
	}

	err = pw.Close()
	if err != nil {
		t.Fatal(err)
	}

	pr := parquet.NewGenericReader[*Message](bytes.NewReader(f.Bytes()), schema)

	out := make([]*Message, 1)
	_, err = pr.Read(out)
	if err != nil {
		t.Fatal(err)
	}
	pr.Close()
	if want, got := testValue, out[0].TestMap[testKey].Nested.Val; want != got {
		t.Error("failed to read map value")
	}
}

I haven't yet tried to debug this further myself yet. Any pointers or suggestions for things to try out are appreciated.

Documentation and/or updates to running writer_test.go

The tests in writer_test.go seem very useful as a check of the writers. However, it relies on the parquet-tools command and so can pass or fail depending on environment configuration. A number of utilities called "parquet-tools" exist, and the one that I infer it's based on may be deprecated?

It would be really nice to have clear documentation of how to set up one's environment to run these tests, and if it relies on a deprecated utility it would be good to find a maintained replacement.

Panic in page.go on bigendian arch

Hi, we are seeing the following error on IBM Z 64bit with Grafana Tempo and Parquet format. Weird is that the exact same setup works well on other architectures.

I have tried rebuilding Tempo with -tags purego and got the same panic.

Any help would be appreciated.

panic: runtime error: index out of range [7] with length 7

goroutine 1221 [running]:
github.com/segmentio/parquet-go.(*byteArrayPage).index(...)
	/remote-source/tempo/app/vendor/github.com/segmentio/parquet-go/page.go:982
github.com/segmentio/parquet-go.(*byteArrayDictionary).lookupString(0xc0002201c0, {0xc001108000, 0x1b, 0x400}, {{0xc001068000, 0x1b, 0x18}})
	/remote-source/tempo/app/vendor/github.com/segmentio/parquet-go/dictionary_purego.go:42 +0x162
github.com/segmentio/parquet-go.(*byteArrayDictionary).Lookup(0xc0002201c0, {0xc001108000, 0x1b, 0x400}, {0xc001068000, 0x1b, 0x3e8})
	/remote-source/tempo/app/vendor/github.com/segmentio/parquet-go/dictionary.go:748 +0x142
github.com/segmentio/parquet-go.(*indexedPageValues).ReadValues(0xc000fe5490, {0xc001068000, 0x1b, 0x3e8})
	/remote-source/tempo/app/vendor/github.com/segmentio/parquet-go/dictionary.go:1332 +0xd6
github.com/segmentio/parquet-go.(*repeatedPageValues).ReadValues(0xc000aa3e40, {0xc001068000, 0x3e8, 0x3e8})
	/remote-source/tempo/app/vendor/github.com/segmentio/parquet-go/page_values.go:98 +0x192
github.com/grafana/tempo/pkg/parquetquery.(*ColumnIterator).iterate.func3.2(0xc00115ff48, 0xc00115fec8, 0xc00018f740, {0xc001068000, 0x3e8, 0x3e8}, 0x3e8, {0x2815978, 0xc000ef6fc0}, {0x2829640, ...})
	/remote-source/tempo/app/pkg/parquetquery/iters.go:406 +0x26c
github.com/grafana/tempo/pkg/parquetquery.(*ColumnIterator).iterate.func3(0xc00018f740, 0xc00115ff48, 0xc00115fec8, {0xc001068000, 0x3e8, 0x3e8}, 0x3e8, {0x2815978, 0xc000ef6fc0}, {0x2821868, ...})
	/remote-source/tempo/app/pkg/parquetquery/iters.go:459 +0x1e4
github.com/grafana/tempo/pkg/parquetquery.(*ColumnIterator).iterate(0xc00018f740, {0x2815978, 0xc000ef6fc0}, 0x3e8)
	/remote-source/tempo/app/pkg/parquetquery/iters.go:464 +0x676
github.com/grafana/tempo/pkg/parquetquery.NewColumnIterator.func1()
	/remote-source/tempo/app/pkg/parquetquery/iters.go:299 +0x42
created by github.com/grafana/tempo/pkg/parquetquery.(*ColumnIterator).next
	/remote-source/tempo/app/pkg/parquetquery/iters.go:485 +0x60

Performance regression on lazy load commit

We are seeing a performance regression on the lazy load commit.

We have a holistic benchmark test suite that runs a handful of TraceQL queries over a block.

Bench results:

name                                             old time/op    new time/op    delta
BackendBlockTraceQL/spanAttValNoMatch-8            8.90ms ± 0%   28.06ms ± 1%  +215.32%  (p=0.008 n=5+5)
BackendBlockTraceQL/spanAttIntrinsicNoMatch-8      8.46ms ± 0%   28.20ms ± 0%  +233.27%  (p=0.008 n=5+5)
BackendBlockTraceQL/resourceAttValNoMatch-8        8.29ms ± 1%    8.35ms ± 2%      ~     (p=0.421 n=5+5)
BackendBlockTraceQL/resourceAttIntrinsicMatch-8    22.2ms ± 1%    22.3ms ± 1%      ~     (p=0.548 n=5+5)
BackendBlockTraceQL/mixedValNoMatch-8               252ms ± 1%     278ms ± 3%   +10.19%  (p=0.008 n=5+5)
BackendBlockTraceQL/mixedValMixedMatchAnd-8        8.33ms ± 0%    8.36ms ± 1%      ~     (p=0.690 n=5+5)
BackendBlockTraceQL/mixedValMixedMatchOr-8          265ms ± 1%     269ms ± 2%      ~     (p=0.056 n=5+5)
BackendBlockTraceQL/count-8                         472ms ±13%     495ms ± 9%      ~     (p=0.222 n=5+5)
BackendBlockTraceQL/struct-8                        776ms ± 2%     774ms ± 2%      ~     (p=0.690 n=5+5)
BackendBlockTraceQL/||-8                            153ms ± 1%     154ms ± 2%      ~     (p=0.548 n=5+5)
BackendBlockTraceQL/mixed-8                        28.7ms ± 1%    29.0ms ± 1%      ~     (p=0.056 n=5+5)

Profiles:

profiles.tar.gz

is it possible to convert any json file to parquet file. Schema of json varies from file to file.

something like
var m map[string]interface{}
json.Unmarshal(inputJson,&m)
if err := parquet.WriteFile("test1.parquet",[]map[string]interface{}{m});err!=nil{
fmt.Println(err)
}

Getting the error as
panic: cannot construct parquet schema from value of type map[string]interface {}

goroutine 1 [running]:
github.com/parquet-go/parquet-go.schemaOf({0x1025d8a18?, 0x10256f020})
/Users/sree/workspace/go/pkg/mod/github.com/parquet-go/[email protected]/schema.go:111 +0x184
github.com/parquet-go/parquet-go.NewGenericWriter[...]({0x1025d0000, 0x1400000e080?}, {0x14000159e28, 0x14000159e38?, 0x1024a5da4?})
/Users/sree/workspace/go/pkg/mod/github.com/parquet-go/[email protected]/writer.go:95 +0x94
github.com/parquet-go/parquet-go.Write[...]({0x1025d0000, 0x1400000e080}, {0x1400000e078, 0x1, 0x1}, {0x0, 0x102899fe0?, 0x1400000e078?})
/Users/sree/workspace/go/pkg/mod/github.com/parquet-go/[email protected]/parquet.go:75 +0x7c
github.com/parquet-go/parquet-go.WriteFile[...]({0x1024a9441, 0x14000159f48?}, {0x1400000e078?, 0x1, 0x1}, {0x0, 0x0, 0x0})

Sorting writer is not able to create multiple row groups

I am trying to control number of rows written per row group in an output file but it does not seem to work. Input file has >570000 records. Not sure if I am missing anything in below code. Can someone help please?

package main

import (
	"fmt"
	"os"
	"reflect"

	"github.com/parquet-go/parquet-go"
)

func main() {
	file1 := "large_file.parquet"
	file2 := "large_file_control_row_group.parquet"
	f1, err := os.Open(file1)
	if err != nil {
		panic(err)
	}
	defer f1.Close()

	s1, err := os.Stat(file1)
	if err != nil {
		panic(err)
	}

	p1, err := parquet.OpenFile(f1, s1.Size())
	if err != nil {
		panic(err)
	}
	if size := p1.Size(); size != s1.Size() {
		fmt.Errorf("file size mismatch: want=%d got=%d", s1.Size(), size)
		panic(err)
	}
	schema := p1.Schema()
	fmt.Println("file1 schema: ", schema)
	fmt.Println("file1 gotype", schema.GoType())

	reader := parquet.NewGenericReader[any](f1, &parquet.ReaderConfig{
		Schema: schema,
	})
	numRows := reader.NumRows()
	out := make([]parquet.Row, 0)
	for _, rowgroup := range p1.RowGroups() {
		out1 := make([]parquet.Row, rowgroup.NumRows())
		_, err := reader.ReadRows(out1)
		if err != nil {
			panic(err)
		}
		out = append(out, out1...)
	}
	fmt.Println("file1 numRows", numRows)
	fmt.Println("file1 out= ", len(out))
	fmt.Println("file1 out= ", reflect.TypeOf(out[0]))

	f2, err := os.Create(file2)
	if err != nil {
		panic(err)
	}
	defer f2.Close()

	pw := parquet.NewSortingWriter[any](f2, int64(50000),
		&parquet.WriterConfig{
			//	PageBufferSize: 2560,
			Schema:      schema,
			Compression: &parquet.Zstd,
		})
	n, err := pw.WriteRows(out)
	if err != nil {
		panic( fmt.Sprintf("failed to write parquet file  err: ", file2, err))
	}
	fmt.Println("written rows to parquet file : ", n, file2)
	if err := pw.Close(); err != nil {
		panic( fmt.Sprintf("failed to close parquet file, err:", file2, err))
	}
	fmt.Println("closed parquet file", file2)
}

Drop support for Go 1.17

segmentio#428

  • Remove 1.17 from github workflow
  • Update go.mod to declare the Go 1.18 requirement
  • Migrate code from *_go18.go and *_go18_test.go files into the corresponding *.go and *_test.go files

Unlike the original ticket, there is no need to rename the Generic* apis yet.

Consider appending pages to on-disk column buffers when writing

The Parquet writer can use filesystem buffers to offload encoded and compressed pages to disk before writing them to the target file. When using FS buffers, we often hit a limit on number of open file descriptors and I suspect it's because each page is flushed to a separate file.

I wonder if we can keep one file per column and append to the same file when writing. During the writer flush process, we could copy column files back to back into the target parquet file.

I will add a "need more info" label since I am not sure if this approach is feasible.

Write Parquet file with custom schema

Hello,

is that possible to produce parquet files whose schema is not known at compile time? I'm trying like in the example below but getting NULL in all fields in each row.

package main

import (
	"os"

	"github.com/parquet-go/parquet-go"
)

func main() {
	file, err := os.Create("test.parquet")
	if err != nil {
		panic(err)
	}
	defer file.Close()

	s := parquet.NewSchema("Record", parquet.Group{
		"Text": parquet.Optional(parquet.String()),
	})
	w := parquet.NewGenericWriter[any](file, s)
	_, err = w.WriteRows([]parquet.Row{{
		parquet.ByteArrayValue([]byte("Hello")),
	}})
	if err != nil {
		panic(err)
	}

	err = w.Close()
	if err != nil {
		panic(err)
	}
}

The schema generated is

Metadata for file: test.parquet

version: 1
num of rows: 1
created by: github.com/parquet-go/parquet-go version 0.0.0(build bb12c19a1110)
metadata:
message Record {
  OPTIONAL BYTE_ARRAY Text (STRING);
}

And when I output content of the produced file

#################################################
File: test.parquet
#################################################

{Text: null}

Add support to int16

When trying to write an int16 field

panic: cannot convert Go values of type int16 to parquet value
goroutine 6 [running]:
github.com/parquet-go/parquet-go.writeRowsFuncOf({0x100cfa8c8, 0x100884200}, 0x0?, {0xc000614600, 0x1, 0x1})
/Users/c12326a/git/credit-header/ch-build-conmaster/vendor/github.com/parquet-go/parquet-go/column_buffer.go:2016 +0x588

ARM64 runners

Previously we were sharing runners with other internal Segment projects. At the moment we don't have a way to run ARM64 tests in a CI environment.

versioned 0.x.x releases

There is a note on the README that we are on 0.x.x and things are subject to change. A new home needs a new start. One of the things I would like to do is start tagging our releases.

I intentionally didn't break things on #16 relating to renaming GenericReader to Reader[T] for this reason. No need to break things without a clear migration path.

The plan is to

  • After consolidating current work I will make the first release 0.x.x
  • I will remove all deprecated features and embrace generics, possibly move requirements to go1.20 then increment the minor component of the semver and drop a new release (which will be the way forward for the package development).

This way , users can pin to specific tags, and we can always back port bug fixes.

note:

I haven't decided on which tag version to start with (v0.1.0 sounds silly lol!) , any suggestions are welcome.

Prepare for v0.20

I have created a milestone to track things going into release v0.20 here https://github.com/parquet-go/parquet-go/milestone/1

This ticket is for gathering feedback in case there is a ticket or PR that I missed and someone would like to add . So far only sorting writer fix Me and @fpetkovski are looking into is pending.

@asubiotto regarding #96 is it urgent/important to you? I can hunt down and help address the use after free bug that was corrupting your data. I'm always into perf stuff. Just let me know if you want it to be part of v0.20

@fpetkovski @achille-roussel What do you think ?

zstd encoding is not useful

given struct:

type Meta struct {
	Name        string   `parquet:",zstd,dict"`
	PeriodType  string   `parquet:",zstd,dict"`
	PeriodUnit  string   `parquet:",zstd,dict"`
	SampleType  string   `parquet:",zstd,dict"`
	SampleUnit  string   `parquet:",zstd,dict"`
	Timestamp   int64    `parquet:",zstd,timestamp,delta"` // time.Time is not supported
	Duration    int64    `parquet:",zstd,delta"`
	Period      int64    `parquet:",zstd,delta"`
	ProfileID   string   `parquet:",zstd,"`
	LabelsKey   []string `parquet:",zstd,list,dict"` // this value definition level is always 1
	LabelsValue []string `parquet:",zstd,list,dict"`
	TotalValue  int64    `parquet:",zstd"`
	SampleLen   int64    `parquet:",zstd"`
}

use it to write:

parquet.NewGenericWriter[Meta](&b)

i find the encoding is not useful

Test failure

Attempting to get the tests passing to merge a rewrite references change, I get the following error:

FAIL: TestOpenFile (2.01s)
    --- FAIL: TestOpenFile/testdata/issue368.parquet (0.00s)
        file_test.go:38: reading parquet file metadata: decoding thrift payload: -10:FIELD<?>: skipping unsupported thrift type 15
FAIL
FAIL	github.com/parquet-go/parquet-go	39.395s
ok  	github.com/parquet-go/parquet-go/bloom	0.02[7](https://github.com/parquet-go/parquet-go/actions/runs/5534777733/jobs/10100177839#step:5:8)s
ok  	github.com/par

SortingWriter writes corrupted data for String and FixedLenByteArray

The SortingWriter writes corrupted data for String and FixedLenByteArray types for any meaningfully large number of Rows. I think this has been present since the introduction of the SortingWriter. In the new tests in the sortingwriter-corruption branch (PR #24), sometimes the mismatched row is found at another index in the array: we found the row at index 106 in want and sometimes it's just corrupted non-row data: got row index 43 isn't found in want rows, and is therefore corrupted data.

=== RUN   TestSortingWriterCorruptedString
    sorting_test.go:239: rows mismatch at index 42 :
    sorting_test.go:240:  want: parquet_test.Row{Tag:"NOjZhG1MpXf4naQFRqE25pCr4EszfVExuT9BGf4znMjAl82X081NXl51t7hYFh1ESB9HGrgtGns949cECbwr0WFcOQ7hQii7s418"}
    sorting_test.go:241:   got: parquet_test.Row{Tag:"zipEs765yzHuaW9s7YAXu23ORm8DgLpmlJeZLy7l4z5yBd4AqtXLfgnjiOarORSiywx8yuzgZwJBmwLu0XIjB1IOmkkMJdtgE91L"}
    sorting_test.go:247:   we found the row at index 106 in want.
    sorting_test.go:239: rows mismatch at index 43 :
    sorting_test.go:240:  want: parquet_test.Row{Tag:"ONxgPm4crYY7e5So1q5PJBZmoP6edxQHM5Qcb9iPmPBb1EPejC2gbNTCw3VYO1v4tDxey9OF4Dya6VdeuAVkyNG8xSmIXeLnHWDs"}
    sorting_test.go:241:   got: parquet_test.Row{Tag:"lJeZLy7l4z5yBd4AqtXLfgnjiOarORSiywx8yuzgZwJBmwLu0XIjB1IOmkkMJdtgE91Ly9OF4Dya6VdeuAVkyNG8xSmIXeLnHWDs"}
    sorting_test.go:253:   got row index 43 isn't found in want rows, and is therefore corrupted data.
    sorting_test.go:259: 2 rows mismatched out of 107 total
--- FAIL: TestSortingWriterCorruptedString (0.12s)

=== RUN   TestSortingWriterCorruptedFixedLenByteArray
    sorting_test.go:239: rows mismatch at index 168 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x43, 0x63, 0x76, 0x6c, 0x6a, 0x70, 0x4d, 0x51, 0x76, 0x79, 0x71, 0x6b, 0x73, 0x32, 0x6c, 0x4d}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x53, 0x75, 0x7a, 0x77, 0x6b, 0x58, 0x44, 0x44, 0x76, 0x77, 0x32, 0x4d, 0x67, 0x4a, 0x77, 0x4a}}
    sorting_test.go:247:   we found the row at index 360 in want.
    sorting_test.go:239: rows mismatch at index 169 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x43, 0x6c, 0x6b, 0x70, 0x77, 0x32, 0x56, 0x38, 0x6c, 0x33, 0x4e, 0x6f, 0x55, 0x50, 0x70, 0x53}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x54, 0x31, 0x46, 0x6a, 0x35, 0x66, 0x59, 0x35, 0x56, 0x79, 0x58, 0x48, 0x45, 0x33, 0x6c, 0x41}}
    sorting_test.go:247:   we found the row at index 361 in want.
    sorting_test.go:239: rows mismatch at index 170 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x43, 0x70, 0x53, 0x78, 0x35, 0x50, 0x4b, 0x34, 0x4f, 0x64, 0x6d, 0x36, 0x6f, 0x74, 0x30, 0x77}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x54, 0x32, 0x6c, 0x66, 0x61, 0x42, 0x74, 0x37, 0x56, 0x71, 0x47, 0x74, 0x55, 0x42, 0x37, 0x70}}
    sorting_test.go:247:   we found the row at index 362 in want.
    sorting_test.go:239: rows mismatch at index 171 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x43, 0x7a, 0x64, 0x68, 0x70, 0x38, 0x47, 0x5a, 0x47, 0x58, 0x6d, 0x4e, 0x68, 0x43, 0x4c, 0x52}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x54, 0x46, 0x4d, 0x55, 0x50, 0x5a, 0x4c, 0x6d, 0x62, 0x36, 0x6c, 0x57, 0x52, 0x7a, 0x6d, 0x4c}}
    sorting_test.go:247:   we found the row at index 363 in want.
    sorting_test.go:239: rows mismatch at index 172 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x44, 0x39, 0x4e, 0x6d, 0x79, 0x36, 0x47, 0x71, 0x6a, 0x76, 0x6b, 0x32, 0x4b, 0x6e, 0x30, 0x78}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x54, 0x4a, 0x47, 0x49, 0x72, 0x4b, 0x6d, 0x4e, 0x48, 0x50, 0x57, 0x63, 0x4b, 0x31, 0x78, 0x77}}
    sorting_test.go:247:   we found the row at index 364 in want.
    sorting_test.go:239: rows mismatch at index 173 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x44, 0x43, 0x79, 0x52, 0x46, 0x4b, 0x6c, 0x34, 0x5a, 0x79, 0x61, 0x71, 0x50, 0x6d, 0x6e, 0x34}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x54, 0x4b, 0x42, 0x47, 0x78, 0x6e, 0x64, 0x69, 0x49, 0x31, 0x38, 0x4d, 0x77, 0x67, 0x4c, 0x76}}
    sorting_test.go:247:   we found the row at index 365 in want.
    sorting_test.go:239: rows mismatch at index 174 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x44, 0x4b, 0x4d, 0x6f, 0x37, 0x4e, 0x79, 0x43, 0x43, 0x44, 0x65, 0x67, 0x62, 0x43, 0x30, 0x4c}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x54, 0x4e, 0x33, 0x51, 0x48, 0x5a, 0x32, 0x73, 0x41, 0x47, 0x68, 0x79, 0x6e, 0x72, 0x63, 0x32}}
    sorting_test.go:247:   we found the row at index 366 in want.
    sorting_test.go:239: rows mismatch at index 175 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x44, 0x53, 0x6c, 0x51, 0x65, 0x65, 0x79, 0x39, 0x52, 0x59, 0x59, 0x75, 0x4c, 0x32, 0x63, 0x50}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x54, 0x51, 0x7a, 0x71, 0x7a, 0x64, 0x4f, 0x4a, 0x5a, 0x37, 0x38, 0x51, 0x43, 0x45, 0x43, 0x36}}
    sorting_test.go:247:   we found the row at index 367 in want.
    sorting_test.go:239: rows mismatch at index 176 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x44, 0x5a, 0x4d, 0x38, 0x51, 0x75, 0x55, 0x58, 0x6a, 0x4d, 0x79, 0x58, 0x71, 0x67, 0x77, 0x4a}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x54, 0x56, 0x39, 0x44, 0x63, 0x35, 0x71, 0x79, 0x6b, 0x4e, 0x70, 0x36, 0x54, 0x4a, 0x64, 0x48}}
    sorting_test.go:247:   we found the row at index 368 in want.
    sorting_test.go:239: rows mismatch at index 177 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x44, 0x66, 0x31, 0x53, 0x4d, 0x50, 0x39, 0x51, 0x50, 0x57, 0x35, 0x64, 0x4b, 0x4f, 0x47, 0x69}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x54, 0x61, 0x67, 0x7a, 0x41, 0x49, 0x33, 0x41, 0x56, 0x73, 0x45, 0x6c, 0x41, 0x73, 0x36, 0x69}}
    sorting_test.go:247:   we found the row at index 369 in want.
    sorting_test.go:239: rows mismatch at index 178 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x44, 0x66, 0x41, 0x75, 0x69, 0x48, 0x76, 0x77, 0x56, 0x69, 0x52, 0x43, 0x56, 0x56, 0x77, 0x45}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x54, 0x62, 0x64, 0x35, 0x5a, 0x5a, 0x65, 0x72, 0x73, 0x64, 0x44, 0x30, 0x58, 0x6d, 0x31, 0x35}}
    sorting_test.go:247:   we found the row at index 370 in want.
    sorting_test.go:239: rows mismatch at index 179 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x44, 0x6b, 0x30, 0x47, 0x47, 0x49, 0x5a, 0x57, 0x7a, 0x73, 0x7a, 0x5a, 0x34, 0x57, 0x72, 0x47}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x54, 0x6d, 0x6c, 0x69, 0x55, 0x70, 0x49, 0x30, 0x74, 0x6a, 0x50, 0x64, 0x47, 0x6a, 0x4a, 0x35}}
    sorting_test.go:247:   we found the row at index 371 in want.
    sorting_test.go:239: rows mismatch at index 180 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x44, 0x6d, 0x4b, 0x36, 0x43, 0x77, 0x41, 0x6e, 0x6f, 0x6f, 0x4e, 0x32, 0x51, 0x4a, 0x38, 0x71}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x54, 0x6e, 0x38, 0x35, 0x6c, 0x44, 0x6b, 0x68, 0x62, 0x67, 0x6a, 0x70, 0x69, 0x6a, 0x37, 0x61}}
    sorting_test.go:247:   we found the row at index 372 in want.
    sorting_test.go:239: rows mismatch at index 181 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x44, 0x74, 0x39, 0x59, 0x74, 0x6b, 0x41, 0x70, 0x4e, 0x6f, 0x36, 0x33, 0x57, 0x72, 0x69, 0x4f}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x55, 0x33, 0x58, 0x45, 0x6e, 0x43, 0x57, 0x77, 0x51, 0x30, 0x4f, 0x52, 0x48, 0x42, 0x70, 0x51}}
    sorting_test.go:247:   we found the row at index 373 in want.
    sorting_test.go:239: rows mismatch at index 182 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x44, 0x76, 0x47, 0x4f, 0x54, 0x66, 0x62, 0x58, 0x4f, 0x79, 0x53, 0x65, 0x63, 0x47, 0x56, 0x62}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x55, 0x34, 0x57, 0x71, 0x77, 0x41, 0x67, 0x33, 0x38, 0x4e, 0x50, 0x70, 0x61, 0x75, 0x6e, 0x52}}
    sorting_test.go:247:   we found the row at index 374 in want.
    sorting_test.go:239: rows mismatch at index 183 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x44, 0x78, 0x6f, 0x39, 0x71, 0x74, 0x36, 0x57, 0x37, 0x48, 0x6e, 0x49, 0x64, 0x62, 0x71, 0x74}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x55, 0x38, 0x71, 0x6d, 0x76, 0x77, 0x33, 0x36, 0x69, 0x6d, 0x38, 0x70, 0x45, 0x44, 0x4c, 0x48}}
    sorting_test.go:247:   we found the row at index 375 in want.
    sorting_test.go:239: rows mismatch at index 184 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x45, 0x38, 0x55, 0x4a, 0x71, 0x69, 0x70, 0x63, 0x58, 0x54, 0x46, 0x52, 0x4b, 0x73, 0x39, 0x65}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x55, 0x4a, 0x44, 0x6e, 0x58, 0x38, 0x4a, 0x58, 0x59, 0x38, 0x61, 0x43, 0x6a, 0x50, 0x63, 0x4b}}
    sorting_test.go:247:   we found the row at index 376 in want.
    sorting_test.go:239: rows mismatch at index 185 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x45, 0x44, 0x4a, 0x30, 0x36, 0x4e, 0x70, 0x46, 0x4c, 0x67, 0x68, 0x66, 0x4e, 0x62, 0x66, 0x4b}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x55, 0x52, 0x69, 0x66, 0x43, 0x68, 0x52, 0x72, 0x39, 0x53, 0x42, 0x67, 0x7a, 0x4f, 0x63, 0x56}}
    sorting_test.go:247:   we found the row at index 377 in want.
    sorting_test.go:239: rows mismatch at index 186 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x45, 0x4c, 0x78, 0x4f, 0x54, 0x57, 0x4b, 0x59, 0x73, 0x44, 0x57, 0x63, 0x68, 0x34, 0x34, 0x32}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x55, 0x57, 0x36, 0x76, 0x37, 0x76, 0x4b, 0x6e, 0x69, 0x68, 0x54, 0x46, 0x48, 0x59, 0x41, 0x73}}
    sorting_test.go:247:   we found the row at index 378 in want.
    sorting_test.go:239: rows mismatch at index 187 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x45, 0x4d, 0x6b, 0x47, 0x4b, 0x6a, 0x69, 0x30, 0x58, 0x68, 0x38, 0x6e, 0x6c, 0x6e, 0x79, 0x42}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x55, 0x58, 0x65, 0x4f, 0x56, 0x44, 0x6f, 0x72, 0x32, 0x78, 0x70, 0x65, 0x57, 0x56, 0x49, 0x51}}
    sorting_test.go:247:   we found the row at index 379 in want.
    sorting_test.go:239: rows mismatch at index 188 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x45, 0x4e, 0x54, 0x71, 0x63, 0x37, 0x4e, 0x78, 0x47, 0x76, 0x63, 0x54, 0x5a, 0x59, 0x49, 0x64}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x55, 0x64, 0x5a, 0x7a, 0x55, 0x6b, 0x55, 0x33, 0x41, 0x4b, 0x43, 0x33, 0x4c, 0x34, 0x6c, 0x4b}}
    sorting_test.go:247:   we found the row at index 380 in want.
    sorting_test.go:239: rows mismatch at index 189 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x45, 0x70, 0x6f, 0x69, 0x4f, 0x7a, 0x34, 0x77, 0x47, 0x4e, 0x68, 0x63, 0x78, 0x56, 0x72, 0x70}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x55, 0x65, 0x31, 0x4b, 0x69, 0x36, 0x55, 0x4d, 0x6e, 0x51, 0x75, 0x53, 0x53, 0x79, 0x44, 0x37}}
    sorting_test.go:247:   we found the row at index 381 in want.
    sorting_test.go:239: rows mismatch at index 190 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x45, 0x71, 0x6c, 0x77, 0x67, 0x56, 0x32, 0x73, 0x42, 0x34, 0x62, 0x43, 0x44, 0x4c, 0x30, 0x51}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x55, 0x72, 0x58, 0x32, 0x56, 0x73, 0x63, 0x71, 0x49, 0x37, 0x69, 0x48, 0x53, 0x33, 0x72, 0x64}}
    sorting_test.go:247:   we found the row at index 382 in want.
    sorting_test.go:239: rows mismatch at index 191 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x45, 0x74, 0x54, 0x4e, 0x63, 0x4e, 0x78, 0x54, 0x4b, 0x7a, 0x38, 0x6c, 0x77, 0x65, 0x57, 0x6e}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x56, 0x32, 0x62, 0x54, 0x39, 0x72, 0x76, 0x73, 0x62, 0x45, 0x63, 0x72, 0x57, 0x48, 0x5a, 0x36}}
    sorting_test.go:247:   we found the row at index 383 in want.
    sorting_test.go:239: rows mismatch at index 378 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x55, 0x57, 0x36, 0x76, 0x37, 0x76, 0x4b, 0x6e, 0x69, 0x68, 0x54, 0x46, 0x48, 0x59, 0x41, 0x73}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x6d, 0x51, 0x74, 0x55, 0x37, 0x54, 0x48, 0x47, 0x69, 0x74, 0x67, 0x57, 0x4d, 0x7a, 0x31, 0x37}}
    sorting_test.go:247:   we found the row at index 570 in want.
    sorting_test.go:239: rows mismatch at index 379 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x55, 0x58, 0x65, 0x4f, 0x56, 0x44, 0x6f, 0x72, 0x32, 0x78, 0x70, 0x65, 0x57, 0x56, 0x49, 0x51}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x6d, 0x53, 0x32, 0x59, 0x61, 0x5a, 0x67, 0x6c, 0x53, 0x72, 0x50, 0x57, 0x37, 0x45, 0x67, 0x79}}
    sorting_test.go:247:   we found the row at index 571 in want.
    sorting_test.go:239: rows mismatch at index 380 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x55, 0x64, 0x5a, 0x7a, 0x55, 0x6b, 0x55, 0x33, 0x41, 0x4b, 0x43, 0x33, 0x4c, 0x34, 0x6c, 0x4b}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x6d, 0x5a, 0x73, 0x4a, 0x75, 0x41, 0x38, 0x70, 0x49, 0x6b, 0x4f, 0x47, 0x7a, 0x67, 0x77, 0x65}}
    sorting_test.go:247:   we found the row at index 572 in want.
    sorting_test.go:239: rows mismatch at index 381 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x55, 0x65, 0x31, 0x4b, 0x69, 0x36, 0x55, 0x4d, 0x6e, 0x51, 0x75, 0x53, 0x53, 0x79, 0x44, 0x37}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x6d, 0x64, 0x39, 0x61, 0x45, 0x72, 0x44, 0x42, 0x49, 0x4f, 0x6a, 0x55, 0x70, 0x77, 0x6b, 0x43}}
    sorting_test.go:247:   we found the row at index 573 in want.
    sorting_test.go:239: rows mismatch at index 382 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x55, 0x72, 0x58, 0x32, 0x56, 0x73, 0x63, 0x71, 0x49, 0x37, 0x69, 0x48, 0x53, 0x33, 0x72, 0x64}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x6d, 0x6f, 0x43, 0x74, 0x49, 0x32, 0x6f, 0x4c, 0x78, 0x48, 0x7a, 0x71, 0x61, 0x41, 0x41, 0x50}}
    sorting_test.go:247:   we found the row at index 574 in want.
    sorting_test.go:239: rows mismatch at index 383 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x56, 0x32, 0x62, 0x54, 0x39, 0x72, 0x76, 0x73, 0x62, 0x45, 0x63, 0x72, 0x57, 0x48, 0x5a, 0x36}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x6d, 0x70, 0x36, 0x48, 0x47, 0x56, 0x51, 0x6d, 0x4c, 0x43, 0x55, 0x41, 0x49, 0x35, 0x33, 0x77}}
    sorting_test.go:247:   we found the row at index 575 in want.
    sorting_test.go:259: 30 rows mismatched out of 700 total
--- FAIL: TestSortingWriterCorruptedFixedLenByteArray (0.00s)

Dynamic schemas: panic: cannot create parquet value of type BYTE_ARRAY from go value of type interface {}

I am trying to read an existing parquet file and write the records into new parquet file but hitting above error. can someone help identify what I am doing wrong here? I have tried same with ReadRow and WriteRows where it works fine. My use case is, I would like to read records from existing parquet files, append new records to it and write back to parquet file (all merged records). Alternatively if there is a way to convert new records (not from parquet file) to []parquet.Row, that would work too. Please suggest.

Appreciate your help!

package main

import (
	"fmt"
	"os"
	"reflect"

	"github.com/parquet-go/parquet-go"
)

func main() {
	file1 := "file1.parquet"
	file2 := "file2.parquet"
	f1, err := os.Open(file1)
	if err != nil {
		panic(err)
	}
	defer f1.Close()

	s1, err := os.Stat(file1)
	if err != nil {
		panic(err)
	}

	p1, err := parquet.OpenFile(f1, s1.Size())
	if err != nil {
		panic(err)
	}
	if size := p1.Size(); size != s1.Size() {
		fmt.Errorf("file size mismatch: want=%d got=%d", s1.Size(), size)
		panic(err)
	}
	schema := p1.Schema()
	fmt.Println("file1 schema: ", schema)
	fmt.Println("file1 gotype", schema.GoType())

	reader := parquet.NewGenericReader[any](f1, &parquet.ReaderConfig{
		Schema: schema,
	})
	numRows := reader.NumRows()
	out := make([]any, numRows)
	n, err := reader.Read(out)
	if err != nil {
		panic(err)
	}
	fmt.Println("file1 out= ", n)
	fmt.Println("file1 out= ", reflect.TypeOf(out[0]))

	f2, err := os.Create(file2)
	if err != nil {
		panic(err)
	}
	defer f2.Close()

	pw := parquet.NewGenericWriter[any](f2, &parquet.WriterConfig{
		Compression: &parquet.Zstd,
		Schema:      schema,
	})
	written, err := pw.Write(out)
	if err != nil {
		panic(err)
	}
	fmt.Println("written:", written)
	if err := pw.Close(); err != nil {
		panic(err)
	}
	fmt.Println("file2 written successfully")
}

output:

{14:53}~/src/duckdb-test ➭ go run read-write.go
file1 schema:  message duckdb_schema {
        optional binary field1 (STRING);
        optional double field2;
}
file1 gotype struct { Field1 *[]uint8; Field2 *float64 }
file1 out=  264550
file1 out=  map[string]interface {}
panic: cannot create parquet value of type BYTE_ARRAY from go value of type interface {}

goroutine 1 [running]:
github.com/parquet-go/parquet-go.makeValue(0x6, 0x1400021b810, {0x104fe97a0?, 0x14001c182c0?, 0x140001cd968?})
        /Users/anupamalolage/src/github.com/pkg/mod/github.com/parquet-go/[email protected]/value.go:339 +0xb4c
github.com/parquet-go/parquet-go.deconstructFuncOfLeaf.func1({0x140079a3b30, 0x2, 0x105688108?}, {0x20?, 0xb6?, 0xfd?}, {0x104fe97a0?, 0x14001c182c0?, 0x0?})
        /Users/anupamalolage/src/github.com/pkg/mod/github.com/parquet-go/[email protected]/row.go:538 +0x74
github.com/parquet-go/parquet-go.deconstructFuncOfOptional.func1({0x140079a3b30?, 0x2?, 0x2?}, {0x40?, 0xc2?, 0x1a?}, {0x104fe97a0?, 0x14001c182c0?, 0x0?})
        /Users/anupamalolage/src/github.com/pkg/mod/github.com/parquet-go/[email protected]/row.go:439 +0xdc
github.com/parquet-go/parquet-go.deconstructFuncOfGroup.func1({0x140079a3b30, 0x2, 0x2}, {0x54?, 0xe1?, 0xec?}, {0x104ff1240?, 0x140016231a0?, 0x14001c18201?})
        /Users/anupamalolage/src/github.com/pkg/mod/github.com/parquet-go/[email protected]/row.go:515 +0x134
github.com/parquet-go/parquet-go.(*Schema).deconstructValueToColumns(0x140001ac240, {0x140079a3b30, 0x2, 0x2}, {0x104ff1240?, 0x140016231a0?, 0x1053377e0?})
        /Users/anupamalolage/src/github.com/pkg/mod/github.com/parquet-go/[email protected]/schema.go:236 +0x128
github.com/parquet-go/parquet-go.(*Schema).Deconstruct(0x140001ac240, {0x0, 0x0, 0x0}, {0x104ff1240?, 0x140016231a0})
        /Users/anupamalolage/src/github.com/pkg/mod/github.com/parquet-go/[email protected]/schema.go:224 +0x1d8
github.com/parquet-go/parquet-go.(*Writer).Write(0x140000802a0, {0x104ff1240, 0x140016231a0})
        /Users/anupamalolage/src/github.com/pkg/mod/github.com/parquet-go/[email protected]/writer.go:379 +0x1c4
github.com/parquet-go/parquet-go.(*GenericWriter[...]).writeAny(0x140000802a0?, {0x14000280000, 0x40, 0x140001cdcd8})
        /Users/anupamalolage/src/github.com/pkg/mod/github.com/parquet-go/[email protected]/writer.go:233 +0x70
github.com/parquet-go/parquet-go.(*GenericWriter[...]).Write.func1(0x1050123e0?)
        /Users/anupamalolage/src/github.com/pkg/mod/github.com/parquet-go/[email protected]/writer.go:169 +0x60
github.com/parquet-go/parquet-go.(*writer).writeRows(0x140079b2000, 0x40966, 0x140001cdd58)
        /Users/anupamalolage/src/github.com/pkg/mod/github.com/parquet-go/[email protected]/writer.go:993 +0xb4
github.com/parquet-go/parquet-go.(*GenericWriter[...]).Write(0x105057520?, {0x14000280000?, 0x140001cde58?, 0x2?})
        /Users/anupamalolage/src/github.com/pkg/mod/github.com/parquet-go/[email protected]/writer.go:168 +0x58
main.main()
        src/duckdb-test/read-write.go:59 +0x478
exit status 2

RLE encoding cannot be specified in parquet struct tags

When generating a schema for a type, there's no struct tag value to specify regular RLE encoding. It seems like all this requires is an additional case when checking the tags and an update to the documentation.

I'm happy to submit a PR for this if it's as simple as it seems.

panic: reflect: call of reflect.Value.Field on zero Value

I'm trying to read records into a struct that contains a slice of elements with nested struct-pointers and seeing the following panic:

panic: reflect: call of reflect.Value.Field on zero Value

Below a test I wrote to reproduce the issue. It writes a single record to a parquet file, then tries to read it.

func TestNestedPointer(t *testing.T) {
	type InnerStruct struct {
		InnerField string
	}

	type SliceElement struct {
		Inner *InnerStruct
	}

	type Outer struct {
		Slice []*SliceElement
	}

	in := &Outer{
		Slice: []*SliceElement{
			{
				Inner: &InnerStruct{
					InnerField: "inner-string",
				},
			},
		},
	}

	tmpDir := t.TempDir()
	parquetFile := filepath.Join(tmpDir, "data.parquet")
	f, err := os.Create(parquetFile)
	if err != nil {
		t.Fatal(err)
	}

	pw := parquet.NewGenericWriter[*Outer](f)
	_, err = pw.Write([]*Outer{in})
	if err != nil {
		t.Fatal(err)
	}

	err = pw.Close()
	if err != nil {
		t.Fatal(err)
	}

	err = f.Close()
	if err != nil {
		t.Fatal(err)
	}

	f, err = os.Open(parquetFile)
	if err != nil {
		t.Fatal(err)
	}
	defer f.Close()

	pr := parquet.NewGenericReader[*Outer](f)

	out := make([]*Outer, 1)
	_, err = pr.Read(out)
	if err != nil {
		t.Fatal(err)
	}

	pr.Close()
}

The code above works as expected if InnerStruct is not a pointer in the slice element.

	type SliceElement struct {
		Inner InnerStruct
	}

Any suggestions on how to fix this? I can't remove the pointers since I'm using generated protobuf messages which always use pointers. The pointers don't appear to be a problem in other places, only if used in a struct that's part of a slice from what I can tell. Any help with this is appreciated.

Looking for contributors

While Segment has invested a lot of resources in parquet-go, it is not a direction that we have the time or ability to maintain.

We are looking for contributors or maintainers who can help keep the library up to date. If you are interested or are using parquet-go in production, please reach out either here or via email at [email protected].

List-type columns should able to write null in parquet file #513

Descriptions:

We have a program that read json file, do some operation, then write to a new parquet file. Some problems happened when there are list-type columns.

Originally, this list-type may have a null value in the json source(e.g.{"list":null}), we want the parquet file to remain this characteristic(remain null if the json source is null). However, we cannot achieve this using the current parquet struct tag this library provided.

package main

import (
	"log"

	"github.com/parquet-go/parquet-go"
)

type RowType1 struct {
	ListTag []int32 `json:"list_tag" parquet:"list_tag,list"`
}

func main() {
	var rows []RowType1
	strs := []string{
		`{}`,
		`{"list_tag":null}`,
		`{"list_tag":[]}`,
		`{"list_tag":[1,2]}`,
	}
	for _, s := range strs {
		var r RowType1
		json.Unmarshal([]byte(s), &r)
		rows = append(rows, r)
	}
	if err := parquet.WriteFile("file.parquet", rows); err != nil {
		log.Fatalln("error")
	}
}

//The result is printed by `pqrs cat file.parquet`
// {list_tag: []} -> expect {list_tag: null}
// {list_tag: []} -> expect {list_tag: null}
// {list_tag: []}
// {list_tag: [1, 2]}

I have tried the following methods:

  • to make the list-type columns optional
type RowType1 struct {
	ListTag []int32 `json:"list_tag" parquet:"list_tag,list,optional"`
}

However, it seems to be all rows will be either null or empty lists, no matter whether the origin json file has value (e.g.{"list":[1,2]}) or not.(Wonder if list + optional is not excepted, or it is just a bug)

  • make the list-type columns a pointer
type RowType1 struct {
	ListTag *[]int32 `json:"list_tag" parquet:"list_tag,list"`
}

// panic: list is an invalid parquet tag: list_tag *[]int32 [list]
However, it seems that the list tag currently only support Slice type

Expected Result:

  • have a way to write parquet with a null value for the list-type column

Is there any possibility to achieve it?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.