Celery 是一個強大的分散式任務佇列框架,用於處理非同步任務和分散式任務排程。它在許多應用程式和系統中都有廣泛的用途,包括以下方面:
非同步任務處理:Celery 最常見的用途之一是執行非同步任務。這些任務可以是長時間運行的操作,例如圖像處理、文件上傳、電子郵件發送、資料處理等。透過將這些任務非同步執行,應用程式可以保持響應性,而不會阻塞使用者介面或主要應用程式流程。
定時任務:Celery 可用於建立和執行定時任務,例如每小時執行一次數據備份或每天發送電子郵件報告。透過 Celery 的定時任務功能,您可以輕鬆管理和排程這些重複性任務。
分散式任務:Celery 是一個分散式任務佇列,可用於在多台伺服器或容器之間分發任務。這對於處理大量任務或需要負載平衡的任務非常有用。
訊息處理:Celery 可以用作訊息處理系統,用於處理和分發訊息。這在實時應用程式中非常有用,例如即時聊天或通知系統。
背景處理:在 Web 應用程式中,Celery 可以處理背景任務,例如處理使用者提交的表單資料、生成報告、執行搜尋索引更新等。這有助於提高應用程式的效能和可擴展性。
任務重試和錯誤處理:Celery 具有內建的任務重試和錯誤處理機制,可以自動重試失敗的任務,或將失敗的任務記錄下來以供後續分析和修復。
分散式資料處理:對於需要分散式資料處理的應用程式,Celery 可以幫助將資料處理任務分發到多個節點,以加速資料處理和分析。
非同步通訊:Celery 不僅可以執行任務,還可以用作非同步通訊的機制。應用程式可以使用 Celery 發送訊息、通知其他服務或元件執行操作。
總之,Celery 是一個多功能的工具,適用於需要非同步、分散式、定時或重複性任務處理的各種應用程式。它具有廣泛的用途,可以與多種訊息代理(如 RabbitMQ、Redis 等)和框架集成,以滿足不同應用場景的需求。無論是 Web 開發、資料處理、實時應用程式還是背景任務處理,Celery 都提供了強大的功能和工具,使開發人員能夠更輕鬆地處理各種任務和非同步操作。
main.py代碼:
from celery.result import AsyncResult
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import uvicorn
from enum import IntEnum
from worker import my_task
from Data import Data
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=['*'],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class TaskStatus(IntEnum):
NONE = 0
SUCCESS = 1
PENDING = 2
FAILURE = 3
RETRY = 4
STARTED = 5
@app.get('/get_task_result/{uuid}', status_code=200)
async def get_task_result(uuid: str):
# 取得執行結果
task = AsyncResult(uuid)
if task.status == 'SUCCESS':
return JSONResponse({"status": TaskStatus.SUCCESS.value, "result": task.result})
elif task.status == 'PENDING':
return JSONResponse({"status": TaskStatus.PENDING.value})
elif task.status == 'FAILURE':
return JSONResponse({"status": TaskStatus.FAILURE.value})
elif task.status == 'RETRY':
return JSONResponse({"status": TaskStatus.RETRY.value})
elif task.status == 'STARTED':
return JSONResponse({"status": TaskStatus.STARTED.value})
else:
return JSONResponse({"status": TaskStatus.NONE.value})
@app.post("/create_task", status_code=200)
async def create_task(data: Data):
result = my_task.delay(data.name)
return JSONResponse({"id": result.id})
Data.py代碼:
from pydantic import BaseModel
# 定義資料結構
class Data(BaseModel):
name: str
worker代碼:
from celery import Celery
from Data import Data
# 設定Redis
celery = Celery(__name__, broker='redis://redis:6379', backend='redis://redis:6379')
@celery.task()
def my_task(data: Data):
# 取得原先傳遞的資料
return {"name": data}
App.vue代碼:<template>
<div>
<input type="text" v-model="msg"><br>
<button @click="sendMessage">發送</button>
</div>
</template>
<script>
import axios from 'axios'
export default {
methods: {
//取得執行結果
getResult() {
axios.get(`http://localhost:7666/get_task_result/${this.taskID}`, {
headers: {
'Content-Type': 'application/json'
}
}).then(res => {
const result = res.data
if (result.status === 1) {
alert(result.result.name)
this.taskID = null
} else if (result.status === 3) {
alert('Some issue')
this.taskID = null
} else {
setTimeout(this.getResult, 1000)
}
})
},
sendMessage() {
if (this.msg && !this.taskID) {
axios.post('http://localhost:7666/create_task', {name: this.msg}, {
headers: {
'Content-Type': 'application/json'
}
}).then(res => {
console.log()
this.msg = null
this.taskID = res.data.id
setTimeout(this.getResult, 1000)
})
}
}
},
data() {
return {
msg: null,
taskID: null
}
}
}
</script>
<style>
#app {
font-family: Avenir, Helvetica, Arial, sans-serif;
-webkit-font-smoothing: antialiased;
-moz-osx-font-smoothing: grayscale;
text-align: center;
color: #2c3e50;
margin-top: 60px;
}
</style>
Vue的Dockerfile:
FROM node:18 as NODE
WORKDIR /vue
COPY package*.json ./
RUN npm install
COPY . .
RUN npm run build
FROM nginx:latest
COPY ./nginx/default.conf /etc/nginx/conf.d/default.conf
COPY --from=NODE /vue/dist /usr/share/nginx/html
CMD ["nginx", "-g", "daemon off;"]
FastAPI的Dockerfile:
FROM python:3.10.0
WORKDIR /app
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "7666"]
Docker-Compose:
version: '3'
services:
fastapi:
build:
context: ./fastapi
ports:
- "7666:7666"
depends_on:
- redis
vue:
build:
context: ./vue
ports:
- "80:80"
depends_on:
- redis
- worker
- fastapi
worker:
build:
context: ./fastapi
command: celery -A worker.celery worker --loglevel=info
depends_on:
- redis
redis:
container_name: redis
image: redis:latest
ports:
- "6379:6379"
專案原碼