Pub/Sub chatbot using FastAPI and Broadcaster

Автор:

Рубрика

Информационные технологии

Ключевые слова

FastAPI
Broadcaster
Pub/Sub
Redis PUB/SUB
WebSocket
chatbot
Docker Compose
microservices
real-time communication

Аннотация статьи

This article presents the development and deployment of a scalable Pub-Sub chatbot using the FastAPI framework and the Broadcaster library. By leveraging Redis PUB/SUB as the message broker, the chatbot supports real-time two-way communication through WebSockets. The implementation improves upon the limitations of standard WebSocket connection management in FastAPI by isolating user sessions into separate topics using the Pub/Sub pattern. The article also details the system’s deployment using Docker Compose, enabling seamless scalability and future integration with other microservices.

Текст статьи

Introduction

Chatbots have firmly integrated into our lives as a convenient interface for user interaction with products, as dialogue is a natural and familiar way for humans to convey their thoughts. Chatbots are particularly popular in messengers, which are common places for communication. On the other hand, other platforms for hosting chatbots, such as a chat widget on a website, also play an important role. Typically, a user's first encounter with a company occurs on the website, and additional functionality in the form of a chatbot enhances the overall impression.

Often, the implementation of such a widget is based on the WebSocket protocol, which facilitates two-way communication between the client and the server. In this article, we would like to focus on the backend part of such a chat widget using the popular FastAPI framework.

Practical Part

Preparation

First, let's review the FastAPI documentation [1] on WebSocket support. The provided example showcases the framework’s capabilities, but unfortunately, it is not suitable for industrial development – the session information is encapsulated within a single connection manager object, which is stored in memory and operates within a single process. Thus, this:

  1. Prevents seamless updating of the bot to a new version, as restarting would result in the loss of active session information;
  2. Increases code coupling, which can cause difficulties when scaling the system, for example, if you need to distribute functionality across microservices.

Therefore, we will follow the advice from the same documentation and use the broadcaster [2] library to apply the publisher-subscriber or Pub/Sub message passing design pattern. Using such a pattern will offload the responsibility of message management from the service and isolate system components from each other. In this case, each user’s session will be represented by separate topics.

The presented library supports the following message brokers: Redis PUB/SUB, Redis Streams, Apache Kafka, and Postgres LISTEN/NOTIFY. In our example, we will use Redis PUB/SUB, but you can choose another.

Deployment of the System

Now we need to decide how to deploy the system. Obviously, we need to set up two services: the NoSQL database Redis and the chatbot service. To do this, we will use the docker-compose utility, which deploys multi-container applications.

First, we need to define a Dockerfile for the chatbot service, which will be used to build the image:

FROM python:3.12-slim

# install dependencies
COPY ci/requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt 

# copy codebase
COPY src /src
WORKDIR /src

EXPOSE 8000

ENTRYPOINT [ "python3", "api_app.py" ]

In the provided Dockerfile description:

  1. Base Image: We use the official Python image [3] as the base.
  2. Install Dependencies: We install dependencies using the pip package manager.
  3. Copy Codebase: We copy the codebase into the container.
  4. Expose Port: We specify the port that the container will listen to.
  5. Set Entry Point: We define the entry point for the container.

The final step is to define our system configuration in docker-compose.yaml:

services:
  chatbot:
    build:
      context: .
      dockerfile: ci/Dockerfile
    image: $REPOSITORY:$TAG
    networks:
      - chat
    ports:
      - "8000:8000"

  redis:
    image: redis:7.4
    networks:
      - chat
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data
    command: ["redis-server"]

networks:
  chat:
    driver: bridge

volumes:
  redis-data:
    external: true

Key points in the provided configuration:

  1. Shared Network: Services are in the same chat network, allowing them to communicate with each other.
  2. Published Ports: Ports are published for each service to enable interaction.
  3. External Volume: An external volume is created for storing Redis data.

Implementation

After deciding on the deployment, we proceed to the implementation. The entire subsequent code is available in the repository [4].

First, let’s define the necessary dependencies (ci/requirements.txt):

