Tic Tac Toe, часть 6: Flask и Celery/RabbitMQ

Tic Tac Toe: содержание цикла статей

Попробуем подключить Celery/RabbitMQ к нашему проекту. В качестве основы возьмем проект с Flask’ом. Celery займется вычислением случайного числа.


Установка проекта

Клонируем проект на свой компьютер:

git clone https://github.com/nomhoi/tic-tac-toe-part6.git

Запускаем контейнеры:

cd tic-tac-toe-part6
docker-compose up -d

Выполняем сборку веб-приложения:

cd front
npm install
npm run-script build

Открываем броузер по адресу http://localhost.

Docker контейнеры

Сервис nginx остался без изменений. В сервис flask добавили установку пакета Celery в файл requirements.txt и смонтировали папку с исходником проекта Celery:

volumes:
— ./flask:/code
— ./celery/app/proj:/code/proj

Добавились новые сервисы celery и rabbit.

celery:
container_name: celery
build:
context: celery/
dockerfile: Dockerfile
volumes:
— ./celery/app:/app
depends_on:
— rabbit
networks:
— backend

rabbit:
container_name: rabbit
hostname: rabbit
image: rabbitmq:3.7.15-alpine
environment:
— RABBITMQ_DEFAULT_USER=admin
— RABBITMQ_DEFAULT_PASS=CT2gNABH8eJ9yVh
ports:
— «5672:5672»
networks:
— backend
Сервис celery

Сервис celery выполнен на базе этого туториала. Кто не знаком с Celery, имеет смысл тут-же пройтись по этому туториалу:

$ docker exec -it celery python
Python 3.7.3 (default, May 11 2019, 02:00:41)
[GCC 8.3.0] on linux
Type «help», «copyright», «credits» or «license» for more information.
>>> from proj.tasks import add
>>> add.delay(2, 2)
<AsyncResult: 43662174-657f-4dd3-ab1a-22f5950c8794>
>>>

Как видим, наш проект в Celery оформлен в виде пакета proj. В задачи Celery в файл tasks.py добавлена наша задача getnumber:

@app.task
def getnumber():
return randrange(9)
Сервис flask

Напомню, что в этот сервис мы добавили пакет Celery и смонтировали папку с проектом proj. Исходный код этого проекта теперь присутствует в двух сервисах flask и celery.

from flask import Flask, jsonify
from proj.tasks import getnumber
from proj.celery import app as celery

app = Flask(__name__)

@app.route(‘/number’)
def number():
task = getnumber.delay()
return task.id

@app.route(‘/result/<task_id>’)
def result(task_id):
task = getnumber.AsyncResult(task_id)
result = task.get(timeout = 3)
response = {
‘state’: task.state,
‘number’: result,
}
return jsonify(response)

В обработчике number мы вызываем задачу getnumber, которая выполняется в воркере celery и возвращаем идентификатор задачи. В обработчике result мы получаем результат выполненной задачи по идентификатору и возвращаем ответ в JSON формате фронтенду.

Фронтенд

В диспетчере нашей игры по нажатию кнопки Get random number сначала отправляем запрос бэкенду по адресу /number и получаем от него идентификатор задачи Celery. После этого в функции getResult периодически отправляем запрос бэкенду на получение результата по адресу /result/<task_id>.

async function getResult(task_id) {
var i = 1;
var timerId = setTimeout(async function go() {
console.log(«Result request: » + i);
console.log(«Task Id: » + task_id)
const res = await fetch(`result/` + task_id);
const response = await res.text();

if (res.ok) {
let result = JSON.parse(response);
console.log(result)

if (result.state === ‘SUCCESS’) {
let i = parseInt(result.number);

if ($status === 1 || $history.state.squares[i]) {
promise_number = result.number + ‘ — busy’;
return;
}

promise_number = i;
history.push(new Command($history.state, i));
return;
}
}

if (i < 5)
setTimeout(go, 500);
i++;
}, 500);
}

Изменили вывод результатов запросов к бэкенду:

{#await promise}
<p>…подождите</p>
{:then taskid}
<p>Task Id: {taskid}</p>
{:catch error}
<p style=»color: red»>{error.message}</p>
{/await}

{#await promise_number}
<p>…подождите</p>
{:then number}
<p>Number: {number}</p>
{:catch error}
<p style=»color: red»>{error.message}</p>
{/await}
Домашнее задание

На самом деле сейчас результат приходит сразу после первого запроса. Попробуйте нашего интеллектуального агента живущего в celery сделать немного задумчивым, чтобы не сразу выдавал ответ.

Время от времени начинает приходить ошибка от flask‘a «500 (INTERNAL SERVER ERROR)», это в celery поднимается исключение «celery.exceptions.TimeoutError: The operation timed out.». Помогает только перезагрузка сервисов. Пока не копал в чем дело, пожалуйста, посмотрите.

В getResult обрабатывается ответ только с состоянием SUCCESS, в остальных случаях выполняется повторный запрос. Можно добавить обработку ответов с FAILURE и PENDING. Формирование ответа в обработчике result также может зависеть от состояния задачи.

Вместо брокера сообщений RabbitMQ можно попробовать подключить Redis.

Можно попробовать выполнять запросы из приложения через брокеры сообщений.

А также попробовать выполнить это на базе примера с Boost.Beast.

Репозиторий на GitHub

https://github.com/nomhoi/tic-tac-toe-part6

Оставить комментарий