Coder Social home page Coder Social logo

pangeo-forge-runner's Introduction

pangeo-forge-runner's People

Contributors

cisaacstern avatar derekocallaghan avatar pre-commit-ci[bot] avatar yuvipanda avatar

Stargazers

 avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

pangeo-forge-runner's Issues

Split this project into two

Based on pangeo-forge/pangeo-forge-orchestrator#115 (comment), and pangeo-forge/pangeo-forge-orchestrator#115 (comment) I think we need to split this project into two.

Part 1

This should be responsible for:

  1. Fetching the appropriate feedstock from whevever (GitHub, Zenodo, etc) onto the local filesystem
  2. Creating an appropriate environment for the recipe to be parsed and to run. This can be via conda or via docker, must be pluggable

Most importantly, there should be no arbitrary code execution here. So it can read meta.yaml (carefully hehe) but not exec any .py files. This is what the orchestrator will call.

It will also not have any ties into the version of pangeo-forge-recipes needed for use by the appropriate feedstock.

Part 2

This should be responsible for actually executing arbitrary user code (in recipe.py file). This will be run in the environment created by part 1, and can be tied to a specific version of pangeo-forge-recipes. This part will be a separate python package, and should be installed in the environment created for it by part 1.

Open questions

  • Orchestrator (and end user) talks to part 1 directly, how will part 1 talk to part 2? My current suggestion is json over stdout from part 2 -> part 1, and traitlets config from part 1 -> part 2.
  • Creating environments will be a messy and difficult task to do right, particularly because we want dataflow / flink to run in the same custom environment as what we have for parsing. This is doable with docker (but requires pushing to a registry), but how do we do that for mamba / conda? This is going to end up being pretty complicated IMO
  • Part 2 becomes responsible for actually submitting the job to beam, so will need access to credentials for both storage as well as the bakery.

Beam "multi_processing" direct_running_mode typo in LocalDirectBakery

LocalDirectBakery currently specifies "multiprocessing" as the Beam direct_running_mode:

direct_running_mode="multiprocessing",

However, Beam expects a value of "multi_processing":

https://github.com/apache/beam/blob/1ec1945ec5c1d29dbc5efe574712733922ced07d/sdks/python/apache_beam/options/pipeline_options.py#L634

    parser.add_argument(
        '--direct_running_mode',
        default='in_memory',
        choices=['in_memory', 'multi_threading', 'multi_processing'],
        help='Workers running environment.')

https://github.com/apache/beam/blob/4788b07601b3bb2a7805ff844d1e78c14938dd83/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L180

    elif running_mode == 'multi_processing':
      command_string = '%s -m apache_beam.runners.worker.sdk_worker_main' \
                    % sys.executable
      self._default_environment = (
          environments.SubprocessSDKEnvironment.from_command_string(
              command_string=command_string))

I'll create a PR with the typo fix.

Dataflow jobs failing when temp_storage_location is on bucket with non-persistent retention

@cisaacstern and I just debugged a jobsubmission over here. It turns out that the job would fail if the job writes to a non-persistent bucket:

The bucket in question was gs://leap-scratch (set up by 2i2c for the leap-stc org) and the error was:

Workflow failed. Causes: Unable to create directory: gs://leap-scratch/data-library/temp/gh-leap-stc-data-management-da1b838-1683838917.1683838945.611305/dax-tmp-2023-05-11_14_02_30-7266315841491564283-S05-0-dcd150a5967231a.

Changing both temp_storage_location and the cache location to the persistent bucket fixed this issue.

Weird things ๐Ÿคช

  • The service account used to deploy the job from gh actions is the same one used on the workers, yet the deployment writes successfully to the bucket, but the runtime write fails.
  • @cisaacstern said that in CI testing of pgf runner they use a non-persistent bucket for the temp_storage_location without issues -> Is something specific in the retention policies messing things up?

Add support for running local recipes during development

This is motivated by the discussion about potentially using pangeo-forge-runner as a CLI for validating recipes during development, prior to staged-recipes contribution.

Looking through the code, a temporary directory is created to contain the specified repo, which is then fetched by one of the content providers:

# Create a temporary directory where we fetch the feedstock repo and perform all operations

        # Create a temporary directory where we fetch the feedstock repo and perform all operations
        # FIXME: Support running this on an already existing repository, so users can run it
        # as they develop their feedstock
        with tempfile.TemporaryDirectory() as d:
            self.fetch(d)

The local content provider is first in the list of providers used during fetching:

