Coder Social home page Coder Social logo

jobmon's Introduction

JobMon

Table of Contents

Introduction

JobMon is a Python package developed by IHME's Scientific Computing team, designed to simplify and standardize the process of job monitoring and workflow management in computational projects. It facilitates the tracking of job statuses, manages dependencies, and streamlines the execution of complex workflows across various computing environments.

Description

The tool aims to enhance productivity and ensure computational tasks are efficiently managed and executed, offering a robust solution for handling large-scale, data-driven analyses in research and development projects.

Features

  • Workflow Management: Easily define and manage workflows with multiple interdependent tasks.
  • Status Tracking: Real-time tracking of job statuses to monitor the progress of computational tasks.
  • Error Handling: Automatically detect and report errors in jobs, supporting swift resolution and rerun capabilities.
  • Compatibility: Designed to work seamlessly across different computing environments, including HPC clusters and cloud platforms.

Installation

To install JobMon, use the following pip command:

pip install jobmon_client[server]

Usage

Refer to the quickstart to get started with a sample workflow

Requirements

  • Python 3.8+

Documentation

For comprehensive documentation, visit readthedocs.

Contributing

We encourage contributions from the community. If you're interested in improving JobMon or adding new features, please refer to our developer guide for python client contributions or the GUI README.md for visualization contributions.

Branching Strategy

This project utilizes a branching strategy that emphasizes release branches and semantic versioning, facilitating orderly development, feature addition, bug fixes, and updates.

Overview

  • Main Branch: The main branch maintains the latest stable release of the project. It represents the culmination of all development efforts into a stable version ready for production use.
  • Release Branches: These branches, named according to semantic versioning as release/X.Y, host development for upcoming minor or major releases. When starting a new major or minor version, it is branched off from the latest stable version in main.
  • Feature and Bug Fix Branches: Development of new features and bug fixes happens in branches derived from the appropriate release/X.Y branches. Once development is complete, reviewed, and tested, these changes are merged back into their respective release/X.Y branch.

Semantic Versioning

We adopt semantic versioning for organizing our releases:

  • Major Version (X): Incremented for significant changes or incompatible API modifications.
  • Minor Version (Y): Incremented for backward-compatible enhancements.
  • Patch Version (Z): Incremented for backward-compatible bug fixes.

Development and Release Process

  1. Starting New Features and Fixes: Branch off from the corresponding release/X.Y branch for developing new features or addressing bugs. Ensure your branch name clearly reflects the purpose of the changes.
  2. Applying Bug Fixes: If a bug fix applies to a release branch that has diverged from main, it should first be applied to the most current release branch where relevant, before merging into the specific release/X.Y branch.
  3. Pull Request (PR): Submit a PR against the release/X.Y branch from which you branched out. The PR must summarize the changes and include any pertinent information for the reviewers.
  4. Creating Tags and Merging to Main:
    • Upon completion of a release cycle, a version tag following the X.Y.Z format is created for the release/X.Y branch.
    • This tagged release/X.Y branch is then merged into main, signifying the release of a new version.

Changelog

For a detailed history of changes and version updates, please refer to the CHANGELOG.md file within this repository.

License

This project is licensed under the JobMon Non-commercial License, developed at the Institute for Health Metrics and Evaluation (IHME), University of Washington. The license allows for redistribution and use in source and binary forms, with or without modification, under the conditions that:

  • The software is used solely for non-commercial purposes. Commercial use, including indirect commercial use such as content on a website that accepts advertising money, is not permitted under this license. However, use by a for-profit company in its research is considered non-commercial use.
  • All redistributions of source code must retain the copyright notice, this list of conditions, and the following disclaimer.
  • Redistributions in binary form must reproduce the copyright notice, this list of conditions, and the following disclaimer in the documentation and/or other materials provided with the distribution.
  • Neither the name of the University of Washington nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.

For commercial use rights, contact the University of Washington, CoMotion, at [email protected] or call 206-543-3970, and inquire about this project.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

For a full copy of the license, see the LICENSE file in this repository.

jobmon's People

Contributors

everdyke avatar davidshaw-uw avatar mlsandar avatar wcbzero avatar limingxuihme avatar kevinoh47 avatar k-simpson avatar collijk avatar geoffreyphipps-uw avatar

Stargazers

 avatar  avatar Matthew Kappel avatar

Watchers

Leonardo Gonzalez avatar  avatar  avatar  avatar

jobmon's Issues

Handle CLI resume if a downstream node is marked D

