Coder Social home page Coder Social logo

dask-geopandas's Introduction

dask-geopandas Conda Version pypi Documentation Status Gitter

Parallel GeoPandas with Dask

Dask-GeoPandas is a project merging the geospatial capabilities of GeoPandas and scalability of Dask. GeoPandas is an open source project designed to make working with geospatial data in Python easier. GeoPandas extends the datatypes used by pandas to allow spatial operations on geometric types. Dask provides advanced parallelism and distributed out-of-core computation with a dask.dataframe module designed to scale pandas. Since GeoPandas is an extension to the pandas DataFrame, the same way Dask scales pandas can also be applied to GeoPandas.

This project is a bridge between Dask and GeoPandas and offers geospatial capabilities of GeoPandas backed by Dask.

Documentation

See the documentation on https://dask-geopandas.readthedocs.io/en/latest/

Installation

This package depends on Shapely, GeoPandas and Dask.

One way to install all required dependencies is to use the conda package manager to create a new environment:

conda create -n geo_env
conda activate geo_env
conda config --env --add channels conda-forge
conda config --env --set channel_priority strict
conda install dask-geopandas

Example

Given a GeoPandas dataframe

import geopandas
df = geopandas.read_file('...')

We can repartition it into a Dask-GeoPandas dataframe:

import dask_geopandas
ddf = dask_geopandas.from_geopandas(df, npartitions=4)

The familiar spatial attributes and methods of GeoPandas are also available and will be computed in parallel:

ddf.geometry.area.compute()
ddf.within(polygon)

dask-geopandas's People

Contributors

bernardpazio avatar dahnj avatar dependabot[bot] avatar ebkurtz avatar fbunt avatar gadomski avatar ian-r-rose avatar j-bennet avatar jorisvandenbossche avatar jrbourbeau avatar jsignell avatar jtbaker avatar jtmiclat avatar martinfleis avatar norlandrhagen avatar raybellwaves avatar richardscottoz avatar slumnitz avatar tastatham avatar the-matt-morris avatar tomaugspurger avatar v2thegreat avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dask-geopandas's Issues

Not loading entire dataset into memory to use, is it an option?

From reading the documentation it seems you have to convert a geodataframe object into the dask-geopandas object. If you would like to use a csv or other file, this would require loading the entire dataset into memory, correct? Is there a way to be more memory efficient when loading in datasets? I originally wanted to use dask for this benefit.

ENH: Implement `spatial_shuffle`

