Discussion forum for David Beazley

Backpressure with curio socket as stream


Caveat, I am new to async programming in python and this is my first attempt at it.

I am using curio as a potential solution to producer / consumer application I am writing. I have an http server that will stream data at extremely high rates to the client. I set up an async generator to yield parts of the stream. This generator is iterated on in the producer, which adds the yielded values to a curio Queue. The consumer then pops items from that queue and processes them further. The processing methods that this consumer is calling are basically synchronous code right now, even though I am using async def and awat in the calls. They are not really blocking functions, they are a few if statements with simple math that modifies state on an instance.

However during periods of time where the socket is pushing a lot of data out, things tend to get backed up (not unexpectedly). Where is this backup occuring, and would it be alleviated if I changed the approach in the consumer and instead of directly awaiting the methods, I spawned them as tasks? I am thinking this may improve things as it would allow the context to switch back to the async generator in the producer and make more progress on teh socket data before switching back to processing. Is this a valid approach or should I try something else to deal with the “backpressure” in the system?

Here is my generator function:

    async def stream(self, symbols):
        sock = await curio.open_connection(self.host, self.port)
        req = f'GET /stream?symbol={self._format_symbols(symbols)} '
        req += 'HTTP/1.1\r\nHost: localhost:5000\r\n\r\n'
        async with sock:
            await sock.sendall(req.encode())
            stream = sock.as_stream()
            for i in range(3):
                await stream.readline()
            while True:
                yield (await stream.readline()).strip(), datetime.now()

Here is my producer / consumer:

async def start(self):
    listener_task = await curio.spawn(self._listener)
    agg_task = await curio.spawn(self._aggregator)
    await listener_task.join()
    await agg_task.cancel()

async def _listener(self):
    async for data, time_stamp in self._conn.stream(self._symbols):
        data = data.strip()
        if data[0] == 84:
            _, symbol, *__, c, s, date = data.split(b',')
            await self._q.put((symbol, c, s, date, time_stamp))
    await self._q.join()

async def _aggregator(self):
    while True:
        args = await self._q.get()
        await self.process(*args)
        await self._q.task_done()

The process method does several things and calls other methods, but none of which involve IO or should be blocking much at all.

Thanks for any input or advice!


Going to spawn the process call, as well as the calls that it makes downstream. That should keep things moving and better distribute the processing time across each line rather backing up quickly during occasional bursts of data.


Hmmm, so it seems my first attempt at spawning these methods didn’t work out. I got a bunch of log messages about my tasks / coroutines never being joined. I looked this up and found the recommendation to spawn from a TaskGroup instead, which will automatically join for me. I did this and now the warning are gone but the program now just eventually crashes due to memory usage. It’s as if the tasks are just getting added to the scheduler but never actually run and removed. While I created the group as a context manager in the start method above, I assigned it as an instance variable and other methods spawn off it (although the execution at that point is still nested in the context manager).

I didn’t really think I wanted to join anywhere. The tasks that I am queuing up simply adjust state and don’t return a value, so I am not going to be calling .result on them. The rationale for spawning the process call instead of calling it is that it does take some time and the socket seems to be getting backed up when there is a high burst of lines coming out. This means I end up with some small number of outlier cases with high latency between the source data and the end of my process call. I figured by switching context back to the socket immediately after queuing up the task to run later I would “spread out” the latency across all my calls instead of having some that are high and some that are very quick, instead narrowing the distribution of latency around an expected average.

This program is meant to run indefinitely, what is the best way to go about adding tasks to the kernel’s schedule without also calling join() on them (I assume join will block until the task is complete)?

I am new to single threaded concurrency, so any help on this would be greatly appreciated.


A few thoughts on this. First, internally, curio is not performing any I/O buffering or queuing. The kernel is mainly concerned with scheduling of tasks, but all I/O is pushed out to user code. This is probably a good thing in the sense that there is no hidden magic going on (i.e., Curio queuing up things in the background without your knowledge).

That said, there are a few ways in which back pressure might be applied in the system.

  1. Use a fixed-sized queue. Queues have a maxsize parameter and attempts to push things into a queue that is full would cause the producer to block.

  2. Adjust the internal buffer settings on the low-level socket objects using various combinations of socket options (sock.setsockopt(), etc.). This approach transfers the problem of back pressure back to the operating system. In theory, something like TCP flow control would kick in and throttle things.

  3. Use something like a Semaphore to throttle things. For example, this could be used to control the amount of concurrency taking place.

Regarding tasks that are never joined, you probably want to spawn a task using the daemon option. For example, await spawn(task, daemon=True). Daemonic tasks are understood to never be joined.

That’s interesting on the TaskGroup running out of memory. The finished tasks are probably being accumulated internally with the expectation of gathering their results. If you’re not going to join with a task, I think you want the daemon flag more than you want TaskGroups.


Thanks Dave, This is very helpful, especially the daemon option (I should have seen that before). It sounds like the OS is handling any build up in the socket, thanks for the clarification on that.

What’s still not entirely clear is when are the spawned processes run and the scheduling of those versus the “driving code” (the code that calls spawn)? I will plan to review the source some on this and will post in a separate thread if its not clear.



With cooperative multitasking (such as Curio), the spawned processes will first run as soon as the task that called spawn() blocks for some reason. From that point forward, task switching only occurs on blocking operations (indicated by the use of await statements).


Thanks Dave.

I see, I wasn’t sure if the kernel kept track and switched context on the awaits in the outer most code that was not “spawned” (the “driving code”). Without digging into the source yet, I assumed that the kernel only managed spawned tasks and that the driving code was just regular python where await was just syntax for priming and driving a co-routine, and blocked until said co-routine returned.