关于异步任务队列

关于异步任务队列

这样的老师,我跟着课程学习的过程中没有使用 flask,而是使用了 fastapi,然后数据库也是用的异步的,这样就进而造成 indexing_service 里的方法都是 async 装饰的,但是 celery 里好像没办法调用,请问有什么方案吗?

@shared_task
async def build_documents(document_ids: List[int]):
    """
    异步任务:构建文档

    :param document_ids:
    :return:
    """
    from app.containers import Container
    indexing_service = Container.indexing_service
    async with get_session() as session:
        await indexing_service.build_documents(session, document_ids)

这样是不可以的

正在回答 回答被采纳积分+1

登陆购买课程后可参与讨论,去登陆

1回答
泽辉呀 2025-08-15 09:29:23

Celery修饰的所有任务都是同步的,不需要改成异步的,async装饰的indexing_service可以直接调用同步的方法,也可以调用异步的方法,知识在fastapi中调用celery的任务会通过add这个模块,以下是一个简单的示例:


Celery服务初始化:


from celery import Celery

# 连接 Redis 作为 broker 和结果存储
celery_app = Celery(
    "worker",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1"
)

# 如果有多个任务文件,可以用 autodiscover_tasks
celery_app.autodiscover_tasks(['tasks'])


异步方法定义:


from celery_app import celery_app
import time

@celery_app.task
def add(x, y):
    time.sleep(5)  # 模拟耗时任务
    return x + y


在fastapi中使用celery异步方法:


@app.get("/add")
def queue_add(x: int, y: int):
    # 异步调用任务
    task = add.delay(x, y)
    return {"task_id": task.id}

@app.get("/result/{task_id}")
def get_result(task_id: str):
    from celery_app import celery_app
    result = celery_app.AsyncResult(task_id)
    return {
        "task_id": task_id,
        "status": result.status,
        "result": result.result
    }


问题已解决,确定采纳
还有疑问,暂不采纳

恭喜解决一个难题,获得1积分~

来为老师/同学的回答评分吧

0 星
AI Agent 全栈开发工程师
  • 参与学习       521    人
  • 解答问题       411    个

全流程打造你自己的(Coze/Dify)低代码智能体开发平台;2025年入行正当时,企业急需,人才稀缺,竞争小;无论入行还是转行,首选口碑好课,门槛低、成长高

了解课程
请稍等 ...
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号

在线咨询

领取优惠

免费试听

领取大纲

扫描二维码,添加
你的专属老师