polarsignals / frostdb Goto Github PK
View Code? Open in Web Editor NEWโ๏ธ Coolest database around ๐ง Embeddable column database written in Go.
License: Apache License 2.0
โ๏ธ Coolest database around ๐ง Embeddable column database written in Go.
License: Apache License 2.0
While working on #118 I realised that we create a go-routine per compaction for each granule.
I think this should be bounded but also we should probably recycle go-routine since those can grow pretty large in term of memory
I suggest we create 8 go-routines that will take care of compactions, then send granule to compact via a buffered channel of 32, meaning it can block on the write path.
If we decide to make concurrency configurable at runtime (which we might want to do based on load), I think this will create an unnecessary synchronizer if we have concurrency == 1 for any reason. What do you think of creating a struct with more planning state to make more informed decisions of what to plan? For example, store some sort of notion of whether we have to plan a synchronizer. Maybe it's fine to also check
len(phyPlans)
, but it seems to me like this state complexity will start to balloon. This struct I suggest might also be useful for more planning information (e.g. ordering column guarantees)
Originally posted by @asubiotto in #202 (comment)
Currently, ArcticDB only supports simple columns, no repeated or struct columns, it would be nice if it would also support more complex types.
We have a bunch of uses of unsafe atomic pointers. Go 1.19 shipped with a generic (type-safe) atomic pointer that we should now use.
Right now in frostdb when we persist a table block into memory, we first read the entirety of the table block into a set of merged row groups, and then into a buffer. This causes our memory to explode every time we persist a block. This is seen the screenshot from Parca below.
It would be ideal if we could instead have the TableBlock
implement the io.Reader
interface that the objstore.Bucket
interface requires so we could write it directly there instead of the memory spikes.
While trying to add back the write benchmark I realized Sync doesn't guarantee to see the last write even if it has already returned.
see #111
I'm guessing this is a bug, but may be that's not a given guarantee.
Hi, all, while experimenting with the persistence issue (#35), I found a problem with the splitGranule()
function defined in the table.go
file.
To reproduce the problem, you can set the granule size to 4096 instead of 8192 inside function Test_Table_Concurrency()
. The test will fail with an ouput similar to the following:
=== RUN Test_Table_Concurrency
table_test.go:495:
Error Trace: table_test.go:495
Error: Not equal:
expected: 8000
actual : 7880
Test: Test_Table_Concurrency
--- FAIL: Test_Table_Concurrency (6.08s)
The issue is caused by the fact that `splitGranule()' is called several time, and, because of the high number of concurrent transactions, the function is aborted. For example when reaching the following condition:
if n < t.table.db.columnStore.granuleSize { // It's possible to have a Granule marked for compaction but all the parts in it aren't completed tx's yet
t.abort(commit, granule)
return
}
Now, when splitGranule()
is called, a "Compacting" sentinel node is immediately prepended in the part list of the granule:
parts := granule.parts.Sentinel(Compacting)
If the function is aborted, this sentinel node is not cleared, and remains in the part list of the granule.
Because of this additional node being present, any subsequent iteration on the part list is stopped when reaching the sentinel node, because of the following condition inside function Iterate()
:
// Iterate accesses every node in the list.
func (l *PartList) Iterate(iterate func(*Part) bool) {
...
if node.part == nil && node.sentinel != l.listType { // if we've encountererd a sentinel node from a different type of list we must exit
return
}
...
}
From my understanding of the code, stopping the iteration when node.sentinel != l.listType
makes sense only when Iterate()
is used inside the splitGranule()
function. However, transactions which want to iterate on the full part list should skip this condition.
If my guess is correct, then we could pass a flag to the Iterate()
function to skip the condition under certain circumstances.
This test seems to be flaky as it failed CI during an unrelated change to benchmark code.
=== RUN Test_Table_ReadIsolation
table_test.go:548:
Error Trace: table_test.go:548
table.go:637
table_test.go:529
Error: Not equal:
expected: 3
actual : 4
Test: Test_Table_ReadIsolation
In order to be able to "list all available label-names" in Parca, we need to be able to iterate over all the dynamically created column names.
The way I imagine this will work is that a DynamicColumnScan
returns RowGroup
s where each dynamic column is a repeated string column in plain encoding which contains the "list" of concrete column names.
I recently came across a weird failure mode where the race detector exceeded a threshold of allowed goroutines. I tracked this down to goroutines created in TxPool.cleaner
:
--- FAIL: Test_Table_Concurrency (10.32s)
leaks.go:78: found unexpected goroutines:
[Goroutine 52 in state select, with github.com/polarsignals/frostdb.(*TxPool).cleaner on top of the stack:
goroutine 52 [select]:
github.com/polarsignals/frostdb.(*TxPool).cleaner(0x140003ed840, 0x140003ed730?)
/Users/asubiotto/Developer/github.com/polarsignals/frostdb/tx_list.go:79 +0xa0
created by github.com/polarsignals/frostdb.NewTxPool
/Users/asubiotto/Developer/github.com/polarsignals/frostdb/tx_list.go:26 +0xe8
To reproduce, simply run go test -run Test_Table_Concurrency
with a leak detector:
diff --git a/table_test.go b/table_test.go
index e74a689..a430e5f 100644
--- a/table_test.go
+++ b/table_test.go
@@ -24,6 +24,7 @@ import (
"github.com/thanos-io/objstore/providers/filesystem"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
+ "go.uber.org/goleak"
"github.com/polarsignals/frostdb/dynparquet"
schemapb "github.com/polarsignals/frostdb/gen/proto/go/frostdb/schema/v1alpha1"
@@ -490,6 +491,8 @@ func Test_Table_InsertLowest(t *testing.T) {
// This test issues concurrent writes to the database, and expects all of them to be recorded successfully.
func Test_Table_Concurrency(t *testing.T) {
+ defer goleak.VerifyNone(t)
+
tests := map[string]struct {
granuleSize int
}{
I couldn't find an obvious Close
method that was missing, so decided to record this in an issue. cc @thorfour
The upstream objstore
dependency we use now implements an identical prefixing mechanism (unsurprisingly since we contributed it), so we should get rid of the one in this repository:
https://github.com/polarsignals/frostdb/blob/42eb27a93c4bc694d9ec4848a30c0e5ecdd8efb5/bucket.go
[signal SIGSEGV: segmentation violation code=0x2 addr=0x0 pc=0x103359f08]
goroutine 25938 [running]:
github.com/polarsignals/frostdb.(*Granule).ColumnChunks(0x140258aaea0)
/Users/thor/go/src/github.com/polarsignals/frostdb/granule.go:378 +0x338
github.com/polarsignals/frostdb.(*ColumnRef).Column(0x1404e80bf30, {0x106cbed48, 0x140258aaea0})
/Users/thor/go/src/github.com/polarsignals/frostdb/binaryscalarexpr.go:108 +0x78
github.com/polarsignals/frostdb.BinaryScalarExpr.Eval({0x1404e80bf30, 0x1, {0x1401b743738, 0x6, 0xf9, 0x0, 0x0, 0x0}}, {0x106cbed48?, 0x140258aaea0?})
/Users/thor/go/src/github.com/polarsignals/frostdb/binaryscalarexpr.go:130 +0x54
github.com/polarsignals/frostdb.(*AndExpr).Eval(0x1404dd991e0, {0x106cbed48, 0x140258aaea0})
/Users/thor/go/src/github.com/polarsignals/frostdb/filter.go:132 +0x3c
github.com/polarsignals/frostdb.(*TableBlock).RowGroupIterator.func1({0x106cb0fe0?, 0x140258aaea0})
/Users/thor/go/src/github.com/polarsignals/frostdb/table.go:935 +0x70
github.com/google/btree.(*node[...]).iterate(0x1404f4a3600, 0x1, {{0x0, 0x0?}, 0xb8?}, {{0x0?, 0x0?}, 0xa8?}, 0x0?, 0x0, ...)
/Users/thor/go/pkg/mod/github.com/google/[email protected]/btree_generic.go:522 +0x318
github.com/google/btree.(*node[...]).iterate(0x1404f4a3240, 0x1, {{0x0, 0x0?}, 0x8?}, {{0x0?, 0x0?}, 0x18?}, 0x0?, 0x0, ...)
/Users/thor/go/pkg/mod/github.com/google/[email protected]/btree_generic.go:510 +0x1a4
github.com/google/btree.(*node[...]).iterate(0x1404f4a31c0, 0x1, {{0x0, 0x0?}, 0x20?}, {{0x0?, 0x0?}, 0x0?}, 0x0?, 0x0, ...)
/Users/thor/go/pkg/mod/github.com/google/[email protected]/btree_generic.go:510 +0x1a4
github.com/google/btree.(*node[...]).iterate(0x1404f4a3180, 0x1, {{0x0, 0x0?}, 0x68?}, {{0x0?, 0x0?}, 0x1?}, 0x0?, 0x0, ...)
/Users/thor/go/pkg/mod/github.com/google/[email protected]/btree_generic.go:510 +0x1a4
github.com/google/btree.(*BTreeG[...]).Ascend(0x0?, 0x0?)
/Users/thor/go/pkg/mod/github.com/google/[email protected]/btree_generic.go:779 +0x54
github.com/google/btree.(*BTree).Ascend(...)
/Users/thor/go/pkg/mod/github.com/google/[email protected]/btree_generic.go:1029
github.com/polarsignals/frostdb.(*TableBlock).RowGroupIterator(0x140002bc240?, {0x1404a34b620?, 0x1404a34bce0?}, 0x10505edae?, {0x106cb0f60?, 0x1404dd991e0?}, 0x1400855a028?)
/Users/thor/go/src/github.com/polarsignals/frostdb/table.go:931 +0x6c
github.com/polarsignals/frostdb.(*Table).collectRowGroups(0x140002bc240, {0x106cd4cb0?, 0x1404a34bce0?}, 0x10504e457?, {0x106cdc148, 0x1404a34b620})
/Users/thor/go/src/github.com/polarsignals/frostdb/table.go:1317 +0x204
Hit this after running for a couple hours
We see some unexpected memory spikes during compaction (this is a local demo instance scraping itself):
In the graph above, there is a ~1GB memory spike that is attributed to compaction. Matching this with the compaction that occurred during this time in the logs (debug level), we get:
level=debug db=parca name=parca ts=2022-10-25T06:57:11.229938Z caller=compaction.go:187 msg="compaction complete" duration=3.117003833s l0Before="[sz=21 MiB,cnt=483]" l1Before="[sz=4.3 MiB,cnt=1]" l1After="[sz=5.9 MiB,cnt=1]" overlaps=1 splits=0
So this ~1GB of allocations was a result of merging 483 parts that sum up to 21 MiB and 1 4.3MiB part. Given parquet decompression, we do expect memory usage to be higher than the l0+l1 size, but 1GB seems like too much. One theory of what is going on is that buffer pooling is not working as expected in parquet (segmentio/parquet-go#372).
I tried running the test again with the race detector enabled (on PR #84) and I get races detected:
gh pr checkout 84
HEAD
is at a3e6195e7beef1e77eff933d569555eb474772f7
go test -race -run Test_Table_Concurrency
The logs can be found here: https://gist.github.com/MadhavJivrajani/fa69278be76cdf18143dcde533f0e193
This is probably unrelated to the bug being fixed by the PR #84 tho.
Arcticdb will need to support aborting write transactions.
Transactions either need to be rolled back or tombstoned such that reads don't contain aborted transactions.
While repeated types are now supported (#72) for writing and reading, they lack the ability to be filtered in more expressive ways than exact array matches (==
) and not matches (!=
). I don't think I have thought through the APIs enough yet for this, but one thought I've had is something like:
logicalplan.Col("values").Any().Eq(logicalplan.Literal(5)))
In this example, the table has a column called values
, which is an array of integers, and the filter would keep only those rows whose values
contain 5
.
I'm not sure I'm satisfied with the API, but I wanted to open this issue to at least drop my thoughts here.
Currently, arcticDB's query execution is not parallelized, but we want to do that. There are well-known techniques such as vulcano to generalize the parallelization of steps within a query. I think vulcano is a promising direction for arcticDB but I'd be happy for us to explore other possibilities as well.
There were some historic reasons to introduce the separate ArrowSchema function call for the TableScan/Execute
.
Going forward we don't need this to be a separate function anymore.
table.Iterator
should implicitly read the schema too.
Basically, we need to remove this function call:
frostdb/query/physicalplan/physicalplan.go
Lines 121 to 134 in 2f2f61f
Instead of maintaining a list of active transactions since we only want to prevent read tearing, we should replace it with a high watermark instead. This will also obviate the need for a garbage collector on the active list
Instead of using a mutex, I would use a buffered channel here (maybe with
concurrency
slots) with a synchronizer goroutine reading from the channel and pushing. The nice thing with a buffered channel is that upstream goroutines can push data to be read even if the downstream goroutine is not ready to read this data yet (e.g. so the upstream goroutine can perform more scan work concurrently), while still being able to apply backpressure if the downstream goroutine is super slow for some reason.
Originally posted by @asubiotto in #202 (comment)
Currently, if a query is sent to ArcticDB it will complete it no matter what, even if the client doesn't care about the request anymore. We should allow passing a context and stop execution if it is canceled.
https://github.com/polarsignals/arcticdb/blob/450025ff363cb7241d9a038dcaf74b8f2779f2ac/pqarrow/arrow.go#L379
https://github.com/polarsignals/arcticdb/blob/0c97de2410a804308b52574cdb1d5073d436ae64/query/logicalplan/expr.go#L127
There are two ParquetNodeToType
functions in pqarrow
and logicalplan
with the same name and the same functionality, so it would make sense to make one function to be used around.
However, there are perhaps two problems with it.
From its functionality, pqarrow
package would be the appropriate place ( it means parquet-arrow
, doesn't it? ), but it imports logicalplan
, which causes an import cycle.
To avoid using panic(), we need to change to DataType(*dynparquet.Schema) (arrow.DataType, error)
in the expr interface.
https://github.com/polarsignals/arcticdb/blob/1bd737630d2c68487961bd7d52c50503924e5ad9/query/logicalplan/builder.go#L82
Any thoughts?
Right now we have a small set of benchmarks that really only benchmark concurrent writers.
It would be good to get some benchmarks around the query path, as well as large database benchmarks etc.
It might make sense to update the interface to return the error too?
frostdb/query/logicalplan/logicalplan.go
Line 133 in b2cc5b0
Or should this error really just be handled outside the interface? Basically opening that as a question to everyone else as I'm not 100% sure right now.
Hi, all, I noticed that if I increase the number of concurrent writers inside function Test_Table_Concurrency()
, I get the following error:
"failed to delete granule during split"
Thinking about it, this could be due to the following. After we reach the following statement
// set the newGranules pointer, so new writes will propogate into these new granules
granule.newGranules = granules
inside splitGranule()
, we propagate new inserts also to child granules. Then, under an high concurrent pressure, compaction could be triggered also for some of the several child granules, even before they were inserted in the index.
So, when the control recheases this line (in the context of the child granule)
deleted := index.Delete(granule)
if deleted == nil {
level.Error(t.logger).Log("msg", "failed to delete granule during split")
}
we will not find it inside the index. If we simply ignore the error log, we could end with a situation, where some granules are not deleted properly from the index. I think that a possible solution to this could the disabling compaction for newly created granules until we are sure we inserted them inside the index. So, something like:
// we disable compaction for new granules before allowing new insert to be propagated to them
for _, childGranule := range granules {
childGranule.metadata.pruned.Store(1)
}
// we restore the possibility to trigger compaction after we exited the function
defer func() {
for _, childGranule := range granules {
childGranule.metadata.pruned.Store(0)
}
}()
// set the newGranules pointer, so new writes will propogate into these new granules
granule.newGranules = granules
...
If this makes sense for you, I will submit a PR with the fix
It would be great to pass down the logger into the database and automatically add the database name as key-value pair:
Line 238 in b2cc5b0
Something like the following would be great:
log.WithPrefix(s.logger,"db",name)
While we already have the And
expression, we cannot express Or
expressions today.
$ sw_vers
ProductName: macOS
ProductVersion: 11.6.4
BuildVersion: 20G417
$ go version
go version go1.18 darwin/amd64
latest(ccf34f7bbb98fa1b6af19a0198b81f7b4cd1441e
)
$ go test -count 300 -run Test_Table_Concurrency -timeout 20h
--- FAIL: Test_Table_Concurrency (27.61s)
--- FAIL: Test_Table_Concurrency/8192 (6.71s)
table_test.go:516:
Error Trace: table_test.go:516
Error: Not equal:
expected: 8000
actual : 7990
Test: Test_Table_Concurrency/8192
--- FAIL: Test_Table_Concurrency (26.70s)
--- FAIL: Test_Table_Concurrency/8192 (6.65s)
table_test.go:516:
Error Trace: table_test.go:516
Error: Not equal:
expected: 8000
actual : 7990
Test: Test_Table_Concurrency/8192
FAIL
exit status 1
FAIL github.com/polarsignals/arcticdb 8061.410s
This is a pretty specific query optimization but one that happens to occur quite a bit, which is querying all available values of a column where the column is dictionary encoded. In the Parca case, that's listing the __name__
label values. In these cases instead of returning the entire decoded column, we can just return the dictionary directly.
Using a literal in an expression to filter that is different than the schema's column type causes an a panic when the query executes.
For example, using an int literal to filter a string column
func main() {
var err error
columnstore := arcticdb.New(nil, 10, 512*1024*1024).WithIndexDegree(3)
db, err := columnstore.DB("test_db")
if err != nil {
panic(err)
}
logger := log2.NewLogfmtLogger(log2.NewSyncWriter(os.Stderr))
logger = level.NewFilter(logger, level.AllowDebug())
schema := dynparquet.NewSchema("test_schema",
[]dynparquet.ColumnDefinition{
// HERE Column1 is defined as a string
{
Name: "Column1",
StorageLayout: parquet.Encoded(parquet.String(), &parquet.RLEDictionary),
Dynamic: false,
},
},
[]dynparquet.SortingColumn{
dynparquet.Ascending("Column1"),
})
tableConfig := arcticdb.NewTableConfig(schema)
table, err := db.Table("test_table", tableConfig, logger)
if err != nil {
panic(err)
}
buffer, err := schema.NewBuffer(map[string][]string{})
row := make([]parquet.Value, 0)
message := "hello"
row = append(row, parquet.ValueOf(message).Level(0, 0, 0))
err = buffer.WriteRow(row)
if err != nil {
panic(err)
}
_, err = table.InsertBuffer(buffer)
if err != nil {
panic(err)
}
table.Sync()
queryEngine := query.NewEngine(memory.DefaultAllocator, db.TableProvider())
query := queryEngine.ScanTable("test_table").
Filter(logicalplan.Col("Column1").LT(logicalplan.Literal(10))) // HERE add filter for non string literal
// Execute will panic
query.Execute(context.Background(), func(ar arrow.Record) error {
log.Printf("%v\n", ar)
return nil
})
}
Causes panic
panic: something terrible has happened, this should have errored previously during validation
goroutine 1 [running]:
github.com/polarsignals/arcticdb/query/physicalplan.BinaryScalarOperation({0x1b229b8, 0xc0000acde0}, {0x1b20c60, 0xc0000d27e0}, 0x4)
/Users/albert.lockett2/Development/arcticdb/query/physicalplan/binaryscalarexpr.go:107 +0x1490
github.com/polarsignals/arcticdb/query/physicalplan.BinaryScalarExpr.Eval({0xc0000a87c0, 0x4, {0x1b20c60, 0xc0000d27e0}}, {0x1b235e8, 0xc0000b51a0})
/Users/albert.lockett2/Development/arcticdb/query/physicalplan/binaryscalarexpr.go:56 +0x185
github.com/polarsignals/arcticdb/query/physicalplan.filter({0x1b1b040, 0x1f97408}, {0x1b17cb8, 0xc0000d2800}, {0x1b235e8, 0xc0000b51a0})
/Users/albert.lockett2/Development/arcticdb/query/physicalplan/filter.go:190 +0xad
github.com/polarsignals/arcticdb/query/physicalplan.(*PredicateFilter).Callback(0xc0000b5020, {0x1b235e8, 0xc0000b51a0})
/Users/albert.lockett2/Development/arcticdb/query/physicalplan/filter.go:177 +0xc5
github.com/polarsignals/arcticdb.(*Table).Iterator(0xc00032a080, {0x1b1cd30, 0xc000038048}, {0x1b1b040, 0x1f97408}, {0xc0000a8760, 0x1, 0x1}, {0x1b1e7e0, 0xc0000b4ff0}, ...)
/Users/albert.lockett2/Development/arcticdb/table.go:271 +0x523
github.com/polarsignals/arcticdb/query/physicalplan.(*TableScan).Execute(0xc0000d2820, {0x1b1cd30, 0xc000038048}, {0x1b1b040, 0x1f97408})
/Users/albert.lockett2/Development/arcticdb/query/physicalplan/physicalplan.go:72 +0x265
github.com/polarsignals/arcticdb/query/physicalplan.(*OutputPlan).Execute(0xc0000b06f0, {0x1b1cd30, 0xc000038048}, {0x1b1b040, 0x1f97408}, 0x1adef30)
/Users/albert.lockett2/Development/arcticdb/query/physicalplan/physicalplan.go:58 +0x93
github.com/polarsignals/arcticdb/query.LocalQueryBuilder.Execute({{0x1b1b040, 0x1f97408}, {0xc0000aad00}}, {0x1b1cd30, 0xc000038048}, 0x1adef30)
/Users/albert.lockett2/Development/arcticdb/query/engine.go:115 +0x330
main.main()
panic: runtime error: index out of range [0] with length 0
goroutine 51647 [running]:
github.com/polarsignals/arcticdb/query/physicalplan.(*HashAggregate).Callback(0x1400cd3e2a0, {0x1089fb7f8, 0x14019cbf980})
/Users/brancz/src/github.com/polarsignals/arcticdb/query/physicalplan/aggregate.go:220 +0x6e8
github.com/polarsignals/arcticdb/query/physicalplan.(*PredicateFilter).Callback(0x14019b9eb70, {0x1089fb7f8?, 0x14019cbf860?})
/Users/brancz/src/github.com/polarsignals/arcticdb/query/physicalplan/filter.go:181 +0xbc
github.com/polarsignals/arcticdb.(*Table).Iterator(0x1400a2ea200?, {0x1089dcaf0, 0x10a4c5600}, {0x1403bde2900, 0x8, 0x8}, {0x1089ec1e8, 0x14019b54a80}, {0x0, 0x0, ...}, ...)
/Users/brancz/src/github.com/polarsignals/arcticdb/table.go:231 +0x16c
github.com/polarsignals/arcticdb/query/physicalplan.(*TableScan).Execute(0x1400d9d0ba0, {0x1089dcaf0, 0x10a4c5600})
/Users/brancz/src/github.com/polarsignals/arcticdb/query/physicalplan/physicalplan.go:71 +0x100
github.com/polarsignals/arcticdb/query/physicalplan.(*OutputPlan).Execute(...)
/Users/brancz/src/github.com/polarsignals/arcticdb/query/physicalplan/physicalplan.go:57
github.com/polarsignals/arcticdb/query.LocalQueryBuilder.Execute({{0x1089dcaf0?, 0x10a4c5600?}, {0x1400a2ea240?}}, 0x14015e39870)
/Users/brancz/src/github.com/polarsignals/arcticdb/query/engine.go:113 +0x138
github.com/parca-dev/parca/pkg/query.(*ColumnQueryAPI).findSingle(0x140033d8320, {0x1089ea968?, 0x14019af0240?}, {0x1400d9d09e0?, 0x3, 0x4}, {0x1400a281e00?, 0x0?, 0x0?})
/Users/brancz/src/github.com/parca-dev/parca/pkg/query/columnquery.go:426 +0x964
github.com/parca-dev/parca/pkg/query.(*ColumnQueryAPI).selectSingle(0x140033d8320, {0x1089ea968, 0x14019af0240}, 0x1400a281e00)
/Users/brancz/src/github.com/parca-dev/parca/pkg/query/columnquery.go:383 +0x168
github.com/parca-dev/parca/pkg/query.(*ColumnQueryAPI).singleRequest(0x14008c7ae60?, {0x1089ea968, 0x14019af0240}, 0x6?, 0xb93338?)
/Users/brancz/src/github.com/parca-dev/parca/pkg/query/columnquery.go:368 +0x30
github.com/parca-dev/parca/pkg/query.(*ColumnQueryAPI).Query(0xa41?, {0x1089ea968, 0x14019af0240}, 0x14008c7ae60)
/Users/brancz/src/github.com/parca-dev/parca/pkg/query/columnquery.go:317 +0x80
github.com/parca-dev/parca/gen/proto/go/parca/query/v1alpha1._QueryService_Query_Handler.func1({0x1089ea968, 0x14019af0240}, {0x10885e580?, 0x14008c7ae60})
/Users/brancz/src/github.com/parca-dev/parca/gen/proto/go/parca/query/v1alpha1/query_vtproto.pb.go:183 +0x78
github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors.UnaryServerInterceptor.func1({0x1089ea968, 0x14019af0240}, {0x10885e580, 0x14008c7ae60}, 0x0?, 0x14014b76b88)
/Users/brancz/pkg/mod/github.com/grpc-ecosystem/go-grpc-middleware/[email protected]/interceptors/server.go:22 +0x158
github.com/grpc-ecosystem/go-grpc-middleware.ChainUnaryServer.func1.1.1({0x1089ea968?, 0x14019af0240?}, {0x10885e580?, 0x14008c7ae60?})
/Users/brancz/pkg/mod/github.com/grpc-ecosystem/[email protected]/chain.go:25 +0x40
github.com/grpc-ecosystem/go-grpc-prometheus.(*ServerMetrics).UnaryServerInterceptor.func1({0x1089ea968, 0x14019af0240}, {0x10885e580, 0x14008c7ae60}, 0x0?, 0x1400d9d0820)
/Users/brancz/pkg/mod/github.com/grpc-ecosystem/[email protected]/server_metrics.go:107 +0x78
github.com/grpc-ecosystem/go-grpc-middleware.ChainUnaryServer.func1.1.1({0x1089ea968?, 0x14019af0240?}, {0x10885e580?, 0x14008c7ae60?})
/Users/brancz/pkg/mod/github.com/grpc-ecosystem/[email protected]/chain.go:25 +0x40
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc.UnaryServerInterceptor.func1({0x1089ea968, 0x14019aa7b00}, {0x10885e580, 0x14008c7ae60}, 0x1400d9d0800, 0x1400d9d0840)
/Users/brancz/pkg/mod/go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/[email protected]/interceptor.go:325 +0x4ec
github.com/grpc-ecosystem/go-grpc-middleware.ChainUnaryServer.func1.1.1({0x1089ea968?, 0x14019aa7b00?}, {0x10885e580?, 0x14008c7ae60?})
/Users/brancz/pkg/mod/github.com/grpc-ecosystem/[email protected]/chain.go:25 +0x40
github.com/grpc-ecosystem/go-grpc-middleware.ChainUnaryServer.func1({0x1089ea968, 0x14019aa7b00}, {0x10885e580, 0x14008c7ae60}, 0x14000377ad8?, 0x10544c198?)
/Users/brancz/pkg/mod/github.com/grpc-ecosystem/[email protected]/chain.go:34 +0xbc
github.com/parca-dev/parca/gen/proto/go/parca/query/v1alpha1._QueryService_Query_Handler({0x108806f40?, 0x140033d8320}, {0x1089ea968, 0x14019aa7b00}, 0x1400b83ba40, 0x140033c44b0)
/Users/brancz/src/github.com/parca-dev/parca/gen/proto/go/parca/query/v1alpha1/query_vtproto.pb.go:185 +0x13c
google.golang.org/grpc.(*Server).processUnaryRPC(0x14000e9dc00, {0x1089f8f10, 0x1400ec5a510}, 0x1400a28e360, 0x140033c4750, 0x10a4671d8, 0x0)
/Users/brancz/pkg/mod/google.golang.org/[email protected]/server.go:1282 +0xb3c
google.golang.org/grpc.(*Server).handleStream(0x14000e9dc00, {0x1089f8f10, 0x1400ec5a510}, 0x1400a28e360, 0x0)
/Users/brancz/pkg/mod/google.golang.org/[email protected]/server.go:1619 +0x840
google.golang.org/grpc.(*Server).serveStreams.func1.2()
/Users/brancz/pkg/mod/google.golang.org/[email protected]/server.go:921 +0x88
created by google.golang.org/grpc.(*Server).serveStreams.func1
/Users/brancz/pkg/mod/google.golang.org/[email protected]/server.go:919 +0x298
Right now when we query we iterate over the entire index.
This makes a query an O(N) operation, and renders the index pointless. We should instead leverage the sorted nature during queries to minimize the number of granules we have to look at.
Ideally we'd also do this on insert as well.
There are two goals during Granule
compaction:
Granule
size (i.e. the sum of the serialized size of all the parquet files contained in this granule).The current compaction strategy is to trigger a compaction on a granule during Insertions if the data added to a Granule
makes this Granule
's size exceed a threshold. If this condition is met, a goroutine is spawned to compact this granule. All that granule's Part
s (essentially a wrapper over a parquet.File
) will be deserialized and merge-sorted according to the sorting columns (using parquet.MergeRowGroups
) and rewritten in serialized format to an in-memory buffer that will become a single Part
of a new granule. If the new granule is still above the target size (note that merging multiple Part
s into one takes advantage of parquet compression, so this size should theoretically be lower on the first compaction of a Granule
), the new Granule
will be split into two new Granule
s. This requires another cycle of deserialization and reserialization because it is the only way to split a single Part
.
This compaction strategy achieves our stated goals, but is inefficient. We should mark a Part
when it is the result of a compaction. This implies that the row groups contained within this Part
are already the desired row group size. We could then enhance our existing compaction strategy to not deserialize these parts, avoiding the memory usage this entails. For any new compactions of an already compacted Granule
, only the uncompacted Part
s will be deserialized and reserialized into a single Part
. When splitting granules, we would only reassign compacted parts (ideally maintaining sorted order), which would also avoid the memory usage of deserializing the Part
to split during a split.
I've tried to introduce memory pooling unsuccessfully for the last ~1day so I'm stopping here and just dumping what I've done so far. A copy of the apache arrow Go allocator with a bucketed slice pool:
package memory
import (
"errors"
"sync"
"unsafe"
)
type Allocator struct {
pool *BucketedBytes
}
func NewAllocator() *Allocator {
return &Allocator{
pool: MustNewBucketedBytes(1, 64*1024*1024, 2, 0),
}
}
const (
alignment = 64
)
func (a *Allocator) Allocate(size int) []byte {
b, err := a.pool.Get(size + alignment) // padding for 64-byte alignment
if err != nil {
panic(err)
}
buf := *b
//buf := make([]byte, size+alignment) // padding for 64-byte alignment
addr := int(addressOf(buf))
next := roundUpToMultipleOf64(addr)
if addr != next {
shift := next - addr
return buf[shift : size+shift]
}
return buf[:size]
}
func (a *Allocator) Reallocate(size int, b []byte) []byte {
if size == len(b) {
return b
}
if size < cap(b) {
return b[:size]
}
newBuf := a.Allocate(size)
copy(newBuf, b)
a.Free(b)
return newBuf
}
func (a *Allocator) Free(b []byte) {
a.pool.Put(&b)
}
func roundToPowerOf2(v, round int) int {
forceCarry := round - 1
truncateMask := ^forceCarry
return (v + forceCarry) & truncateMask
}
func roundUpToMultipleOf64(v int) int {
return roundToPowerOf2(v, 64)
}
func isMultipleOfPowerOf2(v int, d int) bool {
return (v & (d - 1)) == 0
}
func addressOf(b []byte) uintptr {
return uintptr(unsafe.Pointer(&b[0]))
}
// BucketedBytes is a bucketed pool for variably sized byte slices. It can be configured to not allow
// more than a maximum number of bytes being used at a given time.
// Every byte slice obtained from the pool must be returned.
type BucketedBytes struct {
buckets []sync.Pool
sizes []int
maxTotal uint64
usedTotal uint64
mtx sync.Mutex
new func(s int) *[]byte
}
func MustNewBucketedBytes(minSize, maxSize int, factor float64, maxTotal uint64) *BucketedBytes {
p, err := NewBucketedBytes(minSize, maxSize, factor, maxTotal)
if err != nil {
panic(err)
}
return p
}
// NewBucketedBytes returns a new Bytes with size buckets for minSize to maxSize
// increasing by the given factor and maximum number of used bytes.
// No more than maxTotal bytes can be used at any given time unless maxTotal is set to 0.
func NewBucketedBytes(minSize, maxSize int, factor float64, maxTotal uint64) (*BucketedBytes, error) {
if minSize < 1 {
return nil, errors.New("invalid minimum pool size")
}
if maxSize < 1 {
return nil, errors.New("invalid maximum pool size")
}
if factor < 1 {
return nil, errors.New("invalid factor")
}
var sizes []int
for s := minSize; s <= maxSize; s = int(float64(s) * factor) {
sizes = append(sizes, s)
}
p := &BucketedBytes{
buckets: make([]sync.Pool, len(sizes)),
sizes: sizes,
maxTotal: maxTotal,
new: func(sz int) *[]byte {
s := make([]byte, sz)
return &s
},
}
return p, nil
}
// ErrPoolExhausted is returned if a pool cannot provide the request bytes.
var ErrPoolExhausted = errors.New("pool exhausted")
// Get returns a new byte slice that fits the given size.
func (p *BucketedBytes) Get(sz int) (*[]byte, error) {
p.mtx.Lock()
defer p.mtx.Unlock()
if p.maxTotal > 0 && p.usedTotal+uint64(sz) > p.maxTotal {
return nil, ErrPoolExhausted
}
for i, bktSize := range p.sizes {
if sz > bktSize {
continue
}
b, ok := p.buckets[i+1].Get().(*[]byte)
if !ok {
b = p.new(bktSize)
}
*b = (*b)[:sz]
p.usedTotal += uint64(cap(*b))
return b, nil
}
// The requested size exceeds that of our highest bucket, allocate it directly.
p.usedTotal += uint64(sz)
return p.new(sz), nil
}
// Put returns a byte slice to the right bucket in the pool.
func (p *BucketedBytes) Put(b *[]byte) {
if b == nil {
return
}
sz := cap(*b)
for i, bktSize := range p.sizes {
if sz > bktSize {
continue
}
p.buckets[i].Put(b)
break
}
p.mtx.Lock()
defer p.mtx.Unlock()
// We could assume here that our users will not make the slices larger
// but lets be on the safe side to avoid an underflow of p.usedTotal.
if uint64(sz) >= p.usedTotal {
p.usedTotal = 0
} else {
p.usedTotal -= uint64(sz)
}
}
package memory
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestPool(t *testing.T) {
// put buffer of size 8256 into pool
// request buffer of size 8191
pool := MustNewBucketedBytes(1, 64*1024*1024, 2, 0)
b := make([]byte, 8256)
pool.Put(&b)
b2, err := pool.Get(8191)
require.NoError(t, err)
require.Greater(t, cap(*b2), 8191)
}
It'd be nice to be able to push arrow records directly into a query plan. Recently, I came across this need during testing, think it would be nice for benchmarking operators with a speed of light scan layer, and also believe it could be useful in other scenarios.
To do this, I think we're going to want to put an interface on top of frostdb.DBTableProvider
, where there would be an implementation that would return a normal table through GetTable
, and another one that would return a logicalplan.TableReader
(already an interface) that would be passed into the DB somehow with a reference to records to return when Iterator
is called on it.
Instead of adding the final stage boolean to the physical plan execution, I think this logic should be part of the physical plan planning. i.e. when planning a multi-stage count, instead of planning a count(final=true)
operator, you would just plan a sum()
operator at the final stage.
Originally posted by @asubiotto in #202 (comment)
When the a column value is missing from the granule and we're filtering using RE/NRE matchers there's a chance we can already skip the granule.
For example if we're using {foo=~"bar"}
and the foo dynamic label doesn't exist. This means for regexp matcher we actually just need to run it against empty.
see https://github.com/polarsignals/frostdb/blob/main/table.go#L1162-L1166
I suspect the code require more refactoring as this logic is spread across the codebase and it forces us to compile the regexp multiple times.
May be we could improve that interface to:
type Filter interface {
Eval(r arrow.Record) (*Bitmap, error)
Filter(rg dynparquet.DynamicRowGroup) (bool, error)
PreFilter(min *parquet.Value, max *parquet.Value, bool) (bool,error)
}
While we already attempt to optimize distinct queries by checking if they are dictionary encoded and only contain a single value, that still causes us to read the page, using the ColumnIndex however, we could avoid reading the page altogether.
Right now frostdb only has limited support for aggregation functions. For observability and analytical purpose, more aggregation functions should be supported to cover more use cases. For me, I have the requirement to support aggregations like count
, average
and quantile
.
I opened this as an umbrella issue to track the support to other aggregation functions. For aggregation functions to support, feel free to add your use case. For a good start, probably we can take a look at https://prometheus.io/docs/prometheus/latest/querying/operators/#aggregation-operators and https://duckdb.org/docs/sql/aggregates.
count
aggregate function. PR: #176quantile
aggregate functionaverage
aggregate functionTo allow us to test out the performance of our query paths that utilize block storage without impacting the current read/write path; we should support read only instances of frostDB. Where these can be pointed at LTS and support queries directly.
Currently, a major amount of CPU time is spent in the ingestion path of Parca/arcticdb, and garbage collection.
There are a couple of things we can and should try:
parquet.ColumnBufferCapacity
of a parquet writer to the maximum amount that we may write to the buffer during ingestion. I suspect we are wildly over-allocating when splitting data.parquet.PageBufferSize
. Same reason as above. We can probably have arcticDB auto-tune itself here based on previous write sizes.parquet.Writer
. When done with a writer we can call .Reset
on it and reuse it to write the next buffer. If the above two don't help, this is probably the last resort. This one is also more complicated as we can't reuse writers for different schemas, so we would need to keep a record of the schemas and create a sync.Pool
per schema that we see. I suspect this will eventually have to be done to get GC time under control (we'll need to measure whether the saving in CPU time of GC is worth it, as we'd be spending time on resetting the writers).The projections that are created from the filters aren't correct when filtering by dynamic columns.
As shown in the screenshot below, the column is a label.
prefix column and in this case, a dynamic column should be returned. Not because of its name, but because of some attribute that needs to get propagated into the optimizer.
Currently, once the configured size of data is reached the active-append table-block is swapped out for a new one and the old one is thrown away. We of course want to persist data in some way. Since we already keep the data in parquet format in memory, it would be great to write that out and memory map it.
A datadriven logic testing framework was recently added in #211. Slowly but surely, it would be nice to move our operator tests to this logic testing framework to promote readability and conciseness. #255 does this for the distinct operator. This issue can be closed once the following tests have been moved over:
This set of column definitions:
[]dynparquet.ColumnDefinition{
{
Name: "timestamp",
StorageLayout: parquet.Int(64),
Dynamic: false,
},
{
Name: "state",
StorageLayout: parquet.String(),
Dynamic: false,
},
{
Name: "labels",
StorageLayout: parquet.Encoded(parquet.Optional(parquet.String()), &parquet.RLEDictionary),
Dynamic: true,
},
}
will actually have column indexes:
{labels: 0, state: 1, timestamp: 2}
So, when inserting fields, you must get the column indexes by searching the schema's column slice, rather than based on your own definition.
Documentation on functions and/or examples where this happens would be useful to new contributors.
Aside: what is the reason that column names are sorted on schema creation?
Originally posted by @asubiotto in #202 (comment)
Using a literal in an expression to filter that is different than the schema's column type causes an a panic when the query executes.
For example, using an int literal in to filter a string column
func main() {
var err error
columnstore := arcticdb.New(nil, 10, 512*1024*1024).WithIndexDegree(3)
db, err := columnstore.DB("test_db")
if err != nil {
panic(err)
}
logger := log2.NewLogfmtLogger(log2.NewSyncWriter(os.Stderr))
logger = level.NewFilter(logger, level.AllowDebug())
schema := dynparquet.NewSchema("test_schema",
[]dynparquet.ColumnDefinition{
// HERE Column1 is defined as a string
{
Name: "Column1",
StorageLayout: parquet.Encoded(parquet.String(), &parquet.RLEDictionary),
Dynamic: false,
},
},
[]dynparquet.SortingColumn{
dynparquet.Ascending("Column1"),
})
tableConfig := arcticdb.NewTableConfig(schema)
table, err := db.Table("test_table", tableConfig, logger)
if err != nil {
panic(err)
}
buffer, err := schema.NewBuffer(map[string][]string{})
row := make([]parquet.Value, 0)
message := "hello"
row = append(row, parquet.ValueOf(message).Level(0, 0, 0))
err = buffer.WriteRow(row)
if err != nil {
panic(err)
}
_, err = table.InsertBuffer(buffer)
if err != nil {
panic(err)
}
table.Sync()
queryEngine := query.NewEngine(memory.DefaultAllocator, db.TableProvider())
query := queryEngine.ScanTable("test_table").
Filter(logicalplan.Col("Column1").LT(logicalplan.Literal(10))) // HERE add filter for non string literal
// Execute will panic
query.Execute(context.Background(), func(ar arrow.Record) error {
log.Printf("%v\n", ar)
return nil
})
}
Causes panic
panic: something terrible has happened, this should have errored previously during validation
goroutine 1 [running]:
github.com/polarsignals/arcticdb/query/physicalplan.BinaryScalarOperation({0x1b229b8, 0xc0000acde0}, {0x1b20c60, 0xc0000d27e0}, 0x4)
/Users/albert.lockett2/Development/arcticdb/query/physicalplan/binaryscalarexpr.go:107 +0x1490
github.com/polarsignals/arcticdb/query/physicalplan.BinaryScalarExpr.Eval({0xc0000a87c0, 0x4, {0x1b20c60, 0xc0000d27e0}}, {0x1b235e8, 0xc0000b51a0})
/Users/albert.lockett2/Development/arcticdb/query/physicalplan/binaryscalarexpr.go:56 +0x185
github.com/polarsignals/arcticdb/query/physicalplan.filter({0x1b1b040, 0x1f97408}, {0x1b17cb8, 0xc0000d2800}, {0x1b235e8, 0xc0000b51a0})
/Users/albert.lockett2/Development/arcticdb/query/physicalplan/filter.go:190 +0xad
github.com/polarsignals/arcticdb/query/physicalplan.(*PredicateFilter).Callback(0xc0000b5020, {0x1b235e8, 0xc0000b51a0})
/Users/albert.lockett2/Development/arcticdb/query/physicalplan/filter.go:177 +0xc5
github.com/polarsignals/arcticdb.(*Table).Iterator(0xc00032a080, {0x1b1cd30, 0xc000038048}, {0x1b1b040, 0x1f97408}, {0xc0000a8760, 0x1, 0x1}, {0x1b1e7e0, 0xc0000b4ff0}, ...)
/Users/albert.lockett2/Development/arcticdb/table.go:271 +0x523
github.com/polarsignals/arcticdb/query/physicalplan.(*TableScan).Execute(0xc0000d2820, {0x1b1cd30, 0xc000038048}, {0x1b1b040, 0x1f97408})
/Users/albert.lockett2/Development/arcticdb/query/physicalplan/physicalplan.go:72 +0x265
github.com/polarsignals/arcticdb/query/physicalplan.(*OutputPlan).Execute(0xc0000b06f0, {0x1b1cd30, 0xc000038048}, {0x1b1b040, 0x1f97408}, 0x1adef30)
/Users/albert.lockett2/Development/arcticdb/query/physicalplan/physicalplan.go:58 +0x93
github.com/polarsignals/arcticdb/query.LocalQueryBuilder.Execute({{0x1b1b040, 0x1f97408}, {0xc0000aad00}}, {0x1b1cd30, 0xc000038048}, 0x1adef30)
/Users/albert.lockett2/Development/arcticdb/query/engine.go:115 +0x330
main.main()
Right now we wont filter on anything if the column we're filtering on isn't included in the projections.
So we're unable to perform queries equivalent to select a,b from table where c > 10
and instead require the query to be select a,b,c from table where c > 10
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.