'What in this python program can make my memory goes to sky and freeze the program?

I have written a python program that connect to my mqtt server and process the data! BUt the memory start very low and get higher and higher over time! I would says pretty fast like in 2 hours it goes from ~20% to ~80% if i don't restat the process, it will just hang and stop the program.

It does over a week i am looking at the code searching for my errors trying different things and nothings seems to fix the problem!!

This code is where i am at now!

Thanx a lot!

#!/usr/bin/python
from __future__ import unicode_literals
import os, sys, django, json, time, datetime, socket, _thread, multiprocessing, queue
import paho.mqtt.client as mqtt
import paho.mqtt.publish as publish
import django.db

BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR) #here store is root folder(means parent).
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "EraIoT.settings")
django.setup()

from vac300s.models import vac300, vac300Stat
from vac200sV1.models import vac200V1

from shared.utils import *

mqtt.Client.connected_flag = False

clear = lambda: os.system('clear')

messagesReceivedQ = queue.Queue()
messagesQueuedQ = queue.Queue()
activeThreadQ = queue.Queue()
processingTimeQ = queue.Queue()
processingTimeMysqlQ = queue.Queue()
processingTimeMemcachedQ = queue.Queue()
processedRequestQ = queue.Queue()

topics = ["data/vac300s/#", "message/vac300s/#", "alarm/vac300s/#"]

q = queue.Queue()

def on_disconnect(mqttc, userdata, rc):
    print("*****************************************")
    print("Disconnected from MQTT server:")
    print(str(rc))
    print("*****************************************")
    mqttc.connected_flag = False


def on_connect(mqttc, obj, flags, rc):
    if rc == 0:
        mqttc.connected_flag = True
        print("Connected to MQTT server succesfully for VAC300s")
        for topic in topics:
            print("Subscibing to: " + topic)                    
            mqttc.subscribe(topic,0)
    else:
        print("There as been a problem connecting to MQTT server.")

def on_publish(mqttc, obj, mid):
    return

def on_subscribe(mqttc, obj, mid, granted_qos):
    print("Subscribed: " + str(mid) + " " + str(granted_qos))

def on_log(mqttc, obj, level, string):
    return
    print(string)

def on_message_msgs(mosq, obj, msg):
    # This callback will only be called for messages with topics that match
    # $SYS/broker/messages/#
    print("MESSAGES: " + msg.topic + " " + str(msg.qos) + " " + str(msg.payload))

def on_message_bytes(mosq, obj, msg):
    # This callback will only be called for messages with topics that match
    # $SYS/broker/bytes/#
    print("BYTES: " + msg.topic + " " + str(msg.qos) + " " + str(msg.payload))

def on_message(mosq, obj, msg):
    # This callback will be called for messages that we receive that do not
    # match any patterns defined in topic specific callbacks, i.e. in this case
    # those messages that do not have topics $SYS/broker/messages/# nor
    # $SYS/broker/bytes/#
    #print(msg.topic + " " + str(msg.qos) + " " + str(msg.payload))
    return

def on_message(mosq, obj, msg):
    try:
        try:
            msg.payload = msg.payload.decode()
        except:
            pass
        topic = msg.topic.split("/")
        if len(topic[2]) != 9:
                return
        if((topic[2].startswith("0603")) & (topic[2] != "060300000")):
            if(topic[0] == "message"):
                response = {}
                response["serial"] = topic[2]
                response["message"] = msg.payload
                socketSend(response)
                return
    except:
        pass

def start_on_data(mosq, obj, msg):
    messagesReceivedQ.put(".")
    q.put(msg)
    
def on_message_Data(msg):
    _start = time.time()
    django.db.connections['default'].close()
    try:
        try:
            msg.payload = msg.payload.decode()
        except:
            pass
        topic = msg.topic.split("/")
        if len(topic[2]) != 9:
                activeThreadQ.get()
                return
        if((topic[2].startswith("0603")) & (topic[2] != "060300000")):
            try:
                if topic[4] == "ping":
                    activeThreadQ.get()
                    return
            except:
                pass
            response = {}
            start = time.time()
            obj, created = vac300.objects.get_or_create(serial = topic[2])
            response = obj.update(msg)
            processingTimeMysqlQ.put(time.time() - start)
            start = time.time()
            socketSend(response)
            processingTimeMemcachedQ.put(time.time() - start)
            activeThreadQ.get()
            processingTime = time.time() - _start
            return
    
        elif((topic[2] == "060300000") & (topic[4] == "mac")):
            l = vac300.objects.latest('serial')
            lastSerial = int(l.serial[-5:].lstrip("0"))
            newSerial = lastSerial + 1
            if(len(str(lastSerial + 1)) == 5):
                newSerial = "0603"
            elif(len(str(lastSerial + 1)) == 4):
                newSerial = "06030"
            elif(len(str(lastSerial + 1)) == 3):
                newSerial = "060300"
            elif(len(str(lastSerial + 1)) == 2):
                newSerial = "0603000"
            elif(len(str(lastSerial + 1)) == 1):
                newSerial = "06030000"
            newSerial = newSerial + str(lastSerial + 1)
            if(len(newSerial) == 9):
                publish.single("to/vac300s/" + msg.payload +"/setSerial", newSerial, hostname="mqtt.erablitek.com", auth={"username":"erablitekClients", "password":"A23002300a"})
                print("Sending new serial:" + newSerial + " to mac: " + msg.payload)
        else:
            activeThreadQ.get()
            return
    except Exception as e:
        printException(e, msg.topic, msg.payload)
    activeThreadQ.get()

