'Receive UDP packets with python, packet loss
I'm having a lot of packet loss using UDP in python. I know I should use TCP if I don't want packet loss, but I don't have (full) controll over the sender.
It's a camera that sends 15 images per second using UDP multicast.
Below you see the code I've written now. It uses multiprocessing to allow the producer and consumer function to work in parallel. Producer function catches the packets, consumer function processes them and writes the images to .bmp files.
I've written a class PacketStream which writes the bytes from the packages to a .bmp file.
When the camera sends a new image, it first sends one packet, with first byte = 0x01. This contains information about the image. Then 612 packets are sent with first byte = 0x02. These contain the bytes from the image (508 bytes/packet).
Since 15 images are sent per second, ~9000 packets are sent per second. Altough this happens at a faster rate in bursts per image, at ~22 packets/ms.
I can receive all packets perfectly using tcpdump or wireshark. But using the code below, packets are missed. Surely my windows 7 pc should be able to handle this? I'm also using it on a raspberry pi 3, and there more or less the same number of packets is missed. Therefore I think it's a problem with the code.
I've tried lots of different things, like threading instead of multiprocessing, Pipe instead of Queue.
I also tried increasing the socket buffer with
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 3000000)
to no avail.
Is this at all possible in python?
Thanks in advance,
import time
from multiprocessing import Process, Queue
import socket
import struct
from PIL import Image
class PacketStream:
def __init__(self, output_path):
self.output_path = output_path
self.data_buffer = ''
self.img_id = -1 # -1 = waiting for start of new image
def process(self, data):
message_id = data[0]
if message_id == '\x01':
self.wrap_up_last_image()
self.img_id = ord(data[3])
self.data_buffer = ''
if message_id == '\x02':
self.data_buffer += data[6:]
def wrap_up_last_image(self):
if self.img_id > 0:
n_bytes = len(self.data_buffer)
if n_bytes == 307200:
global i
write_image(self.output_path + str(i).zfill(7) + '_' + str(self.img_id).zfill(3) + '.bmp',
self.data_buffer)
i += 1
else:
print 'Image lost: %s bytes missing.' % (307200 - n_bytes)
def write_image(path, data):
im = Image.frombuffer('L', (640, 480), bytearray(data), 'raw', 'L', 0, 1)
im.save(path)
print time.time(), path
def producer(q):
# setup socket
MCAST_GRP = '239.255.83.71'
MCAST_PORT = 2271
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', MCAST_PORT))
mreq = struct.pack('4sl', socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
while True:
q.put(sock.recv(512))
def consumer(q):
packet_stream = PacketStream('D:/bmpdump/')
while True:
data = q.get()
packet_stream.process(data)
i = 0
if __name__ == '__main__':
q = Queue()
t1 = Process(target=producer, args=(q,))
t1.daemon = True # so they stop when the main prog stops
t1.start()
t2 = Process(target=consumer, args=(q,))
t2.daemon = True
t2.start()
time.sleep(10.0)
print 'Program finished.'
EDIT
Thanks for all the suggestions.
1) I tried threading+queue already, also the ''.join(), didn't seem to make much difference. I'm quite sure now the problem is that the producer thread doesn't get enough priority. I can't find how to increase this using Python? Is this even possible?
2) I managed to lose only about 10% using the code below. Processor is at ~25% (on the raspberry pi) The key is to consume the data when there's a pause in the packet stream, i.e. when the last data package has arrived
import time
import socket
import struct
from PIL import Image
def write_image(path, data):
im = Image.frombuffer('L', (640, 480), bytearray(data), 'raw', 'L', 0, 1)
im.save(path)
print time.time(), path
def consume(data_buffer):
img_id = ord(data_buffer[0][1])
real_data_buffer = [data[6:] for data in data_buffer]
data_string = ''.join(real_data_buffer)
global i
write_image('/media/pi/exthdd_02/bmpdump/' + str(i).zfill(7) + '_' + str(img_id).zfill(3) + '.bmp', data_string)
i += 1
def producer(sock):
print 'Producer start'
data_buffer = []
while True:
data = sock.recvfrom(512)[0]
if data[0] == '\x01':
data_buffer = []
else:
data_buffer.append(data)
if len(data_buffer) == 612:
consume(data_buffer)
# image counter
i = 0
# setup socket
MCAST_GRP = '239.255.83.71'
MCAST_PORT = 2271
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((MCAST_GRP, MCAST_PORT))
mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 30000000)
producer(sock)
Solution 1:[1]
A few suggestions to improve your code, but first a question: have you measured at all what might be slowing things down? For instance have you looked at the CPU usage of your system. If you're hitting 100% that may very well the reason of the packet loss. If it's mostly idle, there is something else going on and the problem is not related to the performance of the code.
Now, some suggestions to improve the code:
- use
socket.recvfrom
instead ofsock.recv
when dealing with UDP sockets - don't use multiprocessing with processes, the serialization that has to occur to send data from one process to the other may very well be a performance bottleneck if we're talking ~9000 calls/sec. Try to use threads instead (
threading
+queue
modules). But as you're not providing any observed numbers it is hard to say really. - don't use string concatenation to build up the receiver's buffer as it gets packets. That creates large numbers of new temporary string objects and copies data around all the time. Instead, append each packet in a list and when you received all of the data,
"".join(packets)
them all together once at the end.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Irmen de Jong |