workiva / furious Goto Github PK
View Code? Open in Web Editor NEWFast and modular async task library for Google App Engine.
License: Apache License 2.0
Fast and modular async task library for Google App Engine.
License: Apache License 2.0
From the following snippet in _insert_tasks() it doesn't appear that a single task would re-raise an error, or attempt a re-insert on a TransientError
try:
taskqueue.Queue(name=queue).add(tasks, transactional=transactional)
return len(tasks)
except (taskqueue.BadTaskStateError,
taskqueue.TaskAlreadyExistsError,
taskqueue.TombstonedTaskError,
taskqueue.TransientError):
count = len(tasks)
if count <= 1:
return 0
I don't think we want to recursively call _insert_tasks() - we may have the potential to keep recursing until we reach out limit. Maybe just re-raise so the task retries?
I had the idea we could encode the persistence engine
selection in the id of the Context.
Want it to store in ndb? postfix it with a different
delimiter such as |.
5949025a659a4829947de944c8cc328f|ndb
store with memcache?
5949025a659a4829947de944c8cc328f|mem
with redis?
5949025a659a4829947de944c8cc328f|rds
children would be:
5949025a659a4829947de944c8cc328f|ndb,Bx0
5949025a659a4829947de944c8cc328f|ndb,Fn1
This scheme would allow a context to define it's persistence
engine different from the system wide setting and allow
processor and other components to know how to load
markers from the persistence layer.
The following should immediately cease execution of the currently executing task, but should not output an error level log and no subsequent actions should be taken.
raise async.Abort()
On the context when inserting the tasks add properties for tracking the tasks that were inserted. And possibly also the ones that failed.
When attempting to set the .result of an async, if async.executing is True, the call to .executing = True will raise an exception. That exception is caught in the final except clause, which attempts to set the result to an exception, which raises another exception because the Task isn't executing.
The relevant code starts here:
https://github.com/Workiva/furious/blob/master/furious/processors.py#L56
The solution would probably involve explicitly catching the NotExecutingError and doing something other than attempt to write it to the Async.result.
from https://jira.atl.workiva.net/browse/DS-3936
Traceback (most recent call last):
File "third_party/webapp2-2.5.2/webapp2.py", line 1535, in __call__
rv = self.handle_exception(request, response, e)
File "third_party/webapp2-2.5.2/webapp2.py", line 1529, in __call__
rv = self.router.dispatch(request, response)
File "third_party/webapp2-2.5.2/webapp2.py", line 1278, in default_dispatcher
return route.handler_adapter(request, response)
File "third_party/webapp2-2.5.2/webapp2.py", line 1102, in __call__
return handler.dispatch()
File "third_party/webapp2-2.5.2/webapp2.py", line 572, in dispatch
return self.handle_exception(e, self.app.debug)
File "third_party/webapp2-2.5.2/webapp2.py", line 570, in dispatch
return method(*args, **kwargs)
File "external_libs/furious/handlers/webapp.py", line 28, in post
self._handle_task()
File "external_libs/furious/handlers/webapp.py", line 37, in _handle_task
headers, self.request.body)
File "external_libs/furious/handlers/__init__.py", line 38, in process_async_task
run_job()
File "external_libs/furious/processors.py", line 72, in run_job
status=AsyncResult.ERROR)
File "external_libs/furious/async.py", line 160, in result
'The Async must be executing to set its result.')
NotExecutingError
NotExecutingError: The Async must be executing to set its result.
Calling async.clone()
should create and insert another copy of the currently executing async. Note that this does not need to support result capture in the first version.
Calling async.respawn()
should allow the currently executing async to finish, but should reinsert itself again. The completion callback should not be run until the task completes without calling respawn.
So, if a task respawns itself ten (10) times, the completion callback will only be called once after the tenth iteration.
When calling get_current_async() from a completion handler, the base Async class is returned, not the derived class.
So in the following snippet,
with context.new() as ctx:
ctx.set_event_handler('complete', CustomAsync(target=success_handler))
ctx.add(CustomAsync(target=task_to_run))
When calling get_current_async() from within success_handler
I would expect a 'CustomAsync' to get returned. In the current version (1.0), a base 'Async' gets returned.
Hello,
I've been using furious for a few projects both personal and professional. When an exception is thrown from with inside an Async -- the entries in GAE logs are quite tricky to read, and the stack trace is not useful for trouble-shooting. Typically, you can see the 'last error message', but no 'meta' information about the exception. Ideally, furious should log stack information about the actual exception.
The queue that the cleanup tasks run in should be configurable so they do not run in the default queue
When a furious task fails, the exception message is correct, but the stack trace always points to the location in furious where the exception was re-raised.
Example:
list index out of range
Traceback (most recent call last):
File "/base/data/home/runtimes/python27/python27_lib/versions/third_party/webapp2-2.5.2/webapp2.py", line 1535, in __call__
rv = self.handle_exception(request, response, e)
File "/base/data/home/runtimes/python27/python27_lib/versions/third_party/webapp2-2.5.2/webapp2.py", line 1529, in __call__
rv = self.router.dispatch(request, response)
File "/base/data/home/runtimes/python27/python27_lib/versions/third_party/webapp2-2.5.2/webapp2.py", line 1278, in default_dispatcher
return route.handler_adapter(request, response)
File "/base/data/home/runtimes/python27/python27_lib/versions/third_party/webapp2-2.5.2/webapp2.py", line 1102, in __call__
return handler.dispatch()
File "/base/data/home/runtimes/python27/python27_lib/versions/third_party/webapp2-2.5.2/webapp2.py", line 572, in dispatch
return self.handle_exception(e, self.app.debug)
File "/base/data/home/runtimes/python27/python27_lib/versions/third_party/webapp2-2.5.2/webapp2.py", line 570, in dispatch
return method(*args, **kwargs)
File "/base/data/home/apps/s~wf-sherpa/128.377408199394560075/external_libs/furious/handlers/webapp.py", line 28, in post
self._handle_task()
File "/base/data/home/apps/s~wf-sherpa/128.377408199394560075/external_libs/furious/handlers/webapp.py", line 37, in _handle_task
headers, self.request.body)
File "/base/data/home/apps/s~wf-sherpa/128.377408199394560075/external_libs/furious/handlers/__init__.py", line 36, in process_async_task
run_job()
File "/base/data/home/apps/s~wf-sherpa/128.377408199394560075/external_libs/furious/processors.py", line 74, in run_job
_handle_results(async_options)
File "/base/data/home/apps/s~wf-sherpa/128.377408199394560075/external_libs/furious/processors.py", line 84, in _handle_results
processor_result = results_processor()
File "/base/data/home/apps/s~wf-sherpa/128.377408199394560075/external_libs/furious/processors.py", line 126, in _process_results
async.result.payload.traceback)
IndexError: list index out of range
No matter the exception or task, the stack trace is always the same.
Add the ability to have a group of queues that a task can ask to be ran in.
Then we can either do a simple random process to assign it to one of the tasks in the group or we can leverage the queue API to try and figure out which queue is the most optimal to run in.
This is more of an annoyance, but when using an AutoContext to add tasks in batch, you may get undesired results if you don't set the event handler up front.
So the following may not work - as only your last batch would get a completion checker attached:
with context.new(batch_size=10) as ctx:
for item in items:
ctx.add(target=my_func)
ctx.set_event_handler('complete', completion_handler)
as a work around, always set the event handler first,
with context.new(batch_size=10) as ctx:
ctx.set_event_handler('complete', completion_handler)
for item in items:
ctx.add(target=my_func)
The following construct:
raise async.AbortAndRestart()
Should cause the currently executing Async, to immediately stop, then a clone of the task be reinserted.
When doing the completion checks save the markers we've already loaded to avoid having to check for them again.
In order to prevent unintentional infinite loops, add a special header / option value called max_recursion_depth
and current_depth
that gets automatically set and incremented when tasks are inserted.
Inserting an async task from an async should result in the current_depth
being incremented by one. There should also be a mechanism to set max_recursion_depth
and to reset current_depth
, so that users can override the defaults if needed.
If current_depth
reaches max_recursion_depth
, the task should automatically abort (see enhancement #38).
Let's get this sucker pypi installable :)
https://gist.github.com/johnlockwood-wf/3128d1882963ba15f905
This is an intermittent error that happens when examples are run only on appspot.com.
You can see it in the logs when running the /grep and /context examples.
Inherit the queue that the context runs in to run the completion checks.
BulkAdd() Error on complete handler, results in handler not getting called on retry
If we get a BulkAdd() error when kicking off the 'complete' callback (in 'exec_event_handler' handler.start()). The error raises up, and the task retries, but the Context has already been marked as complete, so my handler never gets called.
Need to update the travis reference to the GAE SDK.
Would be nice to find a way to always use latest.
Are you listening Google?
Installation is installing the examples directory as well.
Switch to use the get method so that this won't blow up in unit tests. It should be fine to default to an empty string or even None.
Allow the ability pass the default async url(s) to trigger in the queue test handler.
Also allow custom handlers for Furious and non Furious tasks.
As concurred by Robert, I've added the id property to Async, but I think there is a need for job_id as well. Specifically for asyncs sending signals such as errors and warnings so they can be collected by the job runner and asyncs receiving abort.
...instead of trying to re-insert all tasks
.
From Queue.add
's docstring:
If a list of more than one Tasks is given, a raised exception does not
guarantee that no tasks were added to the queue (unless transactional is set
to True). To determine which tasks were successfully added when an exception
is raised, check the Task.was_enqueued property.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.