Coder Social home page Coder Social logo

ydf0509 / funboost Goto Github PK

View Code? Open in Web Editor NEW
635.0 13.0 125.0 71.11 MB

pip install funboost,python全功能分布式函数调度框架,。支持python所有类型的并发模式和一切知名消息队列中间件,支持如 celery dramatiq等框架整体作为funboost中间件,python函数加速器,框架包罗万象,用户能想到的控制功能全都有。一统编程思维,兼容50% python业务场景,适用范围广。只需要一行代码即可分布式执行python一切函数,99%用过funboost的pythoner 感受是 简易 方便 强劲 强大,相见恨晚 。

License: Apache License 2.0

Python 97.02% CSS 1.30% JavaScript 0.08% HTML 1.58% Batchfile 0.02%
celery kafka nsq rabbitmq redis rocketmq sqlite tcp zeromq asyncio

funboost's Introduction

nb_time pip install nb_time
面向对象封装的NbTime 时间类,方便时间转化和时区支持,支持无限链式操作,用法暴击亲自使用 datetime 和 三方 arrow 包
比面向过程工程师在time_utils.py写几百个孤立的时间转换函数用法方便太多。
db_libs pip install db_libs
各种数据库的封装。只封装生成连接,很少添加新的方法调用原生方法这种写法。
async_pool_executor pip install async_pool_executor
its api like the concurrent.futures.使asyncio并发编程简化10倍
flexible_thread_pool pip install flexible_thread_pool
flexible_thread_pool 支持同步函数和 acync def 的 异步函数并发执行。 可扩大和自动缩小的线程池,比 threadpool_executor_shrink_able 实现更简单的线程池,性能超过 concurrent.futures.ThreadpoolExecutor 200%
sync2asyncio pip install sync2asyncio
python 快速万能同步转异步语法
object_pool_proj pip install universal_object_pool
通用对象池,可以池化任意自定义类型的对象,用于快速实现任意池(线程池除外)。
nb_http_client pip install nb_http_client
powred by object_pool_proj
nb_http_client 是 python 史上性能最强的http客户端,比任意请求包快很多倍
celery_demo 演示复杂深层路径,完全不按照一般套路的目录格式的celery使用
funboost_support_celery_demo 演示复杂深层路径,完全不按照一般套路的目录格式,使用funboost来自动化配置和操作celery,代码极其简化
nb_filelock pip install nb_filelock
使用磁盘文件作为介质,实现基于单台机器的跨进程跨解释器的分布式锁。
tps_threadpool_executor pip install tps_threadpool_executor
控频线程池,能够指定精确每秒运行多少次函数,而不是精确指定程序线程池中同时多少个线程在并发
auto_run_on_remote pip install auto_run_on_remote
在本机点击运行一个python脚本,但自动使该脚本自动在远程linux机器上运行。
方便程度暴击pycahrm 专业版调用远程linux解释器
auto_restart pip install auto_restart
自动重启冷部署工具。当检测到git内容发生变化时候,会自动重启服务,无需手动重启发版。
base_decorator pip install base_decorator
通用的装饰器基类,使写装饰器变得更简单。
decorator_libs pip install decorator_libs
常用的日常通用装饰器大全
fastapi_use_funboost fastapi 使用分布式函数调度框架 fastapi_use_funboost 作为后台消费的 demo
uwsgi_flask_funboost uwsgi部署flask + funboost 作为后台消费的 demo
django_use_funboost dajngo + funboost 作为后台消费的 demo
funboost_django_orm_demo dajngo + funboost + 函数中操作了orm ,作为后台消费的 demo
funboost_vs_celery_benchmark 使用严谨精确的控制变量法,测试分布式函数调度框架 funboost 和celery的性能对比
pysnooper_click_able pip install pysnooper_click_able 神级别黑科技装饰器,实现难度5颗星。不用打断点不用到处加print的deubg工具,可以精确显示代码运行率轨迹并点击。 可以精确动态统计调用一个函数背后,python到底解释执行了多少行代码,让你对函数消耗的cpu资源了如指掌。
pythonpathdemo 用专门的项目说明掌握python的 PYTHONPATH的重要性;说明窗口会话临时环境变量和永久性环境变量区别;说明pythonpath的好处;说明pythonpath的妙用。学了PYTHONPATH 写几十个项目复用公共代码如虎添翼
kuai_log pip install kuai_log 速度最快的python日志,比nb_log更简单简化,没有三方包依赖和无需配置文件