# Content providers from repo2docker are *solely* used to check out a repo

    # Content providers from repo2docker are *solely* used to check out a repo
    # and get their contents locally, so we can work on them.
    content_providers = List(
        None,
        [
            contentproviders.Local,
            contentproviders.Zenodo,
            contentproviders.Figshare,
            contentproviders.Dataverse,
            contentproviders.Hydroshare,
            contentproviders.Swhid,
            contentproviders.Mercurial,
            contentproviders.Git,
        ],

...

        for ContentProvider in self.content_providers:
            cp = ContentProvider()
            spec = cp.detect(self.repo, ref=self.ref)
            if spec is not None:
                picked_content_provider = cp

...

        for log_line in picked_content_provider.fetch(
            spec, target_path, yield_output=True
        ):
            self.log.info(log_line, extra=dict(status="fetching"))

However, contentproviders.Local.fetch() expects output_dir == spec["path"]:

    def fetch(self, spec, output_dir, yield_output=False):
        # nothing to be done if your content is already in the output directory
        msg = f'Local content provider assumes {spec["path"]} == {output_dir}'
        assert output_dir == spec["path"], msg
        yield f'Using local repo {spec["path"]}.\n'

As output_dir will be the Bake temp dir, the local repo can't be used.

I was originally thinking that perhaps a quick check to see if self.repo exists prior to creating the temp dir might work. However, reading the following discussions suggests that fetching may be performed separately in future, so I'm not sure what's the best approach or if this issue will still be relevant:

`Bake` fails without `MetadataCacheStorage` in config

Currently, the Bake command requires MetadataCacheStorage in the associated config. E.g.:

    bakery_config.MetadataCacheStorage.root_path = (
        bakery_config.MetadataCacheStorage.root_path.format(subpath=subpath)
    )

If a config is used that doesn't require a metadata cache (e.g. the CCMP recipe doesn't require it, and appeared to run faster without it, at least locally), it fails as follows:

In [41]: bconfig
Out[41]: 
{'Bake': {'bakery_class': 'pangeo_forge_runner.bakery.local.LocalDirectBakery',
  'recipe_id': 'eooffshore_ics_ccmp_v02_1_nrt_wind',
  'feedstock_subdir': 'recipes/eooffshore_ics_ccmp_v02_1_nrt_wind',
  'repo': 'https://github.com/eooffshore/staged-recipes',
  'ref': '663f30c95c406b9efe012b9bae66fa1f386b539b',
  'job_name': 'CCMP'},
 'LocalDirectBakery': {'num_workers': 1},
 'TargetStorage': {'fsspec_class': 'fsspec.implementations.local.LocalFileSystem',
  'root_path': './ccmp.zarr'},
 'InputCacheStorage': {'fsspec_class': 'fsspec.implementations.local.LocalFileSystem',
  'root_path': './input-cache/'},
 'prune': True}