A slight edge case: the self-service CLI command update_task_status allows a user to set an arbitrary task to D or G state. In the event a task is set to D, i.e. marking "don't run", if that task has an upstream dependency and the user invokes a CLI resume the resume will fail with a trace like:

  File "/mnt/share/homes/dhs2018/repos/OneMod/src/onemod/main.py", line 164, in resume_pipeline
    resume_workflow_from_id(
  File "/ihme/homes/dhs2018/miniconda3/envs/onemod/lib/python3.11/site-packages/jobmon/client/status_commands.py", line 638, in resume_workflow_from_id
    swarm.from_workflow_id(workflow_id)
  File "/ihme/homes/dhs2018/miniconda3/envs/onemod/lib/python3.11/site-packages/jobmon/client/swarm/workflow_run.py", line 242, in from_workflow_id
    self.set_downstreams_from_db(chunk_size=edge_chunk_size)
  File "/ihme/homes/dhs2018/miniconda3/envs/onemod/lib/python3.11/site-packages/jobmon/client/swarm/workflow_run.py", line 431, in set_downstreams_from_db
    downstream_task_id = task_node_id_map[downstream_node_id]
                         ~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^
KeyError: 77687100

The reason is that the CLI resume command will re-construct a given DAG from nodes that are not in D state for efficiency, working under the assumption that a task in D state can have no upstreams. Under normal operation this is correct, since a task can only be marked D once its upstreams are D as well, but the self service CLI breaks this paradigm.

This is not a problem for workflow_args resume (i.e. build the same DAG with the same workflow args), since the entire DAG is built instead of a partial one.

Handle this edge case in the swarm from_workflow_id method.

Allow `create_tasks` to return single task in case no node_args are provided

# Create tasks on the array
tasks = array.create_tasks(
upstream_tasks=upstream_tasks,
max_attempts=max_attempts,
resource_scales=resource_scales,
**node_args,
)
return tasks

A valid use case of task template is to create a single task with no parallelization, therefore node_args aren't strictly necessary. However, if TaskTemplate.create_tasks is called without node_args provided, an empty list is returned. This method should return a length 1 list instead.

Drop OperationalError handler when database infrastructure moves to cloud

When the Jobmon database moves from on-premise to cloud, we will no longer need replication and therefore not need to account for drops in service when the virtual IP changes.

The following handler can be deleted then:

from MySQLdb import OperationalError

@app.errorhandler(OperationalError)
def handle_mysql_gone_away(error: OperationalError) -> Any:
if "2013, 'Lost connection to MySQL server during query'" in str(error):
engine = SessionLocal().get_bind()
# A new connection pool is created immediately after the old one has been disposed
engine.dispose()

This would remove a dependency on mysqlclient, a package that is not pip installable that complicates the build processes.

Make setting resources from yaml more flexible

When passing in a filepath to Tool.get_task_template, Jobmon is forcing the yaml path to have task template resources and resource scales. These fields should be optional - if found, update the task template resources else default to the tool level resources.

Wrap asyncio.read() in a wait_for coroutine

In certain applications that invoke multiprocessing at the worker node level, OOM kill events can cause asyncio.streamreader.read() to hang indefinitely. We should be able to wrap this read call in asyncio.wait_for() to forcibly timeout stale connections.

The current theory is that if the awaited process is deadlocking, the stream will deadlock indefinitely and thus prevent heartbeats from executing.

For super extra brownie points and a resume boost, think about contributing to cpython instead: add a timeout parameter to def read
python/cpython#67425
https://github.com/python/cpython/blob/3e56ff0/Lib/asyncio/streams.py#L587-L635C20

DAG Circularity Validations

Presently there are no validations which assert that a set of tasks in a workflow are not circular.

Either at workflow bind-time or at launch-time, Jobmon should validate that the tasks provided are in fact acyclic.

Python version: 3.10.14

Jobmon versions:

# Name                    Version                   Build  Channel
jobmon                    3.2.4                    pypi_0    pypi
jobmon-core               3.2.5                    pypi_0    pypi
jobmon-installer-ihme     10.6.5                   pypi_0    pypi
jobmon-slurm              1.5.1                    pypi_0    pypi

Repro

import random

import pytest
from jobmon.client.task import Task
from jobmon.client.tool import Tool
from jobmon.client.workflow import Workflow

TOOL = Tool("dag-cycle-test")

"""Functions to create tasks and workflows for testing."""


def create_task(task_number: int, compute_resources: dict) -> Task:
    return TOOL.get_task_template(
        template_name="dummy_task",
        command_template="sleep {task_number}",
        node_args=["task_number"],
        op_args=[],
        task_args=[],
    ).create_task(
        name=f"task_{task_number}",
        compute_resources=compute_resources,
        task_number=task_number,
    )


def create_tasks(num_tasks: int) -> list[Task]:
    errors_dir, output_dir = cluster_tools.get_logs_dirs()

    return [
        create_task(
            task_number=i,
            compute_resources={
                "memory": "1G",
                "cores": 1,
                "runtime": "1m",
                "queue": "all.q",
                "project": "proj_fhseng",
                "stderr": "/tmp/errors",
                "stdout": "/tmp/output",
            },
        )
        for i in range(num_tasks)
    ]


def create_workflow() -> Workflow:
    return TOOL.create_workflow(
        name="test_workflow",
        default_cluster_name="slurm",
        workflow_args=f"dummy_wf_{random.randint(0, 100_000_000)}",
    )


"""Test functions."""


def set_and_verify_dependencies(t0: Task, t1: Task, t2: Task) -> None:
    """Create a cyclic set of dependencies between the tasks and verify that the cycle was
    created.
    """
    # Create a cycle:
    # t0 -> t1 -> t2
    #  ^----------'
    t0.add_downstream(t1)
    t1.add_downstream(t2)
    t2.add_downstream(t0)

    # t2 <- t0 -> t1
    assert t0.upstream_tasks == {t2}
    assert t0.downstream_tasks == {t1}

    # t0 <- t1 -> t2
    assert t1.upstream_tasks == {t0}
    assert t1.downstream_tasks == {t2}

    # t1 <- t2 -> t0
    assert t2.upstream_tasks == {t1}
    assert t2.downstream_tasks == {t0}


@pytest.mark.unit
def test_create_cycle_before_adding_tasks_to_workflow_then_bind() -> None:
    """Exercise a cycle created in the DAG **before** adding the tasks to the workflow.

    Expect workflow.bind() to raise an exception.
    """
    # Create 3 tasks and the workflow
    t0, t1, t2 = create_tasks(3)
    wf = create_workflow()

    # Set the dependencies between the tasks
    set_and_verify_dependencies(t0=t0, t1=t1, t2=t2)

    # Add the tasks to the workflow
    wf.add_tasks([t0, t1, t2])

    # Call bind, expect it to raise an Exception
    with pytest.raises(Exception):
        wf.bind()


@pytest.mark.unit
def test_create_cycle_after_adding_tasks_to_workflow_then_bind() -> None:
    """Exercise a cycle created in the DAG **after** adding the tasks to the workflow.

    Expect workflow.bind() to raise an exception.
    """
    # Create 3 tasks and the workflow
    t0, t1, t2 = create_tasks(3)
    wf = create_workflow()

    # Add the tasks to the workflow
    wf.add_tasks([t0, t1, t2])

    # Set the dependencies between the tasks
    set_and_verify_dependencies(t0=t0, t1=t1, t2=t2)

    # Call bind, expect it to raise an Exception
    with pytest.raises(Exception):
        wf.bind()


@pytest.mark.unit
def test_create_cycle_before_adding_tasks_to_workflow_then_run() -> None:
    """Exercise a cycle created in the DAG **before** adding the tasks to the workflow.

    Expect workflow.run() to raise an exception.
    """
    # Create 3 tasks and the workflow
    t0, t1, t2 = create_tasks(3)
    wf = create_workflow()

    # Set the dependencies between the tasks
    set_and_verify_dependencies(t0=t0, t1=t1, t2=t2)

    # Add the tasks to the workflow
    wf.add_tasks([t0, t1, t2])

    # Call run, expect it to raise an Exception
    with pytest.raises(Exception):
        wf.run()


@pytest.mark.unit
def test_create_cycle_after_adding_tasks_to_workflow_then_run() -> None:
    """Exercise a cycle created in the DAG **after** adding the tasks to the workflow.

    Expect workflow.run() to raise an exception.
    """
    # Create 3 tasks and the workflow
    t0, t1, t2 = create_tasks(3)
    wf = create_workflow()

    # Add the tasks to the workflow & call bind to validate
    wf.add_tasks([t0, t1, t2])

    # Set the dependencies between the tasks
    set_and_verify_dependencies(t0=t0, t1=t1, t2=t2)

    # Call run, expect it to raise an Exception
    with pytest.raises(Exception):
        wf.run()

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.