We should have a top-level method on dask_geopandas.GeoDataFrame and dask_geopandas.GeoSeries allowing to shuffle data into spatially coherent partitions. (xref #8)

Let's assume we have hilbert_distance and geohash functions based on which we can spatially repartition gdf. I would then like to have a single method (spatial_shuffle?) that can consume both.

# using hilbert
ddf = ddf.spatial_shuffle('hilbert')

# using geohash
ddf = ddf.spatial_shuffle('geohash')

This API can be extended by other methods without the necessity to have two methods for each algorithm (esp. since the repartition_ methods will be largely the same).

Originally posted by @martinfleis in #71 (comment)

note: moving to a separate specific issue to keep track of GSoC.

ENH: add Parquet/Feather IO

Adding read_parquet/read_feather and to_parquet/to_feather functions, so we can work from large data from disk (and not just from existing geopandas.GeoDataFrames).

An initial version could be a relatively simple wrapper of the dask + geopandas functionalities, I think.

(and later we should look into spatial partitioned datasets, but that first requires spatial partitioned dataframes)

Announcement: workshop on scaling geospatial vector data during Dask Summit (Thursday May 20th)

(Mis)using our issue tracker here for a moment, but I thought this was the easiest way to notify some people who have been trying out dask-geopandas / reporting issues / doing fixes, as this could potentially be interesting for you: @RichardScottOZ, @gansanay, @bilelomrani1, @njanakiev, @Rhiyo, @daviddemeij, @Guts, @deeplook

During the Dask Summit, we have a 2-hour workshop scheduled about scaling geospatial vector data on Thursday May 20th at 11-13:00 UTC (https://summit.dask.org/schedule/presentation/22/scaling-geospatial-vector-data/). All welcome to join!

See also geopandas/community#4 for more details.

point_from_xy only gets applied to first partition

In certain situations, the dask_geopandas.points_from_xy only gets applied to the first partition. I am not sure when this happens exactly as I also had cases when this error didn't seem to occur. But below I have a minimal example that causes this problem.

import pandas as pd
import dask_geopandas
import dask.dataframe as dd
import numpy as np

N = 9
df = pd.DataFrame({'x': np.random.randn(N), 'y': np.random.randn(N)})
ddf = dd.from_pandas(df, npartitions=3)
ddf["geometry"] = dask_geopandas.points_from_xy(ddf, "x", "y")
ddf = dask_geopandas.from_dask_dataframe(ddf)
ddf.compute()

image

Missing real example

I'm trying to evaluate this package by writing a little example, but I'm running into some errors below. I try to write a small small example, to read a simple CSV file with lat/lon columns, add a geometry column and turn this into a Dask-GeoDateFrame, but I'm running into errors like these:

  • AttributeError: 'Series' object has no attribute 'map_partitions'
  • AttributeError: 'DataFrame' object has no attribute 'geometry'
import dask_geopandas
import dask.dataframe as dd
import pandas as pd
import geopandas as gd

df = pd.read_csv('airport_volume_airport_locations.csv')

# ok
gdf = gd.GeoDataFrame(
    df, geometry=gd.points_from_xy(df.Airport1Latitude, df.Airport1Longitude)
)

ddf = dask_geopandas.from_geopandas(df, npartitions=4)
# raises AttributeError: 'DataFrame' object has no attribute 'set_geometry'
ddf.set_geometry(
    dask_geopandas.points_from_xy(ddf, "Airport1Latitude", "Airport1Longitude")
)

ENH: Implement `morton_distance`

@tastatham has an implementation of Morton distance mirroring the one we have for Hilbert distance (#70).

The idea is to push it and have it as another repartitioning method. It provides a bit worse result but can be significantly faster in some cases (@tastatham will also do the performance comparison).

ENH: preserve spatial partitioning information in more methods

We already preserve the spatial partitioning information (spatial_partitions attribute) in several places (eg when selecting a subset of the columns in __getitem__, in the boundary attribute, with a _propagate_spatial_partitions helper method).
But there are more places where it could either be preserved as is, or preserved in a slightly modified form.

Methods where it can be preserved as is:

  • representative point
  • explode
  • cx

Methods where it might be relatively straightforward to preserve it in a slightly modified form:

  • buffer -> also buffering the spatial partitions is enough?
  • convex_hull -> also convex_hull of spatial partitions?
  • envelope -> also envelope of spatial partitions?
  • affine transformations?

How to use set_geometry from Dask DF and export as geopackage?

First of all, thanks for this work 👍.

Then, I'm trying to convert a Dask DataFrame into a GeoPandas, setting geometry from 2 columns (x,y) and I think there is a problem in the README instructions:

import dask.dataframe as dd
import dask_geopandas

df = dd.read_csv('...')

df = df.set_geometry(
    dask_geopandas.points_from_xy(df, 'latitude', 'longitude')
)

In that case, df is a dask dataframe and doesn't have attribute/method like set_geometry, raising a : AttributeError: 'DataFrame' object has no attribute 'set_geometry.

Digging into the source code, I've finally did:

geodf = dask_geopandas.from_dask_dataframe(df)

geodf = geodf.set_geometry(
    dask_geopandas.points_from_xy(
            df=geodf,
            x=geodf.x,
            y=geodf.y,
        )
)

But then, I'm not unable to export it as a geopackage. Am I missing something?

ENH: repartition based on an attribute

Assuming you have an attribute in the GeoDataFrame that describes some form of spatial extent or entity (eg the country name, the zip code, district, ..., or a calculation such as a geohash), this column could also be used to repartition the dataframe spatially.

Dask already somewhat supports this with the set_index and repartition methods. But in practice they don't necessarily do exactly what we want, and a helper function that uses those methods on the hood will probably be useful.

In my sjoin demo notebook, I repartitioned a GeoDataFrame based on a geohash column, i.e. a column with unique values and I wanted a new partition for each value in this column.
In ended up doing something like:

df_points_sorted = df_points.set_index("geohash1").sort_index()
n_partitions = df_points.index.nunique()
ddf_points_sorted = dask_geopandas.from_geopandas(df_points_sorted, npartitions=n_partitions)
ddf_points_sorted = ddf_points_sorted.repartition(
    divisions=('0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'b', 'c', 'd', 'e', 'f', 'g', 'g')).persist()

So dask sorts the input on the index and determines partition divisions on that in from_(geo)pandas (or if you already start with a dask dataframe, set_index will do the same). However, that tries to create more or less equal size partitions, so to ensure to actually have one partition per unique value of the attribute, I repartitioned the result again with the partition bounds explicitly given to repartition(..) (this are the unique values in the "geohash1" column). This second step is relatively cheap since it's only changing some start/stop slices, and not actually shuffling data.

I suppose we could write a custom version (maybe based on how those methods in dask are implemented) that makes this a bit simpler (e.g. we also don't need to have the partitions sorted).
I think the main question is: do we need two passes over the data to first determine all unique values in the column, and then the actual repartitioning in a second step?

Reading geoparquet but not using geometry returns ValueError

I've encountered behaviour which makes sense, but it is a bit confusing. Consider the following example (mirroring my real data).

df = geopandas.read_file(geopandas.datasets.get_path('naturalearth_lowres'))
df.iloc[:50].to_parquet('test/test_1.pq')
df.iloc[50:100].to_parquet('test/test_2.pq')
df.iloc[100:].to_parquet('test/test_3.pq')

ddf = dask_geopandas.read_parquet("test/test_*.pq")
ddf['pop_est'].max().compute()

This raises ValueError below because dask is not reading geometry column. And in that case, geopandas complains and tells you that you should use pandas.read_parquet. I guess, that this is again something to be fixed in geopandas, but it does not happen using geopandas only and if it does, the error is meaningful. Which is not the case for dask_geopandas.

The workaround is to read parquet using dask.dataframe, but dask_geopandas should be able to resolve this under the hood.

import dask.dataframe as dd

ddf = dd.read_parquet("test/test_*.pq")

The error from dask_geopandas:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-59-35ac7a0f8ffc> in <module>
----> 1 ddf['pop_est'].max().compute()

/opt/conda/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
    165         dask.base.compute
    166         """
--> 167         (result,) = compute(self, traverse=False, **kwargs)
    168         return result
    169 

/opt/conda/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    445         postcomputes.append(x.__dask_postcompute__())
    446 
--> 447     results = schedule(dsk, keys, **kwargs)
    448     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    449 

/opt/conda/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2686                     should_rejoin = False
   2687             try:
-> 2688                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2689             finally:
   2690                 for f in futures.values():

/opt/conda/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1986                 direct=direct,
   1987                 local_worker=local_worker,
-> 1988                 asynchronous=asynchronous,
   1989             )
   1990 

/opt/conda/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    831         else:
    832             return sync(
--> 833                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    834             )
    835 

/opt/conda/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    337     if error[0]:
    338         typ, exc, tb = error[0]
--> 339         raise exc.with_traceback(tb)
    340     else:
    341         return result[0]

/opt/conda/lib/python3.7/site-packages/distributed/utils.py in f()
    321             if callback_timeout is not None:
    322                 future = asyncio.wait_for(future, callback_timeout)
--> 323             result[0] = yield future
    324         except Exception as exc:
    325             error[0] = sys.exc_info()

/opt/conda/lib/python3.7/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

/opt/conda/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1845                             exc = CancelledError(key)
   1846                         else:
-> 1847                             raise exception.with_traceback(traceback)
   1848                         raise exc
   1849                     if errors == "skip":

/opt/conda/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in read_parquet_part()
    271     This function is used by `read_parquet`."""
    272     if isinstance(part, list):
--> 273         dfs = [func(fs, rg, columns.copy(), index, **kwargs) for rg in part]
    274         df = concat(dfs, axis=0)
    275     else:

/opt/conda/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in <listcomp>()
    271     This function is used by `read_parquet`."""
    272     if isinstance(part, list):
--> 273         dfs = [func(fs, rg, columns.copy(), index, **kwargs) for rg in part]
    274         df = concat(dfs, axis=0)
    275     else:

/opt/conda/lib/python3.7/site-packages/dask/dataframe/io/parquet/arrow.py in read_partition()
    579 
    580         arrow_table = cls._parquet_piece_as_arrow(piece, columns, partitions, **kwargs)
--> 581         df = cls._arrow_table_to_pandas(arrow_table, categories, **kwargs)
    582 
    583         # Note that `to_pandas(ignore_metadata=False)` means

/opt/conda/lib/python3.7/site-packages/dask_geopandas/io/parquet.py in _arrow_table_to_pandas()
     40 
     41         # TODO support additional keywords
---> 42         return _arrow_to_geopandas(arrow_table)
     43 
     44     @classmethod

/opt/conda/lib/python3.7/site-packages/geopandas/io/arrow.py in _arrow_to_geopandas()
    338             """No geometry columns are included in the columns read from
    339             the Parquet/Feather file.  To read this file without geometry columns,
--> 340             use pandas.read_parquet/read_feather() instead."""
    341         )
    342 

ValueError: No geometry columns are included in the columns read from
            the Parquet/Feather file.  To read this file without geometry columns,
            use pandas.read_parquet/read_feather() instead.

ENH: change spatial_partitions from attribute into a property with getter/setter

Currently the spatial_partitions is a plain attribute, meaning you can assign anything to it etc. We should probably turn it into a property, so we can control the actual getter/setter, and store the value itself in _spatial_partitions. Something like:

@property
def spatial_partitions(self):
    return self._spatial_partitions

@spatial_partitions.setter
def spatial_partitions(self, value):
    # ...
    # ensure value is a GeoSeries, has the correct crs, has a default index, ...
    self._spatial_partitions = value

This would also easily allow to store the actual underlying value as something else as a GeoSeries (eg only bounding boxes, if we want that at some point), while keeping the public property returning a GeoSeries for convenience.

ENH: Follow ups for the hilbert_distance function

The hilbert_distance function will shortly be merged into Dask-GeoPandas.
There was several enhancements that were discussed as follow up items, which include:

  • To pass the total_bounds argument to hilbert_distance function if this is already pre-computed: #145
  • Use the bounds of the spatial_partitions if present. -> #161
  • Calculating the mid-points for (Multi)Point geometries in _continuous_to_discrete_coords is not necessary:
    • To add a check to see whether the passed geometries are (Multi)Points
    • Only compute the mid-points for non-(Multi)Point geometries.
  • Currently the _continuous_to_discrete function clips discrete integer values using base Python because numpy.clip is not currently supported by Numba: (no longer using numba)
    • To add numpy.clip when it's supported by Numba - v0.54
  • Data type optimisation should be assessed
    • Total bit-width in _continuous_to_discrete - to use numpy.int32 instead of numpy.int64
  • Provide guidance on p parameter that controls precision of resulting Hilbert curve; need to help user understand how to set this themselves / when this can be useful to set this themselves. (also check hackmd notes)
  • To add a notebook

Note: Will add permalinks when merged.

BUG: GeoDataFrame.total_bounds should be a dask array with known shape

>>> df = geopandas.read_file(geopandas.datasets.get_path("naturalearth_cities"))
>>> ddf = dask_geopandas.from_geopandas(df, npartitions=4)
>>> ddf.total_bounds
dask.array<from_pandas-09ff8835d3deb87b6565b111dcf696cd-total_bounds-agg, shape=(nan,), dtype=float64, chunksize=(nan,), chunktype=numpy.ndarray>

This results in a dask array with unknown shape and chunksize, while we know this is always a 1D length 4 array, so I suppose we should be able to tell dask this is the case.

BUG: preserve spatial_partitions in GeoDataFrame.set_crs

Currently, using set_crs doesn't preserve the spatial partitions (similar as copy, see #56):

import geopandas
import dask_geopandas

gdf = geopandas.read_file(geopandas.datasets.get_path("naturalearth_lowres"))
# remove the crs for purpose of the reproducible example
gdf.crs = None

# create partitioned dataframe + calculate spatial partitions
ddf = dask_geopandas.from_geopandas(gdf, npartitions=4)
ddf.calculate_spatial_partitions()

>>> ddf.spatial_partitions
0    POLYGON ((-68.149 -55.612, -68.640 -55.580, -6...
1    POLYGON ((18.465 -29.045, 16.345 -28.577, -77....
2    POLYGON ((166.740 -22.400, -8.899 36.869, -9.5...
3    POLYGON ((-180.000 -90.000, -180.000 -84.713, ...
dtype: geometry
>>> ddf = ddf.set_crs("EPSG:4326")
>>> ddf.spatial_partitions is None
True

Overlapping computations

If you want to measure anything which has to do with the spatial relationship of features, you eventually find an issue of needing to go across chunk boundaries (assuming spatially chunked ddf). A typical example is a spatial lag, i.e. the mean of values on neighbouring (touching) features.

The way raster analysis deals with it is using dask.array.map_overlap. That essentially copies a bit of neighbouring data to each chunk to create overlaps so you can do your spatial lag fully within a chunk. See https://docs.dask.org/en/latest/array-overlap.html

I believe that we need the analogy of map_overlap for vector data. It is naturally a significantly more complex issue since we do not know how the data look like (it is not a grid). But I believe it is doable and could be a massive game-changer. For example, I would be able to base 90% of momepy on dask-geopandas.

The trick is to define which features should be overlapping. For that, you need to know how far you have to go for each particular operation but we can specify:

  • a distance threshold (everything within n meters from the chunk boundary)
  • topological threshold (everything within n steps of contiguity)

I have actually already tested this approach with topological threshold, with custom single-core functions and it works well.

We obviously first need spatial re-chunking and spatial indexing, but this is something I'd like to put on a roadmap (maybe for GSoC?).

dask.array implementation - https://docs.dask.org/en/latest/array-overlap.html?highlight=map_overlap#dask.array.map_overlap
dask.dataframe implementation - https://docs.dask.org/en/latest/dataframe-api.html?highlight=map_overlap#dask.dataframe.DataFrame.map_overlap

BUG: Apply function to a GeoSeries

I may be doing something wrong, but it seems that there's no way to apply a function to GeoSeries. In the following example, I am just using lambda and area, but the situation is the same no matter which function you apply.

df = geopandas.read_file(geopandas.datasets.get_path("naturalearth_lowres"))
ddf = dask_ geopandas.from_geopandas(df, npartitions=4)
ddf.geometry.apply(lambda geom: geom.area, meta='float').compute()

It always ends with TypeError: apply() takes from 2 to 3 positional arguments but 4 were given.

Full traceback:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-83-5841353d9afc> in <module>
----> 1 ddf.geometry.apply(lambda geom: geom.area, meta='float').compute()

/opt/conda/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
    165         dask.base.compute
    166         """
--> 167         (result,) = compute(self, traverse=False, **kwargs)
    168         return result
    169 

/opt/conda/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    445         postcomputes.append(x.__dask_postcompute__())
    446 
--> 447     results = schedule(dsk, keys, **kwargs)
    448     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    449 

/opt/conda/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2686                     should_rejoin = False
   2687             try:
-> 2688                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2689             finally:
   2690                 for f in futures.values():

/opt/conda/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1986                 direct=direct,
   1987                 local_worker=local_worker,
-> 1988                 asynchronous=asynchronous,
   1989             )
   1990 

/opt/conda/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    831         else:
    832             return sync(
--> 833                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    834             )
    835 

/opt/conda/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    337     if error[0]:
    338         typ, exc, tb = error[0]
--> 339         raise exc.with_traceback(tb)
    340     else:
    341         return result[0]

/opt/conda/lib/python3.7/site-packages/distributed/utils.py in f()
    321             if callback_timeout is not None:
    322                 future = asyncio.wait_for(future, callback_timeout)
--> 323             result[0] = yield future
    324         except Exception as exc:
    325             error[0] = sys.exc_info()

/opt/conda/lib/python3.7/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

/opt/conda/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1845                             exc = CancelledError(key)
   1846                         else:
-> 1847                             raise exception.with_traceback(traceback)
   1848                         raise exc
   1849                     if errors == "skip":

/opt/conda/lib/python3.7/site-packages/dask/optimization.py in __call__()
   1020         if not len(args) == len(self.inkeys):
   1021             raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
-> 1022         return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
   1023 
   1024     def __reduce__(self):

/opt/conda/lib/python3.7/site-packages/dask/core.py in get()
    149     for key in toposort(dsk):
    150         task = dsk[key]
--> 151         result = _execute_task(task, cache)
    152         cache[key] = result
    153     result = _execute_task(out, cache)

/opt/conda/lib/python3.7/site-packages/dask/core.py in _execute_task()
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

/opt/conda/lib/python3.7/site-packages/dask/utils.py in apply()
     29 def apply(func, args, kwargs=None):
     30     if kwargs:
---> 31         return func(*args, **kwargs)
     32     else:
     33         return func(*args)

/opt/conda/lib/python3.7/site-packages/dask/dataframe/core.py in apply_and_enforce()
   5256     func = kwargs.pop("_func")
   5257     meta = kwargs.pop("_meta")
-> 5258     df = func(*args, **kwargs)
   5259     if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
   5260         if not len(df):

/opt/conda/lib/python3.7/site-packages/dask/utils.py in __call__()
    893 
    894     def __call__(self, obj, *args, **kwargs):
--> 895         return getattr(obj, self.method)(*args, **kwargs)
    896 
    897     def __reduce__(self):

TypeError: apply() takes from 2 to 3 positional arguments but 4 were given

ENH: extending the basic spatial join

#54 added a basic spatial join function. It's a naive cross product of all partitions of both left and right (if no spatial partitioning information is available) or all partition combinations that intersect (based on the spatial partitioning information).

This works fine for an inner join and when your spatial partitions are already reasonable. There are however several aspects still to consider.

Supporting a left join

So currently we only allow how="inner", as for this case the naive version works nicely. However, a "left" join becomes more complicated.
Suppose we have a certain partition of the left frame that intersects with two partitions of the right frame. We perform a normal spatial join (using geopandas.sjoin) for each of those two combinations, resulting in two output partitions. But when doing a left join, geopandas.sjoin preserves the rows of the left dataframe that don't match with a geometry of the right dataframe. But so if we do this multiple times, it means that rows of the left dataframe can end up in multiple output partitions (instead of only in one of the output partitions, as is the case for an inner join). As a result, we would end up with a dask GeoDataFrame with duplicated rows.

Improving the output (spatial) partitions

Because the current implementation combines each partition of the left frame with each (intersecting) partition of the right dataframe, the result inherently has more and smaller partitions in the output.
Suppose again we have a certain partition of the left frame that intersects with two partitions of the right frame. Each of those two combinations is currently a task in the graph that ends up as a new partition in the output dataframe.

Overall (and depending on the exact (spatial) partitioning of the input), this can lead to a "fragmented" output dask GeoDataFrame with many smaller partitions, which could negatively impact (the performance of) further computations.
It might be interesting to look into "merging" chunks again (eg all output partitions coming from a certain partition of the left input could be combined (concatted) into a single output partition).

Repartition the input before performing a spatial join?

For pre-partitioned left and right datasets, the currently implemented approach (joining of geometries by partition resulting from intersection of partitions) is probably fine. But there will also be cases where it could be interesting to repartition input before performing the join.

One example case (copying from @brendan-ward mail conversation): for a target dataset (assume this has the greater number of features) that has already been partitioned, partition the query dataset to match the target (which, I guess, is the same as above, just that query dataset is a partition size of 1) - which may involve cutting the query dataset by the bounding boxes of the intersecting partitions.

ENH: use spatial partitioning information to optimize spatial predicates

For the spatial predicates (intersects, contains, within, etc), at the moment we simply perform the geopandas operation for each partition, using dask's elemwise as helper method (which eg will ensure that partitions are aligned if the argument is another dask dataframe).

If spatial partitioning information is available, this can be optimized if we know that some partition is not intersecting at all with the passed geometry, we can avoid doing the computation (as we known it will be all False).
So at least for the simple case (and most prevalent, I think), where a single geometry is passed to the predicate method, such as:

gdf.intersects(polygon)

In such a case, we can first check whether that polygon intersects with gdf.spatial_partitions, and then only for the intersecting partitions perform the actual intersects(..) predicate operation, and for the others directly create a Series of False values.

BUG: `gdf.geometry.total_bounds` reads all columns from Parquet instead of only geometry column

When doing a getitem operation after read_parquet, the column selection is pushed down. So for example, in the following cases

gdf = dask_geopandas.read_parquet(...)
# only the "attribute" column is read
gdf["attribute"].mean()

# only the geometry column is read (.geometry is equivalent of `gdf["geometry"]`
gdf.geometry.x

But, it seems that specifically for total_bounds, this doesn't work for some reason, and even gdf.geometry.total_bounds.compute() loads all columns of the Parquet file instead of only the geometry column (which makes total_bounds considerably slower as it could be).

(the reason I was looking into this was the realization that gdf.total_bounds (so where the user doesn't explicitly call .geometry first) might load all columns unnecessarily (which is relevant for all GeoDataFrame methods/attributes that only require the geometry column, and something we could fix I suppose, need to open a separate issue for that), but then when comparing with gdf.geometry.total_bounds it didn't improve)

ENH: read_file (GDAL based)

Not all, but many of the spatial file formats supported by GDAL/fiona allow to efficiently read chunks of the file (eg Shapefile, GeoPackage).

We could have a dask_geopandas.read_file variant of geopandas.read_file that constructs a partitioned dataframe from such files.

A small prototype:

import geopandas
import dask_geopandas
from dask.delayed import delayed
import fiona

def read_file(path, npartitions):
    
    with fiona.open(path) as collection:
        total_size = collection.session.get_length()
    
    # TODO smart inference for a good default partition size
    batch_size = (total_size // npartitions) + 1
    
    row_offset = 0
    dfs = []
    
    while row_offset < total_size:
        rows = slice(row_offset, min(row_offset + batch_size, total_size))
        df = delayed(geopandas.read_file)(path, rows=rows)
        dfs.append(df)
        row_offset += batch_size
    
    # TODO this could be inferred from fiona's collection.meta["schema"]
    meta = geopandas.read_file(path, rows=5)
    
    return dd.from_delayed(dfs, meta)

And a small experiment with it: https://nbviewer.jupyter.org/gist/jorisvandenbossche/c94bfc9626e622fda7285ed88a4d771a

Since reading those files (currently) doesn't release the gil much, this is mostly useful in a multi-processing setting (and not the default threading scheduler).

I think a function like the above would already be interesting anyway, but for further optimization, using pyogrio might be an interesting path: geopandas/pyogrio#1

ENH: read a list of GIS files into chunks

I have a list of GeoPackages, one for an urban area, I need to read to dask.GeoDataFrame. Since they are already essentially spatially partitioned, the optimal way would be to read each as a chunk directly. Now I have to read them one by one via GeoPandas, concatenate and then create dask.GeoDataFrame from geopandas.GeoDataFrame, which loses spatial partitions.

For cases like this, it may be useful to have dask_geopandas.read_files(list) function which would call geopandas.read_file for each chunk and create chunked GeoDataFrame directly. It would be helpful to be able to pass both list and a path to a folder (like we do with parquet) since in the list you can specify a path in the zip for example (my case).

This is the existing code I am using:

paths = ["foo/bar/one.zip!data/file.gpkg", "foo/bar/two.zip!data/file.gpkg"]
gdfs = []

for file in paths:
    gdf = gpd.read_file(file)
    gdfs.append(gdf)

gdf = pd.concat(gdfs)

ddf = dask_geopandas.from_geopandas(gdf, npartitions=2)  # non spatial chunks

And this would be optimal:

paths = ["foo/bar/one.zip!data/file.gpkg", "foo/bar/two.zip!data/file.gpkg"]

ddf = dask_geopandas.read_files(paths)  # one chunk per file

FutureWarning: Assigning CRS to a GeoDataFrame without a geometry

Running dask-geopandas with geopandas master raises the following warning: /opt/conda/lib/python3.7/site-packages/dask_geopandas/backends.py:54: FutureWarning: Assigning CRS to a GeoDataFrame without a geometry column is now deprecated and will not be supported in the future.

It does that on every step.

ddf = dask_geopandas.from_geopandas(df, npartitions=14)
ddf['area'] = ddf.buildings.area

In GeoPandas, we have moved CRS attribute to GeometryArray level. Since we do not really want to have GeoDataFrame without a geometry but with CRS, we aim to deprecate that option. I guess than in here it is better to also store the copy on dask.GeoDataFrame level, but we have to figure out if it is better to keep the existing support in geopandas or change the implementation here.

Dask GeoPandas

What would it look like to parallelize/distributed GeoPandas with Dask?

Dask has created parallel variants of NumPy arrays and Pandas Dataframes. I think it may be sensible to do the same with Geopandas. I would like to solicit thoughts from Geopandas developers and people familiar with Dask on this idea. Is there a significant need for this? How would we arrange the in-memory geopandas components? What operations would it need to support to be pragmatic? What is a minimal feature set necessary to attract early-adopters? What performance issues are we likely to come across?

I am familiar with Dask but my experience with GeoPandas is light.

Proposed structure

I suspect that one dask geopandas dataframe (dask-gdf) would be composed of many lazily computed geopandas dataframes (gdfs) with known bounding polygons, stored in another overarching geopandas dataframe (gdf). That is to say that we would partition the full dataset into localized regions, and would centrally maintain a mapping of those regions to pointers to gdfs to help guide computation. Dask-gdf operations would consult this mapping of regions before dispatching appropriate operations to the in-memory gdfs. For example a spatial join with an in-memory gdf would first join against the regions to find which regions have any interaction and then perform many in-memory joins with all of the relevant regions. A spatial join between two dask-gdfs would involve a spatial join on the region-mapping to find all intersections followed by many in-memory spatial joins on the gdfs.

I've convinced myself (perhaps foolishly) that this is sensible for spatial join operations and possible to accomplish lazily (which is important for larger-than-memory use). However I don't know if this metadata is sufficient to implement most other relevant operations for GeoPandas.

For background, here is how we design dask.arrays and dask.dataframes:

Almost all dask.array/dataframe operations are closed under this metadata (they know everything they need to do with this metadata and can generate the metadata of the output) without looking at the underlying values.

Performance issues

Often when parallelizing in-memory libraries we start becoming concerned with things like releasing the GIL and fast serialization. Are these an issue for GeoPandas?

An initial naive test shows that yes, serialization is an issue. We're only able to serialize GeoPandas dataframes at around 2MB/s (although this example may not be representative).

In [1]: import geopandas as gpd

In [2]: cities = gpd.read_file(gpd.datasets.get_path('naturalearth_cities'))

In [3]: import pickle

In [4]: %time len(pickle.dumps(cities))
CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 5.59 ms
Out[4]: 11583

In [5]: 11583 / 0.0059 / 1e6  # MB/s
Out[5]: 1.9632203389830507

I haven't tested GIL-releasing friendliness yet.

Explicit questions

  1. Is this sensible?
  2. What operations or classes of operations are relevant to support?
  3. Are there better ways to lay out the the metadata than the geopandas-of-regions approach described above?
  4. Are there better ways to serialize geopandas dataframes?
  5. Do GeoPandas dataframe operations typically release the GIL? If not then is this doable?
  6. How actively maintained is this project? Commits recently seem to be sparse.

cc @kjordahl @jorisvandenbossche @r-shekhar @kuanb @jalessio

CI: expand CI and move it to GHA

I think that we should expand CI to test on windows and mac and to include master versions of key packages. Since testing on windows/mac is not supported on Travis (maybe mac is if I remember?), I'd suggest to follow the geopandas model and switch to GitHub actions.

Happy to do a PR.

Killed worker when using dask-geopandas with distributed client

Just locally on my laptop, but I can't get a distributed client to work, while the default dask threaded scheduler works, ánd a plain dask.dataframe also works with distributed.

What I am doing to test is:

import geopandas
import dask.dataframe as dd
import dask_geopandas

from distributed import Client
client = Client()
client

# loading a geopandas GeoDataFrame
df = geopandas.read_file(geopandas.datasets.get_path("naturalearth_lowres"))

# first creating a pure dask DataFrame (getting non-geometry columns)
ddf = dd.from_pandas(df[['pop_est', 'continent', 'name']], npartitions=4)

# this works fine
ddf.continent.value_counts().compute()

# now creating a dask_geopandas.GeoDataFrame
ddf = dask_geopandas.from_geopandas(df, npartitions=4)

# the same thing now fails (only getting the column as a dask.Series from the GeoDataFrame)
ddf.continent.value_counts().compute()

and this fails with:

---------------------------------------------------------------------------
KilledWorker                              Traceback (most recent call last)
<ipython-input-32-f1c611ca664d> in <module>
----> 1 ddf.continent.value_counts().compute()

~/miniconda3/envs/geo-dev/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    163         dask.base.compute
    164         """
--> 165         (result,) = compute(self, traverse=False, **kwargs)
    166         return result
    167 

~/miniconda3/envs/geo-dev/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    434     keys = [x.__dask_keys__() for x in collections]
    435     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 436     results = schedule(dsk, keys, **kwargs)
    437     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    438 

~/miniconda3/envs/geo-dev/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2593                     should_rejoin = False
   2594             try:
-> 2595                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2596             finally:
   2597                 for f in futures.values():

~/miniconda3/envs/geo-dev/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1885             else:
   1886                 local_worker = None
-> 1887             return self.sync(
   1888                 self._gather,
   1889                 futures,

~/miniconda3/envs/geo-dev/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    777             return future
    778         else:
--> 779             return sync(
    780                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    781             )

~/miniconda3/envs/geo-dev/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    346     if error[0]:
    347         typ, exc, tb = error[0]
--> 348         raise exc.with_traceback(tb)
    349     else:
    350         return result[0]

~/miniconda3/envs/geo-dev/lib/python3.8/site-packages/distributed/utils.py in f()
    330             if callback_timeout is not None:
    331                 future = asyncio.wait_for(future, callback_timeout)
--> 332             result[0] = yield future
    333         except Exception as exc:
    334             error[0] = sys.exc_info()

~/miniconda3/envs/geo-dev/lib/python3.8/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

~/miniconda3/envs/geo-dev/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1750                             exc = CancelledError(key)
   1751                         else:
-> 1752                             raise exception.with_traceback(traceback)
   1753                         raise exc
   1754                     if errors == "skip":

KilledWorker: ("('from_pandas-35d27145a3c0b4d3be6e6daa6537420f', 0)", <Worker 'tcp://127.0.0.1:46111', name: 2, memory: 0, processing: 4>)

Any ideas on where to look to debug this?

ENH: improve the repr/repr_html of GeoDataFrame / GeoSeries

Currently, for the GeoDataFrame, the notebook repr is the same as the one of dask. It would be nice that it says "GeoDataFrame" instead of "DataFrame".

The dask Series repr also shows the divisions, name of the series, etc, while the GeoSeries repr does not.

ENH: expose spatial index and `query_bulk`

We should find a way of exposing sindex attribute each partition has (and e.g. sjoin uses) to allow custom queries without the necessity to write a lot of code (rewrite the indexing logic we have in sjoin).

Ideally, we should find a way how to expose both query and query_bulk in a way that can be mapped to geometries within partitions.

One use case can be found in momepy. This code aims to take building footprints and measure how long is the portion of its perimeter that is shared with another footprint. The code can be optimised a bit but for the illustration is good enough.

inp, res = gdf.sindex.query_bulk(gdf.geometry, predicate="intersects")
left = gdf.geometry.take(inp)
right = gdf.geometry.take(res)
intersections = left.intersection(right, align=False).length
results = intersections.groupby(inp).sum().reset_index(drop=True) - gdf.geometry.length.reset_index(drop=True)
  1. query sindex. Since both sindex and input are the same geometry array, this returns at least one hit (itself) for each geometry, potentially more. Assuming polygons ABC adjacent A-B-C: For polygon A, it finds that it intersects [A, B], for polygons B [A, B, C] and for polygon C [B, C]. Both A-B and B-A are present (one in inp, the other in res and vice versa).

  2. create long arrays based on integer indices. Here we have a lot of duplicates which allows vectorised intersection later.

  3. intersection between all pairs of geometries

  4. length of an intersection

  5. groupby over lengths based on original input geometry int index to get sum of all shared perimeter sections per geometry.

  6. remove the length of itself (A-A is still there, but we don't want it in the result).

The A-A relationship could be eliminated to avoid the overhead it brings but I guess I was lazy when I wrote the code and it was good enough for the purpose :).

The same should be possible to do on top of dask_geopandas.GeoDataFrame. However, we need to figure out how to represent the output of sindex in a distributed manner.

edit: naturalearth_lowres will work for test purposes as footprints well enough.

Issue when reprojecting multiple CSV files

I have multiple CSV files opened with dask as is:

import dask.dataframe as dd
import dask_geopandas

df = dd.read_csv('csv/*_timeseries.csv')
gdf = dask_geopandas.from_dask_dataframe(df)
gdf = gdf.set_geometry(
    dask_geopandas.points_from_xy(gdf, x='Longitude', y='Latitude')
).set_crs('epsg:4326').to_crs('epsg:3395')

When invoking gdf.compute(), the following exception is raised:

------------------------------------------------------------------------
ProjError                              Traceback (most recent call last)
<ipython-input-251-f87bb3768545> in <module>
----> 1 gdf.compute()

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    165         dask.base.compute
    166         """
--> 167         (result,) = compute(self, traverse=False, **kwargs)
    168         return result
    169 

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    450         postcomputes.append(x.__dask_postcompute__())
    451 
--> 452     results = schedule(dsk, keys, **kwargs)
    453     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    454 

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2723                     should_rejoin = False
   2724             try:
-> 2725                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2726             finally:
   2727                 for f in futures.values():

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1984             else:
   1985                 local_worker = None
-> 1986             return self.sync(
   1987                 self._gather,
   1988                 futures,

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    830             return future
    831         else:
--> 832             return sync(
    833                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    834             )

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    338     if error[0]:
    339         typ, exc, tb = error[0]
--> 340         raise exc.with_traceback(tb)
    341     else:
    342         return result[0]

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/distributed/utils.py in f()
    322             if callback_timeout is not None:
    323                 future = asyncio.wait_for(future, callback_timeout)
--> 324             result[0] = yield future
    325         except Exception as exc:
    326             error[0] = sys.exc_info()

~/.local/lib/python3.8/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1849                             exc = CancelledError(key)
   1850                         else:
-> 1851                             raise exception.with_traceback(traceback)
   1852                         raise exc
   1853                     if errors == "skip":

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/optimization.py in __call__()
    959         if not len(args) == len(self.inkeys):
    960             raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 961         return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
    962 
    963     def __reduce__(self):

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/core.py in get()
    149     for key in toposort(dsk):
    150         task = dsk[key]
--> 151         result = _execute_task(task, cache)
    152         cache[key] = result
    153     result = _execute_task(out, cache)

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/core.py in _execute_task()
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/utils.py in apply()
     27 def apply(func, args, kwargs=None):
     28     if kwargs:
---> 29         return func(*args, **kwargs)
     30     else:
     31         return func(*args)

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/dataframe/core.py in apply_and_enforce()
   5296     func = kwargs.pop("_func")
   5297     meta = kwargs.pop("_meta")
-> 5298     df = func(*args, **kwargs)
   5299     if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
   5300         if not len(df):

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/utils.py in __call__()
    891 
    892     def __call__(self, obj, *args, **kwargs):
--> 893         return getattr(obj, self.method)(*args, **kwargs)
    894 
    895     def __reduce__(self):

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/geopandas/geodataframe.py in to_crs()
    814         else:
    815             df = self.copy()
--> 816         geom = df.geometry.to_crs(crs=crs, epsg=epsg)
    817         df.geometry = geom
    818         df.crs = geom.crs

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/geopandas/geoseries.py in to_crs()
    541         transformer = Transformer.from_crs(self.crs, crs, always_xy=True)
    542 
--> 543         new_data = vectorized.transform(self.values.data, transformer.transform)
    544         return GeoSeries(
    545             GeometryArray(new_data), crs=crs, index=self.index, name=self.name

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/geopandas/_vectorized.py in transform()
    877     if compat.USE_PYGEOS:
    878         coords = pygeos.get_coordinates(data)
--> 879         new_coords = func(coords[:, 0], coords[:, 1])
    880         result = pygeos.set_coordinates(data.copy(), np.array(new_coords).T)
    881         return result

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/pyproj/transformer.py in transform()
    428             intime = None
    429         # call pj_transform.  inx,iny,inz buffers modified in place.
--> 430         self._transformer._transform(
    431             inx,
    432             iny,

pyproj/_transformer.pyx in pyproj._transformer._Transformer._transform()

ProjError: x, y, z, and time must be same size

The exception disapears when df if a single csv file df = dd.read_csv('csv/2019_timeseries.csv')

Copy `spatial_partitions` when copying `GeoDataFrame`

I would expect the spatial_partitions to be copied when using df.copy().

import geopandas
import dask
import dask_geopandas
dask.config.set(scheduler='single-threaded')

df = geopandas.read_file(geopandas.datasets.get_path("naturalearth_lowres"))
ddf = dask_geopandas.from_geopandas(df, npartitions=4)
ddf.calculate_spatial_partitions()
ddfc = ddf.copy()
ddfc.spatial_partitions  # is None

Is there a reason to not copy spatial partitions that I didn't think of?

I thought about how this could work and didn't think of anything better than overriding dask's copy method. Something like

def copy(self):
    self_copy = super().copy()
    self_copy.spatial_partitions = self.spatial_partitions
    return self_copy

It would be great if spatial_partitions could get copied automatically, as crs for example does. However, crs is actually a property of _meta, which does get automatically copied by dask. Not sure how spatial partitions could be attached to the _meta, or if there's another mechanism that could act similarly.

If I'm headed in the right direction, then I'm happy to unit test & PR.

ENH: Implement `geopandas.clip`

I needed geopandas.clip functionality the other day and I utilized map_partitions to that end successfully.

Reading #54 I learned how one might be able to perform clip in a "smarter" way, so I gave it a try.

import numpy as np
import pandas as pd
import geopandas as gpd
from dask.highlevelgraph import HighLevelGraph
import dask_geopandas

# Create data
df = gpd.read_file(gpd.datasets.get_path("naturalearth_lowres"))
ddf = dask_geopandas.from_geopandas(pd.concat([df]*5), npartitions=40)
mask = df.loc[df['iso_a3'].eq('CZE')]
mask['geometry'] = mask.geometry.buffer(1)

# Create more localized spatial partitions
ddf = ddf.set_crs('epsg:4326').set_index('continent').reset_index().persist()
ddf.calculate_spatial_partitions()
ddf.spatial_partitions.set_crs('epsg:4326', inplace=True) 
# ddf.spatial_partitions.plot(color='none', edgecolor='black') 

# Naive version (~2.75s on my machine)
result_naive = ddf.map_partitions(lambda x: gpd.clip(x, mask)).compute()

# Smarter version (~1.37s on my machine)
new_spatial_partitions = gpd.clip(ddf.spatial_partitions.to_frame('geometry'), mask)
intersecting_partitions = np.asarray(new_spatial_partitions.index)

name = 'clip-test'
dsk = {(name, i): (gpd.clip, (ddf._name, l), mask) for i, l in enumerate(intersecting_partitions)}
divisions = [None] * (len(dsk) + 1)
graph = HighLevelGraph.from_collections(name, dsk, dependencies=[ddf])
result = dask_geopandas.core.GeoDataFrame(graph, name, ddf._meta, tuple(divisions))
result.spatial_partitions = new_spatial_partitions

result_smart = result.compute()

pd.testing.assert_frame_equal(result_naive, result_smart)

Does this look like a reasonable approach? If so, then I'm happy to test & PR.

ENH: support roundtripping full spatial partitioning instead of only the bbox in Parquet IO

From #28 (review). That PR added the ability to load the spatial_partitions from the Parquet metadata in read_parquet. But currently that only loads bounding boxes (as that is what is stored in the metadata), and thus cannot roundtrip the exact (potentially more detailed) geometries in the spatial partitioning information.

We currently fully rely on the metadata saved in the Parquet file as done by GeoPandas / defined in https://github.com/geopandas/geo-arrow-spec, and this currently only has a bbox (and not generic extent). So we could start a dicussion there to expand this, or on the short term add some custom metadata for dask_geopandas in the to_parquet (this should be easy to do, but of course custom to dask-geopandas).

This also depends on the discussion in #8 on how we want to store partitions.

ENH: spatial partitioning of the GeoDataFrame

For making spatial joins or overlays, spatial predicates, reading from spatially partitioned datasets, etc more efficient, we can have spatially partitioned dataframes: the bounds of each partition is known, and thus it can be checked based on those bounds whether on operation needs to involve that partition or not.
And then geodataframes can also be re-partitioned to optimize the bounds (minimize the overlap) as much as possible (initial costly shuffle operation, but can pay-off later).

This complicates the implementation (we need to keep track of the spatial partitioning, the partitions can change during spatial operations, ..), but I think it will also be critical for improving performance on large datasets.


How can we add this?

In the previous iteration at https://github.com/mrocklin/dask-geopandas, the dataframes had an additional _regions attribute, which was a geopandas.GeoSeries with the "regions" of each partition (so len(regions) == npartitions).

See https://github.com/mrocklin/dask-geopandas/blob/8133969bf03d158f51faf85d020641e86c9a7e28/dask_geopandas/core.py#L50

I think one advantage of using a GeoSeries is that this makes it easy to work with (eg it is easy to check which partitions would intersect with a given geometry).

In spatialpandas (https://github.com/holoviz/spatialpandas), there is a combo of partition_bounds and partition_sindex.
The partition_bounds is basically the total_bounds of each partition (so you could see it as the _regions but limited to a rectangular box and stores as the 4 (minx, miny, maxx, maxy) numbers). And then partition_sindex is a spatial index built on the partition_bounds.

See https://github.com/holoviz/spatialpandas/blob/master/spatialpandas/dask.py


I suppose starting with a basic "partition bounds" should be fine, and allows to later expand it with a spatial index or with more fine-grained shapes.

Alpha release

I know that there's still a lot to do before a first proper release but I was thinking that it would be helpful to do a tagged alpha release of a current state. Something like 0.0.1alpha1. We are already using dask-geopandas in some of our research code and it would be helpful to pin a tagged version for reproducibility rather than install from git@master/commit.

Would it be okay for you? I am happy to prepare GitHub action to automatise release and conda recipe to have it on conda-forge as well.

Unable to use map_partitions on dask_geopandas.core.GeoDataFrame

As a spatial join workaround, I wanted to use map_partition and run the spatial join (geopandas.tools.sjoin) on each partition and aggregate later down the line. Each partition produces an invalid GeoDataFrame and the geometry became a LINESTRING.

Versions used (using conda-forge):

dask 2021.04.0
pandas 1.2.4
geopandas 0.9.0
dask_geopandas v0.1.0a2+1.g69a4073
pygeos 0.9

Here is the reproducible in a minimal example:

import dask.dataframe as dd
import dask_geopandas

ddf = dd.read_csv("data/random_points.csv")

ddf = dask_geopandas.from_dask_dataframe(ddf)
ddf = ddf.set_geometry(
    dask_geopandas.points_from_xy(ddf, 'lon', 'lat')
)

def spatial_join(gdf):
    print(gdf.head())
    # spatial join omitted for clarity
    return gdf

ddf.map_partitions(spatial_join)

Which prints the following output:

   lon  lat label                                       geometry
0  1.0  1.0   foo  LINESTRING (0.00000 0.00000, 0.00000 1.00000)
1  1.0  1.0   foo  LINESTRING (1.00000 1.00000, 1.00000 2.00000)

Data was generated like this:

import numpy as np
import pandas as pd
import string

n = 100000
X = np.random.random((n, 2))
X[:, 0] = 360 * X[:, 0] - 180
X[:, 1] = 170 * X[:, 1] - 85
df = pd.DataFrame(X, columns=['lon', 'lat'])
df['label'] = np.random.choice(list(string.ascii_letters), n)

df.to_csv("data/random_points.csv", index=False)

Use case: load a large shapefile / geographical filter / handle with geopandas

Hi,

My use case is to study only the density distribution of population in the neighborhood of a given point. The source data is the global shapefile for a 200x200m grid of France: https://www.insee.fr/fr/statistiques/4176290?sommaire=4176305

What I was doing with a smaller shapefile (1000x1000m grid) was:

  • load the entire shapefile
  • project to a plane area
  • get the centroids for each polygon
  • reprojet to a geodic projection
  • keep polygons with the nearest centroids (maybe 12-16 of them)

With the more precise grid, it doesn't fit in memory and I can't finish the first step. I would like to do those first loading / reducing steps in dask-geopandas before continuing with geopandas.

Let me know if you already see a smart solution or how I can help!

Guillaume

ENH: add read_postgis

Once there is a spatial partitioning backend in place, we could read chunks from PostGIS via SQL (ST_Intersects or something).

We may also be able to link it to chunksize parameter and read into partitions directly (I think that adapting dask.dataframe.read_sql_table to handle geometries would do it), but spatially constrained reading would be better.

ENH: repartition based on Geohash

Current spatial partitioning in Dask-GeoPandas is limited to computing convex hulls for a set of n arbitrarily defined partitions.
Whilst a Dask-GeoDataFrame can be re-partitioned based on an attribute (column contained within the data) before computing the convex hull, this is dependent;

  1. There being a suitable attribute
  2. Data quality - assigning attributes to wrong geographic location, resulting in large convex hulls.
  3. Defining the number of partitions

When dealing with Point data, an alternative approach is to encode each Point using the Geohash - which is a string that is both sortable and searchable based on prefixing.

Advantages

  • Fast to index
  • Easy to interpret
  • Useful for nearest-neighbour searches

Disadvantages

In the next couple of weeks I will create a pull request with these features, as potential partitioning technique.

BUG: set_index results in invalid dask GeoDataFrame (partitions are DataFrames)

Using the dask set_index method results in an "invalid" dask_geopandas.GeoDataFrame, where the partitions are no longer GeoDataFrames but DataFrames (which then results in errors when computing spatial operations)

import geopandas
import dask_geopandas

gdf = geopandas.read_file(geopandas.datasets.get_path("naturalearth_lowres"))
ddf = dask_geopandas.from_geopandas(gdf, npartitions=4)

>>> type(ddf.partitions[0].compute())
geopandas.geodataframe.GeoDataFrame

>>> ddf2 = ddf.set_index("continent")
# still a dask_geopandas GeoDataFrame
>>> type(ddf2)   
dask_geopandas.core.GeoDataFrame
# but partitions under the hood no longer a geopandas.GeoDataFrame
>>> type(ddf2.partitions[0].compute())   
pandas.core.frame.DataFrame
# which results in errors when doing actual computations
>>> ddf2.area.compute()
....
AttributeError: 'DataFrame' object has no attribute 'area'

import dask_geopandas resulted “cannot import name '_nonempty_index' from 'dask.dataframe.utils' ”

while attempting to import dask_geopandas i encountered error messages as following:

import dask_geopandas Traceback (most recent call last): File "", line 1, in File "C:\Users\user\Miniconda3\envs\envname\lib\site-packages\dask_geopandas_init_.py", line 3, in from . import backends File "C:\Users\user\Miniconda3\envs\envname\lib\site-packages\dask_geopandas\backends.py", line 4, in from dask.dataframe.utils import ( ImportError: cannot import name '_nonempty_index' from 'dask.dataframe.utils' (C:\Users\user\Miniconda3\envs\envname\lib\site-packages\dask\dataframe\utils.py)

apparently this name has moved from dask.dataframe.utils to dask.dataframe.backends. The initial underscore suggests it was not meant for use by other packages.

thank you

ENH: repartition based on predefined spatial extents

A method to repartition a GeoDataFrame along new partitions, i.e. to conform the geodataframe to given spatial partitions.

There is a basic implementation for this at mrocklin/dask-geopandas, which includes both a version for repartitioning a geopandas.GeoDataFrame (which is done eagerly, but could in principle also be extended to do the actual selection lazily) as for repartitioning a dask_geopandas.GeoDataFrame.

I updated the version for a geopandas.GeoDataFrame already a bit in my sjoin demo notebook:

def repartition_pandas(gdf, partitions):
    partitions = geopandas.GeoDataFrame({'geometry': partitions.geometry},
                                  crs=partitions.crs)
    joined = geopandas.sjoin(partitions, gdf, how='inner', op='intersects').index_right
    name = 'from-geopandas-' + tokenize(gdf, partitions)
    dsk = {}
    new_partitions = []
    
    # TODO warn if certain rows are dropped (not intersecting with any of the partitions)

    j = 0
    for i, partition in enumerate(partitions.geometry):
        try:
            ind = joined.loc[i]
        except KeyError:
            continue
        subset = gdf.loc[ind]
        subset2 = subset[subset.geometry.representative_point().intersects(partition)]
        dsk[name, j] = subset2
        j += 1
        if (subset2.geometry.type == 'Point').all():
            new_partitions.append(partition)
        else:
            new_partitions.append(subset2.geometry.convex_hull.unary_union.convex_hull)

    divisions= tuple([None]*(j+1))
    result = dask_geopandas.GeoDataFrame(dsk, name, gdf.head(0), divisions, new_partitions)
    return result

I think there are generally 2 cases to look out for:

  • A geometry doesn't fall in any of the specified spatial partitions
    • -> drop, but warn if that happens?
  • A geometry intersects with multiple of the specified spatial partitions:
    • If the new spatial partitions are not overlapping, this can be handled relatively easily. The above version avoids duplicates by checking that the "representative point" intersects with the partition (although there might still be a corner cases of lying on the boundary of two partitions?)
    • If the new spatial partitions do overlap, this will be more tricky.

(it will probably useful to already add an initial version (like the above) that only supports non-overlapping spatial partitions as specified by the user (the resulting partitions of the output can still overlap))

BUG: `shuffle` results in dask.DataFrame instead of GeoDataFrame

Using shuffle on dask_geopandas.GeoDataFrame doesn't preserve the class and returns dask.DataFrame instead.

gdf = geopandas.read_file(geopandas.datasets.get_path('naturalearth_lowres'))
ddf = dask_geopandas.from_geopandas(gdf, npartitions=4)
ddf.shuffle("continent")

Screenshot 2021-07-06 at 21 20 55

ENH: optimize GeoDataFrame methods that only require the geometry column

From #78: some IO methods in dask (eg read_parquet) support "column projection pushdown", meaning that if you select a column (a "getitem" operation) after reading in a file, then it will actually only read that single column instead of the full file.

So for example, in the following cases

gdf = dask_geopandas.read_parquet(...)
# only the "attribute" column is read
gdf["attribute"].mean()

# only the geometry column is read (.geometry is equivalent of `gdf["geometry"]`
gdf.geometry.x

But, in GeoPandas, most of the spatial methods/attributes are support both on GeoSeries and GeoDataFrame, while often only needing the geometries also in the GeoDataFrame case. So for example gdf.geometry.unary_union and gdf.unary_union gives the same result. But with dask, the first would only read the geometry column while the second would read (unnecessarily) all columns from the file.

We should try to update those methods to automatically include a "getitem" operation in the dask task graph for the GeoDataFrame case

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.