'Django channels testing with HttpCommunicator

I have a DRF application, which also has a single websocket consumer

Now I'm trying to make a test case, which inherits from a Djano 3.1 TestCase. Said test case should register a user via rest_auth registration endpoint and than connect to the channels consumer

To register a user in a test case I use HttpCommunicator like so:

class TestRoomWebsockets(APITestCase):
    async def test_connect(self):
        communicator = HttpCommunicator(
            application, "POST", reverse(UrlUtils.Users.REGISTER), 
            body=json.dumps({"username": ..., "password1": ..., "password2": ...}).encode("utf-8")
        )
        response = await communicator.get_response()
        self.assertEqual(response["status"], 200)

But it fails with status code 400. The response is

{'status': 400, 'headers': [(b'Content-Type', b'application/json'), (b'Vary', b'Accept'), (b'Allow', b'POST, OPTIONS'), (b'X-Frame-Options', b'DENY'), (b'Content-Length', b'120'), (b'X-Content-Type-Options', b'nosniff'), (b'Referrer-Policy', b'same-origin')], 'body': b'{"username":["This field is required."],"password1":["This field is required."],"password2":["This field is required."]}'}

I have no idea, why the data is lost somewhere. Could someone please explain, what am I doing wrong? Please tell, if more details are required to solve the issue.

Some additional files

asgi application looks like this:

application = ProtocolTypeRouter({
    "http": django_asgi_app,
    "websocket": TokenAuthMiddlewareStack(
        URLRouter([
            path('ws/', my_router),
        ])
    )
})


Solution 1:[1]

If you need to use database transactions with channels in test you have to use SimpleTestCase. APITestCase or TestCase dont allow transactions, you have to use SimpleTestCase from django test, and set databases to all. Just with that difference i think it will work. Note that the transactions will be saved between test, and not rolled back after the test.

from django.test import SimpleTestCase
class TestRoomWebsockets(APITestCase):
    databases = '__all__'
    async def test_connect(self):
        communicator = HttpCommunicator(
        application, "POST", reverse(UrlUtils.Users.REGISTER), 
        body=json.dumps({"username": ..., "password1": ..., "password2": ...}).encode("utf-8")
    )
        response = await communicator.get_response()
        self.assertEqual(response["status"], 200)

here are the information of SimpleTestCase https://docs.djangoproject.com/en/4.0/topics/testing/tools/

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Pablo Estevez