pangeo-forge-runner
Commandline tool to manage pangeo-forge feedstocks
See documentation at pangeo-forge-runner.readthedocs.io for more information.
Run pangeo-forge recipes on Apache Beam
License: Apache License 2.0
Commandline tool to manage pangeo-forge feedstocks
See documentation at pangeo-forge-runner.readthedocs.io for more information.
Based on pangeo-forge/pangeo-forge-orchestrator#115 (comment), and pangeo-forge/pangeo-forge-orchestrator#115 (comment) I think we need to split this project into two.
This should be responsible for:
Most importantly, there should be no arbitrary code execution here. So it can read meta.yaml
(carefully hehe) but not exec
any .py
files. This is what the orchestrator will call.
It will also not have any ties into the version of pangeo-forge-recipes needed for use by the appropriate feedstock.
This should be responsible for actually executing arbitrary user code (in recipe.py
file). This will be run in the environment created by part 1, and can be tied to a specific version of pangeo-forge-recipes. This part will be a separate python package, and should be installed in the environment created for it by part 1.
LocalDirectBakery
currently specifies "multiprocessing"
as the Beam direct_running_mode
:
However, Beam expects a value of "multi_processing"
:
parser.add_argument(
'--direct_running_mode',
default='in_memory',
choices=['in_memory', 'multi_threading', 'multi_processing'],
help='Workers running environment.')
elif running_mode == 'multi_processing':
command_string = '%s -m apache_beam.runners.worker.sdk_worker_main' \
% sys.executable
self._default_environment = (
environments.SubprocessSDKEnvironment.from_command_string(
command_string=command_string))
I'll create a PR with the typo fix.
@cisaacstern and I just debugged a jobsubmission over here. It turns out that the job would fail if the job writes to a non-persistent bucket:
The bucket in question was gs://leap-scratch
(set up by 2i2c for the leap-stc org) and the error was:
Workflow failed. Causes: Unable to create directory: gs://leap-scratch/data-library/temp/gh-leap-stc-data-management-da1b838-1683838917.1683838945.611305/dax-tmp-2023-05-11_14_02_30-7266315841491564283-S05-0-dcd150a5967231a.
Changing both temp_storage_location and the cache location to the persistent bucket fixed this issue.
Weird things
As we do for unit testing, we should parametrize the dataflow integration test with beam-refactor
as well, so that integrations tests ensure compatibility with that branch.
This is motivated by the discussion about potentially using pangeo-forge-runner
as a CLI for validating recipes during development, prior to staged-recipes
contribution.
Looking through the code, a temporary directory is created to contain the specified repo, which is then fetched by one of the content providers:
# Create a temporary directory where we fetch the feedstock repo and perform all operations
# FIXME: Support running this on an already existing repository, so users can run it
# as they develop their feedstock
with tempfile.TemporaryDirectory() as d:
self.fetch(d)
The local content provider is first in the list of providers used during fetching:
# Content providers from repo2docker are *solely* used to check out a repo
# and get their contents locally, so we can work on them.
content_providers = List(
None,
[
contentproviders.Local,
contentproviders.Zenodo,
contentproviders.Figshare,
contentproviders.Dataverse,
contentproviders.Hydroshare,
contentproviders.Swhid,
contentproviders.Mercurial,
contentproviders.Git,
],
...
for ContentProvider in self.content_providers:
cp = ContentProvider()
spec = cp.detect(self.repo, ref=self.ref)
if spec is not None:
picked_content_provider = cp
...
for log_line in picked_content_provider.fetch(
spec, target_path, yield_output=True
):
self.log.info(log_line, extra=dict(status="fetching"))
However, contentproviders.Local.fetch()
expects output_dir == spec["path"]
:
def fetch(self, spec, output_dir, yield_output=False):
# nothing to be done if your content is already in the output directory
msg = f'Local content provider assumes {spec["path"]} == {output_dir}'
assert output_dir == spec["path"], msg
yield f'Using local repo {spec["path"]}.\n'
As output_dir
will be the Bake
temp dir, the local repo can't be used.
I was originally thinking that perhaps a quick check to see if self.repo
exists prior to creating the temp dir might work. However, reading the following discussions suggests that fetching may be performed separately in future, so I'm not sure what's the best approach or if this issue will still be relevant:
Currently, the Bake
command requires MetadataCacheStorage
in the associated config. E.g.:
pangeo-forge-orchestrator
: bakery_config.MetadataCacheStorage.root_path = (
bakery_config.MetadataCacheStorage.root_path.format(subpath=subpath)
)
pangeo-forge-runner/tests/test_bake.py
Line 44 in 36380ff
If a config is used that doesn't require a metadata cache (e.g. the CCMP recipe doesn't require it, and appeared to run faster without it, at least locally), it fails as follows:
In [41]: bconfig
Out[41]:
{'Bake': {'bakery_class': 'pangeo_forge_runner.bakery.local.LocalDirectBakery',
'recipe_id': 'eooffshore_ics_ccmp_v02_1_nrt_wind',
'feedstock_subdir': 'recipes/eooffshore_ics_ccmp_v02_1_nrt_wind',
'repo': 'https://github.com/eooffshore/staged-recipes',
'ref': '663f30c95c406b9efe012b9bae66fa1f386b539b',
'job_name': 'CCMP'},
'LocalDirectBakery': {'num_workers': 1},
'TargetStorage': {'fsspec_class': 'fsspec.implementations.local.LocalFileSystem',
'root_path': './ccmp.zarr'},
'InputCacheStorage': {'fsspec_class': 'fsspec.implementations.local.LocalFileSystem',
'root_path': './input-cache/'},
'prune': True}
In [42]: Bake(config=Config(bconfig)).start()
[Bake] Target Storage is FSSpecTarget(LocalFileSystem(, root_path="./ccmp.zarr")
[Bake] Input Cache Storage is CacheFSSpecTarget(LocalFileSystem(, root_path="./input-cache/")
[Bake] Metadata Cache Storage is MetadataTarget(AbstractFileSystem(, root_path="")
[Bake] Picked Git content provider.
[Bake] Cloning into '/tmp/tmplbp6ohs8'...
[Bake] HEAD is now at 663f30c Added datetime import to ics_wind_speed_direction()
[Bake] Parsing recipes...
[Bake] Baking only recipe_id='eooffshore_ics_ccmp_v02_1_nrt_wind'
---------------------------------------------------------------------------
NotImplementedError Traceback (most recent call last)
Input In [42], in <cell line: 1>()
----> 1 Bake(config=Config(bconfig)).start()
File ~/gitrepos/pangeo-forge/pangeo-forge-runner/pangeo_forge_runner/commands/bake.py:145, in Bake.start(self)
139 else:
140 job_name = self.job_name
142 recipe.storage_config = StorageConfig(
143 target_storage.get_forge_target(job_name=job_name),
144 input_cache_storage.get_forge_target(job_name=job_name),
--> 145 metadata_cache_storage.get_forge_target(job_name=job_name),
146 )
148 pipeline_options = bakery.get_pipeline_options(
149 job_name=job_name,
150 # FIXME: Bring this in from meta.yaml?
151 container_image=self.container_image,
152 )
154 # Set argv explicitly to empty so Apache Beam doesn't try to parse the commandline
155 # for pipeline options - we have traitlets doing that for us.
File ~/gitrepos/pangeo-forge/pangeo-forge-runner/pangeo_forge_runner/storage.py:54, in StorageTargetConfig.get_forge_target(self, job_name)
48 def get_forge_target(self, job_name: str):
49 """
50 Return correct pangeo-forge-recipes Target
51
52 If {job_name} is present in `root_path`, it is expanded with the given job_name
53 """
---> 54 return self.pangeo_forge_target_class(
55 self.fsspec_class(**self.fsspec_args),
56 root_path=self.root_path.format(job_id=job_name),
57 )
File <string>:5, in __init__(self, fs, root_path)
File /data/anaconda/anaconda3/envs/forgerunner/lib/python3.9/site-packages/pangeo_forge_recipes/storage.py:130, in FSSpecTarget.__post_init__(self)
129 def __post_init__(self):
--> 130 if not self.fs.isdir(self.root_path):
131 self.fs.mkdir(self.root_path)
File /data/anaconda/anaconda3/envs/forgerunner/lib/python3.9/site-packages/fsspec/spec.py:641, in AbstractFileSystem.isdir(self, path)
639 """Is this entry directory-like?"""
640 try:
--> 641 return self.info(path)["type"] == "directory"
642 except IOError:
643 return False
File /data/anaconda/anaconda3/envs/forgerunner/lib/python3.9/site-packages/fsspec/spec.py:601, in AbstractFileSystem.info(self, path, **kwargs)
584 """Give details of entry at path
585
586 Returns a single dictionary, with exactly the same information as ``ls``
(...)
598 directory, or something else) and other FS-specific keys.
599 """
600 path = self._strip_protocol(path)
--> 601 out = self.ls(self._parent(path), detail=True, **kwargs)
602 out = [o for o in out if o["name"].rstrip("/") == path]
603 if out:
File /data/anaconda/anaconda3/envs/forgerunner/lib/python3.9/site-packages/fsspec/spec.py:339, in AbstractFileSystem.ls(self, path, detail, **kwargs)
300 def ls(self, path, detail=True, **kwargs):
301 """List objects at path.
302
303 This should include subdirectories and files at that location. The
(...)
337 dicts if detail is True.
338 """
--> 339 raise NotImplementedError
NotImplementedError:
Here's where it's used in Bake
:
Is a metadata cache always required, or can it be configurable e.g. via the recipe or meta.yaml
?
The word Runner
is already used by Beam for a very specific concept, and we shouldn't overload it by using it here again. It's early enough that this project can be renamed, so we should try make that happen!
Ideally, we would keep the name of the project, the python package and the commandline executable the same. And if we can keep it within the forge
theme it'd be great! This package performs actions (expand-meta, bake, eventually get logs, check-for-completeness, etc) - so 'something that does something in a forge' would be awesome.
Current suggestions are:
pangeo-forge-smith
pangeo-smith
pangeo-forge-cli
, with the commandline tool being just pangeo-forge
(similar to awscli
commandline tool being aws
)Makes this more configurable + overrideable
Recent experience with failed Dataflow deployments reveal the importance of a Dataflow integration test. I can work on one this week.
I am curious if it is possible to pass Pipeline Options to pangeo-forge-runner directly?
Specifically I want to limit the number of max workers with max_num_workers
Reposting @sharkinsspatial's comment from https://github.com/pangeo-forge/registrar/issues/53#issuecomment-1232274752 here, since I believe this will be the best place to discuss this topic going forward:
@cisaacstern I had put together an initial POC with option 2 from above, but based on our discussions the other day I'm going to take a slightly different track that is more aligned with the
pangeo-forge-runner
approach. Next step include
- A PR for
pangeo-forge-runner
which extends https://github.com/yuvipanda/pangeo-forge-runner/blob/main/pangeo_forge_runner/bakery/dataflow.py to allow requests against a bakery with ajob_id
and a logs filter pattern and returns a log object.- Extend
pangeo-forge-orchestrator
with an endpoint which wraps this call.A few concerns I have with this approach
- The potentially large response body depending upon the filter pattern which is applied.
- The potential for logs to leak secrets and the lack of security around the
pangeo-forge-orchestrator
endpoints.@yuvipanda Does this approach seem structurally ok from the
pangeo-forge-runner
perspective or should I build out some alternative classes rather than including this directly in thebakery
dataflow
subclass?
Runner currently expects/uses a job_name
parameter, e.g. as specified by the orchestrator. If the corresponding run config storage root_path
instances contain a job_name
placeholder, they'll be expanded to use the specified job_name
. However, although job_name
is typically used everywhere, this operation tries to expand a job_id
placeholder:
pangeo-forge-runner/pangeo_forge_runner/storage.py
Lines 52 to 57 in 480d238
job_id
should probably be renamed to job_name
for consistency. I can create a PR with a test for this.
@sharkinsspatial previous lead the effort to create https://github.com/pangeo-forge/dataflow-status-monitoring, which once deployed sends a webhook to a pre-configured url as a notification of dataflow job completion. (And I should add, it works great!)
As I've been working on pangeo-forge/pangeo-forge-orchestrator#80, I realize a limitation of this approach (at least as it's currently implemented), which is that the webhook url is hardcoded. In developing the GitHub App, a key requirement is the need to be very flexible regarding deployment url, because the deployment scenario is a moving target (could be proxying to localhost, could be an ephemeral review app, etc.), so the hardcoded url will not work.
I've chosen to open this issue here, rather than on https://github.com/pangeo-forge/dataflow-status-monitoring, in hopes that we can have a broader conversation about the best way to do this, without the assumption that any particular infrastructure (such as in https://github.com/pangeo-forge/dataflow-status-monitoring) is necessarily required. Certainly, if there's a way to ascertain when a Dataflow job has completed without standing up additional infrastructure to do so, that'd be great.
If I remember correctly, @yuvipanda previously floated the idea of polling dataflow from within the FastAPI application. Perhaps we could leverage pangeo-forge-runner
to do this somehow? Just putting up this issue while I think of it, as a place to continue the discussion.
Currently we use https://github.com/pangeo-forge/gpcp-feedstock/ for our unit tests, run with --prune
. @rabernat suggested this might not be the best idea, and we should try something else. Opening this to track if we need to replace it, and if so, with what.
@jbusecke recently saw some silent failures deploying to dataflow. The following logs cued me into the fact that the config.json
did not exist:
Target Storage is FSSpecTarget(AbstractFileSystem(, root_path="")
Input Cache Storage is CacheFSSpecTarget(AbstractFileSystem(, root_path="")
Metadata Cache Storage is MetadataTarget(AbstractFileSystem(, root_path="")
but this was definitely non-obvious. Should we raise an error (or at least a warning) if the config file is missing?
We wanna run on AWS! Specifically, I'm going to try demo this running on AWS at a NASA meeting on September 25, so that's the use case :)
On GCP, we have managed runner with DataFlow, and this solves our problem. nbd.
On AWS, I investigated possible managed beam runner options. The two were:
(1) is only kinda semi-managed - we'll still need to run some additional unmanaged infrastructure to run the Beam jobserver, and that sucks.
(2) is a bit of a mystery, from the inscrutable name (is it only meant for use with Kinesis?) to the complete lack of information on the internet about running beam pipelines with Python on top of this. From what I can gather, it can only run Java pipelines, and there isn't space for the portable runner that'll allow us to run Python.
The other option is to run Apache Flink on Kubernetes, with one of these two Flink operators (https://github.com/apache/flink-kubernetes-operator or https://github.com/GoogleCloudPlatform/flink-on-k8s-operator).
A few questions we need to answer before choosing:
@yuvipanda, after #12 goes in, can we release 0.4 to get the subdir feature on pip?
As noted in pangeo-forge/pangeo-forge-orchestrator#132, the lack of a "status" field in
causes problems for orchestrator
error handling. (This omission was my oversight in #14.)
We should add {"status": "running"}
here. And maybe a test/check to make sure log lines always have a "status"?
https://github.com/pangeo-forge/staged-recipes is a special-case feedstock repo which breaks this assumption
For recipes in staged-recipes, meta.yaml and the recipe module are organized as described in item 3 of the list here: https://pangeo-forge.readthedocs.io/en/latest/pangeo_forge_cloud/recipe_contribution.html#making-a-pr
I'm not sure the best way to support this special case, but we definitely do want to support running recipes from staged-recipes PRs, so that we can run tests of these recipes before merging the PRs.
Add a command that when given a repo, will launch the beam pipeline in a configured dataflow pipeline
Preprocessor functions in feedstocks can cause serialization issues, e.g. pangeo-forge/staged-recipes#183 (comment).
We should add testing against a feedstock that uses a preprocessor function, to catch these issues early.
To do this, I suggest we create a branch on our test feedstock https://github.com/pforgetest/gpcp-from-gcs-feedstock that implements a basic (even no-op) preprocessor, and then parametrize test_bake
with that branch here:
pangeo-forge-runner/tests/test_bake.py
Lines 63 to 65 in 36380ff
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.