stac-utils / stac-task Goto Github PK
View Code? Open in Web Editor NEWProvides a class interface for running custom algorithms on STAC ItemCollections
License: Apache License 2.0
Provides a class interface for running custom algorithms on STAC ItemCollections
License: Apache License 2.0
Currently the handler does some post processing on the Items (it adds in the task name and version).
This should be factored out into it's own function that can be extended by users subclassing Task.
Also see #22
The Processing Extension states that the intention of it is to describe software that was used in the processing of the data, rather than metadata. Not all tasks process the data, so processing:software shouldn't be added in those cases. There should be a flag or something in the Task that indicates if the field should be added or not.
Task.upload_item_assets_to_s3
has type Dict
for the item
argument: https://github.com/stac-utils/stac-task/blob/4ff3c9e51e83fd88ed7aea639a70c80dae555a95/stactask/task.py#L192C9-L192C33
but that function (for the most part) just forwards the call to asset_io.upload_item_assets_to_s3
:
stac-task/stactask/asset_io.py
Line 88 in 4ff3c9e
Item
for item
.
Looking at the asset_io code it should be type Item
.
https://jsonpath.herokuapp.com, which is linked in the README and in utils.py is dead ("Application Error" page)
In the middle of download_items_assets
execution I'm getting DownloadError
with no message:
---------------------------------------------------------------------------
DownloadError Traceback (most recent call last)
Cell In [4], line 19
8 results = catalog.search(
9 filter={
10 "op": "and",
(...)
15 }
16 )
17 items = results.items()
---> 19 await download_items_assets(items, path_template="input_chunks/${id}")
File .env/lib/python3.10/site-packages/stactask/asset_io.py:40, in download_items_assets(items, path_template, config, keep_non_downloaded)
34 async def download_items_assets(
35 items: Iterable[Item],
36 path_template: str = "${collection}/${id}",
37 config: Optional[DownloadConfig] = None,
38 keep_non_downloaded: bool = True,
39 ) -> list[Item]:
---> 40 return await asyncio.gather(
41 *[
42 asyncio.create_task(
43 download_item_assets(
44 item=item,
45 path_template=path_template,
46 config=config,
47 keep_non_downloaded=keep_non_downloaded,
48 )
49 )
50 for item in items
51 ]
52 )
File .env/lib/python3.10/site-packages/stactask/asset_io.py:25, in download_item_assets(item, path_template, config, keep_non_downloaded)
19 async def download_item_assets(
20 item: Item,
21 path_template: str = "${collection}/${id}",
22 config: Optional[DownloadConfig] = None,
23 keep_non_downloaded: bool = True,
24 ) -> Item:
---> 25 return await stac_asset.download_item(
26 item=item.clone(),
27 directory=LayoutTemplate(path_template).substitute(item),
28 file_name="item.json",
29 config=config,
30 keep_non_downloaded=keep_non_downloaded,
31 )
File .env/lib/python3.10/site-packages/stac_asset/_functions.py:234, in download_item(item, directory, file_name, infer_file_name, config, messages, clients, keep_non_downloaded)
229 async with Downloads(
230 config=config or Config(),
231 clients=clients,
232 ) as downloads:
233 await downloads.add(item, Path(directory), file_name, keep_non_downloaded)
--> 234 await downloads.download(messages)
236 self_href = item.get_self_href()
237 if self_href:
File .env/lib/python3.10/site-packages/stac_asset/_functions.py:172, in Downloads.download(self, messages)
170 exceptions.append(result.error)
171 if exceptions:
--> 172 raise DownloadError(exceptions)
DownloadError:
Making stactask.task.Task.process be an identity function would cause
stac-taskto function as a sufficient downloader of
ItemCollection`s. Also seems like a useful facility for integration/system tests of stactask.
Originally posted by @ircwaves in #18 (comment)
In task.py, this code:
# validate input payload...or not
if not skip_validation:
if not self.validate(payload):
raise FailedValidation()
Raises that the validation failed, but doesn't give a reason why. A reason would make this more useful.
While we'd like to think that we're always running code that's been tagged w/ a version, in reality we're sometimes (often?!) running on a unreleased version. It'd be useful to provide a switch to turn on commit SHA version information in
Line 171 in 6818b9c
{
"processing:software": {
"my-awesome-task": "0.1.0 @ abcdef0123456789"
}
}
Looking for some way (a CLI seems the most obvious) to bootstrap a new task repository. This would include for example:
I've been finding a great deal of variations in how various task repos are laid out, so some guidance is needed for sure.
In @gadomski's PR #42 several types of tasks are defined.
class Task(BaseModel, ABC, Generic[Input, Output]):
"""A generic task."""
class PassthroughTask(Task[Anything, Anything]):
"""A simple task that doesn't modify the items at all."""
class StacOutputTask(Task[Input, Item], ABC):
"""Anything in, STAC out task."""
class ItemTask(StacOutputTask[Item], ABC):
"""STAC In, STAC Out task.
class HrefTask(StacOutputTask[Href], ABC):
"""Href in, STAC Out task.
I really like this way to define the input and output for different types of tasks, especially if it gives us JSON Schema!
Want to review these two Tasks:
StacOutputTask - Anything in, STAC out task.
HrefTask - Href in, STAC Out task
These tasks captures the need to create STAC Items from scratch. In the current payload structure you pass in parameters to the task in the process definition, you don't hand them in as part of the Task Input (which would normally be a FeatureCollection. So the href
(or multiple hrefs), along with other parameters, would be provided in the process.tasks.taskname.parameter
field. I think that should be the preferred model and Input/Output is always going to be STAC Items, or nothing.
Next is the ItemTask which defines a single Items, but stac-tasks current are ItemCollections. A STAC task can take in 1 or more STAC Items as input, and returns 1 or more STAC Items. Note that this is not 1:1, a task doesn't process each item independently to create an array of output items (although you could write a task to do that). A task might take in one Item and create two derived Items from it, or it takes in an Item of data and a few other Items of auxiliary data used in the processing to create a single output Item.
Each task would have requirements on the number of input Items.
So I'd propose
StacOutputTask - Nothing in, STAC out task
ItemCollectionTask - ItemCollection in, ItemCollection out
I suppose we could also have an ItemTask
for single Item input and output (a most common scenario), but I'm not sure I see the advantage over using ItemCollection with 1 Item.
At the moment STAC item name is hardcoded:
stac-task/stactask/asset_io.py
Line 28 in 7ee8533
During post processing of STAC Items (currently in the handler, when the task name and version are added), the stac_extensions should be sorted. This will ensure consistent output files that only differ in the order in which extensions were added.
I have code like:
class MyDatasetToStac(Task):
That returns this error from mypy:
task.py:30: error: Class cannot subclass "Task" (has type "Any") [misc]
The fix is to add:
class Sentinel2ToStac(Task): # type: ignore
However, it would be nice if stactask provided the correct type information so that this was not necessary.
Description of this general issue is here: https://stackoverflow.com/questions/49888155/class-cannot-subclass-qobject-has-type-any-using-mypy
It would be helpful to highlight --save-workdir
as a flag in the README. I had forgotten about it; the main problem is it causes developers (๐ค) to do unintended working directory things.
We may also want to discuss the separation of data from "your code", why the workdir is a tmp dir and that it gets deleted by default.
The name stac-task may be misleading due to the use of "tasking" with regard to ordering future collects of data from a satellite or remote platform. See https://github.com/Element84/stat-api-spec
In this context a "task" is a processing function that takes in 0 or more STAC Items and returns 1 or more STAC Items. Where used the term workflows refers to a series of tasks chained together.
Suggest renaming repo to stac-process to better reflect what it does.
See https://readthedocs.org/projects/stac-task/builds/23322950/. Fix, and add readthedocs to the CI so we catch this sooner.
These utility methods have been useful in Task implementations and should be added to the base Task class
def is_local_asset(self, asset: Asset) -> bool:
return bool(asset.href.startswith(str(self._workdir)))
def get_local_asset_keys(self, item: Item) -> list[str]:
return [key for key, asset in item.assets.items() if self.is_local_asset(asset)]
def find_collection(self, item_dict: dict[str, Any]) -> Optional[str]:
return next(
(
c
for c, expr in self.upload_options.get("collections", {}).items()
if stac_jsonpath_match(item_dict, expr)
),
None,
)
Currently stactask will download assets and name the file as the <asset_key>., thereby keeping the extension, but possibly (probably) renaming the file.
Sometimes a subprocess will expect the file to be named the same as it was originally. Present as option.
We've had a bit of a disconnect between this lib and cirrus for some time. Cirrus supports an array of process definitions in payloads under the process
key, whereas stac-task only supports a single process definition object. The reason for a process array is to support workflow chaining.
This disconnect is only increasing, with cirrus v1 supporting only process
arrays, for consistency and simplicity (only one right way to do things). We also chose to only support array values for process
in swoop (and we had to add workarounds in each task we implemented because of this). Without an change to support process
array values stac-task based tasks are not compatible with swoop and will no longer be compatible with cirrus out-of-the-box.
Currently the handler takes the array of Items returned by process and puts them in features
field of the input payload, and returns it.
This means the output payload will have any additional fields from the input payload which may not be what is intended. The output of a Task should be a valid ItemCollection with an 'id' and process
definition, whereas the input could be any JSON at all as long as it has a process
definition and optionally an id
.
Construct a new ItemCollection output to return, populating with the output from the Task.process
into the features
field and copying over the process definition.
Per the documentation in the README
:
The collections dictionary provides a collection ID and JSONPath pattern for matching against STAC Items. At the end of processing, before the final STAC Items are returned, the Task class can be used to assign all of the Items to specific collection IDs. For each Item the JSONPath pattern for all collections will be compared. The first match will cause the Item's Collection ID to be set to the provided value.
This sounds fine except that dictionaries are maps in json and do not have any guarantees about order preservation, i.e., maps in the json spec are considered unordered.
Best practices typically suggest the use of arrays where ordering is meaningful, and maps where uniqueness is required. In this case ordering is meaningful and mandating collection name uniqueness could be problematic for some use cases (think cases where multiple patterns might be used to check for and assign collection membership to a single collection). So it seems like collections should be an array of collections-matching objects (CollectionsMatcher
s?).
I'd propose this "CollectionMatcher
" object at minimum contain a type
and collection_name
property. The type
would be used to resolve a matcher from a discriminated union of supported matchers. To start we'd support only one type, jsonpath
, which also requires a pattern
property. With this idea the example from the README
becomes:
"collections": [
{
"type": "jsonpath",
"pattern": "$[?(@.id =~ 'LC08.*')]",
"collection_name": "landsat-c2l2"
}
]
Running tests yields deprecation warning
tests/test_task_download.py::test_download_nosuch_asset
/Users/philvarner/code/stac-task/stactask/task.py:238: DeprecationWarning: There is no current event loop
loop = asyncio.get_event_loop()
-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
Description of the issue is here: https://stackoverflow.com/questions/73361664/asyncio-get-event-loop-deprecationwarning-there-is-no-current-event-loop
In several locations, the README indicates the collections
field should be in the top-level of the ProcessDefinition
object. However, task.py
accesses collections
from the UploadOptions
object within the ProcessDefinition
object.
The example process definition given in task.py
shows the correct location of the collections
field.
This is repo is very interesting. Looks like only S3 storage is supported at the moment. Does the roadmap include adding support for other cloud providers like Azure Blob?
When I run this code with Python 3.9:
import asyncio
from stactask.asset_io import download_items_assets
from pystac_client import Client
catalog = Client.open(...)
results = catalog.search(...)
items = results.items()
loop = asyncio.get_event_loop()
loop.run_until_complete(download_items_assets(items, path_template="input_chunks/${id}"))
I'm getting this error:
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
Cell In[20], line 1
----> 1 loop.run_until_complete(download_items_assets(items, path_template="input_chunks/${id}"))
File ~/.pyenv/versions/3.9.0/lib/python3.9/asyncio/base_events.py:618, in BaseEventLoop.run_until_complete(self, future)
607 """Run until the Future is done.
608
609 If the argument is a coroutine, it is wrapped in a Task.
(...)
615 Return the Future's result, or raise its exception.
616 """
617 self._check_closed()
--> 618 self._check_running()
620 new_task = not futures.isfuture(future)
621 future = tasks.ensure_future(future, loop=self)
File ~/.pyenv/versions/3.9.0/lib/python3.9/asyncio/base_events.py:578, in BaseEventLoop._check_running(self)
576 def _check_running(self):
577 if self.is_running():
--> 578 raise RuntimeError('This event loop is already running')
579 if events._get_running_loop() is not None:
580 raise RuntimeError(
581 'Cannot run the event loop while another loop is running')
RuntimeError: This event loop is already running
stactask==0.5.0
For downloading assets, stac-task should use stac-asset instead of fsspec, which allows for better support across some data providers.
stac-asset already supports #6 via https://stac-asset.readthedocs.io/en/stable/api.html#stac_asset.FileNameStrategy
Currently, Task.validate
is marked as a class method, but there are scenarios where having access to the task instance within the validate method would be beneficial. For example, if a STAC item property name is passed as a task parameter and we need to validate that all input items possess that property, access to self.parameters
would be useful. What do you think about changing validate to a regular instance method?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.