In [42]: Bake(config=Config(bconfig)).start()
[Bake] Target Storage is FSSpecTarget(LocalFileSystem(, root_path="./ccmp.zarr")

[Bake] Input Cache Storage is CacheFSSpecTarget(LocalFileSystem(, root_path="./input-cache/")

[Bake] Metadata Cache Storage is MetadataTarget(AbstractFileSystem(, root_path="")

[Bake] Picked Git content provider.

[Bake] Cloning into '/tmp/tmplbp6ohs8'...

[Bake] HEAD is now at 663f30c Added datetime import to ics_wind_speed_direction()

[Bake] Parsing recipes...
[Bake] Baking only recipe_id='eooffshore_ics_ccmp_v02_1_nrt_wind'
---------------------------------------------------------------------------
NotImplementedError                       Traceback (most recent call last)
Input In [42], in <cell line: 1>()
----> 1 Bake(config=Config(bconfig)).start()

File ~/gitrepos/pangeo-forge/pangeo-forge-runner/pangeo_forge_runner/commands/bake.py:145, in Bake.start(self)
    139 else:
    140     job_name = self.job_name
    142 recipe.storage_config = StorageConfig(
    143     target_storage.get_forge_target(job_name=job_name),
    144     input_cache_storage.get_forge_target(job_name=job_name),
--> 145     metadata_cache_storage.get_forge_target(job_name=job_name),
    146 )
    148 pipeline_options = bakery.get_pipeline_options(
    149     job_name=job_name,
    150     # FIXME: Bring this in from meta.yaml?
    151     container_image=self.container_image,
    152 )
    154 # Set argv explicitly to empty so Apache Beam doesn't try to parse the commandline
    155 # for pipeline options - we have traitlets doing that for us.

File ~/gitrepos/pangeo-forge/pangeo-forge-runner/pangeo_forge_runner/storage.py:54, in StorageTargetConfig.get_forge_target(self, job_name)
     48 def get_forge_target(self, job_name: str):
     49     """
     50     Return correct pangeo-forge-recipes Target
     51 
     52     If {job_name} is present in `root_path`, it is expanded with the given job_name
     53     """
---> 54     return self.pangeo_forge_target_class(
     55         self.fsspec_class(**self.fsspec_args),
     56         root_path=self.root_path.format(job_id=job_name),
     57     )

File <string>:5, in __init__(self, fs, root_path)

File /data/anaconda/anaconda3/envs/forgerunner/lib/python3.9/site-packages/pangeo_forge_recipes/storage.py:130, in FSSpecTarget.__post_init__(self)
    129 def __post_init__(self):
--> 130     if not self.fs.isdir(self.root_path):
    131         self.fs.mkdir(self.root_path)

File /data/anaconda/anaconda3/envs/forgerunner/lib/python3.9/site-packages/fsspec/spec.py:641, in AbstractFileSystem.isdir(self, path)
    639 """Is this entry directory-like?"""
    640 try:
--> 641     return self.info(path)["type"] == "directory"
    642 except IOError:
    643     return False

File /data/anaconda/anaconda3/envs/forgerunner/lib/python3.9/site-packages/fsspec/spec.py:601, in AbstractFileSystem.info(self, path, **kwargs)
    584 """Give details of entry at path
    585 
    586 Returns a single dictionary, with exactly the same information as ``ls``
   (...)
    598 directory, or something else) and other FS-specific keys.
    599 """
    600 path = self._strip_protocol(path)
--> 601 out = self.ls(self._parent(path), detail=True, **kwargs)
    602 out = [o for o in out if o["name"].rstrip("/") == path]
    603 if out:

File /data/anaconda/anaconda3/envs/forgerunner/lib/python3.9/site-packages/fsspec/spec.py:339, in AbstractFileSystem.ls(self, path, detail, **kwargs)
    300 def ls(self, path, detail=True, **kwargs):
    301     """List objects at path.
    302 
    303     This should include subdirectories and files at that location. The
   (...)
    337     dicts if detail is True.
    338     """
--> 339     raise NotImplementedError

NotImplementedError: 

Here's where it's used in Bake:

metadata_cache_storage = MetadataCacheStorage(parent=self)

Is a metadata cache always required, or can it be configurable e.g. via the recipe or meta.yaml?

Rename this project

The word Runner is already used by Beam for a very specific concept, and we shouldn't overload it by using it here again. It's early enough that this project can be renamed, so we should try make that happen!

Ideally, we would keep the name of the project, the python package and the commandline executable the same. And if we can keep it within the forge theme it'd be great! This package performs actions (expand-meta, bake, eventually get logs, check-for-completeness, etc) - so 'something that does something in a forge' would be awesome.

Current suggestions are:

  1. pangeo-forge-smith
  2. pangeo-smith
  3. pangeo-forge-cli, with the commandline tool being just pangeo-forge (similar to awscli commandline tool being aws)
  4. ???

Dataflow integration tests

Recent experience with failed Dataflow deployments reveal the importance of a Dataflow integration test. I can work on one this week.

Log fetching

Reposting @sharkinsspatial's comment from https://github.com/pangeo-forge/registrar/issues/53#issuecomment-1232274752 here, since I believe this will be the best place to discuss this topic going forward:

@cisaacstern I had put together an initial POC with option 2 from above, but based on our discussions the other day I'm going to take a slightly different track that is more aligned with the pangeo-forge-runner approach. Next step include

  1. A PR for pangeo-forge-runner which extends https://github.com/yuvipanda/pangeo-forge-runner/blob/main/pangeo_forge_runner/bakery/dataflow.py to allow requests against a bakery with a job_id and a logs filter pattern and returns a log object.
  2. Extend pangeo-forge-orchestrator with an endpoint which wraps this call.

A few concerns I have with this approach

  1. The potentially large response body depending upon the filter pattern which is applied.
  2. The potential for logs to leak secrets and the lack of security around the pangeo-forge-orchestrator endpoints.

@yuvipanda Does this approach seem structurally ok from the pangeo-forge-runner perspective or should I build out some alternative classes rather than including this directly in the bakery dataflow subclass?

Rename expected storage target `root_path` `job_id` placeholder to `job_name`

Runner currently expects/uses a job_name parameter, e.g. as specified by the orchestrator. If the corresponding run config storage root_path instances contain a job_name placeholder, they'll be expanded to use the specified job_name. However, although job_name is typically used everywhere, this operation tries to expand a job_id placeholder:

If {job_name} is present in `root_path`, it is expanded with the given job_name
"""
return self.pangeo_forge_target_class(
self.fsspec_class(**self.fsspec_args),
root_path=self.root_path.format(job_id=job_name),
)

job_id should probably be renamed to job_name for consistency. I can create a PR with a test for this.

Delete v0.5 release from GitHub Releases?

The v0.5 release is the only one I made, and I realized it's also the only one with an associated GitHub Release

Screen Shot 2022-10-03 at 8 20 14 AM

Should we delete this? Or alternatively, should we make GitHub Releases for the other releases?

How to know when a bake is complete

@sharkinsspatial previous lead the effort to create https://github.com/pangeo-forge/dataflow-status-monitoring, which once deployed sends a webhook to a pre-configured url as a notification of dataflow job completion. (And I should add, it works great!)

As I've been working on pangeo-forge/pangeo-forge-orchestrator#80, I realize a limitation of this approach (at least as it's currently implemented), which is that the webhook url is hardcoded. In developing the GitHub App, a key requirement is the need to be very flexible regarding deployment url, because the deployment scenario is a moving target (could be proxying to localhost, could be an ephemeral review app, etc.), so the hardcoded url will not work.

I've chosen to open this issue here, rather than on https://github.com/pangeo-forge/dataflow-status-monitoring, in hopes that we can have a broader conversation about the best way to do this, without the assumption that any particular infrastructure (such as in https://github.com/pangeo-forge/dataflow-status-monitoring) is necessarily required. Certainly, if there's a way to ascertain when a Dataflow job has completed without standing up additional infrastructure to do so, that'd be great.

If I remember correctly, @yuvipanda previously floated the idea of polling dataflow from within the FastAPI application. Perhaps we could leverage pangeo-forge-runner to do this somehow? Just putting up this issue while I think of it, as a place to continue the discussion.

cc @rabernat @andersy005

Raise error if config file doesn't exist?

@jbusecke recently saw some silent failures deploying to dataflow. The following logs cued me into the fact that the config.json did not exist:

Target Storage is FSSpecTarget(AbstractFileSystem(, root_path="")

Input Cache Storage is CacheFSSpecTarget(AbstractFileSystem(, root_path="")

Metadata Cache Storage is MetadataTarget(AbstractFileSystem(, root_path="")

but this was definitely non-obvious. Should we raise an error (or at least a warning) if the config file is missing?

Support running on AWS

We wanna run on AWS! Specifically, I'm going to try demo this running on AWS at a NASA meeting on September 25, so that's the use case :)

On GCP, we have managed runner with DataFlow, and this solves our problem. nbd.

On AWS, I investigated possible managed beam runner options. The two were:

  1. Spark Runner with AWS EMR
  2. Kinesis Data Analytics with their managed Apache Flink

(1) is only kinda semi-managed - we'll still need to run some additional unmanaged infrastructure to run the Beam jobserver, and that sucks.

(2) is a bit of a mystery, from the inscrutable name (is it only meant for use with Kinesis?) to the complete lack of information on the internet about running beam pipelines with Python on top of this. From what I can gather, it can only run Java pipelines, and there isn't space for the portable runner that'll allow us to run Python.

The other option is to run Apache Flink on Kubernetes, with one of these two Flink operators (https://github.com/apache/flink-kubernetes-operator or https://github.com/GoogleCloudPlatform/flink-on-k8s-operator).

A few questions we need to answer before choosing:

  1. How mature is the Spark Runner? Everyone I see seems to be mostly running on Flink if they aren't using Dataflow. I don't want us to use a runner others aren't using, especially one involving the hadoop / JVM / Spark ecosystem if I can avoid it. This is a huge -1 for the EMR + Spark based runner (along with needing to run our job server somewhere)
  2. What's the deal with Kinesis Data Analytics? How tied is it to Kinesis (which we don't use at all)? Can it actually be configured to run Beam with Python? The docs seem to suggest otherwise.
  3. If using k8s, why are there two operators? Which one do we use?

Support staged-recipes path format

https://github.com/pangeo-forge/staged-recipes is a special-case feedstock repo which breaks this assumption

https://github.com/yuvipanda/pangeo-forge-runner/blob/677ad109348e50d4aa67f1009b5f184ab43beade/pangeo_forge_runner/feedstock.py#L18

For recipes in staged-recipes, meta.yaml and the recipe module are organized as described in item 3 of the list here: https://pangeo-forge.readthedocs.io/en/latest/pangeo_forge_cloud/recipe_contribution.html#making-a-pr

I'm not sure the best way to support this special case, but we definitely do want to support running recipes from staged-recipes PRs, so that we can run tests of these recipes before merging the PRs.

Add testing for feedstocks with preprocessor functions

Preprocessor functions in feedstocks can cause serialization issues, e.g. pangeo-forge/staged-recipes#183 (comment).

We should add testing against a feedstock that uses a preprocessor function, to catch these issues early.

To do this, I suggest we create a branch on our test feedstock https://github.com/pforgetest/gpcp-from-gcs-feedstock that implements a basic (even no-op) preprocessor, and then parametrize test_bake with that branch here:

"https://github.com/pforgetest/gpcp-from-gcs-feedstock.git",
"--ref",
"4f41e02512b2078c8bdb286368a1a9d878b5cec2",

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.