Coder Social home page Coder Social logo

cnes / zcollection Goto Github PK

View Code? Open in Web Editor NEW
12.0 9.0 3.0 655 KB

Python library allowing to manipulate data splited into a collection of groups stored in Zarr format.

Home Page: https://zcollection.readthedocs.io/en/latest/

License: BSD 3-Clause "New" or "Revised" License

Python 100.00%
zarr dask fsspec s3fs partitioning

zcollection's Introduction

ZCollection

This project is a Python library allowing manipulating data partitioned into a collection of Zarr groups.

This collection allows dividing a dataset into several partitions to facilitate acquisitions or updates made from new products. Possible data partitioning is: by date (hour, day, month, etc.) or by sequence.

A collection partitioned by date, with a monthly resolution, may look like on the disk:

collection/
├── year=2022
│    ├── month=01/
│    │    ├── time/
│    │    │    ├── 0.0
│    │    │    ├── .zarray
│    │    │    └── .zattrs
│    │    ├── var1/
│    │    │    ├── 0.0
│    │    │    ├── .zarray
│    │    │    └── .zattrs
│    │    ├── .zattrs
│    │    ├── .zgroup
│    │    └── .zmetadata
│    └── month=02/
│         ├── time/
│         │    ├── 0.0
│         │    ├── .zarray
│         │    └── .zattrs
│         ├── var1/
│         │    ├── 0.0
│         │    ├── .zarray
│         │    └── .zattrs
│         ├── .zattrs
│         ├── .zgroup
│         └── .zmetadata
└── .zcollection

Partition updates can be set to overwrite existing data with new ones or to update them using different strategies.

The Dask library handles the data to scale the treatments quickly.

It is possible to create views on a reference collection, to add and modify variables contained in a reference collection, accessible in reading only.

This library can store data on POSIX, S3, or any other file system supported by the Python library fsspec. Note, however, only POSIX and S3 file systems have been tested.

zcollection's People

Contributors

fbriol avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

zcollection's Issues

Incomplete overlaps with more than one workers

Hi !

It seems that the update() function with depth > 0 (with overlaps) has a different behavior when the computation is parallelized over one worker, or more than one worker.

To be more precise, some partitions that should have overlap values on the left and write of the partitioning axis are missing one of the side. I will try to illustrate this using a simple test case

Test case

I am using the simple use case presented in the gallery. It creates a zcollection in memory with a monthly partitioning

### IMPORTS
from __future__ import annotations

import datetime
import pprint

import dask.distributed
import fsspec
import numpy

import zcollection
import zcollection.tests.data

### ZCOLLECTION CREATION
def create_dataset():
    """Create a dataset to record."""
    generator = zcollection.tests.data.create_test_dataset_with_fillvalue()
    return next(generator)

ds = create_dataset()
ds.to_xarray()

fs = fsspec.filesystem('memory')

cluster = dask.distributed.LocalCluster(processes=False)
client = dask.distributed.Client(cluster)

partition_handler = zcollection.partitioning.Date(('time', ), resolution='M')

collection = zcollection.create_collection('time',
                                           ds,
                                           partition_handler,
                                           '/my_collection',
                                           filesystem=fs)

collection.insert(ds)

I am then defining a callback that simply prints the details about the partition that we are currently updating (partition_info arguments), and the extent of the dataset that is available for the update.

### UPDATE
def callback(zds, partition_info=slice(None)):
    updated_partition = zds["time"].values[partition_info[1]].astype('<M8[D]')
    print(f'Update: [{updated_partition[0]}, {updated_partition[-1]}] - Overlap extent [{zds["time"].values[0]}, {zds["time"].values[-1]}]')
    return dict()

When using only one worker in the local cluster, all the partitions have the proper dataset extent available. Some overlap are missing for the partition covering the most ancient and recent periods respectively, but this is expected. Also note that we have a the first partition twice because it is used by zcollection to infer which fields are updated by our callback.

### ONLY ONE WORKER
cluster.scale(1)
collection.update(callback, selected_variables=["time"], depth=1)
>> Update: [2000-01-01, 2000-01-31] - Overlap extent [2000-01-01T00:00:00.000000, 2000-01-31T00:00:00.000000]
>> Update: [2000-01-01, 2000-01-31] - Overlap extent [2000-01-01T00:00:00.000000, 2000-02-27T00:00:00.000000]
>> Update: [2000-02-03, 2000-02-27] - Overlap extent [2000-01-01T00:00:00.000000, 2000-03-31T00:00:00.000000]
>> Update: [2000-03-01, 2000-03-31] - Overlap extent [2000-02-03T00:00:00.000000, 2000-04-30T00:00:00.000000]
>> Update: [2000-04-03, 2000-04-30] - Overlap extent [2000-03-01T00:00:00.000000, 2000-05-30T00:00:00.000000]
>> Update: [2000-05-03, 2000-05-30] - Overlap extent [2000-04-03T00:00:00.000000, 2000-06-29T00:00:00.000000]
>> Update: [2000-06-02, 2000-06-29] - Overlap extent [2000-05-03T00:00:00.000000, 2000-06-29T00:00:00.000000]

