faust-streaming / faust Goto Github PK
View Code? Open in Web Editor NEWPython Stream Processing. A Faust fork
Home Page: https://faust-streaming.github.io/faust/
License: Other
Python Stream Processing. A Faust fork
Home Page: https://faust-streaming.github.io/faust/
License: Other
master
branch of Faust.Install faust together with a current version of jaeger-client (which requires opentracing>=2.1)
No version conflict should arise
faust claims to be unable to work with opentracing>=2.0.0
ERROR: jaeger-client 4.3.0 has requirement opentracing<3.0,>=2.1, but you'll have opentracing 1.3.0 which is incompatible.
According to this issue in the original project just bumping the version number here should be enough to enable opentracing>2.0.0.
The reason it was not upgraded in the original project was a possible incompatibility in robinhoods closed source internal tracing framework.
I tried it locally and all tests work.
Faust-streaming should now use faust-streaming-rocksdb. Faust now uses close() from this library instead of relying of gc.collect()
Hey there guys, great job on the fork.
The Faust docs do reference tools like Circus and Supervisord for use when running in production. If we wish to fire off multiple Faust workers from a single Circus config, we encounter a problem.
Let's take an example Circus config:
[watcher:processor]
cmd = faust -A boo --datadir=/data/worker$(circus.wid) worker -l info --web-transport=unix://...
numprocesses = 2
copy_env = True
use_sockets = True
[socket:web]
host = 127.0.0.1
port = 8888
Circus makes available Unix sockets for use by child applications by opening a Unix socket and presenting the file descriptor (an integer) to be used. However, Faust only seems to support specifying the full path to a Unix socket file which won't work.
Other web servers like Uvicorn allow passing an fd like so:
[watcher:web]
cmd = uvicorn --fd $(circus.sockets.web) example:App
numprocesses = 4
use_sockets = True
copy_env = True
[socket:web]
host = 0.0.0.0
port = 8888
Is it possible to support use of a file descriptor in Faust or alternatively, is there another way that this can be made to work with Circus or Supervisor when running multiple workers?
Thanks a lot
Fotis
Just opening an issue for documentation purposes, to note that the documentation has this snippet of text talking about future plans for faust. Given that this is a fork, under new ownership, it seems worth noting so it can be allocated to "actual future plans of the fork" or "thing to remove when updating the documentation".
https://faust.readthedocs.io/en/latest/userguide/tasks.html#http-verbs-get-post-put-delete
Then under "Exposing Tables":
"A frequent requirement is the ability to expose table values in a web view, and while this is likely to be built-in to Faust in the future, you will have to implement this manually for now."
master
branch of Faust.Pretty much any time you restart a worker, it will replay the last message it received. So if there were messages [0,1,2,3,4]
that a worker processed, then restart, it will re-process item 4
. This will mess up any analytics that are based on stateful counts. With a trivial case of incrementing a counter in a table, this can consistently reproduced by simply restarting and starting a worker and finding the last id continue to increment even though there were no new messages to the underlying topic.
If using the group_by functionality to re-partition a stream, I am finding that it will replay ALL of the messages resulting in much more duplicates than simply +1 to counts.
Paste the full traceback (if there is any)
I have a consumer group where 15 of the partitions are lagged by approximately 16M records each. The faust workers fail on startup due to an OOM condition.
faust-streaming[uvloop,fast,redis,rocksdb]==0.4.1
This issue originated in the faust
codebase and is still present in the 0.4.1 faust-streaming
release.
robinhood/faust#453
The _add_gap function in faust/transport/consumer.py is being called with a VERY large offset_from / offset_to delta. In the screenshot below a list is populated from 1 to 2,288,752,002 which results in python running out of memory and triggering the failure. Note that in the screenshot I added an exception handler to see if I could trap the error (I could not - python fails silently).
I have observed faust intermittently failing silently on startup with the 1.8.x series and have been able to trace it back to PR #403, commit c0daac1.
master
branch of Faust.faust-streaming 0.3.0
aiokafka 0.7.0
In this example, partitions 1, 5 and 10 were originally assigned but after the rebalance partition 1 is no longer assigned. The code tries to seek to the committed offset for partition 1 and crashes:
[2020-12-06 15:31:12,106] [11] [WARNING] Heartbeat failed for group spe-5 because it is rebalancing
[2020-12-06 15:31:12,106] [11] [INFO] Revoking previously assigned partitions
+Topic Partition Set----------+
| topic | partitions |
+----------------+------------+
| spe_requests_5 | {1, 5, 10} |
+----------------+------------+ for group spe-5
[2020-12-06 15:31:12,108] [11] [INFO] (Re-)joining group spe-5
[2020-12-06 15:31:12,108] [11] [INFO] [^---Recovery]: Resuming flow...
[2020-12-06 15:31:12,109] [11] [INFO] [^---Recovery]: Seek stream partitions to committed offsets.
[2020-12-06 15:31:12,121] [11] [INFO] Joined group 'spe-5' (generation 968) with member_id faust-0.3.0-9126a240-a759-4e9f-bca9-cc0ad3f20f7e
[2020-12-06 15:31:12,128] [11] [INFO] Successfully synced group spe-5 with generation 968
[2020-12-06 15:31:12,129] [11] [INFO] Setting newly assigned partitions
+Topic Partition Set----------+
| topic | partitions |
+----------------+------------+
| spe_requests_5 | {5, 10} |
+----------------+------------+ for group spe-5
[2020-12-06 15:31:12,131] [11] [ERROR] [^---Recovery]: Crashed reason=IllegalStateError("No current assignment for partition TopicPartition(topic='spe_requests_5', partition=1)")
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 802, in _execute_task
await task
File "/usr/local/lib/python3.7/site-packages/faust/tables/recovery.py", line 349, in _restart_recovery
await T(self._resume_streams)()
File "/usr/local/lib/python3.7/site-packages/faust/tables/recovery.py", line 297, in _resume_streams
consumer.perform_seek(), timeout=self.app.conf.broker_request_timeout
File "/usr/local/lib/python3.7/site-packages/faust/tables/recovery.py", line 556, in _wait
wait_result = await self.wait_first(coro, signal, timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 715, in wait_first
f.result() # propagate exceptions
File "/usr/local/lib/python3.7/site-packages/faust/transport/consumer.py", line 534, in perform_seek
_committed_offsets = await self.seek_to_committed()
File "/usr/local/lib/python3.7/site-packages/faust/transport/consumer.py", line 1315, in seek_to_committed
return await self._thread.seek_to_committed()
File "/usr/local/lib/python3.7/site-packages/faust/transport/drivers/aiokafka.py", line 509, in seek_to_committed
{"version": "1.0", "type": "log", "log": {"message": "[^---Recovery]: Crashed reason=IllegalStateError(\"No current assignment for partition TopicPartition(topic='spe_requests_5', partition=1)\")"}, "time": "2020-12-06T15:31:12.136Z", "level": "err"}
return await self.call_thread(self._ensure_consumer().seek_to_committed)
File "/usr/local/lib/python3.7/site-packages/mode/threads.py", line 436, in call_thread
result = await promise
File "/usr/local/lib/python3.7/site-packages/mode/threads.py", line 383, in _process_enqueued
result = await maybe_async(method(*args, **kwargs))
File "/usr/local/lib/python3.7/site-packages/mode/utils/futures.py", line 134, in maybe_async
return await res
File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/consumer.py", line 868, in seek_to_committed
self._fetcher.seek_to(tp, offset)
File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/fetcher.py", line 1162, in seek_to
self._subscriptions.seek(tp, offset)
File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/subscription_state.py", line 239, in seek
self._assigned_state(tp).seek(offset)
File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/subscription_state.py", line 120, in _assigned_state
"No current assignment for partition {}".format(tp))
kafka.errors.IllegalStateError: IllegalStateError: No current assignment for partition TopicPartition(topic='spe_requests_5', partition=1)
Should not attempt to seek on a partition that has been removed. Maybe catch the exception and restart the fetcher?
Process crashes on exception
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 802, in _execute_task
await task
File "/usr/local/lib/python3.7/site-packages/faust/tables/recovery.py", line 349, in _restart_recovery
await T(self._resume_streams)()
File "/usr/local/lib/python3.7/site-packages/faust/tables/recovery.py", line 297, in _resume_streams
consumer.perform_seek(), timeout=self.app.conf.broker_request_timeout
File "/usr/local/lib/python3.7/site-packages/faust/tables/recovery.py", line 556, in _wait
wait_result = await self.wait_first(coro, signal, timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 715, in wait_first
f.result() # propagate exceptions
File "/usr/local/lib/python3.7/site-packages/faust/transport/consumer.py", line 534, in perform_seek
_committed_offsets = await self.seek_to_committed()
File "/usr/local/lib/python3.7/site-packages/faust/transport/consumer.py", line 1315, in seek_to_committed
return await self._thread.seek_to_committed()
File "/usr/local/lib/python3.7/site-packages/faust/transport/drivers/aiokafka.py", line 509, in seek_to_committed
{"version": "1.0", "type": "log", "log": {"message": "[^---Recovery]: Crashed reason=IllegalStateError(\"No current assignment for partition TopicPartition(topic='spe_requests_5', partition=1)\")"}, "time": "2020-12-06T15:31:12.136Z", "level": "err"}
return await self.call_thread(self._ensure_consumer().seek_to_committed)
File "/usr/local/lib/python3.7/site-packages/mode/threads.py", line 436, in call_thread
result = await promise
File "/usr/local/lib/python3.7/site-packages/mode/threads.py", line 383, in _process_enqueued
result = await maybe_async(method(*args, **kwargs))
File "/usr/local/lib/python3.7/site-packages/mode/utils/futures.py", line 134, in maybe_async
return await res
File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/consumer.py", line 868, in seek_to_committed
self._fetcher.seek_to(tp, offset)
File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/fetcher.py", line 1162, in seek_to
self._subscriptions.seek(tp, offset)
File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/subscription_state.py", line 239, in seek
self._assigned_state(tp).seek(offset)
File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/subscription_state.py", line 120, in _assigned_state
"No current assignment for partition {}".format(tp))
On rebalance the worker should not try to reopen rocksdb from old partitions
Close rocksdb and not crash
The worker keeps trying to open rocksdb and eventually times out and crashes
Paste the full traceback (if there is any)
master
branch of Faust.Run faust-streaming in a pod in a Kubernetes cluster watching Azure Eventhubs through their Kafka interface.
After a period of several days aiokafka
will raise StaleLeaderEpochCodeError
several times. As this error is not caught anywhere Faust application stops processing streams, but does not crash or restart.
N.B. I didn't see this behaviour when using robinhood/faust
, I suspect the change to aiokafka
might be part of it.
Something that my intuition suggests is related is that Eventhubs only retain messages in the queue for at most 7 days. So the ****-__assignor-__leader
topic will eventually drain and appear empty from Faust's point of view.
I understand that working with Azure Eventhubs rather than proper Kafka is unsupported and appreciate any advice you can give. Even if there isn't a way to stop the exception happening (given my non-standard usage), if there was a way to catch the exception so I could cause the pod to restart that would be fine too.
I have to say that Faust is much better than any of the native Microsoft libraries for dealing with Eventhub message streams.
Faust pods continue to process Eventhub message streams without interruption.
StaleLeaderEpochCodeError
exception occurs somewhat randomly after several days of operation but the app does not crash and so the Kubernetes pod does not restart.
Traceback (most recent call last):
File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
self._bootstrap_inner()
File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
File "/usr/local/.venv/lib/python3.8/site-packages/mode/threads.py", line 66, in run
self.service._start_thread()
File "/usr/local/.venv/lib/python3.8/site-packages/mode/threads.py", line 211, in _start_thread
self.thread_loop.run_until_complete(self._serve())
> File "/usr/local/.venv/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 523, in _commit
await consumer.commit(aiokafka_offsets)
File "/usr/local/.venv/lib/python3.8/site-packages/aiokafka/consumer/consumer.py", line 550, in commit
await self._coordinator.commit_offsets(assignment, offsets)
File "/usr/local/.venv/lib/python3.8/site-packages/aiokafka/consumer/group_coordinator.py", line 964, in commit_offsets
raise err
File "/usr/local/.venv/lib/python3.8/site-packages/aiokafka/consumer/group_coordinator.py", line 953, in commit_offsets
await asyncio.shield(
File "/usr/local/.venv/lib/python3.8/site-packages/aiokafka/consumer/group_coordinator.py", line 1066, in _do_commit_offsets
raise first_error
kafka.errors.StaleLeaderEpochCodeError: [Error 13] StaleLeaderEpochCodeError
3.8.6
faust-streaming 0.4.0
Debian Slim in Python Docker image
Azure Eventhubs
librocksdb5.17
master
branch of Faust.Run unit tests using the scripts/tests script
All unit tests should run
75% of the unit tests do not run because they are class-based and the class names are test_Foo instead of Test_Foo
master
branch of Faust.I have the same single module app that works correctly with the robinhood version of faust, but does not work with this fork. Namely, the exception quoted in the title on_rebalance() takes 5 positional arguments but 6 were given
is thrown on app startup in when faust-streaming
but not faust
. relevant versions used are 0.4.8 466dbf2
(git master at time of writing) and 1.10.4
. The single module app, and exception are as follows:
from typing import Any
import faust
class UserUpdate(faust.Record):
user_id: int
event_type: str
event_data: Any
app = faust.App("faust-demo", broker="kafka://kafka-1:9092")
user_updates = app.topic("user_updates", value_type=UserUpdate, partitions=8)
users = app.Table("user", partitions=8)
@app.agent(user_updates)
async def process_user_updates(updates: faust.StreamT[UserUpdate]):
async for update in updates:
user = users.get(update.user_id)
if user:
user[update.event_type] = update.event_data
users[update.user_id] = user
else:
users[update.user_id] = {update.event_type: update.event_data}
@app.page("/users")
async def get_users(self, request):
return self.json(users)
Exception is not thrown
Exception is thrown
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/faust/app/base.py", line 1720, in _on_partitions_assigned
await T(self.tables.on_rebalance)(
File "/usr/local/lib/python3.9/site-packages/faust/utils/tracing.py", line 133, in corowrapped
await_ret = await ret
File "/usr/local/lib/python3.9/site-packages/faust/tables/manager.py", line 194, in on_rebalance
await T(table.on_rebalance)(
File "/usr/local/lib/python3.9/site-packages/faust/utils/tracing.py", line 133, in corowrapped
await_ret = await ret
File "/usr/local/lib/python3.9/site-packages/faust/tables/base.py", line 571, in on_rebalance
await self.data.on_rebalance(
TypeError: on_rebalance() takes 5 positional arguments but 6 were given
I would like to update the documentation and make it more clear. I would like to use markdown
and publish the documentation in github
pages. For example: https://marcosschroh.github.io/dataclasses-avroschema/
We need our own pypi
account to publish the package
master
branch of Faust.On repartition if the rocksdb instance is owned by another worker the requesting worker should wait and not resume its streams
It should wait until the rocksdb instance is available
The worker started to process and crashed when it could not access the rocksdb instance
Paste the full traceback (if there is any)
master
branch of Faust.When the Consumer has requested a commit on one or more partitions and a rebalance is triggered before the aiokafka consumer can perform the commit, it is possible that one or more of the partitions being committed is revoked by the rebalance and not re-assigned. This will cause an IllegalStateError exception from the kafka-python client code due to the attempt to commit a partition that isn't owned by the consumer.
The fix is to filter the commit partitions again in the aiokafka consumer commit code to ensure that they were not revoked during the await break.
Commits after a rebalance should not cause an IllegalStateError exception
Intermittent IllegalStateError exceptions after a rebalance
[2021-01-21 14:59:16,201] [9] [ERROR] [^---AIOKafkaConsumerThread]: Got exception: IllegalStateError("Partition TopicPartition(topic='spe_kxe_5', partition=40) is not assigned")
Current assignment: '\n+Topic Partition Set-------+\n| topic | partitions |\n+-----------+--------------+\n| spe_kxe_5 | {25, 52, 67} |\n+-----------+--------------+'
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/faust/transport/drivers/aiokafka.py", line 523, in _commit
await consumer.commit(aiokafka_offsets)
File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/consumer.py", line 548, in commit
"Partition {} is not assigned".format(tp))
kafka.errors.IllegalStateError: IllegalStateError: Partition TopicPartition(topic='spe_kxe_5', partition=40) is not assigned
I am not very familiar with Kafka; however, when used with single instance Kafka and Redpanda on my development environment, RPC requests (agent.ask()) causes this error:
[^Worker]: Error: InvalidReplicationFactorError('Cannot create topic: f-reply-664f489d-06e6-4f66-b8ff-14fbb3cd8b67 (38): Replication factor must be greater than 0')
Same error with if app.conf.reply_create_topic is set to True even with topic_replication_factor is set to 1
Line 498 in 466dbf2
When I change the default value to 1 from 0, RPC works as intended.
I believe I am missing something here? Will this change break things in the production deployments with Kafka clusters?
master
branch of Faust.I have a branch with an update ready to push, but will need to be added as a contributor first.
Tell us what you did to cause something to happen.
I ran tests via ./scripts/tests
.
Tell us what you expected to happen.
I would expect to see few (ideally, 0) deprecation warning while running tests.
Tell us what happened instead.
Running tests reveals multiple deprecation warnings, including the following:
PytestDeprecationWarning: @pytest.yield_fixture is deprecated.
Use @pytest.fixture instead; they are the same.
@pytest.yield_fixture()
See the pytest
documentation for the deprecation note: https://docs.pytest.org/en/stable/yieldfixture.html
====================================================================== warnings summary ======================================================================
tests/conftest.py:84
/Users/kristenfoster-marks/Desktop/faust/tests/conftest.py:84: PytestDeprecationWarning: @pytest.yield_fixture is deprecated.
Use @pytest.fixture instead; they are the same.
@pytest.yield_fixture()
tests/conftest.py:199
/Users/kristenfoster-marks/Desktop/faust/tests/conftest.py:199: PytestDeprecationWarning: @pytest.yield_fixture is deprecated.
Use @pytest.fixture instead; they are the same.
@pytest.yield_fixture(autouse=True)
tests/functional/conftest.py:30
/Users/kristenfoster-marks/Desktop/faust/tests/functional/conftest.py:30: PytestDeprecationWarning: @pytest.yield_fixture is deprecated.
Use @pytest.fixture instead; they are the same.
@pytest.yield_fixture()
tests/functional/conftest.py:64
/Users/kristenfoster-marks/Desktop/faust/tests/functional/conftest.py:64: PytestDeprecationWarning: @pytest.yield_fixture is deprecated.
Use @pytest.fixture instead; they are the same.
@pytest.yield_fixture()
venv/lib/python3.7/site-packages/colorclass/codes.py:4
/Users/kristenfoster-marks/Desktop/faust/venv/lib/python3.7/site-packages/colorclass/codes.py:4: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3,and in 3.9 it will stop working
from collections import Mapping
tests/unit/livecheck/conftest.py:46
/Users/kristenfoster-marks/Desktop/faust/tests/unit/livecheck/conftest.py:46: PytestDeprecationWarning: @pytest.yield_fixture is deprecated.
Use @pytest.fixture instead; they are the same.
@pytest.yield_fixture()
tests/unit/livecheck/conftest.py:53
/Users/kristenfoster-marks/Desktop/faust/tests/unit/livecheck/conftest.py:53: PytestDeprecationWarning: @pytest.yield_fixture is deprecated.
Use @pytest.fixture instead; they are the same.
@pytest.yield_fixture()
tests/functional/web/conftest.py:7
/Users/kristenfoster-marks/Desktop/faust/tests/functional/web/conftest.py:7: PytestDeprecationWarning: @pytest.yield_fixture is deprecated.
Use @pytest.fixture instead; they are the same.
@pytest.yield_fixture()
faust/livecheck/exceptions.py:43
/Users/kristenfoster-marks/Desktop/faust/faust/livecheck/exceptions.py:43: PytestCollectionWarning: cannot collect test class 'TestFailed' because it has a __init__ constructor (from: tests/unit/livecheck/test_app.py)
class TestFailed(LiveCheckError):
faust/livecheck/models.py:49
/Users/kristenfoster-marks/Desktop/faust/faust/livecheck/models.py:49: PytestCollectionWarning: cannot collect test class 'TestExecution' because it has a __init__ constructor (from: tests/unit/livecheck/test_app.py)
class TestExecution(Record, isodates=True):
faust/livecheck/models.py:132
/Users/kristenfoster-marks/Desktop/faust/faust/livecheck/models.py:132: PytestCollectionWarning: cannot collect test class 'TestReport' because it has a __init__ constructor (from: tests/unit/livecheck/test_app.py)
class TestReport(Record):
tests/unit/livecheck/test_case.py:159
/Users/kristenfoster-marks/Desktop/faust/tests/unit/livecheck/test_case.py:159: PytestDeprecationWarning: @pytest.yield_fixture is deprecated.
Use @pytest.fixture instead; they are the same.
@pytest.yield_fixture()
faust/livecheck/models.py:132
/Users/kristenfoster-marks/Desktop/faust/faust/livecheck/models.py:132: PytestCollectionWarning: cannot collect test class 'TestReport' because it has a __init__ constructor (from: tests/unit/livecheck/test_case.py)
class TestReport(Record):
faust/livecheck/exceptions.py:39
/Users/kristenfoster-marks/Desktop/faust/faust/livecheck/exceptions.py:39: PytestCollectionWarning: cannot collect test class 'TestSkipped' because it has a __init__ constructor (from: tests/unit/livecheck/test_runners.py)
class TestSkipped(LiveCheckError):
faust/livecheck/exceptions.py:43
/Users/kristenfoster-marks/Desktop/faust/faust/livecheck/exceptions.py:43: PytestCollectionWarning: cannot collect test class 'TestFailed' because it has a __init__ constructor (from: tests/unit/livecheck/test_runners.py)
class TestFailed(LiveCheckError):
faust/livecheck/exceptions.py:47
/Users/kristenfoster-marks/Desktop/faust/faust/livecheck/exceptions.py:47: PytestCollectionWarning: cannot collect test class 'TestRaised' because it has a __init__ constructor (from: tests/unit/livecheck/test_runners.py)
class TestRaised(LiveCheckError):
faust/livecheck/exceptions.py:51
/Users/kristenfoster-marks/Desktop/faust/faust/livecheck/exceptions.py:51: PytestCollectionWarning: cannot collect test class 'TestTimeout' because it has a __init__ constructor (from: tests/unit/livecheck/test_runners.py)
class TestTimeout(LiveCheckError):
faust/livecheck/exceptions.py:51
/Users/kristenfoster-marks/Desktop/faust/faust/livecheck/exceptions.py:51: PytestCollectionWarning: cannot collect test class 'TestTimeout' because it has a __init__ constructor (from: tests/unit/livecheck/test_signals.py)
class TestTimeout(LiveCheckError):
tests/unit/stores/test_rocksdb.py:77
/Users/kristenfoster-marks/Desktop/faust/tests/unit/stores/test_rocksdb.py:77: PytestDeprecationWarning: @pytest.yield_fixture is deprecated.
Use @pytest.fixture instead; they are the same.
@pytest.yield_fixture()
tests/unit/stores/test_rocksdb.py:82
/Users/kristenfoster-marks/Desktop/faust/tests/unit/stores/test_rocksdb.py:82: PytestDeprecationWarning: @pytest.yield_fixture is deprecated.
Use @pytest.fixture instead; they are the same.
@pytest.yield_fixture()
tests/unit/stores/test_rocksdb.py:212
/Users/kristenfoster-marks/Desktop/faust/tests/unit/stores/test_rocksdb.py:212: PytestDeprecationWarning: @pytest.yield_fixture is deprecated.
Use @pytest.fixture instead; they are the same.
@pytest.yield_fixture()
tests/unit/tables/test_objects.py:27
/Users/kristenfoster-marks/Desktop/faust/tests/unit/tables/test_objects.py:27: PytestDeprecationWarning: @pytest.yield_fixture is deprecated.
Use @pytest.fixture instead; they are the same.
@pytest.yield_fixture()
tests/unit/tables/test_wrappers.py:62
/Users/kristenfoster-marks/Desktop/faust/tests/unit/tables/test_wrappers.py:62: PytestDeprecationWarning: @pytest.yield_fixture is deprecated.
Use @pytest.fixture instead; they are the same.
@pytest.yield_fixture()
Sorry for asking this question here since it is probably not the right place but how do I set the log level when running a faust CLI command?
This is a duplicate of:
robinhood/faust#703
master
branch of Faust.Running the following code will only process some of the messages generated by the task. The problem is that the stream buffer fills up and removes the topic from the active partitions, and when Consumer.getmany() is iterating over the list of already-received messages, and see that the topic is no longer in the list of active partitions, it drops the remaining messages.
The fix is to include self._buffered_partitions in the check in Consumer.getmany() so that it will continue to process the already-received messages even when the Consumer is no longer actively fetching new messages from the topic because the buffer is full. When the buffer full condition clears, new messages will be processed as usual.
import faust
app = faust.App(
'hello-world3',
broker='kafka://localhost:29092',
value_serializer='raw',
# stream_buffer_maxsize=1000000,
)
greetings_topic = app.topic('greetings')
@app.agent(greetings_topic)
async def greet(greetings):
count = 0
async for greeting in greetings:
count += 1
print(count)
@app.task()
async def say_hello():
count = 0
for i in range(0, 40000):
count += 1
await greetings_topic.send(key='key', value=f'hello{count}')
The agent should continue to receive messages when the Consumer buffer is considered "full" and backpressure has been applied
The Consumer stops sending the already-received messages to the agents/Stream when backpressure is being applied to the topic
master
branch of Faust.Faust does not support sharing the rocksdb data directory amongst workers. This drastically reduces the applications performance when it has to scale out as on scale out every worker has to rebuild state by replaying partition events from Kafka.
master
branch of Faust.Tell us what you did to cause something to happen.
Tell us what you expected to happen.
If the stream buffer gets full and the worker is still in recovery, there is a chance of it stuck in the recovery loop forever
This keeps going on forever
+Remaining for active recovery--------------+-----------+-------------+-------------+-----------+
| topic | partition | need offset | have offset | remaining |
+-------------------------------------------+-----------+-------------+-------------+-----------+
| ifos0.5-correletable_keys_table-changelog | 0 | 4574 | 4256 | 318 |
| ifos0.5-kv_table-changelog | 2 | 26857 | 26617 | 240 |
| ifos0.5-triggers_table-changelog | 0 | 3691 | 1347 | 2344 |
+-------------------------------------------+-----------+-------------+-------------+-----------+
Paste the full traceback (if there is any)
To monitor the threaded producer add methods in sensor to send the size of the buffer
Because we are forking the original Faust
, we should rename the project. My suggestion is faust-streaming
but we can debate about the name
master
branch of Faust.Create faust app and set reply_create_topic=True flag.
app = faust.App(
'hello-world',
broker='kafka://kafka:9092',
reply_create_topic=True,
)
greetings_topic = app.topic('greetings', value_type=str)
@app.agent(greetings_topic)
async def print_greetings(greetings):
async for greeting in greetings:
print(greeting)
yield 'resp ' + greeting
@app.timer(5)
async def produce():
for i in range(100):
resp = await print_greetings.ask(value=f'hello {i}')
print(resp)
if __name__== '__main__':
app.main()
Reply topic is created.
InvalidReplicationFactorError is raised.
hello-word_1 | [2021-01-19 18:25:19,690] [8] [INFO] [^--Producer]: Creating topic 'f-reply-2a37b8f6-246e-4141-a5d5-f96a6ca9e7c9'
hello-word_1 | [2021-01-19 18:25:19,726] [8] [ERROR] [^-App]: Crashed reason=InvalidReplicationFactorError('Cannot create topic: f-reply-2a37b8f6-246e-4141-a5d5-f96a6ca9e7c9 (38): Replication factor must be larger than 0.')
hello-word_1 | Traceback (most recent call last):
hello-word_1 | File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 802, in _execute_task
hello-word_1 | await task
hello-word_1 | File "/usr/local/lib/python3.8/site-packages/faust/app/base.py", line 966, in _wrapped
hello-word_1 | return await task()
hello-word_1 | File "/usr/local/lib/python3.8/site-packages/faust/app/base.py", line 1019, in around_timer
hello-word_1 | await fun(*args)
hello-word_1 | File "/main.py", line 31, in produce
hello-word_1 | resp = await print_greetings.ask(value=f'hello {i}')
hello-word_1 | File "/usr/local/lib/python3.8/site-packages/faust/agents/agent.py", line 797, in ask
hello-word_1 | await app._reply_consumer.add(p.correlation_id, p)
hello-word_1 | File "/usr/local/lib/python3.8/site-packages/faust/agents/replies.py", line 167, in add
hello-word_1 | await self._start_fetcher(reply_topic)
hello-word_1 | File "/usr/local/lib/python3.8/site-packages/faust/agents/replies.py", line 176, in _start_fetcher
hello-word_1 | await topic.maybe_declare()
hello-word_1 | File "/usr/local/lib/python3.8/site-packages/mode/utils/futures.py", line 55, in __call__
hello-word_1 | result = await self.fun(*self.args, **self.kwargs)
hello-word_1 | File "/usr/local/lib/python3.8/site-packages/faust/topics.py", line 476, in maybe_declare
hello-word_1 | await self.declare()
hello-word_1 | File "/usr/local/lib/python3.8/site-packages/faust/topics.py", line 491, in declare
hello-word_1 | await producer.create_topic(
hello-word_1 | File "/usr/local/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 1072, in create_topic
hello-word_1 | await cast(Transport, self.transport)._create_topic(
hello-word_1 | File "/usr/local/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 1284, in _create_topic
hello-word_1 | await wrap()
hello-word_1 | File "/usr/local/lib/python3.8/site-packages/mode/utils/futures.py", line 55, in __call__
hello-word_1 | result = await self.fun(*self.args, **self.kwargs)
hello-word_1 | File "/usr/local/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 1371, in _really_create_topic
hello-word_1 | raise for_code(code)(f"Cannot create topic: {topic} ({code}): {reason}")
hello-word_1 | kafka.errors.InvalidReplicationFactorError: [Error 38] InvalidReplicationFactorError: Cannot create topic: f-reply-2a37b8f6-246e-4141-a5d5-f96a6ca9e7c9 (38): Replication factor must be larger than 0.
class MyTopic(faust.Topic):
def __init__(self, *args, **kwargs):
if kwargs.get('replicas') == 0:
kwargs['replicas'] = 1
super().__init__(*args, **kwargs)
app = faust.App(
'hello-world',
broker='kafka://kafka:9092',
reply_create_topic=True,
Topic=MyTopic
)
I think changing this line https://github.com/faust-streaming/faust/blob/master/faust/agents/replies.py#L190 to replicas=1
or replicas=None
will solve the problem
root@0d7a8b87eab1:/# cat /etc/os-release
PRETTY_NAME="Debian GNU/Linux 10 (buster)"
NAME="Debian GNU/Linux"
VERSION_ID="10"
VERSION="10 (buster)"
VERSION_CODENAME=buster
ID=debian
I have no name!@9e91917f6f27:/$ kafka-topics.sh --version
2.7.0 (Commit:448719dc99a19793)
master
branch of Faust.Check out faust-streaming 40237be, then
./scripts/install
./scripts/lint
./scripts/tests
<runs a lot of tests>
scripts/coverage: 11: codecov: not found
I'm not familiar with codecov, but I guess it should be run only if the environment variable CODECOV_TOKEN is set?
Issue can be reproduced on the latest stable commit in master - 8d61a78c76a096597421a8e9db2878d4381dd6a
pip install git+https://github.com/robinhood/faust@38d61a78c76a096597421a8e9db2878d4381dd6a
import faust
app = faust.App(
'hello-world',
broker='kafka://localhost:9092',
value_serializer='raw',
)
greetings_topic = app.topic('greetings')
@app.agent(greetings_topic)
async def greet(greetings):
async for greeting in greetings:
print(greeting)
faust -A hello_world worker -l warn
kafka-console-producer --broker-list localhost:9092 --topic greetings
>hello
kafka-consumer-groups --bootstrap-server localhost:9092 --group hello-world --reset-offsets --to-latest --execute --all-topics
faust -A hello_world worker -l warn
kafka-console-producer --broker-list localhost:9092 --topic greetings
>hello 1, will be ignored
>hello 2, will be printed
┌ƒaµS† v1.11.0a1────────────────────────────────────────────────────────┐
│ id │ hello-world │
│ transport │ [URL('kafka://localhost:9092')] │
│ store │ memory: │
│ web │ http://maxims-macbook-pro.local:6066 │
│ log │ -stderr- (warn) │
│ pid │ 49793 │
│ hostname │ Maxims-MacBook-Pro.local │
│ platform │ CPython 3.8.0 (Darwin x86_64) │
│ drivers │ │
│ transport │ aiokafka=1.1.6 │
│ web │ aiohttp=3.6.2 │
│ datadir │ /Users/shamanu4/projects/faust_test/hello-world-data │
│ appdir │ /Users/shamanu4/projects/faust_test/hello-world-data/v1 │
└─────────────┴─────────────────────────────────────────────────────────┘
starting➢ 😊
[2020-07-21 12:51:38,541] [49793] [WARNING] b'hello 1, will be ignored'
[2020-07-21 12:51:39,399] [49793] [WARNING] b'hello 2, will be printed'
┌ƒaµS† v1.11.0a1────────────────────────────────────────────────────────┐
│ id │ hello-world │
│ transport │ [URL('kafka://localhost:9092')] │
│ store │ memory: │
│ web │ http://maxims-macbook-pro.local:6066 │
│ log │ -stderr- (warn) │
│ pid │ 49793 │
│ hostname │ Maxims-MacBook-Pro.local │
│ platform │ CPython 3.8.0 (Darwin x86_64) │
│ drivers │ │
│ transport │ aiokafka=1.1.6 │
│ web │ aiohttp=3.6.2 │
│ datadir │ /Users/shamanu4/projects/faust_test/hello-world-data │
│ appdir │ /Users/shamanu4/projects/faust_test/hello-world-data/v1 │
└─────────────┴─────────────────────────────────────────────────────────┘
starting➢ 😊
[2020-07-21 12:51:39,399] [49793] [WARNING] b'hello 2, will be printed'
master
branch of Faust.I have a typical filter/group_by setup. Simplified to the maximum goes along these lines:
import faust
class Value(faust.Record):
...
app = faust.App(
"app",
broker=...,
broker_credentials=...,
consumer_auto_offset_reset="latest",
topic_partitions=3,
topic_replication_factor=3,
)
source_topic = app.topic("source-topic", value_type=Value)
@app.agent(source_topic)
async def my_agent(stream):
async for value in stream.filter(
lambda v: v > 1000
).group_by(
lambda v: v.some_other_attr,
name="custom-key",
):
pass
This automatically creates topic app.my_agent-source-topic-custom-key-repartition
(I might be slightly wrong here, doesn't matter).
pass
. I have stripped all the functionalities one by one and got to "bare bones" Faust.filter
and leave only group_by then the problem is gone, offset commits are tight, back where they should be. The thing is that we really need this filter as only about 30% traffic is eligible for repartition.Hey there guys, I'm sorry I didn't fill in the template as this is more of a question.
Our application must read some data from a database before starting to process streams, as the loaded data is correlated with certain fields of a message that arrives.
At present, I don't seem to see an elegant way of doing this other than having a global bool and making the agent sleep until this bool changes to indicate loading has completed.
Is there any other way to accomplish this?
Huge thanks
Fotis
master
branch of Faust.Run a faust worker that waits for messages on a topic with no or very little traffic.
Worker should sit and do nothing.
I get ERROR level messages as follows:
[2020-11-17 16:19:03,695] [708969] [ERROR] Exception in callback Fetcher._create_fetch_waiter.<locals>.<lambda>(<Future cancelled>) at /home/forsberg/.virtualenvs/ferrostream/lib/python3.8/site-packages/aiokafka/consumer/fetcher.py:446
handle: <Handle Fetcher._create_fetch_waiter.<locals>.<lambda>(<Future cancelled>) at /home/forsberg/.virtualenvs/ferrostream/lib/python3.8/site-packages/aiokafka/consumer/fetcher.py:446>
Traceback (most recent call last):
File "/usr/lib/python3.8/asyncio/events.py", line 81, in _run
self._context.run(self._callback, *self._args)
File "/home/forsberg/.virtualenvs/ferrostream/lib/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 446, in <lambda>
lambda f, waiters=self._fetch_waiters: waiters.remove(f))
KeyError: <Future cancelled>
Problem goes away if I revert 361b09d (releasing all fetch waiters).
Problem also seems to appear during rebalance for busy workers, i.e workers where the input topic is busy.
master
branch of Faust.When the number of partitions for an agent topic is not evenly divisible by the number of workers, the PartitionAssignor can leave one or more workers with no partitions assigned. This wastes CPU and memory resources, as well as causing other workers to carry a heavier load and potentially reduces throughput.
The initial deployment will distribute the partitions across all workers. For example, a topic with 100 partitions will be spread across 40 workers with 20 workers having three partitions and 20 workers having 2 partitions. The CopartitionedAssignor will calculate the (maximum) capacity for each worker to be 3 partitions, using the formula ceil(num_partitions / num_workers).
Now if one worker gets rebooted or leaves the group for any reason, a rebalance is triggered and the 2 or 3 partitions for that worker get moved to other workers that have 2 partitions, leaving 22 or 23 workers with 3 partitions and 17 or 18 workers with 2 partitions (100 partitions across 39 workers).
When the rebooted worker recovers and rejoins the group, it will probably not receive any partitions because there are no "extra" partitions on any of the workers. The maximum capacity is still 3, no worker has more than 3 partitions, so no partitions are "available" for assignment. Rebooting the worker has no impact for the same reason.
The worker with no partitions will leave the consumer group after 5 minutes as the aiokafka Fetcher has been idle due to no assignment, which means that future rebalances of the group will NOT include this consumer/worker, and it will be idle forever, or until the group is redeployed.
Partitions should be assigned to all available workers, as balanced as possible.
Workers can receive no partition assignment and leave the group.
Paste the full traceback (if there is any)
master
branch of Faust.Have an agent process slowly. When the number of events on its Queue > stream_buffer_maxsize
it causes starvation of other stream processors(agents) processing other topics
Stop fetching data from the slow topic/partition when the buffer reaches a threshold and then resume it after the pressure drops
Paste the full traceback (if there is any)
Created a topic specifying pattern= with a valid regex that should have matched multiple topics on the kafka cluster. Running the worker with -l info
shows everything starting up, but the only topic subscription is to the base assignor topic.
This is a known, existing issue dating back to at least July 25, 2019 as detailed in robinhood/faust:
robinhood/faust#390
I suspect, but have not verified, that the issue occurs in 'faust/faust/transport/drivers/aiokafka:subscribe' given the comment XXX pattern does not work :/
from a prior dev.
Still occurs on master
No traceback - silently fails to do anything
import faust
import io
import json
from datetime import datetime
import random
import os
import ssl
import sys
from decimal import *
import typing
class Greeting(faust.Record):
from_name: str
to_name: str
app = faust.App('faust-dev', broker='kafka://broker:29092')
topic = app.topic('MyGreatTopic-1', value_type=Greeting)
topic2 = app.topic('MyGreatTopic-2', value_type=Greeting)
topic3 = app.topic('MyGreatTopic-3', value_type=Greeting)
@app.task
async def create_topics():
await topic.maybe_declare()
await topic2.maybe_declare()
await topic3.maybe_declare()
regex_topic = app.topic(pattern="^MyGreatTopic-.*$", value_type=Greeting)
@app.agent(regex_topic)
async def hello(greetings):
async for event in greetings.events():
greeting = event.value
print(f'{event.message.topic} says: Hello from {greeting.from_name} to {greeting.to_name}')
@app.timer(interval=1.0)
async def example_sender(app):
await topic.send(
value=Greeting(from_name='Faust', to_name='you'),
)
await topic2.send(value=Greeting(from_name='Faust 2', to_name='you'))
await topic3.send(value=Greeting(from_name='Faust 3', to_name='you'))
if __name__ == '__main__':
app.main()
Which provider shall we use? Our options are Travis
, Gitlab
, Circle CI
Hey all, the default Faust package wouldn't allow my app to connect to Confluent Cloud, I'm guessing because it was using the older aiokafka
driver. Since this uses the updated version, can I use the aiokafka URI to connect to my conlfuent cloud instance? And if by any chance someone can provide an example, that would be greatly appreciated.
Many thanks!
master
branch of Faust.Define a SetGlobalTable and start a faust worker.
my_table= app.SetGlobalTable('some_name', start_manager=True)
The worker runs without any warnings.
A warning appears every few seconds:
faust_1 | [2021-01-20 14:49:11,275] [1] [WARNING] Timer SetManager.flush is overlapping (interval=2.0 runtime=2.0038752279942855)
faust_1 | [2021-01-20 14:49:19,277] [1] [WARNING] Timer SetManager.flush is overlapping (interval=2.0 runtime=2.003296997048892)
faust_1 | [2021-01-20 14:49:27,277] [1] [WARNING] Timer SetManager.flush is overlapping (interval=2.0 runtime=2.0008818010101095)
faust_1 | [2021-01-20 14:49:35,279] [1] [WARNING] Timer SetManager.flush is overlapping (interval=2.0 runtime=2.0001070860307664)
faust_1 | [2021-01-20 14:49:39,280] [1] [WARNING] Timer SetManager.flush is overlapping (interval=2.0 runtime=2.000795218977146)
faust_1 | [2021-01-20 14:49:43,282] [1] [WARNING] Timer SetManager.flush is overlapping (interval=2.0 runtime=2.0005710140103474)
faust_1 | [2021-01-20 14:49:47,283] [1] [WARNING] Timer SetManager.flush is overlapping (interval=2.0 runtime=2.000350568909198)
faust_1 | [2021-01-20 14:49:51,285] [1] [WARNING] Timer SetManager.flush is overlapping (interval=2.0 runtime=2.0012008629273623)
faust_1 | [2021-01-20 14:49:55,286] [1] [WARNING] Timer SetManager.flush is overlapping (interval=2.0 runtime=2.00038573902566)
faust_1 | [2021-01-20 14:49:59,289] [1] [WARNING] Timer SetManager.flush is overlapping (interval=2.0 runtime=2.001224332023412)
faust_1 | [2021-01-20 14:50:03,290] [1] [WARNING] Timer SetManager.flush is overlapping (interval=2.0 runtime=2.000285888905637)
faust_1 | [2021-01-20 14:50:07,292] [1] [WARNING] Timer SetManager.flush is overlapping (interval=2.0 runtime=2.0006646789843217)
faust_1 | [2021-01-20 14:50:15,295] [1] [WARNING] Timer SetManager.flush is overlapping (interval=2.0 runtime=2.0010148769943044)
faust_1 | [2021-01-20 14:50:19,296] [1] [WARNING] Timer SetManager.flush is overlapping (interval=2.0 runtime=2.000219620997086)
Is the sleep timer here neccessary?
Line 145 in 63853b1
That sleep should already be handled by itertimer right?
master
branch of Faust.Recovery crashes on a rebalance
no crash
Tell us what you expected to happen.
[2020-11-20 20:25:29,427] [22] [ERROR] [^---Recovery]: Crashed reason=IllegalStateError("No current assignment for partition TopicPartition(topic='p999105-enriched-ad', partition=10)")
Traceback (most recent call last):
File "/usr/local/lib/python3.7/dist-packages/mode/services.py", line 802, in _execute_task
await task
File "/usr/local/lib/python3.7/dist-packages/faust/tables/recovery.py", line 349, in _restart_recovery
await T(self._resume_streams)()
File "/usr/local/lib/python3.7/dist-packages/faust/tables/recovery.py", line 297, in _resume_streams
consumer.perform_seek(), timeout=self.app.conf.broker_request_timeout
File "/usr/local/lib/python3.7/dist-packages/faust/tables/recovery.py", line 556, in _wait
wait_result = await self.wait_first(coro, signal, timeout=timeout)
File "/usr/local/lib/python3.7/dist-packages/mode/services.py", line 715, in wait_first
f.result() # propagate exceptions
File "/usr/local/lib/python3.7/dist-packages/faust/transport/consumer.py", line 534, in perform_seek
_committed_offsets = await self.seek_to_committed()
File "/usr/local/lib/python3.7/dist-packages/faust/transport/consumer.py", line 1315, in seek_to_committed
return await self._thread.seek_to_committed()
File "/usr/local/lib/python3.7/dist-packages/faust/transport/drivers/aiokafka.py", line 509, in seek_to_committed
return await self.call_thread(self._ensure_consumer().seek_to_committed)
File "/usr/local/lib/python3.7/dist-packages/mode/threads.py", line 436, in call_thread
result = await promise
File "/usr/local/lib/python3.7/dist-packages/mode/threads.py", line 383, in _process_enqueued
result = await maybe_async(method(*args, **kwargs))
File "/usr/local/lib/python3.7/dist-packages/mode/utils/futures.py", line 134, in maybe_async
return await res
File "/usr/local/lib/python3.7/dist-packages/aiokafka/consumer/consumer.py", line 868, in seek_to_committed
self._fetcher.seek_to(tp, offset)
File "/usr/local/lib/python3.7/dist-packages/aiokafka/consumer/fetcher.py", line 1162, in seek_to
self._subscriptions.seek(tp, offset)
File "/usr/local/lib/python3.7/dist-packages/aiokafka/consumer/subscription_state.py", line 239, in seek
self._assigned_state(tp).seek(offset)
File "/usr/local/lib/python3.7/dist-packages/aiokafka/consumer/subscription_state.py", line 120, in _assigned_state
"No current assignment for partition {}".format(tp))
kafka.errors.IllegalStateError: IllegalStateError: No current assignment for partition TopicPartition(topic='p999105-enriched-ad', partition=10)
[2020-11-20 20:25:29,436] [22] [WARNING] [^-App]: Missing sensor state for rebalance #13
[2020-11-20 20:25:29,439] [22] [INFO] [^Worker]: Stopping...
[2020-11-20 20:25:29,440] [22] [INFO] [^-App]: Stopping...
[2020-11-20 20:25:29,440] [22] [INFO] [^---Fetcher]: Stopping...
[2020-11-20 20:25:29,441] [22] [INFO] [^--Consumer]: Consumer shutting down for user cancel.
[2020-11-20 20:25:29,441] [22] [INFO] [^-App]: Flush producer buffer...
[2020-11-20 20:25:29,442] [22] [INFO] [^--TableManager]: Stopping...
Tell us what happened instead.
Paste the full traceback (if there is any)
master
branch of Faust.I struggle to reproduce this consistently, as I suspect it is the result of a very tight race condition. I can however say that it tends to occur around rebalances, and generally with agents which return naturally.
The problem stems from the wait_empty
method on a consumer
, which waits for streams to be consumed before exiting (assuming stream_wait_empty
is not overriden).
This method logs human tracebacks of the agents, which in turn relies on mode
to produce those tracebacks - raising the error here.
This error is raised if the agent coroutine has coro.cr_frame == None
, which is the valid state for a closed coroutine (i.e. one which has finished processing and returned). This could presumably be raised for agen.ag_frame
etc too.
I think its valid that mode
raises here (although a custom exception would be useful), because the traceback cannot be found.
My current workaround for this is as follows:
from typing import List
from faust import App
from faust.agents import Agent
class CustomAgent(Agent):
def actor_tracebacks(self) -> List[str]:
tracebacks: List[str] = []
for actor in self._actors:
try:
tracebacks.append(actor.traceback())
except RuntimeError as exc:
if "cannot find stack of coroutine" in str(exc):
tracebacks.append(f"Could not find stack of coroutine for actor: {actor}")
continue
raise exc
return tracebacks
app = App(
...,
Agent=CustomAgent,
...,
)
No exception to be raised when collecting actor traces for the purpose of logging.
A RuntimeError("cannot find stack of coroutine")
is raised.
File \"/usr/local/lib/python3.8/site-packages/mode/utils/tracebacks.py\", line 52, in print_coro_stack
tb = Traceback.from_coroutine(coro, limit=limit)
| | | -> 125
| | -> <coroutine object my_agent at 0x7f6d09d361c0>
| -> <classmethod object at 0x7f6d0d48ce20>
-> <class 'mode.utils.tracebacks.Traceback'>
File \"/usr/local/lib/python3.8/site-packages/mode/utils/tracebacks.py\", line 231, in from_coroutine
raise RuntimeError('cannot find stack of coroutine')
RuntimeError: cannot find stack of coroutine
confluentinc/cp-kafka:5.5.1
master
branch of Faust.We have noticed that on a restart the recovery hangs with the only message from the fetcher and then it just hangs indefinitely
Recovery should not hang
Hangs with the following log message
[^----Fetcher]: Starting...
Paste the full traceback (if there is any)
master
branch of Faust.setting up a normal streaming application with rocksdb and an internal timer to delete older rows in rocksdb
I expected that the scipt continues to work normally
randomly, in the log I found:
[2020-12-21 15:36:51,857] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:34:04,708] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:37:01,560] [60692] [WARNING] Heartbeat failed: local member_id was not recognized; resetting and re-joining group
[2020-12-21 15:37:03,509] [60692] [WARNING] Timer commit is overlapping (interval=2.8 runtime=178.82933128997684)
[2020-12-21 15:38:05,538] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:38:14,381] [60692] [WARNING] Timer commit is overlapping (interval=2.8 runtime=17.595341868989635)
[2020-12-21 15:38:22,749] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:38:31,374] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:38:41,520] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:38:59,004] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:39:18,977] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:39:38,909] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:39:50,086] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:40:22,655] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:40:41,770] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:40:52,244] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:41:00,917] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:41:19,617] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:41:29,388] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:41:36,996] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:41:48,920] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:42:16,437] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:42:30,207] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:42:38,045] [60692] [WARNING] Timer commit is overlapping (interval=2.8 runtime=117.13050575199304)
[2020-12-21 15:42:47,174] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:43:11,633] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:43:19,542] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
simulate-rt-odd.service: Main process exited, code=killed, status=9/KILL
simulate-rt-odd.service: Failed with result 'signal'.
so basically the application starts to raise these errors and eventually it is stopped by systemctl (at least I suppose)
for the full traceback see previous step
Currently the metadata that we have for faust-straming
in pypi is pointing to the original faust
. We need to update the setup.py
and probably the MANIFEST.in
master
branch of Faust.It looks like the regression was added in faust/agents/agent.py, specifically this chunk of code:
if cur_version >= req_version:
task = asyncio.Task(
self._execute_actor(coro, aref),
loop=self.loop,
name=f"{str(aref)}-{self.channel.get_topic_name()}",
)
else:
task = asyncio.Task(
self._execute_actor(coro, aref),
loop=self.loop,
)
Technically nothing - yesterday pytest testing code I was writing was running fine, today that same code errored out on every test I was writing - even previously passing tests. I pulled it out an created a minimal set of code to reproduce the error here (agent.test_context() returns a Channel which is how I noticed it):
import faust
app = faust.App('faust-bug-reproduction', broker='kafka://broker:29092')
topic = app.channel()
topic.maybe_declare()
@app.agent(topic)
async def split_sentence(data):
async for key, value in data.items():
print(key)
print(value)
app.k = 1
app.v = 1
# Sends a message to the topic on a timer
@app.timer(interval=2.0)
async def example_sender(app):
await topic.send(
value=app.v,
key=app.k
)
app.k *= 2
app.v *= 3
# When ran standalone, start it up
if __name__ == '__main__':
app.main()
Just a bunch of repeating messages of the form:
[2021-01-21 17:38:42,774] [1] [WARNING] 1
[2021-01-21 17:38:42,774] [1] [WARNING] 1
[2021-01-21 17:38:44,774] [1] [WARNING] 2
[2021-01-21 17:38:44,774] [1] [WARNING] 3
[2021-01-21 17:38:46,774] [1] [WARNING] 4
[2021-01-21 17:38:46,774] [1] [WARNING] 9
Crash on name=f"{str(aref)}-{self.channel.get_topic_name()}
. Error:NotImplemented Error('Channels are unnamed topics')
name=f"{str(aref)}-{self.channel.get_topic_name()}",
RuntimeWarning: Enable tracemalloc to get the object allocation traceback ◣[2021-01-21 17:34:54,688] [1] [ERROR] [^Worker]: Error: NotImplementedError('Channels are unnamed topics')
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/mode/worker.py", line 276, in execute_from_commandline
self.loop.run_until_complete(self._starting_fut)
File "/usr/local/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 759, in start
await self._default_start()
File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 766, in _default_start
await self._actually_start()
File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 790, in _actually_start
await child.maybe_start()
File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 818, in maybe_start
await self.start()
File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 759, in start
await self._default_start()
File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 766, in _default_start
await self._actually_start()
File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 790, in _actually_start
await child.maybe_start()
File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 818, in maybe_start
await self.start()
File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 759, in start
await self._default_start()
File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 766, in _default_start
await self._actually_start()
File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 783, in _actually_start
await self.on_start()
File "/usr/local/lib/python3.8/site-packages/faust/agents/manager.py", line 57, in on_start
await agent.maybe_start()
File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 818, in maybe_start
await self.start()
File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 759, in start
await self._default_start()
File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 766, in _default_start
await self._actually_start()
File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 783, in _actually_start
await self.on_start()
File "/usr/local/lib/python3.8/site-packages/faust/agents/agent.py", line 284, in on_start
await self._on_start_supervisor()
File "/usr/local/lib/python3.8/site-packages/faust/agents/agent.py", line 315, in _on_start_supervisor
res = await self._start_one(
File "/usr/local/lib/python3.8/site-packages/faust/agents/agent.py", line 253, in _start_one
return await self._start_task(
File "/usr/local/lib/python3.8/site-packages/faust/agents/agent.py", line 648, in _start_task
return await self._prepare_actor(
File "/usr/local/lib/python3.8/site-packages/faust/agents/agent.py", line 668, in _prepare_actor
name=f"{str(aref)}-{self.channel.get_topic_name()}",
File "/usr/local/lib/python3.8/site-packages/faust/channels.py", line 197, in get_topic_name
raise NotImplementedError("Channels are unnamed topics")
NotImplementedError: Channels are unnamed topics
master
branch of Faust.Application is configured with processing_guarantee="exactly_once"
publish 5 messages to a topic, keyed by id
repartition the topic using group_by(new_id)
increment count on table with keys that are new_id
Initially, the worker is up, processes the messages and stores the correct data in the changelog topic.
Then I send SIGTERM to stop the worker
When restarting the worker, it gets stuck on recovering per the logs below.
Tell us what you did to cause something to happen.
Possibly some issue with the transaction producer and a transaction potentially getting aborted leads to worker not able to recover.
Tell us what you expected to happen.
Worker recovers and is able to process events.
Tell us what happened instead.
Worker hangs on recovery.
Log showing this behavior.
[2020-11-29 15:55:45,969] [114] [WARNING] [^---Recovery]: No event received for active tp TP(topic='meteor-submission-count-by-workflow-changelog', partition=0) in the last 30.0 seconds (last event received 1.04 minute ago)
[2020-11-29 15:55:50,974] [114] [WARNING] [^---Recovery]: No event received for active tp TP(topic='meteor-submission-count-by-workflow-changelog', partition=0) in the last 30.0 seconds (last event received 1.13 minute ago)
[2020-11-29 15:55:55,970] [114] [WARNING] [^---Recovery]: No event received for active tp TP(topic='meteor-submission-count-by-workflow-changelog', partition=0) in the last 30.0 seconds (last event received 1.21 minute ago)
[2020-11-29 15:56:00,976] [114] [WARNING] [^---Recovery]: No event received for active tp TP(topic='meteor-submission-count-by-workflow-changelog', partition=0) in the last 30.0 seconds (last event received 1.29 minute ago)
[2020-11-29 15:56:05,972] [114] [WARNING] [^---Recovery]: No event received for active tp TP(topic='meteor-submission-count-by-workflow-changelog', partition=0) in the last 30.0 seconds (last event received 1.38 minute ago)
[2020-11-29 15:56:10,977] [114] [WARNING] [^---Recovery]: No event received for active tp TP(topic='meteor-submission-count-by-workflow-changelog', partition=0) in the last 30.0 seconds (last event received 1.46 minute ago)
[2020-11-29 15:56:15,974] [114] [WARNING] [^---Recovery]: No event received for active tp TP(topic='meteor-submission-count-by-workflow-changelog', partition=0) in the last 30.0 seconds (last event received 1.54 minute ago)
[2020-11-29 15:56:20,979] [114] [WARNING] [^---Recovery]: No event received for active tp TP(topic='meteor-submission-count-by-workflow-changelog', partition=0) in the last 30.0 seconds (last event received 1.63 minute ago)
[2020-11-29 15:56:25,975] [114] [WARNING] [^---Recovery]: No event received for active tp TP(topic='meteor-submission-count-by-workflow-changelog', partition=0) in the last 30.0 seconds (last event received 1.71 minute ago)
[2020-11-29 15:56:30,980] [114] [WARNING] [^---Recovery]: No event received for active tp TP(topic='meteor-submission-count-by-workflow-changelog', partition=0) in the last 30.0 seconds (last event received 1.79 minute ago)
[2020-11-29 15:56:35,977] [114] [WARNING] [^---Recovery]: No event received for active tp TP(topic='meteor-submission-count-by-workflow-changelog', partition=0) in the last 30.0 seconds (last event received 1.88 minute ago)
[2020-11-29 15:56:40,982] [114] [WARNING] [^---Recovery]: Recovery has not flushed buffers in the last 120.0 seconds (last flush was 2.00 minutes ago). Current total buffer size: 5
master
branch of Faust.We created a simple test for Faust streaming and ran two or more parallel workers, each using own RocksDB.
Under high traffic, stopping one worker sometimes triggers crashes in the other one.
Workers can be stopped, and restarted, and even in the unfortunate even they crash, it should not impact other workers operating in parallel.
Other workers may crash.
Tell us what happened instead.
[^---Recovery]: Crashed reason=KeyError(TP(topic='<our-topic>', partition=1))
Traceback (most recent call last):
File "<our venv>/lib/python3.8/site-packages/mode/services.py", line 802, in _execute_task
await task
File "<our venv>/lib/python3.8/site-packages/faust/tables/recovery.py", line 375, in _restart_recovery
await self._wait(
File "<our venv>/lib/python3.8/site-packages/faust/tables/recovery.py", line 562, in _wait
wait_result = await self.wait_first(coro, signal, timeout=timeout)
File "<our venv>/lib/python3.8/site-packages/mode/services.py", line 715, in wait_first
f.result() # propagate exceptions
File "<our venv>/lib/python3.8/site-packages/faust/tables/recovery.py", line 666, in _build_offsets
new_value = earliest[tp]
KeyError: TP(topic='<our topic>', partition=1)
master
branch of Faust.MWE:
import faust
app = faust.App('myapp')
if __name__ == "__main__":
app.main()
python test.py worker
no warnning
starting➢ 😊
◦[2021-01-22 23:50:16,167] [414000] [WARNING] [^-App]: Missing sensor state for rebalance #1
[2021-01-22 23:50:16,181] [414000] [WARNING] [^-App]: Missing sensor state for rebalance #1
The same warning also appeared in #44.
This was introduced in 0.2.0. It won't happen in 0.1.1. The signal
faust/faust/tables/recovery.py
Line 241 in 5cce145
was changed from reset
to start
in 0.2.0, somehow causing
Lines 1611 to 1612 in fda7e52
clears the sensor state.
Hello, just wanted to say that that we really enjoy Faust and much thanks to everyone who helps develop and maintain the project. I've not run into any issues with Faust, but I have a small question just regarding how to more efficiently send messages (eg as a batch) to a topic
I have an agent which processes messages from a topic, whose goal it is to split a single message into many messages and send the new messages to the next topic. The code looks something like this:
@app.agent(input_topic)
async def split_messages(stream):
async for event in stream:
# each event contains a list of readings
for reading in event.readings:
# reading is an object like {value: 1, id: 34}
individual_event = IndividualEvent(
id=reading.id,
value=reading.value
)
await output_topic.send(key=individual_event.id, value=individual_event)
Is it possible to send the new messages more efficiently? I want to do something like this instead:
@app.agent(input_topic)
async def split_messages(stream):
async for event in stream:
messages_to_publish = [IndividualEvent(
id=reading.id,
value=reading.value
) for reading in event.readings]
await output_topic.send_batch(messages_to_publish)
Is there a preferred way to handle this case and is it possible to do this with the map/reduce functions?
Thank you for the help !
master
branch of Faust.This issue is after recovery is complete the standby partition views should be updated in batches but is not due to the check in _may_signal_recovery_end()
Tell us what you expected to happen.
Tell us what happened instead.
Paste the full traceback (if there is any)
Hi,
Thank you for your work.
Do you plan to also fork mode ?
It is another project formerly maintained by a robinhood employee and today it looks abandoned.
It is an important dependency of Faust so it could be valuable to keep it updated in parallel of your Faust fork.
If there is anything I can do help you in this process, just ask.
Thank you very much.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.