Coder Social home page Coder Social logo

merantix-momentum / squirrel-core Goto Github PK

View Code? Open in Web Editor NEW
279.0 16.0 8.0 3.18 MB

A Python library that enables ML teams to share, load, and transform data in a collaborative, flexible, and efficient way :chestnut:

Home Page: https://squirrel-core.readthedocs.io/

License: Apache License 2.0

Dockerfile 0.12% Python 99.88%
python ml machine-learning data-science computer-vision cv nlp natural-language-processing ai pytorch

squirrel-core's People

Contributors

ademfr avatar adrianloy avatar alirezasohofi avatar alparibal avatar axkoenig avatar jotterbach avatar kai-tub avatar lisacoiffard avatar maik-schmidt avatar maxschambach avatar mg515 avatar pzdkn avatar sebs94 avatar thomaswollmann avatar tiansuyu avatar winfried-ripken avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

squirrel-core's Issues

[FEATURE] Allow configuring compression mode in MessagepackSerializer

Hey,

Thank you for working on this library!
I think it has a huge potential, especially for dataset creators to provide their dataset in an optimized deep-learning format that is well suited for distribution.
The performance of the MessagepackSerializer is amazing and being able to distribute subsets of the dataset (shards) is something I never wanted but really want to utilize in the future!

I have played around with some "MessagepackSerializer" configurations and according to some internal benchmarks, it would be helpful to allow the user to configure the compression algorithm.

@staticmethod
def serialize_shard_to_file(
shard: ShardType,
fp: str,
fs: t.Optional[AbstractFileSystem] = None,
mode: str = "wb",
**open_kwargs,
) -> None:
"""Writes a shard to a file by only writing and serializing the values of its samples.
Args:
shard (ShardType): Shard to serialize and write to the file.
fp (str): Path to the file to write.
fs (AbstractFileSystem, optional): Filesystem to use for opening the file. If not provided, `fsspec` will
pick a filesystem suitable for `fp`. Defaults to None.
mode (str): IO mode to use. Passed to :py:meth:`fs.open`. Defaults to "wb".
**open_kwargs: Other keyword arguments passed to :py:meth:`fs.open`. `open_kwargs` will always have
`compression="gzip"` set.
"""
open_kwargs["mode"] = mode
open_kwargs["compression"] = "gzip"

Currently, the compression mode is "locked" to gzip.
I assume the main reason is due to the wide usage of gzip and to keep the code 'simple' as it makes it easy in the deserializer to know that the gzip compression was used:

def deserialize_shard_from_file(
fp: str,
fs: t.Optional[AbstractFileSystem] = None,
mode: str = "rb",
unpacker_kwargs: t.Optional[t.Dict] = None,
**open_kwargs,
) -> t.Iterable[t.Any]:
"""Reads a shard from file and returns an iterable over its samples.
Args:
fp (str): Path to the file to write.
fs (AbstractFileSystem, optional): Filesystem to use for opening the file. If not provided, `fsspec` will
pick a filesystem suitable for `fp`. Defaults to None.
mode (str): IO mode to use. Passed to :py:meth:`fs.open`. Defaults to "rb".
unpacker_kwargs (Dict, optional): Kwargs to be passed to `msgpack.Unpacker()`.
If `use_list` not given, it will be set to False.
**open_kwargs: Other keyword arguments passed to :py:meth:`fs.open`. `open_kwargs` will always have
`compression="gzip"` set.
Yields:
(Any) Values of the samples of the shard.
"""
open_kwargs["mode"] = mode
open_kwargs["compression"] = "gzip"

Here I would like to note that given the extension, fsspec (default) could also infer the compression by inspecting the filename suffix. But I can see how this might cause problems if somebody would like to switch out fsspec with something else (although I would have no idea with what and why :D )

Other spots within the codebase that are coupled to this compression assumption are the methods from the SquirrelStore:

class SquirrelStore(FilesystemStore):
"""FilesystemStore that persist samples (Dict objects) or shards (i.e. list of samples)."""
def __init__(self, url: str, serializer: SquirrelSerializer, clean: bool = False, **storage_options) -> None:
"""Initializes SquirrelStore.
Args:
url (str): Path to the root directory. If this path does not exist, it will be created.
serializer (SquirrelSerializer): Serializer that is used to serialize data before persisting (see
:py:meth:`set`) and to deserialize data after reading (see :py:meth:`get`). If not specified, data will
not be (de)serialized. Defaults to None.
clean (bool): If true, all files in the store will be removed recursively
**storage_options: Keyword arguments passed to filesystem initializer.
"""
super().__init__(url=url, serializer=serializer, clean=clean, **storage_options)
def get(self, key: str, **kwargs) -> t.Iterator[SampleType]:
"""Yields the item with the given key.
If the store has a serializer, data read from the file will be deserialized.
Args:
key (str): Key corresponding to the item to retrieve.
**kwargs: Keyword arguments forwarded to :py:meth:`self.serializer.deserialize_shard_from_file`.
Yields:
(Any) Item with the given key.
"""
fp = f"{self.url}/{key}.gz"
yield from self.serializer.deserialize_shard_from_file(fp, fs=self.fs, **kwargs)
def set(self, value: t.Union[SampleType, ShardType], key: t.Optional[str] = None, **kwargs) -> None:
"""Persists a shard or sample with the given key.
Data item will be serialized before writing to a file.
Args:
value (Any): Shard or sample to be persisted. If `value` is a sample (i.e. not a list), it will be wrapped
around with a list before persisting.
key (Optional[str]): Optional key corresponding to the item to persist.
**kwargs: Keyword arguments forwarded to :py:meth:`self.serializer.serialize_shard_to_file`.
"""
if not isinstance(value, t.List):
value = [value]
if key is None:
key = get_random_key()
fp = f"{self.url}/{key}.gz"
self.serializer.serialize_shard_to_file(value, fp, fs=self.fs, **kwargs)
def keys(self, nested: bool = False, **kwargs) -> t.Iterator[str]:
"""Yields all shard keys in the store."""
for k in super().keys(nested=nested, **kwargs):
# we only set .gz files, so we only read .gz files
if k.endswith(".gz"):
yield k.rsplit(".gz", 1)[0]

Or to show the significant parts:

In my internal benchmarks, I was able to greatly speed up the data loading by simply using no compression at all (None).
Although I am fully aware that the correct compression mode heavily depends on the specific hardware/use case.
But even in a network limited domain, I can see reasons to then prefer xz instead due to its better compression ratio and relatively similar decompression speed to gzip.

IMHO, I think it should be ok to not store any suffix at all for the squirrel store.
If I/a user looks inside of the squirrel store URL I think it is not mandatory to show what compression algorithm was used.
The user could/should use the designated driver/metadata that comes bundled with the dataset and let the driver handle the correct decompression.

If you don't agree I still think the gz extension doesn't have to be 'hardcoded' into these functions.
This is actually something that confused me when I was looking at the internals of the code base.
So instead, we could use something like:

comp = kwargs.get("compression", "gzip")
comp_to_ext_dict[comp] # just to show the concept

With these modifications, it should be possible to utilize different compression modes and make them easily configurable.
I would be very happy to create a PR and contribute to this project!

[BUG] Composable `shuffle` is not guaranteeing repeatability when using random.Random instances

I'm using squirrel main, directly from source. I'm trying to seed the shuffle buffer of a Composable using an instantiated random.Random instance and expect the same output over several repeated instances. However this is not correct due to a bug in the squirrel.iterstream.iterators.py::get_random_range functionality. There an instantiated rng instance will be ignored if the seed is None.

Below I attach a test file which exposed the expected behavior. As of now test_1 and test_4 fail, while test_2 and test_3 are passing.

from squirrel.iterstream import IterableSource
import random

def get_itersource():
    data = list(range(100))
    source = IterableSource(data)
    return source


