Comments (6)
Thanks for trying out and the feedback!
In that case, df is a dask dataframe and doesn't have attribute/method like set_geometry, raising a : AttributeError: 'DataFrame' object has no attribute 'set_geometry.
Hmm, yes, it seems that we didn't yet add this set_geometry
method to the dask dataframe class. So what GeoPandas does is "monkeypatching" the normal pandas DataFrame to add a set_geometry
which then returns a GeoDataFrame (so the above example would work with plain pandas/geopandas). But we didn't do that yet here. We should probably do that for consistency (although monkeypatching is also not the nicest solution).
But in any case, should update the example, as that is not working right now. What you did instead is the correct solution at the moment.
I'm not unable to export it as a geopackage. Am I missing something?
No, that's not yet implemented. Only writing to Parquet files is currently implemented.
I have a POC for reading GIS file formats (https://github.com/jsignell/dask-geopandas/issues/11 / https://nbviewer.jupyter.org/gist/jorisvandenbossche/c94bfc9626e622fda7285ed88a4d771a), but didn't yet try writing files. The main thing I am unsure about (and would need to research / experiment with) is how to let the different partitions write to a single file without concurrency / locking issues (as I suppose you will want a single GeoPackage file and not a directory of GeoPackage files?)
from dask-geopandas.
Thanks for your quick reply.
Thanks for your explanation. Until now, I used to chunk huge files (mostly csv) and stream into files (mode="a") or db.
GPKG beeing a SQLite wrapper, if I were to implement a complete workflow, I would try to come up with something like this:
import dask.dataframe as dd
chunked_df = dd.read_csv(
"really_huge_geocsv.csv",
usecols=list(columns_definition.keys()),
dtype=columns_definition,
)
chunked_geodf = dask_geopandas.from_dask_df(chunked_df)
for chunk in chunked_geodf:
chunk.to_gpkg(layername='Table',
if_exists='append',
geometry_type="point",
x=chunked_df.long,
y=chunked_df.lat,
crs=32640
)
Still too new to these tools to really help you. Maybe using PyGEOS abilities? There is a small gpkg writer using it: https://github.com/brendan-ward/pgpkg but I still didn't try myself.
from dask-geopandas.
Thanks for that link to gpkg
! Wasn't aware of that.
Regarding pygeos, I certainly recommend installing it (if installed, geopandas / dask_geopandas will use it automatically under the hood), as it makes things much faster / parallellizable.
I just was experimenting with a to_file
for dask_geopandas
:
from dask.delayed import delayed, tokenize
@delayed
def _extra_deps(func, *args, extras=None, **kwargs):
return func(*args, **kwargs)
def to_file(df, path, driver="GPKG", parallel=False, compute=True, **kwargs):
"""
Write to single file.
Parameters
----------
df : dask_geopandas.GeoDataFrame
path : str
Filename.
parallel : bool, default False
When true, have each block append itself to the DB table concurrently. This can result in DB rows being in a
different order than the source DataFrame's corresponding rows. When false, load each block into the SQL DB in
sequence.
compute : bool, default True
When true, call dask.compute and perform the load into SQL; otherwise, return a Dask object (or array of
per-block objects when parallel=True)
"""
# based on dask.dataframe's to_sql
def make_meta(meta):
return meta.to_file(path, driver=driver, mode="w", **kwargs)
make_meta = delayed(make_meta)
meta_task = make_meta(df._meta)
# Partitions should always append to the empty file created from `meta` above
worker_kwargs = dict(kwargs, driver=driver, mode="a")
if parallel:
# Perform the meta insert, then one task that inserts all blocks concurrently:
result = [
_extra_deps(
d.to_file,
path,
extras=meta_task,
**worker_kwargs,
dask_key_name="to_file-%s" % tokenize(d, **worker_kwargs)
)
for d in df.to_delayed()
]
else:
# Chain the "meta" insert and each block's insert
result = []
last = meta_task
for d in df.to_delayed():
result.append(
_extra_deps(
d.to_file,
path,
extras=last,
**worker_kwargs,
dask_key_name="to_file-%s" % tokenize(d, **worker_kwargs)
)
)
last = result[-1]
result = delayed(result)
if compute:
dask.compute(result, scheduler="processes")
else:
return result
And then you can use it like this:
to_file(gdf, "test.gpkg")
from dask-geopandas.
Ah, and what I forgot to mention is that you need to change this one line in your fiona install: https://github.com/Toblerity/Fiona/pull/858/files (to enable "append" mode for GPKG), because that fix is not yet released
from dask-geopandas.
Thanks for that link to
gpkg
! Wasn't aware of that.
You're welcome!
Regarding pygeos, I certainly recommend installing it (if installed, geopandas / dask_geopandas will use it automatically under the hood), as it makes things much faster / parallellizable.
Yes it's well documented on geopandas, great tip!
I just was experimenting with a
to_file
fordask_geopandas
:
to_file(gdf, "test.gpkg")
Nice! I'll give it a try if you need.
Ah, and what I forgot to mention is that you need to change this one line in your fiona install: https://github.com/Toblerity/Fiona/pull/858/files (to enable "append" mode for GPKG), because that fix is not yet released
Too bad a new version is not released, it's not always possible to install from Github in professional context.
from dask-geopandas.
from dask.delayed import delayed, tokenize @delayed def _extra_deps(func, *args, extras=None, **kwargs): return func(*args, **kwargs) def to_file(df, path, driver="GPKG", parallel=False, compute=True, **kwargs): # replaced "GPKG" with "ESRI Shapefile" """ Write to single file. Parameters ---------- df : dask_geopandas.GeoDataFrame path : str Filename. parallel : bool, default False When true, have each block append itself to the DB table concurrently. This can result in DB rows being in a different order than the source DataFrame's corresponding rows. When false, load each block into the SQL DB in sequence. compute : bool, default True When true, call dask.compute and perform the load into SQL; otherwise, return a Dask object (or array of per-block objects when parallel=True) """ # based on dask.dataframe's to_sql def make_meta(meta): return meta.to_file(path, driver=driver, mode="w", **kwargs) make_meta = delayed(make_meta) meta_task = make_meta(df._meta) # Partitions should always append to the empty file created from `meta` above worker_kwargs = dict(kwargs, driver=driver, mode="a") if parallel: # Perform the meta insert, then one task that inserts all blocks concurrently: result = [ _extra_deps( d.to_file, path, extras=meta_task, **worker_kwargs, dask_key_name="to_file-%s" % tokenize(d, **worker_kwargs) ) for d in df.to_delayed() ] else: # Chain the "meta" insert and each block's insert result = [] last = meta_task for d in df.to_delayed(): result.append( _extra_deps( d.to_file, path, extras=last, **worker_kwargs, dask_key_name="to_file-%s" % tokenize(d, **worker_kwargs) ) ) last = result[-1] result = delayed(result) if compute: dask.compute(result, scheduler="processes") else: return resultAnd then you can use it like this:
to_file(gdf, "test.gpkg") # replaced .gpkg with .shp
I understand this code is most definitely still experimental, so I tried to modify it slightly to work with ESRI Shapefiles (please reference the comments in the above code), but I got these errors for both the gpkg and shp versions of the above code:
--------------------------------------------------------------------------- BrokenProcessPool Traceback (most recent call last) ~/../some_dir/test.py in 183 184 ---> 185 to_file(gdf, 'test.gpkg') 186 # to_file(gdf, 'test.shp') ~/../some_dir/test.py in to_file(df, path, driver, parallel, compute, **kwargs) 178 179 if compute: ---> 180 dask.compute(result, scheduler="processes") 181 else: 182 return result ~/opt/anaconda3/envs/geo-tools/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs) 566 postcomputes.append(x.__dask_postcompute__()) 567 --> 568 results = schedule(dsk, keys, **kwargs) 569 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)]) 570 ~/opt/anaconda3/envs/geo-tools/lib/python3.7/site-packages/dask/multiprocessing.py in get(dsk, keys, num_workers, func_loads, >func_dumps, optimize_graph, pool, chunksize, **kwargs) 228 raise_exception=reraise, 229 chunksize=chunksize, --> 230 **kwargs 231 ) 232 finally: ~/opt/anaconda3/envs/geo-tools/lib/python3.7/site-packages/dask/local.py in get_async(submit, num_workers, dsk, result, cache, >get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs) 501 while state["waiting"] or state["ready"] or state["running"]: 502 fire_tasks(chunksize) --> 503 for key, res_info, failed in queue_get(queue).result(): 504 if failed: 505 exc, tb = loads(res_info) ~/opt/anaconda3/envs/geo-tools/lib/python3.7/concurrent/futures/_base.py in result(self, timeout) 426 raise CancelledError() 427 elif self._state == FINISHED: --> 428 return self.__get_result() 429 430 self._condition.wait(timeout) ~/opt/anaconda3/envs/geo-tools/lib/python3.7/concurrent/futures/_base.py in __get_result(self) 382 def __get_result(self): 383 if self._exception: --> 384 raise self._exception 385 else: 386 return self._result BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
I just started looking into Dask recently. It's still new to me, so I don't really get this error message and I'd like to know if there's an "easy" fix/workaround to make the to_file function work with ESRI shapefiles?
Please let me know.
from dask-geopandas.
Related Issues (20)
- Will it be possible to use sjoin_nearest with dask? HOT 2
- Boolean indexing with Dask object causes conversion of Dask-Geopandas object to Dask object HOT 3
- Bug reading parquet files with `dask==2022.12.0` HOT 8
- Method directly calls PyGEOS function, but GeoPandas is deprecating PyGEOS for Shapely HOT 2
- Enable using groupby with shuffle
- 0.2.1 release? HOT 3
- Mistaken documentation for dask_geopandas.read_parquet HOT 2
- Overlay function or a way to mimic geopandas overlay HOT 2
- Pickling DaskGeoDataFrame loses `spatial_partitions`
- DOC: incomplete ipython cell HOT 5
- `GeoArrowEngine` error when reading Parquet files HOT 5
- Error when apply sjoin() function HOT 9
- FutureWarning for index_parts parameter in GeoDataFrame.explode() HOT 2
- Unpin sphinx-book-theme HOT 1
- Add support for Pandas 2.0.0 `dtype_backend` argument in `read_feather`
- read parquet from s3 failing with 'GeoArrowEngine' has no attribute 'extract_filesystem' HOT 3
- 0.3.1 release HOT 2
- dtype('O') not supported since geopandas 0.13.0
- Spatial_shuffle() can result in ArrowTypeError when using pyarrow 12 HOT 5
- FeatureError from filegdbtable.cpp when reading file HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from dask-geopandas.