Coder Social home page Coder Social logo

saturn's Introduction

saturn

Saturn is a job scheduling and data processing system developed for web crawling needs at Flare Systems.

Documentation

Installation

Development

Install nox and poetry.

  • To run all tests: nox
  • To format code: nox -rs format

You can also work from the shell with:

$ # Install the project locally.
$ poetry install --all-extras
$ poetry shell
$ # Run the utilities.
$ py.test tests -xsvv
$ mypy src tests

saturn's People

Contributors

isra17 avatar aviau avatar kevgodev avatar mlefebvre avatar leaexplores avatar slaroche avatar infherny avatar jordsti avatar maximegoyette avatar adamlabrash avatar adam-flare avatar

Stargazers

 avatar  avatar  avatar  avatar Alexandre Arpin avatar  avatar

Watchers

Frédéric Morin avatar Olivier Bilodeau avatar  avatar  avatar Mathieu Lavoie avatar  avatar  avatar Félix Leblanc avatar Simon LP avatar  avatar C. d'Eon avatar Francis Masse avatar Maxime Carbonneau avatar jazzrab avatar

saturn's Issues

Sentry error are tagged with unrelated data

There are many instand of sentry error event with tag that seems to be coming from unrelated event.

For eample, an error with a backtrace that clearly shows it comes from pipeline foobar has the pipeline tag set to bizbuz.

MultiItems doesn't merge so well

Current design doesn't deal so well with merging multiple items into one. For example, there is no way for to sub inventories to set hook on the same event.

Only add started actor to the ActorPool

In the ray executor, when we restart an actor we should "ping" it and ensure it's schedulable before adding it to the pool, otherwise we might add unschedulable actor to the pool, have message being executing in that and blocking forever. If that message has locked resources, this lead to the whole workload being blocked.

Set component class based on config

Broker should initialize the broker, work manager based on a loaded configuration file. Right now everything is passed as Broker.__init__ parameters

Coroutine run in different context

While working on tracing, when I stop a worker I get this error during cleanup:

2022-10-26 14:58:43 [error    ] Failed to detach context       [opentelemetry.context]
Traceback (most recent call last):
  File "/Users/israelhalle/Library/Caches/pypoetry/virtualenvs/saturn-engine-WvplgCTT-py3.9/lib/python3.9/site-packages/opentelemetry/trace/__init__.py", line 573, in use_span
    yield span
  File "/Users/israelhalle/Library/Caches/pypoetry/virtualenvs/saturn-engine-WvplgCTT-py3.9/lib/python3.9/site-packages/opentelemetry/sdk/trace/__init__.py", line 1033, in start_as_current_span
    yield span_context
  File "/Users/israelhalle/devel/saturn/src/saturn_engine/worker/services/tracing/tracer.py", line 37, in on_message_executed
    results = yield
GeneratorExit

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/israelhalle/Library/Caches/pypoetry/virtualenvs/saturn-engine-WvplgCTT-py3.9/lib/python3.9/site-packages/opentelemetry/context/__init__.py", line 157, in detach
    _RUNTIME_CONTEXT.detach(token)  # type: ignore
  File "/Users/israelhalle/Library/Caches/pypoetry/virtualenvs/saturn-engine-WvplgCTT-py3.9/lib/python3.9/site-packages/opentelemetry/context/contextvars_context.py", line 50, in detach
    self._current_context.reset(token)  # type: ignore
ValueError: <Token var=<ContextVar name='current_context' default={} at 0x1037375e0> at 0x10371b040> was created in a different Context

It seems like a the coroutine is being switch from contexts / tasks. Perhaps a bug from TaskGroup or the Scheduler? Would be nice to reproduce in a unittest and ensure that Coroutine remains in the same context / tasks during all their lifecycle.

Note that this only happen during cleanup.

Type check topology

I won't work on this in the following weeks, but something I would love to see in saturn eventually.

Inventory yield items from a specific type. They should effectiveley become generic through TopicMessage[T]. This type could then be checked with the pipeline being connected to ensure the T match the pipeline signature and raise a runtime error if that's not the case.

End goal is that once a topology is loaded, and mypy find no error, there should be no such thing as an inventory yield an unexpected type to a pipeline.

Create a pipeline schema

Right now we parse each individual parameters, check if it's a dataclass and then create a schema for it. It would be simpler and more robust if we created a schema for all the pipeline parameters and loaded it with the message arguments at once. This would avoid workaround to support extra type at the parameter level (See https://github.com/Flared/saturn/pull/139/files).

ie:

@classmethod
def instancify_args(args: dict[str, object], pipeline: Callable) -> dict[str, object]:
  pipeline_schema = cls.schema_for_pipeline(pipeline)
  return dataclasses.asdict(pipeline_schema.load(args))

This way we let desert deal with optionals, dict, etc.

Call services initializer from `BootstrapPipeline`

Right now if a service need to run initialization code in the executor worker, it's somewhat difficult or hackish.

We added hooks.on_executor_initialized, but many executor (ARQ, Ray) don't support it.

Right now, we actually rely on the executor to call services initializer (ie.

initialize_hook.register(logger.on_executor_initialized)
)