However, when scaling the cluster to 2 workers, some central partitions are missing overlap data : ex. the march period should use february to april, but only uses february to march.

## 2 WORKERS
cluster.scale(2)
collection.update(callback, selected_variables=["time"], depth=1)
>> Update: [2000-01-01, 2000-01-31] - Overlap extent [2000-01-01T00:00:00.000000, 2000-01-31T00:00:00.000000]
>> Update: [2000-04-03, 2000-04-30] - Overlap extent [2000-04-03T00:00:00.000000, 2000-05-30T00:00:00.000000] --> WRONG
>> Update: [2000-01-01, 2000-01-31] - Overlap extent [2000-01-01T00:00:00.000000, 2000-02-27T00:00:00.000000]
>> Update: [2000-05-03, 2000-05-30] - Overlap extent [2000-04-03T00:00:00.000000, 2000-06-29T00:00:00.000000]
>> Update: [2000-02-03, 2000-02-27] - Overlap extent [2000-01-01T00:00:00.000000, 2000-03-31T00:00:00.000000] 
>> Update: [2000-06-02, 2000-06-29] - Overlap extent [2000-05-03T00:00:00.000000, 2000-06-29T00:00:00.000000]
>> Update: [2000-03-01, 2000-03-31] - Overlap extent [2000-02-03T00:00:00.000000, 2000-03-31T00:00:00.000000] --> WRONG

It feels like the entire update has been split between the workers and that they do not share their partitions. I did not test with more workers but I expect more errors the more workers there are.

zcollection_version: 2023.3.2

collection.insert() does not work if cluster is not instanciated

Hello,

While using the tutorial, I stumbled over a strange behavior.

If we replay the tutorial, but removing the cluster instanciation, the zcollection stays empty after the insert() step

Code to reproduce

from __future__ import annotations

import datetime
import pprint

import dask.distributed
import fsspec
import numpy

import zcollection
import zcollection.tests.data

def create_dataset():
    """Create a dataset to record."""
    generator = zcollection.tests.data.create_test_dataset_with_fillvalue()
    return next(generator)


ds = create_dataset()
ds.to_xarray()

fs = fsspec.filesystem('memory')

# Here, contrary to the tutorial, we do not instanciate a client
# cluster = dask.distributed.LocalCluster(processes=False)
# client = dask.distributed.Client(cluster)

partition_handler = zcollection.partitioning.Date(('time', ), resolution='M')

collection = zcollection.create_collection('time',
                                           ds,
                                           partition_handler,
                                           '/my_collection',
                                           filesystem=fs)

collection.insert(ds)
print(collection.load())
>> <Nothing>

Expected behavior

I was expected the insert() step to be effective even if we do not instanciate a cluster.

