'How to close connections in celery to avoid 'client unexpectedly closed TCP connection' warnings in RabbitMQ

When running the following simple celery task I always get 'client unexpectedly closed TCP connection' warnings in the RabbitMQ log output.

from celery import Celery
import datetime, time
app = Celery('tasks',
            broker='amqp://guest@localhost/my_vhost'
            )

@app.task(ignore_result=True)
def foobar(name):
    with open('mylog.log', 'a') as logfile:
        try:
            nowstr = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
            logfile.write('[%s] start\n' % nowstr)
            logfile.flush()
            time.sleep(5)
            logfile.write(name + '\n')
            logfile.write('ended\n')
        except Exception as e:
            logfile.write('Exception: %s' % e)
        logfile.flush()

if __name__ == '__main__':
    foobar.delay('foobar')

I run RabbitMQ in a docker container that I start with the following docker-compose.

version: '2'
services:
  rabbitmq:
    image: rabbitmq
    environment:
      - RABBITMQ_DEFAULT_VHOST=my_vhost
    ports: 
      - "5672:5672"

The worker I start with

celery -f celery.log -A sandbox worker --concurrency 1

How do I correctly close the connection to avoid these warnings?



Solution 1:[1]

I assume that the problem is that writing to a file is a blocking operation, which causes this problem, because RabbitMQ polls the container about its state. The following solution should help:

from celery import Celery
import multiprocessing
import datetime, time


app = Celery('tasks', broker='amqp://guest@localhost/my_vhost')

def _write_logs(name):
    with open('mylog.log', 'a') as logfile:
        try:
            nowstr = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
            logfile.write('[%s] start\n' % nowstr)
            logfile.flush()
            time.sleep(5)
            logfile.write(name + '\n')
            logfile.write('ended\n')
        except Exception as e:
            logfile.write('Exception: %s' % e)
        logfile.flush()


@app.task(ignore_result=True)
def foobar(name):
    multiprocessing.Process(target=_write_logs, args=(name, )).start()


if __name__ == '__main__':
    foobar.delay('foobar')

EDIT: If the solution above did not help you, most likely you are facing a memory leak problem.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1