'How to connect Kafka consumer to Django app? Should I use a new thread for consumer or new process or new docker container?

I have Django app which should consume Kafka messages and handle it with my handlers and existing models. I use https://kafka-python.readthedocs.io/en/master/usage.html library.

What is the right way to connect KafkaConsumer to Django app. Should I user a new daemon thread? Or a new process? Or a separate docker container maybe? Where to place the code (new Django app?) and how to start it automatically when Django app is ready. And how to update topics which it listen dynamically: should I kill old consumer and start new one each time in new thread?



Solution 1:[1]

Had a similar problem, what I did was create a custom Django command, then proceed to do add a handler method for your functionality. In deployment, you can launch it as a sidecar container.

class Command(BaseCommand):
    def handle(self, *args, **options):
    
        consumer = kafka.KafkaConsumer(KAFKA_TOPIC_NAME,bootstrap_server=["localhost:9092"],group_id=KAFKA_CONSUMER_GROUP)

        for message in consumer:
            handler_method(message)

As a sidecar it would pick any messages in the consumer when it starts.

Solution 2:[2]

I know it's a late answer.

Regarding this issue you might give a try to Faust, it's a stream processing library and can be integrated with Django.

In Docker, initiate two different containers that share the same code base, except one of these containers runs a command which starts a Faust worker

version: '3.3'

services:
  backend_service: &backend_service
    container_name: your_container_name
    build:
      context: .
      dockerfile: Dockerfile
    image: backend_service_image
    volumes:
      - ./your_codebase:/your_container_codebase_path
    ports:
      - "8000:8000"
    env_file: .env

  kafka_consumer:
    <<: *backend_service
    image: kafka_consumer_image
    container_name: kafka_consumer
    command: faust -A <your_project_root>.kafka:app worker -l info

Notice in kafka_consumer container, a command is run: faust -A <your_project_root>.kafka:app worker -l info, where <your_project_root> is the namespace of the folder which holds the settings.py file.

In this folder create a file kafka.py, as following

import os
import faust
from django.conf import settings

# eventlet is used as a bridge to communicate with asyncio
os.environ.setdefault('FAUST_LOOP', 'eventlet')

os.environ.setdefault('DJANGO_SETTINGS_MODULE', '<your_root_namespace>.settings')

app = faust.App('set_a_name_here', broker=f"kafka://{settings.KAFKA_URL}")

# Specify a topic name
new_topic = app.topic("topic_to_process")

# define a method to process the above topic
@app.agent(new_topic)
async def process_topic(stream):
    async for event in stream:
        ...  # process event

Bear in mind that this process is executed in async, so you might have issues when working with Django ORM, in order to properly implement a consumer which uses ORM you can use sync_to_async wrapper.

Or use it as a decorator:

# kafka.py
from asgiref.sync import sync_to_async

@sync_to_async
def _do_something(event):
    random_model_id = event.get('random_model_id')
    random_model = RandomModel.objects.get(id=random_model_id)


@app.agent(new_topic)
async def process_topic(stream):
    async for event in stream:
        await _do_something(event)

Then you can debug kafka_consumer container to check what is going on. To reflect the changes you need to restart the container.

If you are not using Docker, you need to install a supervisor and configure this command faust -A <your_project_root>.kafka:app worker -l info to run on your supervisord.conf file.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Dharman
Solution 2