...
collection.insert(ds)
print(collection.load())
>> <zcollection.dataset.Dataset>
>>  Dimensions: ('num_lines: 61', 'num_pixels: 25')
>>Data variables:
>>    time    (num_lines  datetime64[us]: dask.array<chunksize=(11,)>
>>    var1    (num_lines, num_pixels  float64: dask.array<chunksize=(11, 25)>
>>    var2    (num_lines, num_pixels  float64: dask.array<chunksize=(11, 25)>
>>  Attributes:
>>    attr   : 1

zcollection version
2023.3.2

Add a check over fill_value at variable creation

Zcollection relies on zarr for storage. Zarr does not handle invalid data in a numpy array so zcollection replaces the invalid values by a given fill_value prior to storing the zarr array. When reading the zarr array, it masks the invalid values using :
np.ma.masked_equal(data, fill_value)

This will give trouble to fresh users that are not familiar enough with how zarr and zcollection work : if the zarr array include filters, some values might not be exactly encoded (ex. high float32 values with a fixedscaleoffset filter).
The fill_value is usually taken as the biggest value possible to keep the largest dynamic range possible for valid data. However, if this fill_value is big, it has a high risk of not being properly encoded and this will give a badly masked array when reading back the data.

During a variable creation, I suggest that zcollection checks if the given fill value can be properly retrieved :

import zarr
from numcodecs import Blosc
import numpy as np
for fill_value in [214748.3647, 214748.34]:
    array = zarr.array(np.array([fill_value], dtype=np.float32),
                       filters=[
                           zarr.codecs.FixedScaleOffset(scale=10000, offset=0, dtype='<f4', astype='<i4'),
                           zarr.codecs.Delta(dtype='i4')],
                       compressor=Blosc(cname='zstd', clevel=5, shuffle=Blosc.BITSHUFFLE),)
    if not np.all(array[...] == fill_value):
        print(f"Fill value {fill_value} KO")
    else:
        print(f"Fill value {fill_value} OK")

The user should be warned in case it has badly configured his fill_values

conda-forge package

I'm working on the conda-forge packaging, here are some feedbacks:

  • There is a missing dependency in setup.cfg : pyarrow (used in zcollection.indexing.abc).
  • Using setuptools_scm to dynamically define the version from git tags forces us to use pypi as source. The package cannot be build from a github archive which does not contain .git information.
  • I think that not having the version number writen in pyproject.toml forces setuptools to import zcollection during the packaging build step which forces us to add all the run dependencies to the host part of the meta.yaml.

The conda-forge PR is here.
@fbriol : Could you post a comment on this PR in order for me to check the following item of the check-list:

  • GitHub users listed in the maintainer section have posted a comment confirming they are willing to be listed there.

selected_variables must include output fields of update() with depth > 0

Hi,

There is a discrepancy with the selected_variable parameter used in the update() function:

  • With depth not set (no overlap), the selected_variables parameter does not need to include the updated fields
  • With depth set to > 0 (overlap), the selected_variables parameter must include the updated fields. Not setting the output fields in the selected_variables will trigger a KeyError

I think it would be better to have the same behavior in these two cases. Best case is not having the output fields in selected_variables as it can reduce the number of reading operations of the zcollection.

Anyway, I have set up a simple case to illustrate the problem :

### IMPORTS
from __future__ import annotations

import datetime
import pprint

import dask.distributed
import fsspec
import numpy

import zcollection
import zcollection.tests.data

print(zcollection.__version__)
>> 2023.3.2

### ZCOLLECTION CREATION
def create_dataset():
    """Create a dataset to record."""
    generator = zcollection.tests.data.create_test_dataset_with_fillvalue()
    return next(generator)


ds = create_dataset()
ds.to_xarray()

fs = fsspec.filesystem('memory')

cluster = dask.distributed.LocalCluster(processes=False)
client = dask.distributed.Client(cluster)

partition_handler = zcollection.partitioning.Date(('time', ), resolution='M')

collection = zcollection.create_collection('time',
                                           ds,
                                           partition_handler,
                                           '/my_collection',
                                           filesystem=fs)

collection.insert(ds)

### UPDATE
def callback(zds):
    new = 2*zds["var2"].values
    return dict(var1=new)

print(collection.update(callback, selected_variables=["var2"]))
>> None

collection.update(callback, selected_variables=["var2"], depth=1)
>> 2023-05-03 09:57:11,122 - distributed.worker - WARNING - Compute Failed
>> Key:       callback-6791e733b811dfee47c879a54d34aad9
>> Function:  wrap_function
>> args:      (('/my_collection/year=2000/month=01', '/my_collection/year=2000/month=02', '/my_collection/year=2000/month=03', '/my_collection/year=2000/month=04', '/my_collection/year=2000/month=05', '/my_collection/year=2000/month=06'))
>> kwargs:    {}
>> Exception: "KeyError('var1')"

In the update function, partition_size arguments is not behaving as expected

According to the documentation
image

A partition_size set to 1 means that we map the update function over each partition. However, it seems not to be the case. The following example uses a local cluster to parallelize a dummy function. Without the partition_size argument, it is properly sent to 6 workers. However, when setting it to 1, it is run in sequential over only one worker :

from dask.distributed import LocalCluster
import time
import zcollection
import zcollection.tests.data
import fsspec

ds = next(zcollection.tests.data.create_test_dataset_with_fillvalue())
fs = fsspec.filesystem('memory')
cluster = LocalCluster(processes=False)
client = cluster.get_client()

partition_handler = zcollection.partitioning.Date(('time', ), resolution='M')
collection = zcollection.create_collection('time',
                                           ds,
                                           partition_handler,
                                           '/my_collection',
                                           filesystem=fs)
collection.insert(ds)

def dummy(zds):
    time.sleep(1)
    return dict()

# The computation is properly parallalized
cluster.scale(6)
client.wait_for_workers(6)
n_partitions = len(list(collection.partitions()))
n_workers = len(cluster.workers)
print(f"Number of partitions: {n_partitions}")
print(f"Number of workers: {n_workers}")
%time collection.update(dummy, depth=1)

>> Number of partitions: 6
>> Number of workers: 6
>> CPU times: user 389 ms, sys: 65.3 ms, total: 455 ms
>> Wall time: 2.06 s

# The computation is done in parallel
cluster.scale(6)
client.wait_for_workers(6)
n_partitions = len(list(collection.partitions()))
n_workers = len(cluster.workers)
print(f"Number of partitions: {n_partitions}")
print(f"Number of workers: {n_workers}")
%time collection.update(dummy, depth=1, partition_size=1)

>> Number of partitions: 6
>> Number of workers: 6
>> CPU times: user 1.29 s, sys: 159 ms, total: 1.45 s
>> Wall time: 7.11 s

It seems that the partition_size is instead used as the number of batches instead of the number of partitions in each batch :
Link batch sequence

What is the intended use of the partition_size argument ?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.