Coder Social home page Coder Social logo

Comments (22)

jorisvandenbossche avatar jorisvandenbossche commented on June 19, 2024 1

What do you think about potentially partitioning by attribute?

I think that's something that is already supported by dask (but @mrocklin can correct me if I am wrong). See eg the doc page on shuffling at https://docs.dask.org/en/latest/dataframe-groupby.html
You can for example do a set_index(..) for a certain attribute column, which will repartition the dataframe based on that attribute, making joins on this new index more efficient.

The spatial partitioning will then mostly be useful for cases when you don't have such attribute that can be used as index (or where the regional extent of the attribute is not exactly the same accross datasets).

Of course, if you know you have an attribute that describes certain regions, we could make it possible to specify this as input to the "spatial re-partitioning" function as a basis on how to repartition (instead of using some other logic to divide the space into regions).
(this would be similar as doing a set_index() and then calculating the bounds for the partitions as they are)

from dask-geopandas.

jorisvandenbossche avatar jorisvandenbossche commented on June 19, 2024 1

We are speaking about "spatial indexing / partitioning" a lot here, but I think there are two distinct aspects:

  1. How to store the "spatial partitioning" information, and how to query this information to retrieve a subset of the partitions you need for a certain operation.
    Currently this is the spatial_partitions attribute which stores a polygon for each partition (and this polygon is expected to cover all geometries in the partition) as a geopandas.GeoSeries on the dask_geopandas.GeoDataFrame. But it could be discussed if we want to use some spatial index also at this level.
  2. How to repartition a dataframe into efficient spatial partitions (or how to determine optimal spatial partitions before reading in the actual data, eg by first only reading in bounding boxes).
    For this repartition step, there are some "easy" options (rectangular grid, based on known regions or an attribute), or this can be done with more advanced spatial indexing algorithms to find a good structure in the space of geometries.

I think several mentions of spatial indexing methods above are rather for the second aspect, but I would propose to open a separate issue for the repartitioning goal, and focus in this issue on the actual spatial partitioning concept and mechanism of the GeoDataFrame (the first aspect).

from dask-geopandas.

borao avatar borao commented on June 19, 2024

What do you think about potentially partitioning by attribute?

For example, if I have a set of points (maybe cities) in the US with the state as a feature, and I want to spatially join it with a set of counties (polygons) in the US that also has the state as a feature, the computation could be significantly optimized by not having to worry about overlap/other geometric nuances in the borders, and most computation would be within the partition.

from dask-geopandas.

mrocklin avatar mrocklin commented on June 19, 2024

FWIW I found that spatial partitioning with a GeoPandas series felt pretty clean. Many of the operations like spatial joins were relativly straightforward to write. Ideally some the implementation in the previous version could be reused.

Things do become odd when you start considering geometries that can cross partition boundaries though. (This will be a problem regardless of which partitioning scheme you use). I think that there are ways to handle it, but it requires some thinking. For point-wise geometries though everything is pretty straightforward.

from dask-geopandas.

jsignell avatar jsignell commented on June 19, 2024

I think copying Matt's original implementation of using a geoseries to represent the divisions makes sense. Is there a cost associated with that that makes the spatialpandas path more compelling?

from dask-geopandas.

jorisvandenbossche avatar jorisvandenbossche commented on June 19, 2024

Only boxes are in principle a bit simpler, but on the other hand, storing it as a geoseries might make working with it easier. And is also more general (and actually gives a spatial index on those partition bounds directly as well, as the geoseries has it).
We can also always simplify the regions on the partitions in the geoseries to boxes / envelopes to avoid the partition bounds being too detailed (a plain union of all geometries inside the partition might give a very detailed polygon).

So certainly on board with starting with such an approach.

Regarding naming, I actually like the "partition_bounds", only "bounds" is typically pointing to a bounding box.

from dask-geopandas.

jsignell avatar jsignell commented on June 19, 2024

Can we just call them divisions like dask does?

from dask-geopandas.

jorisvandenbossche avatar jorisvandenbossche commented on June 19, 2024

I am not sure that will play nicely with dask? Eg does dask expect it to be a tuple?

Also, this are not divisions for the index, but a different concept, which might be confusing?

from dask-geopandas.

caspervdw avatar caspervdw commented on June 19, 2024

