'Django ORM and Async

So, I'm trying to create a polling system internal to django commands for fun and to learn async/django.

I'm using django_tenants, although it's not overly important. The idea is that there is a table that holds "tenants". I want to loop through those tenants in a higher infinite loop. For each tenant that isn't already in the async queue (perhaps because the process from the last time that tenant was found hasn't finished yet), I want to add a new asyncio task to be run.

My test case has this one tenant which I wait 5 seconds for to ensure I get the internal queue/loop working.

The idea is that, say there are 3 tenants: t_a, t_b and t_c, and t_a and t_c take 2 and 1 seconds respectively to run, with t_b taking 5.

I would ultimate see the infinite loop running like this:

t_c Done t_c Done t_a Done t_c Done t_c Done t_a Done t_c Done t_b Done t_c Done --- The long running one t_a Done ....

So, t_c doesn't hold up the other tenants from being re-run.

Each iteration of the tenants fetches NEW data as tenants might get created between runs.

class Command(BaseCommand):
    help = 'Processes the webhook queue'

    def __init__(self):
        self.tenant_queue = []
        super().__init__()

    def add_arguments(self, parser):
        parser.add_argument(
            '--schema',
            action='store_true',
            help='Process on a single schema',
        )

    async def get_tenant_queue_count(self, tenant):
        with tenant_context(tenant):
            count = await sync_to_async(Queue.objects.count)()
            if tenant.schema_name == 't_b':
                random_int = 10
            else:
                random_int = randint(1, 3)

            await asyncio.sleep(random_int)
            print(self.tenant_queue)
            print(f'{tenant.name}: {count}, time: {random_int}')
            self.tenant_queue.pop(self.tenant_queue.index(tenant.id))

    def handle(self, *args, **options):
        def _get_tenants():
            return Tenant.objects.all()

        async def _main_routine():

            while True:
                tenants = await sync_to_async(_get_tenants, thread_sensitive=True)()
                for t in tenants:
                    if t.id not in self.tenant_queue:
                        self.tenant_queue.append(t.id) 

                processing_schemas = [self.get_tenant_queue_count(t) for t in tenants if t.id not in self.tenant_queue]

                await asyncio.gather(*processing_schemas)

        asyncio.run(_main_routine())

Now this dies everytime with

django.core.exceptions.SynchronousOnlyOperation: You cannot call this from an async context - use a thread or sync_to_async.

File "/app/app/webhook/management/commands/process_webhooks.py", line 48, in _main_routine
for t in tenants:
File "/usr/local/lib/python3.8/site-packages/django/db/models/query.py", line 280, in __iter__
self._fetch_all()
File "/usr/local/lib/python3.8/site-packages/django/db/models/query.py", line 1324, in _fetch_all
self._result_cache = list(self._iterable_class(self))
File "/usr/local/lib/python3.8/site-packages/django/db/models/query.py", line 51, in __iter__
results = compiler.execute_sql(chunked_fetch=self.chunked_fetch, chunk_size=self.chunk_size)
File "/usr/local/lib/python3.8/site-packages/django/db/models/sql/compiler.py", line 1173, in execute_sql
cursor = self.connection.cursor()
File "/usr/local/lib/python3.8/site-packages/django/utils/asyncio.py", line 31, in inner
raise SynchronousOnlyOperation(message)
django.core.exceptions.SynchronousOnlyOperation: You cannot call this from an async context - use a thread or sync_to_async.

How am I supposed to loop through the tenants?



Solution 1:[1]

Ahhh figured it out... In order to prevent actions on the QuerySet making the ORM try and run sync actions in the async function, I needed to convert the QuerySet to a list inside the sync_to_async function


    async def get_queue_count(self, tenant):
        with tenant_context(tenant):
            count = await sync_to_async(Queue.objects.count)()
            if tenant.schema_name == 'company_xyz':
                random_int = 10
            else:
                random_int = randint(1, 3)

            await asyncio.sleep(random_int)
            print(self.queue)
            print(f'{tenant.name}: {count}, time: {random_int}')
            self.queue.pop(self.queue.index(tenant.id))

    def handle(self, *args, **options):
        def _get_tenants():
            # THIS LINE WRAPPED IN LIST
            return list(Tenant.objects.all())

        async def _main_routine():

            while True:
                tenants = await sync_to_async(_get_tenants, thread_sensitive=True)()
                for tenant in tenants:
                    if tenant.id not in self.queue:
                        self.queue.append(tenant.id)
                        asyncio.create_task(self.get_queue_count(tenant))

        asyncio.run(_main_routine())

Then to make it run indefinitely and non-blocking, I simply create tasks one after the other in the loop, and track them using a simple integer check on the self.queue

This is working as expected now! Hope this help someone.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Trent