In this issue, I would like to submit a proposal for modifying the parquet.RowReader
and parquet.RowWriter
interfaces in order to improve usability of those abstractions, as well as reducing their overhead to improve application performance.
Context
At this time, the APIs in question have the following definitions:
package parquet
type Row []Value
type RowReader interface {
ReadRow(Row) (Row, error)
}
type RowWriter interface {
WriteRow(Row) error
}
Qualities
The goals of these APIs is to allow the expression of generic algorithms working on sequences of parquet rows. For example, when writing rows to a parquet.Writer
, applications can call WriteRow
repeatedly to append rows and construct a parquet file. When reading rows, the API allows for a stream-like consumption of the content of a parquet file, which is useful when the datasets cannot be loaded entirely in memory (a common case when working with large parquet files).
Simplicity of the design
The use of parquet.Row
offers a simple abstraction (a slice doesn't hide much about the implementation details), and enables constructs that leverage language features like append
or for ... range
. For example, some programs can simply iterate over the values of a row with a construct like this:
for _, value := range row {
...
}
Similarities with the io
package
Being somewhat similar to the design of io.Reader
and io.Writer
, this model also reuses concepts that Go developers are often familiar with, helping ramp up when learning to use the package. The parallels go beyond the interfaces into APIs like parquet.CopyRows
which mimic the design of io.Copy
, etc... creating a strong sense of familiarity for developers who have worked with the standard io
package.
Flaws
There are limitations with the design of these APIs that seem to have a greater impact than originally anticipated, and we might be able to mitigate by striking different balances in the design.
Asymmetry of parquet.RowReader and parquet.RowWriter
When writing rows to a parquet.RowWriter
, the application forms a full parquet.Row
and passes it to the writer, allowing the WriteRow
method to offer very simple input and output. The intuitive corresponding API for parquet.RowReader
would be to consume and return the next row, for example:
interface {
ReadRow() (Row, error)
}
Performance consideration arise with this model, since the parquet.Row
type needs to be somewhat flexible in the amount of memory that it consumes (e.g. repeated columns may cause the number of values in a row to vary). Returning a newly allocated parquet.Row
value on every call would greatly increase GC pressure on the application, as well as impact the compute footprint due to extra malloc calls.
To address these concerns, the contract of parquet.RowReader
could establish that the returned parquet.Row
remains valid until the next call to ReadRow
, and reuse the buffer it had returned. While effective to address the performance concerns, this model weakens the application because nothing in the language prevents an application from retaining the parquet.Row
and unexpectedly have the inner buffers mutated on the next call to ReadRow
. Explicit copies of the parquet.Row
values would be required when they need to be retained, but developers would need to remember to copy the rows, the programs would be correct either way from the compiler's perspective.
The current approach taken by parquet-go is to allow the application to pass a pre-allocated parquet.Row
value that the reader will append values to. This makes memory management more explicit, allowing the application to choose where to allocate the buffer, or pass nil
to let the append
calls allocate memory on demand. The model better leverages the language infrastructure to help achieve greater efficiency, at the expense of usability by having an asymmetry between the writer and reader interfaces, the reader case forcing more complexity onto the application in this case.
Performance overhead of the API
While the interfaces are named parquet.RowReader
and parquet.RowWriter
, the models are more similar to iterators than readers/writers since applications can only read or write a single row at a time. For example, the io.Reader
and io.Writer
interfaces allow bulk operations on arrays of bytes, they don't require applications to read or write bytes one by one.
Bulk operations yield performance benefits over iterator-like APIs, as they allow application to balance between memory and compute utilization: by using larger memory buffers, we can reduce the number of calls through the abstraction layers, amortizing the runtime cost of the abstractions.
This becomes more important the more wrappers we use, which is a common approach in Go programs. For example, one might wrap an io.Reader
to transparently inject counters of bytes read from an underlying medium. The cost of the interface calls increases linearly with the number of wrappers, quickly becoming a measurable portion of the compute time consumed by an application. Reducing the number of calls by allowing bulk operations is an effective way to get the best of both worlds, leveraging the power of Go interfaces while keeping the cost of going through the abstraction layers to a minimum.
The following screen capture shows a subset of a CPU profile measuring compute utilization during the MergeRowGroups
benchmark. Because we are reading a single row at a time, we must cross through the abstraction layer each time to attain the column that we need to read values from. In this case, it accounts for ~16% of the CPU time used by this code path (and tends to increase linearly with the number of columns):
Proposal
With this proposal, we are attempting to strike a better balance in the design of the row readers and writers, which retains qualities of the existing design while also offering solutions to address the issues highlighted above.
The change would consist in changing the parquet.RowReader
and parquet.RowWriter
interfaces to adopt the following definitions:
package parquet
type RowReader interface {
ReadRows([]Row) (int, error)
}
type RowWriter interface {
WriteRows([]Row) (int, error)
}
The interfaces would receive slices of rows, returning the number of rows that were successfully read or written (starting from the first).
On addressing the lack of symmetry between the reader and writer
As we see in the interface definitions, the methods have the same inputs and outputs, removing the asymmetry that exist today between the interfaces.
This model mimics the standard io.Reader
and io.Writer
interfaces more closely, as well as the parquet.ValueReader
and parquet.ValueWriter
from this package. It would result in having APIs that are better understood by developers, smoothing the learning curve and reducing the cognitive overhead of using parquet-go.
On addressing the performance overhead of the API
With the proposed change, multiple rows can be read or written in each call to ReadRows
or WriteRows
. Applications that intend to achieve high throughput of row processing (whether reading or writing) can increase the row buffer size to read or write more rows in each call, amortizing the cost of going through abstraction layers.
To prevent allocations of individual rows on each call to ReadRows
, the readers must be allowed to append to the Row
values if entries of the []parquet.Row
slice are set to non-nil values. This contract not only optimizes memory allocations by being able to reuse the back arrays of the slices, it also allows for simpler and less error prone approach than the current model. The following example shows how the code scanning rows combines both simplicity and efficiency:
var rows [100]parquet.Row
for {
n, err := reader.ReadRows(rows[:])
// The call to ReadRows has reused inner buffers of the `rows` array,
// setting each row to the column values while retaining the backing
// array of each slice to be reused in the next call.
for _, row := range rows[:n] {
...
}
if err != nil {
...
}
}
Other considerations
Non-contiguous grouping of column values in memory
The proposed solution highlights another missed opportunity to optimize the column values layout in memory. The []Row
type keeps the values for each row contiguous in memory, but the values of columns are spread across random locations.
Contiguous memory layout of columnar data can be achieved by reading values from columns individually through the parquet.ValueReader
(and reciprocally parquet.ValueWriter
). Constructing an API which will highlight both the qualities of efficient columnar layout AND the simplicity of manipulating rows is a balance that seems hard to strike, and would likely require much more advanced software constructs. It also seems to overlap with goals of other projects such as Apache Arrow, which may be better suited to solve these problems than duplicating the efforts in parquet-go.
I believe that there is room for both, the parquet.RowReader
and parquet.RowWriter
interface should be the logical mediums that the package is built upon, while also allowing optimizations allowing acceleration through columnar data layouts with different APIs.