broadcaster[redis]==0.3.1  # websocket chats
fastapi==0.110.3 # API
uvicorn==0.29.0 # ASGI web server
websockets==13.0 # websoket chats

Next, let’s proceed to the code by creating an object for publishing and subscribing to our topics (src/redis_broadcaster.py):

"""Redis broadcaster."""

from broadcaster import Broadcast

DB_URI = "redis://redis:6379"
broadcast = Broadcast(DB_URI)

Now, define the entry point with the initialization of our FastAPI application (src/api_app.py):

"""API App entry point."""

import uvicorn
from fastapi import FastAPI

from lifespan_events import lifespan
from websocket_chat import websocket_chat_router

app FastAPI(
    title="Chatbot",
   description="Pub/Sub Chatbot",
   lifespan=lifespan,
)

app.include_router(websocket_chat_router)

if __name__ == "__main__":
   uvicorn.run(app, host="0.0.0.0")

Here, it’s important to focus on the imports:

  1. From lifespan_events import lifespan;
  2. From websocket_chat import websocket_chat_router.

The first imported module handles events that occur before the application starts and after it shuts down. In our case, we need to first connect to the message broker and then disconnect from it (src/lifespan_events.py):

"""Lifespan events."""

from contextlib import asynccontextmanager

from fastapi import FastAPI

from redis_broadcaster import broadcast

@asynccontextmanager
async def lifespan(app: FastAPI):
    """
   Manage the lifespan of a FastAPI application.

   This function connects to a broadcast system at the start of the application
   and disconnects from it when the application shuts down.

   Args:
        app (FastAPI): The FastAPI application instance.

   Yields:
        None: This context manager does not yield any values.
   """
    await broadcast.connect()
    yield
    await broadcast.disconnect()

The second imported package handles the business logic of the chatbot based on WebSocket. It includes three submodules:

1) Declares the WebSocket endpoint (src/websocket_chat/routers.py):

"""Websocket chat router."""

from fastapi import APIRouter, WebSocket
from starlette.concurrency import run_until_first_complete

from .pub_sub import chat_ws_receiver, chat_ws_sender

websocket_chat_router = APIRouter()

@websocket_chat_router.websocket("/chatbot/{chat_id}")
async def websocket_endpoint(websocket: WebSocket, chat_id: str):
    """
   Handle websocket chat connections.

   Args:
        websocket (WebSocket): The WebSocket connection instance.
        chat_id (str): The identifier for the chat session.
   """
    await websocket.accept()

    await run_until_first_complete(
       (chat_ws_receiver, {"websocket": websocket, "chat_id": chat_id}),
       (chat_ws_sender, {"websocket": websocket, "chat_id": chat_id}),
    )
2) Handles the publishing and subscribing logic (src/websocket_chat/pub_sub.py):

"""PUB/SUB."""

from fastapi import WebSocket

from redis_broadcaster import broadcast

from .bot import get_response_from_bot

async def chat_ws_receiver(websocket: WebSocket, chat_id: str) -> None:
    """
   Receive messages from a WebSocket connection and broadcasts responses.

   This function listens for incoming text messages from a WebSocket connection,
   generates a response using the bot, and publishes the response to a broadcast
   channel.

   Args:
        websocket (WebSocket): The WebSocket connection to receive messages from.
        chat_id (str): The identifier of the chat channel for broadcasting messages.

   Returns:
        None
   """
    async for message in websocket.iter_text():
       response_message get_response_from_bot(message)
       await broadcast.publish(channel=chat_id, message=response_message)

async def chat_ws_sender(websocket: WebSocket, chat_id: str) -> None:
    """
   Send messages from a broadcast channel to a WebSocket connection.

   This function subscribes to a broadcast channel and sends any received messages
   to the connected WebSocket client.

   Args:
        websocket (WebSocket): The WebSocket connection to send messages to.
        chat_id (str): The identifier of the chat channel to subscribe to.

   Returns:
        None
   """
    async with broadcast.subscribe(channel=chat_id) as subscriber:
        async for event in subscriber:
           await websocket.send_text(event.message)

