calpaterson / missive Goto Github PK
View Code? Open in Web Editor NEWPython framework for writing message processors
Home Page: https://missive.readthedocs.io
Python framework for writing message processors
Home Page: https://missive.readthedocs.io
drain_events
doesn't timeout when there is a constant stream of messages, the way that we check for shutdown needs to be changed.
Missive proc exits gracefully at some point
Missive proc soldiers wrongly on
Probably best to check for shutdown at the end (not the start!) of each message handling and clean up (but allow existing messages to finish)
In the RabbitMQAdapter
, double acking currently raises a Kombu-level exception
Not sure? Warning? Missive-level exception? Nothing?
Kombu level exception
Got this monstrosity while testing qu:
2020-10-19 10:48:22,986 ERROR quarchive.crawler - indexing error
Traceback (most recent call last):
File "/home/cal/src/missive/missive/missive.py", line 188, in handle
sole_matching_handler(message, self)
File "/home/cal/src/quarchive/src/server/quarchive/bg_worker.py", line 96, in on_crawl_requested
ctx.ack(message)
File "/home/cal/src/missive/missive/missive.py", line 136, in ack
self.adapter.ack(message)
File "/home/cal/src/missive/missive/adapters/rabbitmq.py", line 37, in ack
self._current_kombu_message.ack()
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/kombu-5.0.2-py3.7.egg/kombu/message.py", line 122, in ack
self))
kombu.exceptions.MessageStateError: Message already acknowledged with state: ACK
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/cal/src/missive/missive/adapters/rabbitmq.py", line 91, in run
conn.drain_events(timeout=drain_timeout)
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/kombu-5.0.2-py3.7.egg/kombu/connection.py", line 318, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/kombu-5.0.2-py3.7.egg/kombu/transport/pyamqp.py", line 101, in drain_events
return connection.drain_events(**kwargs)
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/amqp-5.0.1-py3.7.egg/amqp/connection.py", line 514, in drain_events
while not self.blocking_read(timeout):
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/amqp-5.0.1-py3.7.egg/amqp/connection.py", line 520, in blocking_read
return self.on_inbound_frame(frame)
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/amqp-5.0.1-py3.7.egg/amqp/method_framing.py", line 77, in on_frame
callback(channel, msg.frame_method, msg.frame_args, msg)
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/amqp-5.0.1-py3.7.egg/amqp/connection.py", line 527, in on_inbound_method
method_sig, payload, content,
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/amqp-5.0.1-py3.7.egg/amqp/abstract_channel.py", line 143, in dispatch_method
listener(*args)
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/amqp-5.0.1-py3.7.egg/amqp/channel.py", line 1613, in _on_basic_deliver
fun(msg)
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/kombu-5.0.2-py3.7.egg/kombu/messaging.py", line 620, in _receive_callback
return on_m(message) if on_m else self.receive(decoded, message)
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/kombu-5.0.2-py3.7.egg/kombu/messaging.py", line 586, in receive
[callback(body, message) for callback in callbacks]
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/kombu-5.0.2-py3.7.egg/kombu/messaging.py", line 586, in <listcomp>
[callback(body, message) for callback in callbacks]
File "/home/cal/src/missive/missive/adapters/rabbitmq.py", line 79, in callback
ctx.handle(message)
File "/home/cal/src/missive/missive/missive.py", line 200, in handle
self.ack(message)
File "/home/cal/src/missive/missive/missive.py", line 136, in ack
self.adapter.ack(message)
File "/home/cal/src/missive/missive/adapters/rabbitmq.py", line 37, in ack
self._current_kombu_message.ack()
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/kombu-5.0.2-py3.7.egg/kombu/message.py", line 122, in ack
self))
kombu.exceptions.MessageStateError: Message already acknowledged with state: ACK
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/cal/src/quarchive/src/server/quarchive/crawler.py", line 148, in add_to_fulltext_index
ensure_fulltext(session, crawl_uuid)
File "/home/cal/src/quarchive/src/server/quarchive/crawler.py", line 171, in ensure_fulltext
crawl_metadata = get_crawl_metadata(session, crawl_uuid)
File "/home/cal/src/quarchive/src/server/quarchive/data/functions.py", line 571, in get_crawl_metadata
.filter(CrawlResponse.crawl_uuid == crawl_uuid)
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3325, in one
ret = self.one_or_none()
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3294, in one_or_none
ret = list(self)
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3367, in __iter__
return self._execute_and_instances(context)
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3389, in _execute_and_instances
querycontext, self._connection_from_session, close_with_result=True
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3404, in _get_bind_args
mapper=self._bind_mapper(), clause=querycontext.statement, **kw
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3382, in _connection_from_session
conn = self.session.connection(**kw)
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1133, in connection
execution_options=execution_options,
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1139, in _connection_for_bind
engine, execution_options
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 408, in _connection_for_bind
self._assert_active()
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 295, in _assert_active
code="7s2a",
sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "full_text_pkey"
DETAIL: Key (url_uuid)=(57425b9c-9562-506b-b588-0f25bbdbaddc) already exists.
[SQL: INSERT INTO full_text (url_uuid, crawl_uuid, inserted, full_text, tsvector) VALUES (%(url_uuid)s, %(crawl_uuid)s, %(inserted)s, %(full_text)s, to_tsvector(%(to_tsvector_1)s))]
[parameters: {'url_uuid': UUID('57425b9c-9562-506b-b588-0f25bbdbaddc'), 'crawl_uuid': UUID('9e53b2e8-dc84-4e32-b4db-e6cfe6901a40'), 'inserted': datetime.datetime(2020, 10, 19, 9, 48, 22, 925691, tzinfo=datetime.timezone.utc), 'full_text': 'One of the easiest, least expensive ways to make great coffee. How To Make French Press CoffeeOpenOpen the Pocket mobile menuPocketDiscoverMy ListLog ... (35896 characters truncated) ... 0507d7b56b2dcc535be590c73","assetPrefix":"https://assets.getpocket.com/web-discover","isFallback":false,"customServer":true,"gip":true,"appGip":true}', 'to_tsvector_1': 'One of the easiest, least expensive ways to make great coffee. How To Make French Press CoffeeOpenOpen the Pocket mobile menuPocketDiscoverMy ListLog ... (35896 characters truncated) ... 0507d7b56b2dcc535be590c73","assetPrefix":"https://assets.getpocket.com/web-discover","isFallback":false,"customServer":true,"gip":true,"appGip":true}'}]
(Background on this error at: http://sqlalche.me/e/gkpj) (Background on this error at: http://sqlalche.me/e/7s2a)
Traceback (most recent call last):
File "/home/cal/src/missive/missive/missive.py", line 188, in handle
sole_matching_handler(message, self)
File "/home/cal/src/quarchive/src/server/quarchive/bg_worker.py", line 96, in on_crawl_requested
ctx.ack(message)
File "/home/cal/src/missive/missive/missive.py", line 136, in ack
self.adapter.ack(message)
File "/home/cal/src/missive/missive/adapters/rabbitmq.py", line 37, in ack
self._current_kombu_message.ack()
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/kombu-5.0.2-py3.7.egg/kombu/message.py", line 122, in ack
self))
kombu.exceptions.MessageStateError: Message already acknowledged with state: ACK
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/cal/.virtualenvs/quarchive/bin/quarchive-bg-worker", line 11, in <module>
load_entry_point('quarchive', 'console_scripts', 'quarchive-bg-worker')()
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/click/core.py", line 764, in __call__
return self.main(*args, **kwargs)
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/click/core.py", line 717, in main
rv = self.invoke(ctx)
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/click/core.py", line 956, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/click/core.py", line 555, in invoke
return callback(*args, **kwargs)
File "/home/cal/src/quarchive/src/server/quarchive/bg_worker.py", line 121, in bg_worker
adapted_processor.run()
File "/home/cal/src/missive/missive/adapters/rabbitmq.py", line 91, in run
conn.drain_events(timeout=drain_timeout)
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/kombu-5.0.2-py3.7.egg/kombu/connection.py", line 318, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/kombu-5.0.2-py3.7.egg/kombu/transport/pyamqp.py", line 101, in drain_events
return connection.drain_events(**kwargs)
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/amqp-5.0.1-py3.7.egg/amqp/connection.py", line 514, in drain_events
while not self.blocking_read(timeout):
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/amqp-5.0.1-py3.7.egg/amqp/connection.py", line 520, in blocking_read
return self.on_inbound_frame(frame)
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/amqp-5.0.1-py3.7.egg/amqp/method_framing.py", line 77, in on_frame
callback(channel, msg.frame_method, msg.frame_args, msg)
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/amqp-5.0.1-py3.7.egg/amqp/connection.py", line 527, in on_inbound_method
method_sig, payload, content,
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/amqp-5.0.1-py3.7.egg/amqp/abstract_channel.py", line 143, in dispatch_method
listener(*args)
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/amqp-5.0.1-py3.7.egg/amqp/channel.py", line 1613, in _on_basic_deliver
fun(msg)
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/kombu-5.0.2-py3.7.egg/kombu/messaging.py", line 620, in _receive_callback
return on_m(message) if on_m else self.receive(decoded, message)
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/kombu-5.0.2-py3.7.egg/kombu/messaging.py", line 586, in receive
[callback(body, message) for callback in callbacks]
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/kombu-5.0.2-py3.7.egg/kombu/messaging.py", line 586, in <listcomp>
[callback(body, message) for callback in callbacks]
File "/home/cal/src/missive/missive/adapters/rabbitmq.py", line 79, in callback
ctx.handle(message)
File "/home/cal/src/missive/missive/missive.py", line 200, in handle
self.ack(message)
File "/home/cal/src/missive/missive/missive.py", line 136, in ack
self.adapter.ack(message)
File "/home/cal/src/missive/missive/adapters/rabbitmq.py", line 37, in ack
self._current_kombu_message.ack()
File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/kombu-5.0.2-py3.7.egg/kombu/message.py", line 122, in ack
self))
kombu.exceptions.MessageStateError: Message already acknowledged with state: ACK
We want a way for people to scope certain things to
In order to do this:
HandlingContext
to ProcessingContext
to indicate that it's per-processorHandlingContext
for each message@proc.before_process(proc_ctx)
@proc.after_process(proc_ctx)
@proc.before_handle(proc_ctx, handle_ctx)
and @proc.after_handle(proc_ctx, handle_ctx)
If a DLQ raises an exception, that isn't handled and the whole processor crashes
Not sure.
Option A: Message acked, problem logged at error level but processor continues
Option B: Processor exits in an orderly fashion, with particular status code
Tricky choice to make
Disorderly exit
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.