It would be better if we extend BootstrapPipeline that take care of this. All executor have to instantiate this call to execute message. We could rely on this to ensure that the worker had initialized the services. It might require the executor worker to load Saturn config and pass it to the bootstrapper and let the services define a open_in_executor and remove the partly supported on_executor_initialized.

Instant shutdown of Saturn worker

We added some way to cleanup running tasks and services in Saturn worker. However, it seems a few component just hang and tasks don't get cancelled.

I would be nicer for dev and tester if C-c was doing a very quick cleanup of everything and stop the worker right away.

Use descriptive rate limit instead of delay in resource

Replace default_delay by rate_limit that can be a rate by window. This way we could support API that support a given rate by month and bursting this rate in a few days instead of spreading the load through a month. We might want to support many rate limit so we can set 1000 per month and 1 per 5 seconds.

cc @mlefebvre

Remodel work component

Rething and redesign work unit: JobDefinition, Job, Queue, Topic, Inventory, ExecutableQueue, Schedulable, etc.

How do we deal with failure, retries, etc at all these level. Right now a job cannot be retried after it fails.

Add Topic Fan-in

Create a topic that takes in parameter other topics to fan-in. This way a single jobdefinition could read from a single topic instead of having to define multiple topics and job definition. eg.

apiVersion: saturn.flared.io/v1alpha1
kind: SaturnTopic
metadata:
  name: fanin-topic
spec:
  type: FaninTopic
  options:
    topics:
      - type: PeriodicTopic
        name: periodic-topic
        options:
          interval: "* * * * * */5"
      - type: RabbitMQTopic
        name: rabbitmq-topic
        options:
          queue_name: foobar

worker: ray client disconnected

Sometimes this happens and all pipelines start failing.

  File "/opt/venv/lib/python3.9/site-packages/saturn_engine/worker/services/loggers/logger.py", line 78, in on_message_executed
    result = yield
  File "/opt/venv/lib/python3.9/site-packages/saturn_engine/worker/services/metrics/base.py", line 42, in on_message_executed
    results = yield
  File "/opt/venv/lib/python3.9/site-packages/saturn_engine/worker/services/extras/sentry.py", line 109, in on_message_executed
    yield
  File "/opt/venv/lib/python3.9/site-packages/saturn_engine/utils/hooks.py", line 156, in __call__
    result = await self.scope(arg)
  File "/opt/venv/lib/python3.9/site-packages/saturn_engine/worker/executors/__init__.py", line 72, in scope
    return await self.executor.process_message(message)
  File "/opt/venv/lib/python3.9/site-packages/saturn_engine/worker/executors/ray.py", line 52, in process_message
    return await actor.process_message.remote(message)
  File "/opt/venv/lib/python3.9/site-packages/ray/util/client/common.py", line 346, in remote
    return return_refs(ray.call_remote(self, *args, **kwargs))
  File "/opt/venv/lib/python3.9/site-packages/ray/util/client/__init__.py", line 252, in __getattr__
    return self.get_context().__getattr__(name)
  File "/opt/venv/lib/python3.9/site-packages/ray/util/client/__init__.py", line 157, in __getattr__
    raise Exception("Ray Client is not connected. "
Exception: Ray Client is not connected. Please connect by calling `ray.init`.

Race condition error on delayed resource release

In some case we can see logs like:

Traceback (most recent call last):
  File "/opt/venv/lib/python3.9/site-packages/saturn_engine/worker/executors/queue.py", line 67, in run_queue
    processable.update_resources_used(output.resources)
  File "/opt/venv/lib/python3.9/site-packages/saturn_engine/worker/executors/executable.py", line 57, in update_resources_used
    self.resources[resource_used.type].release_later(resource_used.release_at)
KeyError: 'SomeResourceType'

My guess is some race condition where a resource has already been released. Doesn't seem to lead to critical failure, only message failure every once in a while.

Cursor states too large

Since we save state with the cursor as a key, it happens that if the key is too large, postgresql fails.

We should hash the key when stored in the postgresql database so we don't have any limit.

However it would be nice to keep things backward compatible, so the migration path would be:

  1. Add JobState option to fetch_state_modes (a list of either RAW, SHA256):
  2. Change JobState API client to fetch cursors based on fetch_state_modes. The same cursor might request multiple keys (For example, cursor 1 also fetch sha256:6b86b273ff34fce19d6b804eff5a3f5747ada4eaa22f1d49c01e52ddb7875b4b).
  3. Add JobState option to save_state_modes (either RAW or SHA256):
  4. Change JobState service to transform cursor depending on save_state_modes
  5. Deploy new saturn worker with save_state_modes=SHA256, and fetch_state_modes=[RAW, SHA256]
  6. Manually migrate database (update job_state_cursors set cursor=sha256(cursor) where cursor not like 'sha256:%')
  7. Deploy saturn worker with fetch_state_modes=[SHA256]

Inventory Tester

Write a tester bench that run an arbitrary Inventory from a topology definition.

This tester validate a bunch of things such as:

  • Run from start to finish
  • No items has the same cursor
  • Resuming from a cursor give similar results (At least ordered)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.