Only boxes are in principle a bit simpler, but on the other hand, storing it as a geoseries might make working with it easier. And is also more general (and actually gives a spatial index on those partition bounds directly as well, as the geoseries has it).
We can also always simplify the regions on the partitions in the geoseries to boxes / envelopes to avoid the partition bounds being too detailed (a plain union of all geometries inside the partition might give a very detailed polygon).

So certainly on board with starting with such an approach.

Regarding naming, I actually like the "partition_bounds", only "bounds" is typically pointing to a bounding box.

I might be looking at this issue too much from one side, but I am really drawn towards the bounding box-only approach. Such a partition looks more like an array chunk. For vector-raster analyses this match will potentially allow more optimalizations.

Maybe this would also allow borrowing some (battle-tested) logic from dask.array?

from dask-geopandas.

jorisvandenbossche avatar jorisvandenbossche commented on June 19, 2024

In the analogy with dask.array, I think some problems are that 1) we don't have a regular grid with rectangles (in dask, the chunksize can vary in each dimension, but it's still are all rectangles in a grid, I think), and 2) we will typically have overlapping bounding boxes / spatial partitions (with polygons of all shapes, and each polygon having to fit completely in one of the bounding boxes, I think it is unavoidable that the boxes will typically overlap slightly).

(that's not to say that a simpler bounding-box-only scheme couldn't be interesting, just pointing out it's quite a bit more complicated as dask.array's chunking scheme)

from dask-geopandas.

caspervdw avatar caspervdw commented on June 19, 2024
  1. we don't have a regular grid with rectangles (in dask, the chunksize can vary in each dimension, but it's still are all rectangles in a grid, I think)

You are correct that dask.array works with rectangular grids. I'd propose to make that the default option also for the spatial partitioning of geometries. There are roughly two options: 1) a partition consists of all geometries that are intersecting with it or 2) a partition consists of all geometries that have some deterministically assigned point (e.g. centroid) in it. The first will give duplicates, but the second doesn't allow pruning partitions in some spatial operations like rasterizing or overlay. It depends on your typical geometry size if the duplicates are acceptable; in many cases they are and you still get a big performance increase because of the parallelization.

So I'd propose the first one (intersections, with duplicates) but keep the door open for developing more advanced schemes (carrying the convex hull of the parition along, quadtree, space filling curves, ...). Advantages of this approach (in my personal experience) are:

  • The partitioning does not depend on the actual data. So you don't have to go through and pay the cost of IO / cpu before even starting to build the dask collection.
  • The partitions can be defined with only 6 numbers (x0, y0, dx, dy, nx, ny) as opposed to N geometries. This will allow non-materialized layers (as in high level graphs) and efficient high level optimizations. This is especially useful if you are working with lots of partitions.
  • With the correct tiling, the output could go into a tile grid, which are expected for many services (like vector tiles).

from dask-geopandas.

jorisvandenbossche avatar jorisvandenbossche commented on June 19, 2024

Interesting! That seems to point to a fundamental difference in thinking about "what is a spatial partition (bounds)". Because in my mind, it's still something else as the two options you list.

In your two options, you seem to start from the spatial extent, and then determine which geometries belong to that spatial partition (either all geometries intersecting with it, or having a representative point lying within it).

