2023/09/16

FastAPI 透過Celery 處理Async task並透過Docker運行

 Celery 是一個強大的分散式任務佇列框架,用於處理非同步任務和分散式任務排程。它在許多應用程式和系統中都有廣泛的用途,包括以下方面:


非同步任務處理:Celery 最常見的用途之一是執行非同步任務。這些任務可以是長時間運行的操作,例如圖像處理、文件上傳、電子郵件發送、資料處理等。透過將這些任務非同步執行,應用程式可以保持響應性,而不會阻塞使用者介面或主要應用程式流程。


定時任務:Celery 可用於建立和執行定時任務,例如每小時執行一次數據備份或每天發送電子郵件報告。透過 Celery 的定時任務功能,您可以輕鬆管理和排程這些重複性任務。


分散式任務:Celery 是一個分散式任務佇列,可用於在多台伺服器或容器之間分發任務。這對於處理大量任務或需要負載平衡的任務非常有用。


訊息處理:Celery 可以用作訊息處理系統,用於處理和分發訊息。這在實時應用程式中非常有用,例如即時聊天或通知系統。


背景處理:在 Web 應用程式中,Celery 可以處理背景任務,例如處理使用者提交的表單資料、生成報告、執行搜尋索引更新等。這有助於提高應用程式的效能和可擴展性。


任務重試和錯誤處理:Celery 具有內建的任務重試和錯誤處理機制,可以自動重試失敗的任務,或將失敗的任務記錄下來以供後續分析和修復。


分散式資料處理:對於需要分散式資料處理的應用程式,Celery 可以幫助將資料處理任務分發到多個節點,以加速資料處理和分析。


非同步通訊:Celery 不僅可以執行任務,還可以用作非同步通訊的機制。應用程式可以使用 Celery 發送訊息、通知其他服務或元件執行操作。


總之,Celery 是一個多功能的工具,適用於需要非同步、分散式、定時或重複性任務處理的各種應用程式。它具有廣泛的用途,可以與多種訊息代理(如 RabbitMQ、Redis 等)和框架集成,以滿足不同應用場景的需求。無論是 Web 開發、資料處理、實時應用程式還是背景任務處理,Celery 都提供了強大的功能和工具,使開發人員能夠更輕鬆地處理各種任務和非同步操作。

專案結構:


main.py代碼:
from celery.result import AsyncResult
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import uvicorn
from enum import IntEnum
from worker import my_task
from Data import Data

app = FastAPI()
app.add_middleware(
    CORSMiddleware,
    allow_origins=['*'],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


class TaskStatus(IntEnum):
    NONE = 0
    SUCCESS = 1
    PENDING = 2
    FAILURE = 3
    RETRY = 4
    STARTED = 5


@app.get('/get_task_result/{uuid}', status_code=200)
async def get_task_result(uuid: str):
    # 取得執行結果
    task = AsyncResult(uuid)
    if task.status == 'SUCCESS':
        return JSONResponse({"status": TaskStatus.SUCCESS.value, "result": task.result})
    elif task.status == 'PENDING':
        return JSONResponse({"status": TaskStatus.PENDING.value})
    elif task.status == 'FAILURE':
        return JSONResponse({"status": TaskStatus.FAILURE.value})
    elif task.status == 'RETRY':
        return JSONResponse({"status": TaskStatus.RETRY.value})
    elif task.status == 'STARTED':
        return JSONResponse({"status": TaskStatus.STARTED.value})
    else:
        return JSONResponse({"status": TaskStatus.NONE.value})


@app.post("/create_task", status_code=200)
async def create_task(data: Data):
    result = my_task.delay(data.name)
    return JSONResponse({"id": result.id})
Data.py代碼:

from pydantic import BaseModel


# 定義資料結構
class Data(BaseModel):
    name: str

worker代碼:

from celery import Celery
from Data import Data

# 設定Redis
celery = Celery(__name__, broker='redis://redis:6379', backend='redis://redis:6379')


@celery.task()
def my_task(data: Data):
    # 取得原先傳遞的資料
    return {"name": data}

App.vue代碼:
<template>
  <div>
    <input type="text" v-model="msg"><br>
    <button @click="sendMessage">發送</button>
  </div>
</template>

<script>
import axios from 'axios'

export default {
  methods: {
    //取得執行結果
    getResult() {
      axios.get(`http://localhost:7666/get_task_result/${this.taskID}`, {
        headers: {
          'Content-Type': 'application/json'
        }
      }).then(res => {
        const result = res.data
        if (result.status === 1) {
          alert(result.result.name)
          this.taskID = null
        } else if (result.status === 3) {
          alert('Some issue')
          this.taskID = null
        } else {
          setTimeout(this.getResult, 1000)
        }
      })
    },
    sendMessage() {
      if (this.msg && !this.taskID) {
        axios.post('http://localhost:7666/create_task', {name: this.msg}, {
          headers: {
            'Content-Type': 'application/json'
          }
        }).then(res => {
          console.log()
          this.msg = null
          this.taskID = res.data.id
          setTimeout(this.getResult, 1000)
        })
      }
    }
  },
  data() {
    return {
      msg: null,
      taskID: null
    }
  }
}
</script>

<style>
#app {
  font-family: Avenir, Helvetica, Arial, sans-serif;
  -webkit-font-smoothing: antialiased;
  -moz-osx-font-smoothing: grayscale;
  text-align: center;
  color: #2c3e50;
  margin-top: 60px;
}
</style>

Vue的Dockerfile:
FROM node:18 as NODE

WORKDIR /vue
COPY package*.json ./
RUN npm install
COPY . .
RUN npm run build

FROM nginx:latest
COPY ./nginx/default.conf /etc/nginx/conf.d/default.conf
COPY --from=NODE /vue/dist /usr/share/nginx/html
CMD ["nginx", "-g", "daemon off;"]
FastAPI的Dockerfile:
FROM python:3.10.0
WORKDIR /app
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "7666"]
Docker-Compose:
version: '3'
services:
  fastapi:
    build:
      context: ./fastapi
    ports:
      - "7666:7666"
    depends_on:
      - redis

  vue:
    build:
      context: ./vue
    ports:
      - "80:80"
    depends_on:
      - redis
      - worker
      - fastapi

  worker:
    build:
      context: ./fastapi
    command: celery -A worker.celery worker --loglevel=info
    depends_on:
      - redis

  redis:
    container_name: redis
    image: redis:latest
    ports:
      - "6379:6379"
專案原碼