关于异步任务队列
这样的老师,我跟着课程学习的过程中没有使用 flask,而是使用了 fastapi,然后数据库也是用的异步的,这样就进而造成 indexing_service 里的方法都是 async 装饰的,但是 celery 里好像没办法调用,请问有什么方案吗?
@shared_task async def build_documents(document_ids: List[int]): """ 异步任务:构建文档 :param document_ids: :return: """ from app.containers import Container indexing_service = Container.indexing_service async with get_session() as session: await indexing_service.build_documents(session, document_ids)
这样是不可以的
10
收起
正在回答 回答被采纳积分+1
1回答
泽辉呀
2025-08-15 09:29:23
Celery修饰的所有任务都是同步的,不需要改成异步的,async装饰的indexing_service可以直接调用同步的方法,也可以调用异步的方法,知识在fastapi中调用celery的任务会通过add这个模块,以下是一个简单的示例:
Celery服务初始化:
from celery import Celery # 连接 Redis 作为 broker 和结果存储 celery_app = Celery( "worker", broker="redis://localhost:6379/0", backend="redis://localhost:6379/1" ) # 如果有多个任务文件,可以用 autodiscover_tasks celery_app.autodiscover_tasks(['tasks'])
异步方法定义:
from celery_app import celery_app import time @celery_app.task def add(x, y): time.sleep(5) # 模拟耗时任务 return x + y
在fastapi中使用celery异步方法:
@app.get("/add")
def queue_add(x: int, y: int):
# 异步调用任务
task = add.delay(x, y)
return {"task_id": task.id}
@app.get("/result/{task_id}")
def get_result(task_id: str):
from celery_app import celery_app
result = celery_app.AsyncResult(task_id)
return {
"task_id": task_id,
"status": result.status,
"result": result.result
}
AI Agent 全栈开发工程师
- 参与学习 521 人
- 解答问题 411 个
全流程打造你自己的(Coze/Dify)低代码智能体开发平台;2025年入行正当时,企业急需,人才稀缺,竞争小;无论入行还是转行,首选口碑好课,门槛低、成长高
了解课程
恭喜解决一个难题,获得1积分~
来为老师/同学的回答评分吧
0 星