funboost's People

Contributors

leec20 avatar t880216t avatar ydf0509 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

funboost's Issues

nb 克拉斯

确实nb,但是吧,引入了好多项目用不上的包。flask、kafka、mogodb…

pulsar怎么使用

粗略看了下,好像只有constant定义了,配置、工厂和其他地方都没有相关代码,是需要自己拓展吗?还是我没有留意到?

在django项目中使用时,会阻塞django进程(用户自己问题)

大佬,我在django项目中使用时,会阻塞django进程(可能我用的不对?),
我觉得应该不会阻塞才对,不知怎么改进了。。

  1. 定义一个任务
from funboost import boost, BrokerEnum

@boost('run_queue', broker_kind=BrokerEnum.LOCAL_PYTHON_QUEUE, is_using_rpc_mode=True, log_level=50)
def backup_task(device_info):
    backup = BackupNetConfTask(device_info)
    return "success"
  1. 在视图函数中调用任务
from tasks import backup_task
    ... ...
    @action(methods=['post'], detail=False)
    def run_backup_task(self, request, *args, **kwargs):
        """
        api/v1/devices/run_backup_task/
        :param request: 请求对象
        :param args: 元组参数
        :param kwargs: 字典参数
        :return:
        """
        device_infos = get_device_infos() 
        for device_info in device_infos:
            backup_task.pub(dict(device_info=device_info))  <------------------ 程序阻塞在这里了
        return CustomResponse(status=status.HTTP_200_OK, msg="run backup task success")
  1. 启动消费者
from tasks import backup_task
backup_task.consume()

consumers/base_consumer.py 中的一处疑惑

在 consumers/base_consumer.py 文件的 schedulal_task_with_no_block() 中,第 1097 行代码:
elif self._concurrent_mode == ConcurrentModeEnum.GEVENT:
是不是应该改为:
elif self._concurrent_mode == ConcurrentModeEnum.EVENTLET:

多进程锁竞争严重

  • 提示
