Discussion forum for David Beazley

Batching and synchronization with Curio


How can I synchronize tasks waiting on the same ressource using the async programming model?

Imagine a webservice that squares integers in batches of 1000. We need an HTTP client that is able to accumulate queries until we hit the required batch size, call the webservice and then dispatch results to the callers. Obviously, calls should be asynchronous.


>>> t1 = curio.spawn(client.get_square_of(2))  # batch size 1
>>> t2 = curio.spawn(client.get_square_of(3))  # batch size 2


>>> tn = curio.spawn(client.get_square_of(6))  # batch size reached => call the webservice!
>>> print(await t1.join())
>>> print(await t2.join())
>>> print(await tn.join())

Sounds simple enough. I can think of a solution using threads and callbacks, but I just cannot think of a way to implement that elegantly with Curio. I managed to get something working using a bunch of flags and curio.sleep(), but it does not feel right.

Is it something that the current curio API can help me with? If so, what would be your suggestions?



Interesting puzzle! Here’s one possible approach I threw together:

from collections import namedtuple
import curio

WorkOrder = namedtuple("WorkOrder", ["data", "reply_queue"])

class Batcher:
    def __init__(self, submit_batch_fn, max_batch_size, max_time):
        self._submit_batch_fn = submit_batch_fn
        self._max_batch_size = max_batch_size
        self._max_time = max_time
        self._inbox = curio.Queue()
        self._batch_task = None

    async def _batch_submit_loop(self):
        # Loop over batches
        closing = False
        while not closing:
            # Loop gathering up a single batch
            batch = []
            async with curio.ignore_after(self._max_time):
                while len(batch) < self._max_batch_size:
                    work_order = await self._inbox.get()
                    if work_order is None:
                        closing = True
            # Either len(batch) == batch_size, or timeout expired, or we're
            # closing. In any case, we should submit what we have.
            if not batch:
            datas = [work.data for work in batch]
            reply_queues = [work.reply_queue for work in batch]
            responses = await self._submit_batch_fn(datas)
            for reply_queue, response in zip(reply_queues, responses):
                await reply_queue.put(response)

    async def process_item(self, data):
        if self._inbox is None:
            raise RuntimeError("Batcher is closed")
        if self._batch_task is None:
            # This relies on spawn being synchronous...
            self._batch_task = await curio.spawn(self._batch_submit_loop(), daemon=True)
        reply_queue = curio.Queue()
        await self._inbox.put(WorkOrder(data, reply_queue))
        return await reply_queue.get()

    async def close(self):
        inbox = self._inbox
        self._inbox = None
        await inbox.put(None)
        await self._batch_task.join()

    # Unfortunately we can't implement __del__, because inbox.put is async-colored

### Example usage

async def example():
    async def batched_get_square_of(numbers):
        # pretend this is a web service
        return [n**2 for n in numbers]

    batcher = Batcher(batched_get_square_of, 100, 1)
    # Will block for 1 second waiting for other work to be submitted, before
    # giving up and submitting a single-item batch:
    print(await batcher.process_item(10))
    await batcher.close()


What do you think?

[Edit: let’s call that code CC0]


Thanks @njsmith for an amazing reply!

What I definitely missed in my tentative implementation is the idea of having a looping task (_batch_submit_loop here). I tried to handle everything in the the equivalent of the process_item method here and it was an absolute mess.
I just learned about the existence of the ignore_after context manager thanks to you. I must admit I did not explore the API very thoroughly, I just used what my IDE auto-completion would suggest :slight_smile: I guess it’s time to get a good hard look at the fine manual.

Well, this solves the problem pretty elegantly and it gives me a ton of insight about what can be done with Curio and the async programming model in general. Thank you very much!

I’m currently working on a tech talk I’m going to give to my coworkers on the subject of async programming in general and its application in Python in particular. The thing is, I don’t know much about the subject myself and I’m still in the process of wrapping my head around the stuff.

One thing is for sure is that I would like to not talk about asyncio too much. It is in the stdlib and it is important to know it exists, but I’d rather use Curio in this talk because it conveys the concept of pure async programming much better than asyncio in my opinion. So, would you mind if your code above makes an appearance in a couple slides? This specific problem is something we had to solve recently in Java using Futures/Callbacks and your approach nicely illustrates the difference with the async model.