'Mac web requests hanging after thousands of requests

I'm facing a very strange issue and am looking for advice on how to debug it more than I am on a simple fix, since I've been unable to create a simple reproducible case.

Over the course of a few hours I'm opening 10,000-100,000 async requests to remote web domains with httpx. Specifically I'm using a shared pool of context managers to share TCP sockets / other resources across requests. I'll only have a few thousand requests pending at any one time. My code at its core is doing the following:

from random import choice
from httpx import AsyncClient

clients = []

for _ in range(200):
   client = AsyncClient()
   clients.append(await client.__aopen__())

async def run_request(url):
    try:
        client = choice(clients)
        response = await client.get(url, timeout=15)
    except Exception as e:
        continue
with ProcessPoolExecutor() as executor:
    await gather(
        *[
            asyncio.get_event_loop().run_in_executor(
                executor,
                partial(run_request, url=url)
            )
            for url in urls
          ]
      )

Sometimes the exception loop throws in the case of a timeout or an inaccessible host.

At some point my whole machine will hang when trying to create new connections. Chrome freezes, a locally hosted postgres instance freezes, even lsof -i -a freezes. Yet none actually timeout, they just spin forever. It seems as if the OS is unable to allocate new sockets in order to communicate with remote hosts, but I'm not sure if that explains the postgres or lsof behavior.

Is it possible socket opens are being leaked and not released, despite the context manager? Has anyone seen something similar? What are the profiling methods to explore to determine the root cause?



Solution 1:[1]

The "proper" way to debug a problem like this in OSX would be to grab DTrace and look at syscalls. If you don't want to go down that rabbit hole, then spin up tcpdump and monitor HTTP(s) traffic to check out if the process is actually doing work or is literally in a loop.

That said, the issue is shared mutable state causing the code to be not thread-safe below I theorize why the OS might be freezing up, propose some solution, and then there is my code which aims to reduce resource usage and have the code behave well with respect to the servers it might be hammering.


What you are doing is fundamentally not thread-safe, so it's probably due to that. The AsyncClients contain mutable state which can be mutated by more than one executor. Since there is no synchronization, the executor could potentially change the state of some client that an other executor is working on. For example it could change the client's internal state machine from "having received a response and awaiting to be read by python" to "awaiting on a response from the server" which will never arrive thereby locking an executor in place; with enough of these indefinite awaits, the OS's eventloop (kqueue iirc) is overwhelmed.

There isn't enough context to say more. Debugging whether code is not thread-safe is hard, especially when more stuff is going on than the snippets you shared. It's the entire reason Rust is so popular since it prevents these kinds of issues from even being valid.

There are a few ways to handle this problem. One is to hand an AsyncClient to each executor (and creating a client per url):

with ProcessPoolExecutor() as executor:
    await gather(
        *[
            asyncio.get_event_loop().run_in_executor(
                executor,
                partial(run_request, url=url, client=client)
            )
            for url,client in zip(urls,clients)
          ]
      )

Or, create the client in each executor after it's spawned:

async def run_request(url):
    async with httpx.AsyncClient() as client:
        try:
            response = await client.get(url, timeout=15)
        except Exception as e:
            continue

You could also just put an asyncio.Lock around the clients and then the executors could share them, but you're better off not doing that.

My preferred method is to group your urls per domain (eg. {"reddit.com":{<set of reddit urls>}, ...}) and pass the set of domain-specific urls in to the function being run in parallel, and have that function create it's own client.

from httpx import AsyncClient
from concurrent.futures import ProcessPoolExecutor
import asyncio
from functools import partial
from urllib.parse import urlparse

clients = []

async def run_request(urls):
    async with httpx.AsyncClient(timeout=15) as client:
        resps = []
        deadletter = []
        for url in urls:
            try:
                resp = await client.get(url)
                resps.append(resp)
            except Exception as e:
                deadletter.append((url,e,resp))
        return resps,deadletter

def get_domain_from_url(url):
    c = urlparse(url)
    return c.netloc


async def entrypoint(urls):
    domain_url_map = {}
    for url in urls:
        domain = get_domain_from_url(url)
        
        # using a set deduplicates the urls
        domain_url_map[domain] = domain_url_map.get(domain,set())

        # set.add == list.append
        domain_url_map[domain].add(url) 


    with ProcessPoolExecutor() as executor:
        responses, errors = await asyncio.gather(
            *[
                asyncio.get_event_loop().run_in_executor(
                    executor,
                    partial(run_request, urls=urls)
                )
                for urls in domain_url_map.values()
            ]
        )

This way you are greatly reducing the number of clients (and presumably sockets) which is good. Domains aren't affecting each other so rate-limiting is less likely to affect multiple clients. You also get deduplicated urls, and you can easily add per domain rate limits.

Solution 2:[2]

Reviewing the code you have posted, it seems you are not handling the release/closing of the AsyncClient as mentioned in httpx async documentation:

Use async with httpx.AsyncClient() if you want a context-managed client

async with httpx.AsyncClient() as client:
   <your code here>

Alternatively, use await client.aclose() if you want to close a client explicitly:

client = httpx.AsyncClient() 
<your code here> 
await client.aclose()

As stated in the document I think the best approach is to open a context manager inside your run_request function.

you can also find a similar issue being reported in this github issue on httpx repo and their approach is using the context manager.

besides their own repo and docs, just to provide an example of how it should look, something like:

async def run_request(url):
    async with httpx.AsyncClient() as client:
        try:
            response = await client.get(url, timeout=15)
        except Exception as e:
            continue

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 plunker
Solution 2 bjornaer