'How to connect Kafka consumer to Django app? Should I use a new thread for consumer or new process or new docker container?
I have Django app which should consume Kafka messages and handle it with my handlers and existing models. I use https://kafka-python.readthedocs.io/en/master/usage.html library.
What is the right way to connect KafkaConsumer to Django app. Should I user a new daemon thread? Or a new process? Or a separate docker container maybe? Where to place the code (new Django app?) and how to start it automatically when Django app is ready. And how to update topics which it listen dynamically: should I kill old consumer and start new one each time in new thread?
Solution 1:[1]
Had a similar problem, what I did was create a custom Django command, then proceed to do add a handler method for your functionality. In deployment, you can launch it as a sidecar container.
class Command(BaseCommand):
def handle(self, *args, **options):
consumer = kafka.KafkaConsumer(KAFKA_TOPIC_NAME,bootstrap_server=["localhost:9092"],group_id=KAFKA_CONSUMER_GROUP)
for message in consumer:
handler_method(message)
As a sidecar it would pick any messages in the consumer when it starts.
Solution 2:[2]
I know it's a late answer.
Regarding this issue you might give a try to Faust, it's a stream processing library and can be integrated with Django.
In Docker, initiate two different containers that share the same code base, except one of these containers runs a command which starts a Faust worker
version: '3.3'
services:
backend_service: &backend_service
container_name: your_container_name
build:
context: .
dockerfile: Dockerfile
image: backend_service_image
volumes:
- ./your_codebase:/your_container_codebase_path
ports:
- "8000:8000"
env_file: .env
kafka_consumer:
<<: *backend_service
image: kafka_consumer_image
container_name: kafka_consumer
command: faust -A <your_project_root>.kafka:app worker -l info
Notice in kafka_consumer
container, a command is run: faust -A <your_project_root>.kafka:app worker -l info
, where <your_project_root>
is the namespace of the folder which holds the settings.py
file.
In this folder create a file kafka.py
, as following
import os
import faust
from django.conf import settings
# eventlet is used as a bridge to communicate with asyncio
os.environ.setdefault('FAUST_LOOP', 'eventlet')
os.environ.setdefault('DJANGO_SETTINGS_MODULE', '<your_root_namespace>.settings')
app = faust.App('set_a_name_here', broker=f"kafka://{settings.KAFKA_URL}")
# Specify a topic name
new_topic = app.topic("topic_to_process")
# define a method to process the above topic
@app.agent(new_topic)
async def process_topic(stream):
async for event in stream:
... # process event
Bear in mind that this process is executed in async, so you might have issues when working with Django ORM, in order to properly implement a consumer which uses ORM you can use sync_to_async wrapper.
Or use it as a decorator:
# kafka.py
from asgiref.sync import sync_to_async
@sync_to_async
def _do_something(event):
random_model_id = event.get('random_model_id')
random_model = RandomModel.objects.get(id=random_model_id)
@app.agent(new_topic)
async def process_topic(stream):
async for event in stream:
await _do_something(event)
Then you can debug kafka_consumer
container to check what is going on. To reflect the changes you need to restart the container.
If you are not using Docker, you need to install a supervisor and configure this command faust -A <your_project_root>.kafka:app worker -l info
to run on your supervisord.conf
file.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Dharman |
Solution 2 |