'Python stream from FTP server to Flask server for downloading

I have a Python Flask app that gets request to download a file from a remote FTP server. I have used BytesIO to save contents of the file downloaded from FTP server using retrbinary:

import os

from flask import Flask, request, send_file
from ftplib import FTP
from io import BytesIO

app = Flask(__name__)

@app.route('/')
def hello_world():
    return 'Hello, World!'

@app.route('/download_content', methods=['GET'])
def download_content():
    filepath = request.args.get("filepath").strip()
    f = FTP(my_server)
    f.login(my_username, my_password)
    b = BytesIO()
    f.retrbinary("RETR " + filepath, b.write)
    b.seek(0)
    return send_file(b, attachment_filename=os.path.basename(filepath))

app.run("localhost", port=8080)

The issue here is that when the download_content route is hit, first the contents of the file comes in the BytesIO object, then it is sent to the frontend for downloading.

How can I stream the file to frontend while it is being downloading from FTP server? I can't wait for the file to get downloaded entirely in BytesIO object and then do a send_file, as that could be both, memory inefficient as well as more time consuming.

I have read that Flask's send_file accepts a generator object, but how can I make the BytesIO object yield to send_file in chunks?



Solution 1:[1]

It looks like you will need to setup a worker thread to manage the downloading from retrbinary

I have made a quick Gist for this as we have come across the same problem. This method seems to work.

https://gist.github.com/Richard-Mathie/ffecf414553f8ca4c56eb5b06e791b6f

class FTPDownloader(object):
  def __init__(self, host, user, password, timeout=0.01):
    self.ftp = FTP(host)
    self.ftp.login(user, password)
    self.timeout = timeout

  def getBytes(self, filename):
    print("getBytes")
    self.ftp.retrbinary("RETR {}".format(filename) , self.bytes.put)
    self.bytes.join()   # wait for all blocks in the queue to be processed
    self.finished.set() # mark streaming as finished

  def sendBytes(self):
    while not self.finished.is_set():
      try:
        yield self.bytes.get(timeout=self.timeout)
          self.bytes.task_done()
      except Empty:
        self.finished.wait(self.timeout)
    self.worker.join()

  def download(self, filename):
    self.bytes = Queue()
    self.finished = Event()
    self.worker = Thread(target=self.getBytes, args=(filename,))
    self.worker.start()
    return self.sendBytes()

Should probably add some timeouts and logic to handle connections timing out ect, but this is the basic form.

Explanation

Queues don't guarantee that the worker process getBytes has finished when they are empty so you have to have a semaphore/Event to indicate to the generator sendBytes when the worker has finished. However I have to wait for all the blocks in the queue to be processed first hence the self.bytes.join() before setting finished.

Interested if anyone can think of more elegant way of doing this.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1