def on_message_alarms(mosq, obj, msg):
    try:
        print(msg.topic + " | " + msg.payload.decode())
    except Exception as e:
        print(e)

def statsPrinterThread():
    print("Starting statsPrinterThread Thread")
    messagesReceived = 0
    processingTimeMysql = 0
    while True:
        try:
            time.sleep(60)            
            messagesReceived = messagesReceivedQ.qsize()
            with messagesReceivedQ.mutex:
                messagesReceivedQ.queue.clear()
                messagesReceivedQ.all_tasks_done.notify_all()
                messagesReceivedQ.unfinished_tasks = 0
            
            qSize = processingTimeMysqlQ.qsize()
            with processingTimeMysqlQ.mutex:
                average = 0
                for i in processingTimeMysqlQ.queue:
                    average += i
                processingTimeMysql = average / qSize
            
            qSize = processingTimeMemcachedQ.qsize()
            with processingTimeMemcachedQ.mutex:
                average = 0
                for i in processingTimeMemcachedQ.queue:
                    average += i
                processingTimeMemcached = average / qSize

            avgTotalProcessingTime = processingTimeMysql + processingTimeMemcached
            obj = vac300Stat()
            obj.mqttMsgPerSecond = messagesReceived / 60.0
            obj.queuedMSg = q.qsize()
            obj.activeThread = activeThreadQ.qsize()
            if avgTotalProcessingTime:
                obj.avgMysqlProcessingTime = round((processingTimeMysql) * 1000, 5)
                obj.avgSocketProcessingTime = round((processingTimeMemcached) * 1000, 5)
                obj.avgTotalProcessingTime = round((avgTotalProcessingTime) * 1000, 5)
            else:
                print("Service seems crashed!!!")
            obj.save()
            print("")
            print(" vac300s ".center(47, "*"))
            print("Messages received / second %5.2f" % (obj.mqttMsgPerSecond))
            print("Queued messages: " + str(obj.queuedMSg))
            print("Active thread: " + str(obj.activeThread))
            print("Average processing time MySql: " + str(obj.avgMysqlProcessingTime) + " ms")
            print("Average processing time Socket: " + str(obj.avgSocketProcessingTime) + " ms")
            print("Average processing time Total: " + str(obj.avgTotalProcessingTime) + " ms")
            print("***********************************************")
            print("")
        except Exception as e:
            print(e)

_thread.start_new_thread( statsPrinterThread, () )

def processThread():
    while True:
        activeThread = activeThreadQ.qsize()
        if activeThread < 75:
            msg = q.get(True)
            activeThreadQ.put(".")
            _thread.start_new_thread( on_message_Data, (msg, ))
        else:
            time.sleep(0.01)
    print("processThread crashed!")
        
_thread.start_new_thread( processThread, () )

mqttc = mqtt.Client()
mqttc.message_callback_add("data/vac300s/#", start_on_data)
mqttc.message_callback_add("message/vac300s/#", on_message)
mqttc.message_callback_add("alarm/vac300s/#", on_message_alarms)
mqttc.on_message = on_message
mqttc.on_connect = on_connect
mqttc.on_disconnect = on_disconnect
mqttc.on_publish = on_publish
mqttc.on_subscribe = on_subscribe

mqttc.username_pw_set("******", "*******")
mqttc.connect("mqtt.*****.com", 1883, 30)

while True:
    try:
        mqttc.loop(1)
    except Exception as e:
        print(e)


Solution 1:[1]

Without reading the whole thing and being familiar with the packages in use, this is not a simple question to answer.

That said, following basic principles, I would time each portion of the run and see where the tie efficiency decreases (where it starts taking longer and longer) and then focus on that area, and areas that flow into it.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 max_settings