"C:\Python38\lib\site-packages\funboost\consumers\confirm_mixin.py:70" 这行代码没有获得redis锁 fsdf_lock__requeue_tasks_which_unconfirmed:class_sch_group_check
  • 程序情况
  1. 开了五个实例(进程)
  2. 每个进程 qps=1, concurrent_num=1 (因为这个任务为CPU密集型
  3. push很多任务,但同时运行的进程大约为 2-4
  4. 后面竞争起来了,有效进程就为2了,然后参与锁竞争的进程好像还会满载cpu

win11 python3.10.0版本安装gevent报错,要求c++环境

安装报错,我截几个关键的位置,不知道如何解决
Building wheels for collected packages: gevent
Building wheel for gevent (pyproject.toml) ... error
error: subprocess-exited-with-error

× Building wheel for gevent (pyproject.toml) did not run successfully.
│ exit code: 1
╰─> [289 lines of output]
.................
error: Microsoft Visual C++ 14.0 or greater is required. Get it with "Microsoft C++ Build Tools": https://visualstudio.microsoft.com/visual-cpp-build-tools/
[end of output]

note: This error originates from a subprocess, and is likely not a problem with pip.
ERROR: Failed building wheel for gevent
Failed to build gevent
ERROR: Could not build wheels for gevent, which is required to install pyproject.toml-based projects

运行示例程序有报错?(用户自己的问题)

import time
from funboost import boost, BrokerEnum,get_consumer,run_consumer_with_multi_process

def f(a, b):
    return a+b

consumer = get_consumer('queue_test_f01', consuming_function=f,qps=0.2, broker_kind=2)

if __name__ == "__main__":
    # 需要手动指定consuming_function入参的值。
    for i in range(10, 2000):
        consumer.publisher_of_same_queue.publish(dict(a=i, b=i * 2))
        
    consumer.start_consuming_message()
    run_consumer_with_multi_process(consumer,8)
    

运行后,报错:

Traceback (most recent call last):
  File "main.py", line 16, in <module>
    run_consumer_with_multi_process(consumer,8)
  File "/home/project/test_mul_thread/venv/lib/python3.8/site-packages/funboost/helpers.py", line 45, in run_consumer_with_multi_process
    if not getattr(task_fun, 'is_decorated_as_consume_function'):
AttributeError: 'RedisConsumer' object has no attribute 'is_decorated_as_consume_function'

但不影响程序执行,程序会正常继续运行,想知道这个错误是否可以规避?

提个建议

下载完之后100多m,能不能精简精简,把不必要的库都删掉,只保留必须的库,只是建议啊,希望国产这库越来越好!

结合tornado使用

hi
想请问一下funboost如何结合tornado实现异步非阻塞啊? 我想使用tornado通过funboost向队列中发消息 然后异步的获取执行结果 然后结束响应。
thks

push任务之后想要阻塞进程弹出如下错误?(没有设置rpc模式)

大佬, 我创建了2个任务,第一个负责下载,第二个负责处理;但是负责处理的任务依赖于第一个,而处理任务会有大量的IO,因此我设置了下载2个线程处理,而处理任务只开启了1个。于是在提交任务的函数中设置了获取下载的运行状态,要求回调结果,但是弹出下面的错误,简化代码如下:

from tasks import download_task, compress_task

def request(url, tempfn):
    '''
    url 为下载地址
    tempfn 为下载的本地路径
    '''
    download_status = download_task.push(url)
    if download_status.result:
        compress_task.push(tempfn)

错误:

raceback (most recent call last):

  File "/root/anaconda3/lib/python3.9/threading.py", line 930, in _bootstrap
    self._bootstrap_inner()
    │    └ <function Thread._bootstrap_inner at 0x7f3b3c808af0>
    └ <DummyProcess(Thread-8, started daemon 139891555944192)>
  File "/root/anaconda3/lib/python3.9/threading.py", line 973, in _bootstrap_inner
    self.run()
    │    └ <function Thread.run at 0x7f3b3c808820>
    └ <DummyProcess(Thread-8, started daemon 139891555944192)>
  File "/root/anaconda3/lib/python3.9/threading.py", line 910, in run
    self._target(*self._args, **self._kwargs)
    │    │        │    │        │    └ {}
    │    │        │    │        └ <DummyProcess(Thread-8, started daemon 139891555944192)>
    │    │        │    └ (<_queue.SimpleQueue object at 0x7f3b2179af40>, <_queue.SimpleQueue object at 0x7f3b2179aa40>, None, (), None, False)
    │    │        └ <DummyProcess(Thread-8, started daemon 139891555944192)>
    │    └ <function worker at 0x7f3b217c71f0>
    └ <DummyProcess(Thread-8, started daemon 139891555944192)>
  File "/root/anaconda3/lib/python3.9/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
                    │     │       └ {}
                    │     └ (('reanalysis-era5-pressure-levels', {'variable': 'geopotential', 'year': '2020', 'month': '4', 'format': 'netcdf', 'area': '...
                    └ <function only_request at 0x7f3b23864d30>
> File "/root/anaconda3/lib/python3.9/site-packages/era5/cli.py", line 111, in only_request
    if downloas_status.result: # 为了阻塞线程
       │               └ <property object at 0x7f3b24a51450>
       └ <funboost.publishers.base_publisher.AsyncResult object at 0x7f3b2078e2b0>
  File "/root/anaconda3/lib/python3.9/site-packages/funboost/publishers/base_publisher.py", line 68, in result
    return self.get()
           │    └ <function AsyncResult.get at 0x7f3b24a52280>
           └ <funboost.publishers.base_publisher.AsyncResult object at 0x7f3b2078e2b0>
  File "/root/anaconda3/lib/python3.9/site-packages/funboost/publishers/base_publisher.py", line 64, in get
    raise HasNotAsyncResult
          └ <class 'funboost.publishers.base_publisher.HasNotAsyncResult'>

cannot set 'is_timeout' attribute of immutable type 'TimeoutError'

python 3.10.3 运行会出现这个哦
Traceback (most recent call last):
File "", line 1027, in _find_and_load
File "", line 1006, in find_and_load_unlocked
File "", line 688, in load_unlocked
File "", line 883, in exec_module
File "", line 241, in call_with_frames_removed
File "D:\python\python3.10\lib\site-packages\funboost_init
.py", line 11, in
from funboost.assist.user_custom_broker_register import register_custom_broker
File "D:\python\python3.10\lib\site-packages\funboost\assist\user_custom_broker_register.py", line 2, in
from funboost.publishers.base_publisher import AbstractPublisher
File "D:\python\python3.10\lib\site-packages\funboost\publishers\base_publisher.py", line 22, in
from funboost.concurrent_pool import CustomThreadPoolExecutor
File "D:\python\python3.10\lib\site-packages\funboost\concurrent_pool_init
.py", line 14, in
from .custom_evenlet_pool_executor import CustomEventletPoolExecutor
File "D:\python\python3.10\lib\site-packages\funboost\concurrent_pool\custom_evenlet_pool_executor.py", line 7, in
from eventlet import greenpool, monkey_patch, patcher, Timeout
File "D:\python\python3.10\lib\site-packages\eventlet_init.py", line 17, in
from eventlet import convenience
File "D:\python\python3.10\lib\site-packages\eventlet\convenience.py", line 7, in
from eventlet.green import socket
File "D:\python\python3.10\lib\site-packages\eventlet\green\socket.py", line 4, in
import('eventlet.green.socket_nodns')
File "D:\python\python3.10\lib\site-packages\eventlet\green_socket_nodns.py", line 11, in
from eventlet import greenio
File "D:\python\python3.10\lib\site-packages\eventlet\greenio_init
.py", line 3, in
from eventlet.greenio.base import * # noqa
File "D:\python\python3.10\lib\site-packages\eventlet\greenio\base.py", line 32, in
socket_timeout = eventlet.timeout.wrap_is_timeout(socket.timeout)
File "D:\python\python3.10\lib\site-packages\eventlet\timeout.py", line 166, in wrap_is_timeout
base.is_timeout = property(lambda _: True)
TypeError: cannot set 'is_timeout' attribute of immutable type 'TimeoutError'

是否可以多任务依赖执行

很有幸了解到这个项目。我想问一下,多任务依赖,怎么配备优先级,以及等待任务执行完成之后 再执行。
比如: 当前时间为 2点, 任务参数为1 点
那么我在2点执行的时候是1点数据,因为别的原因导致任务超时或者失败, 重新执行2点的任务。 但是可能任务参数 为3点了。 是否会保存当时执行的参数。 并且可以判断多任务依赖关系。 A->b/d/f->C

建议优化相关包管理

验证了一下,准备投入使用试试,发现还是一大堆不兼容的问题,主要体现在包上。
image

  1. 建议使用 poetry 来进行依赖管理,及时更新对新版本的支持。
  2. 建议移除无关的包,或者把一些中间件做成可选安装的。

兼容性问题用起来确实有点头大。

supporting python3.10+ maybe?

非常好用的框架,之前一直在用celery,发现这个框架之后,我觉得无论从项目设计规划和上手难易度来说,这个框架都非常棒。

目前问题是,这个框架在使用python 3.10+的时候,存在一些不兼容,请问作者是否有计划加入pyhton 3.10的支持。

关于setup install后部署运行问题?

大佬好,我将funboost集成到自己写的一个包里,位置在/root/rdown。这个包是我写的命令行,需要Python setup.py install后安装到/root/anaconda3/lib下进行命令行使用,这时喔该如何启动消费者呢?是在/root/rdown下还是/root/ansconda3/lib/site-packages/rdown下呢?

其实celery也是一样的问题,但是我各种调试不好celery的后台任务,每次命令行执行都OK,一旦按照官方文档systemctl后台部署就不对了

请问支持task按组限制qps吗

其实就是一个需求, 我们调用某saas服务, 最高100qps 超过就会封禁一直断时间, 而且此saas的接口的qps限制是共用的

也就是我们写的 一部分task 加起来的qps 不得超过100 但是我们又得让他尽量靠近100 这样可以最大化利用资源

文档我看了两遍
但是好像没有 按组 限制qps的

恳请解答 感谢作者开源,优秀项目

Small wishes

非常仰慕,非常好用
一个小愿望:setup中是否考虑使用extras_require
默认还可以安装所有组件,可以按需选装组件。
非常感谢作者的贡献

mqtt项目实例

大佬能否出一个mqtt项目实例,文档中没有找到相关使用说明。

复制文档中《4.4 演示如何定时运行。》代码直接给我报错了

15:28:17 "/Users/zhangxiaodong/PycharmProjects/GameFiBridge/tasks/test.py:7" queue_test_666 的消费者
2022-02-07 15:28:20 - apscheduler.scheduler - "/Users/zhangxiaodong/PycharmProjects/GameFiBridge/venv/lib/python3.9/site-packages/apscheduler/schedulers/base.py:988" - _process_jobs - ERROR - Error submitting job "timing_publish_deco.._deco (trigger: interval[0:00:03], next run at: 2022-02-07 15:28:20 CST)" to executor "default"
Traceback (most recent call last):
File "/Users/zhangxiaodong/PycharmProjects/GameFiBridge/venv/lib/python3.9/site-packages/apscheduler/schedulers/base.py", line 979, in _process_jobs
executor.submit_job(job, run_times)
File "/Users/zhangxiaodong/PycharmProjects/GameFiBridge/venv/lib/python3.9/site-packages/apscheduler/executors/base.py", line 71, in submit_job
self._do_submit_job(job, run_times)
File "/Users/zhangxiaodong/PycharmProjects/GameFiBridge/venv/lib/python3.9/site-packages/apscheduler/executors/pool.py", line 28, in _do_submit_job
f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/thread.py", line 163, in submit
raise RuntimeError('cannot schedule new futures after '
RuntimeError: cannot schedule new futures after interpreter shutdown

求教,案例代码跑,出现这个,是中间件连接不对吗(不支持python3.10现在)

截屏2022-08-21 22 10 53

File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/eventlet/green/_socket_nodns.py", line 11, in from eventlet import greenio File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/eventlet/greenio/__init__.py", line 3, in from eventlet.greenio.base import * # noqa File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/eventlet/greenio/base.py", line 32, in socket_timeout = eventlet.timeout.wrap_is_timeout(socket.timeout) File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/eventlet/timeout.py", line 166, in wrap_is_timeout base.is_timeout = property(lambda _: True) TypeError: cannot set 'is_timeout' attribute of immutable type 'TimeoutError'

引入的包太多了,可以优化包的管理吗?

刚才重新创建了环境,专门用于学习测试这个框架,装一个包,发现有很多依赖包:
Successfully installed AMQPStorm-2.7.1 Automat-20.2.0 Flask-2.1.1 Jinja2-3.1.1 MarkupSafe-2.1.1 Werkzeug-2.1.1 aiohttp-3.8.0 aiosignal-1.2.0 amqp-5.1.0 apscheduler-3.7.0 async-timeout-4.0.2 attrs-21.4.0 bcrypt-3.2.0 blinker-1.4 certifi-2021.10.8 cffi-1.15.0 charset-normalizer-2.0.12 click-8.1.2 concurrent-log-handler-0.9.19 constantly-15.1.0 cryptography-36.0.2 decorator-4.4.0 deprecated-1.2.13 dnspython-1.16.0 dominate-2.6.0 elastic-transport-8.1.1 elasticsearch-8.1.2 eventlet-0.31.0 fabric2-2.6.0 flask-bootstrap-3.3.7.1 flask-login-0.6.0 flask-wtf-1.0.1 frozenlist-1.3.0 funboost-15.9 gevent-21.1.2 gnsq-1.0.1 greenlet-1.1.2 hyperlink-21.0.0 idna-3.3 importlib-metadata-4.11.3 incremental-21.3.0 invoke-1.7.0 itsdangerous-2.1.2 kafka-python-2.0.2 kombu-5.2.4 multidict-6.0.2 nats-python-0.8.0 nb-filelock-0.7 nb-log-7.5 packaging-21.3 paho-mqtt-1.6.1 pamqp-2.3.0 paramiko-2.10.3 pathlib2-2.3.7.post1 persist-queue-0.7.0 pika-1.2.0 pikav0-0.1.23 pikav1-1.0.13 portalocker-2.4.0 psutil-5.9.0 pyasn1-0.4.8 pyasn1-modules-0.2.8 pycparser-2.21 pymongo-4.0.2 pynacl-1.5.0 pyparsing-3.0.7 pysnooper-1.1.0 python-json-logger-0.1.10 pytz-2022.1 pyzmq-22.3.0 rabbitpy-2.0.1 redis-4.2.1 redis2-2.10.6.3 redis3-3.5.2.3 requests-2.27.1 rocketmq-0.4.4 semantic-version-2.9.0 service-identity-21.1.0 setuptools-rust-1.2.0 six-1.16.0 sqlalchemy-1.3.10 sqlalchemy-utils-0.36.1 tomorrow3-1.1.0 tornado-6.1 twisted-22.2.0 typing-extensions-4.1.1 tzlocal-2.1 urllib3-1.26.9 vine-5.0.0 visitor-0.1.3 wrapt-1.14.0 wtforms-3.0.1 yarl-1.7.2 zipp-3.7.0 zmq-0.0.0 zope.event-4.5.0 zope.interface-5.4.0

建议:

  1. 把不必要的依赖去掉;
  2. 如果实在需要,是否可以按需引入;

funboost mysql

我不打算用 celery,因为他没办法和mysql完美的结合,我打算测试一下你这个 产品

能否支持降级

支持多种中间件 能否通过配置支持降级 [ kafka -> MQ - > redis -> 本地文件 ] 这种

设置任务60s执行一次,但是看log发现10s内执行了两次?

定时任务写法:
fsdf_background_scheduler.add_job(timing_publish_deco(xxx_spider), 'interval', id='xxx_add_queue', seconds=300)
fsdf_background_scheduler.add_job(timing_publish_deco(xxx_result), 'interval', id='xxx_result_queue', seconds=60)

日志:RedisConsumer--xxx_result_queue - "base_consumer.py:697" - INFO - 10 秒内执行了 2 次函数 [xxx_result ] ,函数平均运行耗时 0.009 秒

消费者:
xxx_spider.consume()
xxx_result.consume()

使用延时任务发布的时候,任务是async的就会出错,是不支持吗还是我写错了

@boost("register", broker_kind=BrokerEnum.REDIS_STREAM,
       concurrent_mode=ConcurrentModeEnum.ASYNC, log_level=20)
async def register(cookie_dict: dict, meta_dict: dict, mobile: str):
       pass

register.publish({'cookie_dict': cookie_dict, 'meta_dict': meta_dict, 'mobile': mobile},
                                 priority_control_config=PriorityConsumingControlConfig(countdown=40))

报错

Error submitting job "AsyncPoolExecutor.submit (trigger: date[2022-04-09 17:55:16 CST], next run at: 2022-04-09 17:55:16 CST)" to executor "default"
Traceback (most recent call last):
  File "C:\ProgramData\Miniconda3\lib\site-packages\apscheduler\schedulers\base.py", line 979, in _process_jobs
    executor.submit_job(job, run_times)
  File "C:\ProgramData\Miniconda3\lib\site-packages\apscheduler\executors\base.py", line 71, in submit_job
    self._do_submit_job(job, run_times)
  File "C:\ProgramData\Miniconda3\lib\site-packages\apscheduler\executors\pool.py", line 28, in _do_submit_job
    f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name)
  File "C:\ProgramData\Miniconda3\lib\concurrent\futures\thread.py", line 161, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown

NameError: name 'sqlalchemy' is not defined

lib/python3.9/site-packages/funboost/queues/sqla_queue.py", line 66, in SessionContext
def init(self, session: sqlalchemy.orm.session.Session):
NameError: name 'sqlalchemy' is not defined

我在运行例子的时候报错了
Uploading 截屏2022-10-19 15.06.04.png…

mongodb 作为队列报错

报错运行截图

image

python version: 3.9
funboost version: 14.8

代码

from funboost import boost, BrokerEnum, ConcurrentModeEnum, FunctionResultStatusPersistanceConfig


@boost(
    "queue2",
    broker_kind=BrokerEnum.MONGOMQ,
    function_result_status_persistance_conf=FunctionResultStatusPersistanceConfig(True, True, 7*25*36000)
)
def print_hi(name):
    # Use a breakpoint in the code line below to debug your script.
    print(f'Hi, {name}')  # Press ⌘F8 to toggle the breakpoint.


# Press the green button in the gutter to run the script.
if __name__ == '__main__':
    for i in range(100):
        pass
    print_hi.push(f"Hello")

    print_hi.consume()
    # print_hi.consume()
    # print_hi.consume()

monbodb docker-compose

version: '3.1'

services:

  mongo:
    image: mongo
    restart: always
    ports:
      - 27017:27017
    environment:
      MONGO_INITDB_ROOT_USERNAME: root
      MONGO_INITDB_ROOT_PASSWORD: example
    volumes:
      - ./data:/data/db

  mongo-express:
    image: mongo-express
    restart: always
    ports:
      - 8081:8081
    environment:
      ME_CONFIG_MONGODB_ADMINUSERNAME: root
      ME_CONFIG_MONGODB_ADMINPASSWORD: example
      ME_CONFIG_MONGODB_URL: mongodb://root:example@mongo:27017/

mongo version:

➜  mongo docker exec cb5a5c4c8932 mongo --version
MongoDB shell version v5.0.5
Build Info: {
    "version": "5.0.5",
    "gitVersion": "d65fd89df3fc039b5c55933c0f71d647a54510ae",
    "openSSLVersion": "OpenSSL 1.1.1f  31 Mar 2020",
    "modules": [],
    "allocator": "tcmalloc",
    "environment": {
        "distmod": "ubuntu2004",
        "distarch": "x86_64",
        "target_arch": "x86_64"
    }
}

python 3.10的Callable导入错误

使用python3.10版本运行测试案例报错

问题已解决

报错情况:

  File "C:\Users\90957\Documents\2022-1\fastapitest\test.py", line 2, in <module>
    from funboost import boost, BrokerEnum
  File "C:\software\Anaconda\envs\funboost\lib\site-packages\funboost\__init__.py", line 12, in <module>
    from funboost.consumers.base_consumer import (ExceptionForRequeue, ExceptionForRetry,
  File "C:\software\Anaconda\envs\funboost\lib\site-packages\funboost\consumers\base_consumer.py", line 25, in <module>
    from collections import Callable
ImportError: cannot import name 'Callable' from 'collections' (C:\software\Anaconda\envs\funboost\lib\collections\__init__.py)

原因:Deprecated since version 3.9: collections.abc.Callable Callable在python3.9已弃用
解决方案:使用低于3.9的python

出错的环境的配置完整命令:

conda create -n env_name python
conda activate env_name 
pip install funboost --upgrade

使用的代码

import time
from funboost import boost, BrokerEnum


@boost("task_queue_name1", qps=5, broker_kind=BrokerEnum.PERSISTQUEUE)  # 入参包括20种,运行控制方式非常多,想得到的控制都会有。
def task_fun(x, y):
    print(f'{x} + {y} = {x + y}')
    time.sleep(3)  # 框架会自动并发绕开这个阻塞,无论函数内部随机耗时多久都能自动调节并发达到每秒运行 5 次 这个 task_fun 函数的目的。


if __name__ == "__main__":
    for i in range(100):
        task_fun.push(i, y=i * 2)  # 发布者发布任务
    task_fun.consume()  # 消费者启动循环调度并发消费任务

关于日志的询问

根据指引 把实际项目附加到funboost项目下,方便调试
实际生产项目中使用了阿里云 SLS日志服务。

工程项目使用了dictConfig和get_logger(name)
funbootst使用了推荐的PYTHONPATH的方式导入工程
根据funboost的规划,如我我需要继续使用 阿里云 sls服务,我似乎应该导入ng_log.get_logger 使用logger.addHandler()
logger.addFilter()去实现,是这样的么?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.