orlandos-nl / mongoqueue Goto Github PK
View Code? Open in Web Editor NEWMongoDB Job Queue in Swift
License: MIT License
MongoDB Job Queue in Swift
License: MIT License
Often there are scenarios where a task is queued but due to external changes (user changing their notification settings for example.) the task execution date needs to be changed.
There a 2 ways of handling this:
I opt for option 2 since it's more efficient and can be done in a single query (findAndModify
) vs a delete and insert query.
Currently there is no good API for handling this so we have to resort to using code like this
let document = try await context.mongodb["queue"].find(...) // find a scheduled Task
if let metadata = document["metadata"] as? Document,
let reminderDate = metadata["reminder_date"] as? Date,
let id = document["_id"] as? ObjectId {
let newDate = reminderDate.addingTimeInterval(....)
_ = try await context.mongodb["queue"].findAndModify(where: ["_id": id],
update: ["$set": ["configuration.scheduledDate": newDate] as Document]).execute()
}
Also there is some slight discrepancy/redundancy for execution dates. In the screenshot attached there are 3 separate date fields that represent when a task should be executed.
But the one that really matters is the configuration.scheduledDate
. There is a caveat that the metadata.taskExecutionDate
is derived from conforming to the ScheduledTask
but that value is actually never read when it's time to execute but it's rather used to create the configuration.scheduledDate
. Maybe we don't have to save it to take up extra space but decoding would fail since we have to specify it anyway another problem for another day.
Also maxTaskDuration
is also duplicated when it doesn't necessarily have to be.
As far as a better API maybe there could be a function on the context that can be
func updateTaskExecutionDate(date: Date, filter: Document) async throws
Often at times I find myself wanting to retry a failed task.
While the option to set a maximum retry using the retry api
public func onExecutionFailure(failureContext: QueuedTaskFailure<Context>) async throws -> TaskExecutionFailureAction {
.retry(maxAttempts: 5)
}
Even then if something fails 5 times in this example the task is just deleted with no record of the payload etc. and one would have t hopefully rely on logging to pinpoint the reason for a failure.
A better alternative would be to keep the task in the queue but have it marked as failed
with a new status or make use of the existing suspended
status
This gives us the flexibility to run the task locally while debugging to use breakpoints and a debugger to pinpoint the reasons for a failure. This also allows us to potentially build a frontend (queue dashboard ๐) to have insight into tasks failures and even for monitoring queue health etc.
I imagine an API for this could be something like
/// configure globally for all tasks
let queue = MongoQueue(collection: mongoDatabase["queue"], keepFailedTasksInQueue: true)
/// or maybe per individual tasks you care to keep
queue.registerTask(PushNotificationTask.self, context: ..., keepFailedTaskInQueue: true)
/// with this approach we can have access to the `KnownType` and make the decision when catching the error after execution.
Currently the attempts is increased after a task's completion - successful or not. That means that a crashed task will not increment the attempt count.
I'm running into a use case where it'd be extremely helpful to manually "flush the queue" possibly with a timeout.
Main use case for now is just unit tests.
I can imagine an API like this.
try await queue.flush(timeout: 300)
And this will attempt to pickup and execute all jobs and returns after the queue is empty.
This would supplement/replace the call try await queue.run/inBackground()
in unit tests. This way we don't have to use this flow
queue.registerTask(MyTask.self, context: app)
try await queue.queueTask(Mytask())
try queue.runInBackground()
// Sleep to allow queue to pick up the tasks.
try await Task.sleep(nanoseconds: 5000000000) // 5 seconds
But instead we could do this
queue.registerTask(MyTask.self, context: app)
try await queue.queueTask(Mytask())
try await queue.flush(timeout: 300)
XCTAssertTrue(...)
It would be nice if there was a way to prevent a task from being queued multiple times, especially if given the exact same context/configuration. This would throw an alreadyQueued
error when queued.
This would be optional, and off by default to prevent backwards compatibility issues.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.