In my mind, and thinking about how a dask.dataframe works, another option (and what is currently done in the dummy implementation) is to start from the actual physical data: a dask dataframe is splitted into different partitions (sub-dataframes), and so the spatial partitioning is determined by the spatial extent (total bounding box for simplicity, or a more complex polygon) of the geometries that are stored in a certain partition. So the spatial bounds of a partition is a bounding box (or polygon) that at least fully covers all geometries of its partition.
(and then it of course becomes a matter of ensuring that the geometries are sorted over the different partitions in such a way that those spatial extents are efficient and not overlapping too much -> that's a whole separate topic of how to smartly "re"partition)

In that mindset, for non-point geometries and when working with simple bounding boxes, you will have by definition overlapping bounding boxes of the spatial partitions (unless all the polygon geometries are rectangular as wel, but let's not consider such special case for the general design ;))
For point geometries, it's of course possible to have a more efficient non-overlapping bounding boxes (like the resulting rectangles from a quadtree spatial index).


I want to get back to the two options you listed above as well, and discuss some potential problems (or to explain "properties" that I think are important for a spatial partitioning system):

1) a partition consists of all geometries that are intersecting with it.

As you mention, this will give duplicates. However, I am not sure this is actually practical with dask.dataframe to have duplicates. Either you need to accept that things like len(df) or df["attribute"].mean().compute() will give wrong answers (which I don't think is acceptable), or either such simple calculations like the length or the mean of a column will each time have to "deduplicate" the rows first (which I think is not that easy, will impact performance for non-spatial operations, and will also loose a lot of the benefits of using dask.dataframe).

2) a partition consists of all geometries that have some deterministically assigned point (e.g. centroid) in it.

For this one, you mention that it doesn't allow pruning partitions in some spatial operations. And I think that's indeed an essential property of the spatial partitioning is that the bounding box (or polygon) representing a given partition should fully cover the actual geometries (and not only contain a representative point, or only intersect with the polygon).
For example, if I want to check if a certain polygon intersects with any of the polygons in my dataframe (gdf.intersects(polygon) type of operation), I think we need to be able to rely on the spatial partitioning information to know which partitions to check based on whether the given polygon intersects with any of the bounding boxes of the partitions. If the bounding boxes are not guaranteed to fully cover its geometries, we don't have reliable information about potential intersection with the given polygon, and that means we would basically always need to check the interesects operation with all partitions, not being able to skip any partition, and thus loosing the benefit of having spatial partitions in the first place.

from dask-geopandas.

caspervdw avatar caspervdw commented on June 19, 2024

Interesting discussion indeed!

In your two options, you seem to start from the spatial extent, and then determine which geometries belong to that spatial partition (either all geometries intersecting with it, or having a representative point lying within it).

In my mind, and thinking about how a dask.dataframe works, another option (and what is currently done in the dummy implementation) is to start from the actual physical data: a dask dataframe is splitted into different partitions (sub-dataframes), and so the spatial partitioning is determined by the spatial extent (total bounding box for simplicity, or a more complex polygon) of the geometries that are stored in a certain partition. So the spatial bounds of a partition is a bounding box (or polygon) that at least fully covers all geometries of its partition.
(and then it of course becomes a matter of ensuring that the geometries are sorted over the different partitions in such a way that those spatial extents are efficient and not overlapping too much -> that's a whole separate topic of how to smartly "re"partition)

I agree that there is this difference of having (or not having) the data determine the partitions. Having to duplicate all of the input vector data into a partitioned form before an analysis can be done can be quite a show-stopper. IO is expensive, especially if the data is accessed over a network (let's say an SQL database or web API). At the same time, data that is accessible over the network is ideally suited for distributed computations. In my ideal world, you would be able to construct a computation on your laptop and send it to a cluster for computation without ever having to have all the IO flowing to your laptop.

I think this is a really important point to decide on: does dask-geopandas want to aim for this kind of usage?

  1. a partition consists of all geometries that are intersecting with it.

As you mention, this will give duplicates. However, I am not sure this is actually practical with dask.dataframe to have duplicates. Either you need to accept that things like len(df) or df["attribute"].mean().compute() will give wrong answers (which I don't think is acceptable), or either such simple calculations like the length or the mean of a column will each time have to "deduplicate" the rows first (which I think is not that easy, will impact performance for non-spatial operations, and will also loose a lot of the benefits of using dask.dataframe).

So this is a real problem indeed. Maybe dask-geopandas can keep track of which features have their representative point inside the partition? In that way it is very efficient to do aggregations like len or mean over unique features. And it also directly solves #40 by accepting that any partitioned vector dataset has overlaps.

  1. a partition consists of all geometries that have some deterministically assigned point (e.g. centroid) in it.

For this one, you mention that it doesn't allow pruning partitions in some spatial operations. And I think that's indeed an essential property of the spatial partitioning is that the bounding box (or polygon) representing a given partition should fully cover the actual geometries (and not only contain a representative point, or only intersect with the polygon).

This approach is indeed not suitable for spatial operations. But it does have use cases: for example, when you want to do a polygon-raster operation in which the raster is averaged inside each polygon. Both datasets are partitioned. You don't want duplicates in the polygons, but you do want the geometries in a partition to be spatially close so that you can prune the raster partitions.


Summarizing: we could define non-overlapping partitions (e.g. a rectangular grid). Each partition contains at least all geometries that intersect with it, with a flag indicating if it is in our out bounds. Also it has 2 bounding boxes as metadata: the partition box and the box that contains all geometries (including the out-of-bounds ones).

One more thoughts that popped up while writing this: I think the problem here is not so much in the fact that we are working in 2D, but mainly in the fact that the partitioning column consists of "range" objects instead of scalar objects. When partitioning a dask.dataframe, you take a column of scalars (typically, time) and divide this in blocks. For geometries, this can't be done easily because e.g. a polygon covers ranges of x and y. With dask.dataframes, you might have this issue too if your data consists of "events" with a start and end time. How would you partition that by time? Are there examples out there? Maybe somebody already dealt with the problem that partitions of events overlap in time?

from dask-geopandas.

brendan-ward avatar brendan-ward commented on June 19, 2024

One approach that looks promising, at least in theory, is bounding interval hierarchy. A bit more on the approach here and here.

It uses a recursive splitting algorithm like kdtree (points), but extends this to ranges and explicitly handles geometries that overlap the split line - so that once complete, those geometries are still assigned uniquely to a partition. It looks like it might take a bit of figuring to work out how to handle range queries against this data structure, since most of the examples are from single raycasting.

from dask-geopandas.

caspervdw avatar caspervdw commented on June 19, 2024

Two options I encountered:

from dask-geopandas.

martinfleis avatar martinfleis commented on June 19, 2024

Spatial partitioning in Apache Sedona seems to be based on KDB-Tree, Quad-Tree and R-Tree.

https://sedona.apache.org/tutorial/core-python/#use-spatial-partitioning

Their example reminds me that we should have a method that repartitions a dataframe A based on partitions of a dataframe B.

from dask-geopandas.

mrocklin avatar mrocklin commented on June 19, 2024

I'll jump in here again and promote using polygons. Writing the code the first time around felt really natural. Just to be clear, they don't necessarily have to fully partition the data, as someone said above you don't want to force a model where data must be organized, because in many cases this won't be the case, and it's expensive to implement. Instead, you want possibly overlapping bounding polygons for each partition, with the infinity polygon being the default option.

In the original dask-geopandas I intentionally tried to solve some of the harder problems to make sure that the approach would work well. Spatial joins and whatever-the-groupby-thing-is called (please forgive my memory) were both interesting to write, but also both clean in the end. It felt very much like the right abstraction. We got to reuse a lot of the logic in geopandas. A parallel spatial join between two parallel geodataframes turned into a spatial join of the high level regions of each, followed by mapping spatial-join across those intersections. We got to leverage the power of the already written geopandas code. It was great.

Trees are also good, but I personally would start with a tree of depth 1, and only expand that depth once computation/indexing proved troublesome. I wouldn't expect this to happen until you were well beyond thousands of partitions, which I think already covers the vast majority of use cases.

Their example reminds me that we should have a method that repartitions a dataframe A based on partitions of a dataframe B.

Operations like this are already written in the original version. If people haven't already I encourage them to look through it.

from dask-geopandas.

jorisvandenbossche avatar jorisvandenbossche commented on June 19, 2024

Thanks Matthew for chiming in. I agree with having the spatial partitions as a GeoSeries, it makes writing code very natural (I updated your original spatial join implementation last week -> #54, mostly an update for dask API changes for now).

I also want to emphasize again that it is really not straightforward to have "duplicated" rows (to support non-overlapping spatial partitions). That will certainly be an interesting model for certain applications, but that also means that you basically have to implement that model with dask from scratch, instead of basing it on dask.dataframe (as we currently do). I personally think that for our current efforts in this repo, reusing the logic of dask.dataframe is the best option.

It might still be useful to have readers that are using a spatial intersection to query data from some source, but then the "deduplication" step (eg based on containment of representative point) will need to happen during IO, so that once the data is read and materialized as a geopandas.GeoDataFrame inside a partition, it can be used as is.

I wouldn't expect this to happen until you were well beyond thousands of partitions, which I think already covers the vast majority of use cases.

Indeed, for a number of partitions in the thousands, doing a plain gdf.spatial_partitions.intersects(..) predicate (or spatial join with partitions of another dataframe) with geopandas is not going to be significantly slower (or maybe even faster) as some spatial index query.

Their example reminds me that we should have a method that repartitions a dataframe A based on partitions of a dataframe B.

Operations like this are already written in the original version. If people haven't already I encourage them to look through it.

Yes, we indeed should. As mentioned above, I already ported the sjoin algorithm, and for the demo notebook (https://gist.github.com/jorisvandenbossche/20d3bd91b2d0db2522148e5c2c84d206) I also needed the "repartition given known regions", so I need to open a PR with the ported version of that as well. And we should see if we can reuse a bunch of the tests as well.

from dask-geopandas.

caspervdw avatar caspervdw commented on June 19, 2024

I also want to emphasize again that it is really not straightforward to have "duplicated" rows (to support non-overlapping spatial partitions). That will certainly be an interesting model for certain applications, but that also means that you basically have to implement that model with dask from scratch, instead of basing it on dask.dataframe (as we currently do). I personally think that for our current efforts in this repo, reusing the logic of dask.dataframe is the best option.

Agreed. I really liked the idea at first, but you're right. I'll stop talking about this.

As @mrocklin said, partitioning data will be expensive for many datasources. We could allow some Partitioner object to get partitions another datastorage, or compute partitions on the spot. Either way, there has to be some work done before someone can start building a dask-geopandas computation, or you can't take advantage of spatial pruning. Isn't there a way around this? Can we make the object representing the layer some sort of a lazy object that maybe only executes once the computation is scheduled?

There seems to be a lot of progress in this area (e.g. https://coiled.io/dask-under-the-hood-scheduler-refactor/) and I wonder if we can make this such that the partitions evaluate lazily as an iterator. I think this is a question that is in scope here, because choosing for e.g. a Geoseries to implement the partitions may limit our options.

Another question that someone else may be able to answer: does the scheduler have access to the same packages (GEOS, PyGEOS, geopandas) as the workers?

Coming back to the discussion about bbox vs. polygons. Initially, I had the feeling that packing partitions inside a Geoseries (which contains an ndarray of objects with a pointer to an opaqua GEOS object) seemed to be a sluggish way to do it, if you could also do it with an N,4 ndarray of floats. But I did some timings and pygeos is actually 'only' 35% slower than intersecting bounding boxes in pure numpy, which is pretty neat.

I am still slightly worried about the polygons though. I have encountered some cases of severe "oversampling" in sourcedata, e.g. a arc that gets a point every centimeter, giving single polygons ~100 megabytes. When computing the convex hull of that, these terrible things might get into the partitions. Using bounding boxes, you just can't have that issue.

So we could leave this option open and require partitions to be an object exposing a part of the Geoseries interface. Then the implementation advantages @mrocklin describes still hold. I wonder what part of the Geoseries interface was required for implementing dask-geopandas?

from dask-geopandas.

jorisvandenbossche avatar jorisvandenbossche commented on June 19, 2024

Yes, I think it will indeed be interesting to investigate how we can leverage the work around high-level graphs to optimize this.

Another question that someone else may be able to answer: does the scheduler have access to the same packages (GEOS, PyGEOS, geopandas) as the workers?

We can assume that, yes, but that is something you need to ensure it's the case as the user (or dev ops providing the client/scheduler environments).

I wonder what part of the Geoseries interface was required for implementing dask-geopandas?

At the moment just the intersects method, and the fact that it can be turned into a GeoDataFrame to pass to sjoin (and I also used the plot method to inspect it).
But I think this is still something that can rather easily be changed/optimized later in case we want to make a specialized object instead of reusing GeoSeries, if we run into issues with it.

from dask-geopandas.

TomAugspurger avatar TomAugspurger commented on June 19, 2024

What's the plan for dask_geopandas.GeoDataFrame.set_index("<geometry_column>")?

dask.dataframe's set_index differs from pandas since it sorts / shuffles the data by the column being assigned to the index and then partitions the output dataframe. This divergence from pandas enables fast random access to index labels. Does geopandas want something similar for set_index? (I believe that currently set_index raises while trying to compute the min / max to do the partitioning).

from dask-geopandas.

martinfleis avatar martinfleis commented on June 19, 2024

What's the plan for dask_geopandas.GeoDataFrame.set_index("<geometry_column>")?

I don't think there are any. GeometryArray currently doesn't support sorting. I'd say that the best solution, for now, would be to raise NotImplementedError.

from dask-geopandas.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.