3) Emulates the bot's response logic (src/websocket_chat/bot.py):

"""Bot."""

def get_response_from_bot(user_message: str) -> str:
    """
   Generate a response from the bot based on the user's message.

   Args:
        user_message (str): The message received from the user.

   Returns:
        str: A formatted response from the bot.
   """
    return f"BOT: {user_message}"

In the end, we have an echo-bot, but in reality, this functionality represents a growth point for the system. You can take this template and implement the third module yourself – up to the point where message publishing to a topic can occur in another service responsible for the bot's internal logic. This way, you achieve communication between two services through a message broker, which is a good practice.

Running the Chatbot

Finally, it's time to launch our bot. To do this:

  1. Build the image: docker compose build chatbot;
  2. Create an external volume: docker create volume redis-data;
  3. Run the services: docker compose up.

As a client, we will use Postman – create a new WebSocket connection: File -> New... -> WebSocket, enter the correct URL: ws://localhost:8000/chatbot/1, and click the Connect button. If everything is done correctly, the status Connected should appear (Fig. 1):

image.png

Fig. 1. Step 1 – WebSocket connection

Now, let’s interact with the bot – enter our message in the left form and click Send (Fig. 2):

image.png

Fig. 2. Step 2 – User interaction with the Bot

The bot responded, and optionally, you can continue the conversation.

Finally, let’s take a look at Redis to see what has been recorded. For this, we use the GUI application Redis Insight. In the welcome window, set up a connection to our database (Fig. 3):

image.png

Fig. 3. Step 3 – Connection to database using Redis Insight

In the next window, you can leave everything as is and simply click Add Redis Database. After that, you will be taken to the list of connections, where your database will be displayed. To view the data, click on the corresponding entry, select Pub/Sub on the left panel, and click the Subscribe button (Fig. 4):

image.png

Fig. 4. Step 4 – Subscribe to the database

These actions will allow messages from all topics to be displayed. Let’s verify this by sending another message in Postman. If everything is done correctly, you will see a new message from the bot in Redis Insight with the topic set – achieving a one-way message transmission through the broker.

Thus, we have obtained an extended version of the FastAPI documentation chatbot, which is now easier to scale and deploy in different environments.

Conclusion

In this work, we successfully implemented a scalable chatbot system using the FastAPI framework and the Pub/Sub messaging pattern with Redis PUB/SUB. The architecture addresses common challenges in WebSocket management, such as session persistence and system scalability. The system is deployed using Docker Compose, which ensures easy setup and potential expansion. This foundational setup allows for the implementation of more complex functionalities, such as multi-service communication through message brokers, providing a robust solution for real-time applications.

Список литературы

  1. Tiangolo S. FastAPI. WebSockets // FastAPI Documentation. URL: https://fastapi.tiangolo.com/advanced/websockets/ (Accessed: October 18, 2024).
  2. Encode. Broadcaster. GitHub. Available at: https://github.com/encode/ broadcaster (Accessed: October 18, 2024).
  3. Docker. Python Official Image. Docker Hub. Available at: https://hub.docker.com/_/python/ (Accessed: October 18, 2024).
  4. Bobrovsky A. pubsub-fastapi-chatbot. GitHub. Available at: https://github.com/anatoly-bobrovsky/pubsub-fastapi-chatbot (Accessed: October 18, 2024).

Поделиться

589

Postovalov R.. Pub/Sub chatbot using FastAPI and Broadcaster // Актуальные исследования. 2024. №42 (224). Ч.I.С. 36-43. URL: https://apni.ru/article/10290-pubsub-chatbot-using-fastapi-and-broadcaster

Обнаружили грубую ошибку (плагиат, фальсифицированные данные или иные нарушения научно-издательской этики)? Напишите письмо в редакцию журнала: info@apni.ru

Другие статьи из раздела «Информационные технологии»

Все статьи выпуска
Актуальные исследования

#3 (238)

Прием материалов

18 января - 24 января

осталось 3 дня

Размещение PDF-версии журнала

29 января

Размещение электронной версии статьи

сразу после оплаты

Рассылка печатных экземпляров

12 февраля