def test_1():

    iter_0 = get_itersource().shuffle(size=40, rng=random.Random(42)).collect()
    iter_1 = get_itersource().shuffle(size=40, rng=random.Random(42)).collect()

    assert iter_0 == iter_1, "test_1 failed"

    print("test_1 successful")

def test_2():

    iter_0 = get_itersource().shuffle(size=40, seed=42).collect()
    iter_1 = get_itersource().shuffle(size=40, seed=42).collect()

    assert iter_0 == iter_1, "test_2 failed"
    print("test_2 successful")


def test_3():

    iter_0 = get_itersource().shuffle(size=40, rng=random.Random(21), seed=42).collect()
    iter_1 = get_itersource().shuffle(size=40, rng=random.Random(21), seed=42).collect()

    assert iter_0 == iter_1, "test_3 failed"
    print("test_3 successful")


def test_4():

    iter_0 = get_itersource().shuffle(size=40, rng=random.Random(21)).collect()
    iter_1 = get_itersource().shuffle(size=40, seed=21).collect()

    assert iter_0 == iter_1, "test_4 failed"
    print("test_4 successful")

[BUG] When forwarding kwargs to `shuffle` the `None` default breaks

Using squirrel main directly from github.

When using shuffle inside a nested function/method, the forwarding of the shuffle defaults into the function does not work. According to the doc-strings size (int, optional): Buffer size for shuffling. Defaults to 1000. Skip the shuffle step if `size < 2`. so I would assume that feeding in None would create a default or similar.

Test file below.

import pytest
from squirrel.iterstream import IterableSource

def get_itersource():
    data = list(range(100))
    source = IterableSource(data)
    return source

@pytest.mark.parametrize('size', [None, 1, 200])
def test_1(size):

    iter = get_itersource().shuffle(size=size).collect()
    assert sorted(iter) == list(range(100))

Zip multiple iterables as a source

The use case: we have a store with samples, and 1..n other stores that each contain only features. These stores must have the same keys and same number of samples per shard.

IterableZipSource makes it possible to zip items from several iterables and use that as a source. For instance:

it1 = MessagepackDriver(url1).get_iter()
it2 = MessagepackDriver(url2).get_iter()

it3 = IterableZipSource(iterables=[it1, it2]).collect()

[BUG] IterableSamplerSource is not picklable

When reporting a bug, please be sure to include the following:

  • A descriptive title
  • An isolated way to reproduce the behavior (example: GitHub repository with code isolated to the issue that anyone can clone to observe the problem)

Add the following test to test/test_iterstream/test_stream.py

@pytest.mark.parametrize("probs", [[0.4, 0.6], None])
def test_iterablesamplersource_is_picklable(probs: t.Optional[t.List[float]]) -> None:
    """Smoke test IterableSamplerSource"""
    res_1 = IterableSource([0, 1, 2, 3])
    res_2 = IterableSource([4, 5, 6])
    src = IterableSamplerSource([res_1, res_2], probs=probs)
    import pickle

    pkl = pickle.dumps(src)
    src_2 = pickle.loads(pkl)

    assert src.collect() == src_2.collect()

Run: poetry run pytest -s test/test_iterstream/test_stream.py::test_iterablesamplersource_is_picklable

  • What version of squirrel-datasets and squirrel you're using, and the platform(s) you're running it on
    commit-sha: 86f61c0

  • What packages or other dependencies you're using
    no additional packages

  • The behavior you expect to see and the actual behavior

Expected: Test passes
Actual: Fails with exception: TypeError: cannot pickle 'generator' object

image

[FEATURE] Make `get_iter` method documentation about `max_workers` more explicit

Hey, I've stumbled across a potentially easy-to-misunderstand part of the MapDriver.get_iter documentation:

def get_iter(
self,
keys_iterable: Optional[Iterable] = None,
shuffle_key_buffer: int = 1,
key_hooks: Optional[Iterable[Union[Callable, Type[Composable], functools.partial]]] = None,
max_workers: Optional[int] = None,
prefetch_buffer: int = 10,
shuffle_item_buffer: int = 1,
flatten: bool = False,
keys_kwargs: Optional[Dict] = None,
get_kwargs: Optional[Dict] = None,
key_shuffle_kwargs: Optional[Dict] = None,
item_shuffle_kwargs: Optional[Dict] = None,
) -> Composable:
"""Returns an iterable of items in the form of a :py:class:`squirrel.iterstream.Composable`, which allows
various stream manipulation functionalities.
Items are fetched using the :py:meth:`get` method. The returned :py:class:`Composable` iterates over the items
in the order of the keys returned by the :py:meth:`keys` method.
Args:
keys_iterable (Iterable, optional): If provided, only the keys in `keys_iterable` will be used to fetch
items. If not provided, all keys in the store are used.
shuffle_key_buffer (int): Size of the buffer used to shuffle keys.
key_hooks (Iterable[Iterable[Union[Callable, Type[Composable], functools.partial]]], optional): Hooks
to apply to keys before fetching the items. It is an Iterable any of these objects:
1) subclass of :py:meth:`~squirrel.iterstream.Composable`: in this case, `.compose(hook, **kw)`
will be applied to the stream
2) A Callable: `.to(hook, **kw)` will be applied to the stream
3) A partial function: the three attributes `args`, `keywords` and `func` will be retrieved, and
depending on whether `func` is a subclass of :py:meth:`~squirrel.iterstream.Composable` or a
`Callable`, one of the above cases will happen, with the only difference that arguments are passed
too. This is useful for passing arguments.
max_workers (int, Optional): If larger than 1 or None, :py:meth:`~squirrel.iterstream.Composable.async_map`
is called to fetch multiple items simultaneously and `max_workers` refers to the maximum number of
workers in the ThreadPoolExecutor used by `async_map`.
Otherwise, :py:meth:`~squirrel.iterstream.Composable.map` is called and `max_workers` is not used.
Defaults to None.
prefetch_buffer (int): Size of the buffer used for prefetching items if `async_map` is used. See
`max_workers` for more details. Please be aware of the memory footprint when setting this parameter.
shuffle_item_buffer (int): Size of the buffer used to shuffle items after being fetched. Please be aware of
the memory footprint when setting this parameter.
flatten (bool): Whether to flatten the returned iterable. Defaults to False.
keys_kwargs (Dict, optional): Keyword arguments passed to :py:meth:`keys` when getting the keys in the
store. Not used if `keys_iterable` is provided. Defaults to None.
get_kwargs (Dict, optional): Keyword arguments passed to :py:meth:`get` when fetching items. Defaults to
None.
key_shuffle_kwargs (Dict, optional): Keyword arguments passed to :py:meth:`shuffle` when shuffling keys.
Defaults to None. Can be useful to e.g. set the seed etc.
item_shuffle_kwargs (Dict, optional): Keyword arguments passed to :py:meth:`shuffle` when shuffling items.
Defaults to None. Can be useful to e.g. set the seed etc.
Returns:
(squirrel.iterstream.Composable) Iterable over the items in the store.
"""
keys_kwargs = {} if keys_kwargs is None else keys_kwargs
get_kwargs = {} if get_kwargs is None else get_kwargs
key_shuffle_kwargs = {} if key_shuffle_kwargs is None else key_shuffle_kwargs
item_shuffle_kwargs = {} if item_shuffle_kwargs is None else item_shuffle_kwargs
keys_it = keys_iterable if keys_iterable is not None else self.keys(**keys_kwargs)
it = IterableSource(keys_it).shuffle(size=shuffle_key_buffer, **key_shuffle_kwargs)
if key_hooks:
for hook in key_hooks:
arg = []
kwarg = {}
f = hook
if isinstance(hook, partial):
arg = hook.args
kwarg = hook.keywords
f = hook.func
if isclass(f) and issubclass(f, Composable):
it = it.compose(f, *arg, **kwarg)
elif isinstance(f, Callable):
it = it.to(f, *arg, **kwarg)
else:
raise ValueError(
f"wrong argument for hook {hook}, it should be a Callable, partial function, or a subclass "
f"of Composable"
)
map_fn = partial(self.get, **get_kwargs)
_map = (
it.map(map_fn)
if max_workers is not None and max_workers <= 1
else it.async_map(map_fn, prefetch_buffer, max_workers)
)

The documentation of max_workers states that by default None will be used and also mentions that this will cause async_map to be called but I missed these parts of the documentation and was surprised to see that so many threads were allocated.

I am/was not too familiar with the ThreadPoolExecutor interface and find it somewhat surprising that None equals numer_of_processors x 5 according to the ThreadPoolExecutor definition.
Maybe it would be helpful to explicitly state that by default ThreadPoolExecutor will be used with so many threads?
The documentation string reads a bit unintuitive as the starts out that max_worker defines how many items are fetched simultaneously with max_worker and then continues to state that otherwise map is used.
From that perspective, max_workers=None doesn't sound like it should be using any threads at all.
Without knowing the default values of ThreadPoolExecutor I would make it more explicit that to disable threading one has to set max_workers=0/1 and that by default many threads are used.

I am happy to add a PR with my suggested doc-string update if you agree! :)

[BUG] TorchIterable raises `PyTorchSplittingException` unexpectedly when source is picklable IterableSamplerSource

When reporting a bug, please be sure to include the following:

  • A descriptive title
  • An isolated way to reproduce the behavior (example: GitHub repository with code isolated to the issue that anyone can clone to observe the problem)

This behavior is hard to reproduce with the current squirrel-version and requires #163 to be merged. However, let me try to describe the issue:

  • Create two (or) more composables that contain split_by_worker_pytorch and/or split_by_rank_pytorch. Let's call them src_0 and src_1
  • Construct an IterableSamplerSource samp_src (when it is picklable) using src_0 and src_1.
  • Construct a pytorch dataloader dl.

A pseudocode would look like:

src_0 = IterableSource(samples_0).split_by_worker_pytorch()
src_1 = IterableSource(samples_0).split_by_worker_pytorch()

samp_src = IterableSamplerSource([src_0, src_1])

dl = torch.data.utils.Dataloader(samp_src.batched(batch_size).to_torch_iterable())

for b in dl:
   ....
  • What version of squirrel-datasets and squirrel you're using, and the platform(s) you're running it on
  • What packages or other dependencies you're using
    No additional packages
  • The behavior you expect to see and the actual behavior

Expected: The for loop finishes successfully
Actual: Entering the for loop raises a PyTorchSplittingException

Current hypothesis:

The TorchIterable recursively checks the Composable source. It the source is a List[Composable] or simply an iterator the logic breaks and it will not find a split by rank / worker in the ancestors of the iterstream. Hence the Exception raises.

I tested this hypothesis locally on our internal tools. Disabling the _in_multi_rank_env and _in_multi_worker_env checks result in correct behavior of the iterstream.

[BUG] Rendering errors in docs

https://squirrel-core.readthedocs.io/en/latest/advanced/store.html mermaid figure raise a syntax error.

When reporting a bug, please be sure to include the following:

  • A descriptive title
  • An isolated way to reproduce the behavior (example: GitHub repository with code isolated to the issue that anyone can clone to observe the problem)
  • What version of squirrel-datasets and squirrel you're using, and the platform(s) you're running it on
  • What packages or other dependencies you're using
  • The behavior you expect to see and the actual behavior

[FEATURE] Multiplexing functionality for dataset composables

The current functionality of IterableSamplerSource is limited to subsampling datasets. However, there are many cases where you want to sub- and supersample a dataset mixture. A solution for this is to include a new sampler that allows multiplexing of datasets of vastly different sizes.

When you open an issue for a feature request, please add as much detail as possible:

  • A descriptive title
  • A description of the problem you're trying to solve, including why you think this is a problem
  • An overview of the suggested solution
  • If the feature changes current behavior, reasons why your solution is better

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.