Discussion forum for David Beazley

Creating Pipelines of Tasks


I want to create a pipeline of tasks. I’m not sure my terminology is quite right, so here’s an example:

I receive a stack of purchase orders. Each purchase order contains 0 to 5 line items. The handwriting on these purchase orders is pretty bad, so it takes between 0 and 10 seconds to figure out how many line items there are.

Once I know how many items there are, I need to go and fetch each item from the shelves, which again takes between 0 and 10 seconds.

In principle this can extend, with each “fetched item” requiring more work to be done.

Enough with the example! Here’s my code implementing this set of tasks:

import random

import curio

async def read_purchase_order(order_id):
    units = random.randint(0, 5)
    await curio.sleep(random.randrange(0, 10))
    print(f'order {order_id} has {units} items')
    return order_id, units

async def fetch_item_for_order(order_id):
    await curio.sleep(random.randrange(0, 10))
    print(f'found an item from order {order_id}')

async def main():
    order_ids = range(4)
    async with curio.TaskGroup() as read_group:
        for i in order_ids:
            await read_group.spawn(read_purchase_order(i))

        async with curio.TaskGroup() as fetch_group:
            async for task in read_group:
                order_id = task.result[0]
                for _ in range(task.result[1]):
                    await fetch_group.spawn(fetch_item_for_order(order_id))

    await fetch_group.join()
    print('all items fetched...coffee break time!')

if __name__ == '__main__':

So here’s my question: is this the right way to approach this problem? My instinct is no…if the second step produces more work, etc., etc. I’d end up with deeper and deeper nesting of async for task in some_task_group

(Just for context, the actual problem I’m working on involves taking a list of RSS feeds, fetching them, parsing them, and then fetching and parsing each item in each feed. I like my harried shipping manager example a bit better though :slight_smile: )


For pipelines, I have always had a fondness for using generators. For example, using techniques as described “Generator Tricks for Systems Programmers” (https://speakerdeck.com/dabeaz/generator-tricks-for-systems-programmers). With Python 3.6, you can now write “async generators” and set up pipelines in much the same manner. So, I might be inclined to investigate that.

Your example above is kind of interesting though. If I understand it correctly, you throw a bunch of work out to a task group and then take the results from that (as they come back) and put them into another task group. I honestly can’t say if that’s the “right way” to do this or not. I kind of wonder if this idea could be generalized somehow. Hmmm.


Thanks for the response! I’ll take a look at that presentation tomorrow, hopefully.

Your understanding is correct. The solution I evolved my way into looks like this:

async def main():
    order_ids = range(4)

    readers = curio.TaskGroup(name='readers')
    fetchers = curio.TaskGroup(name='fetchers')

    for i in order_ids:
        await readers.spawn(read_purchase_order, i)

    async for reader in readers:
        order_id, units = reader.result
        for _ in range(units):
            await fetchers.spawn(fetch_item_for_order, order_id)

In this way, I don’t end up with something akin to a callback pyramid and I don’t have to join on the task groups…I let the async for take care of that for me. Since all the task group finalization is just join, I don’t think I’m leaving anything hanging here.

I agree with you, though–I’m not sure if this is the “right”/best way to go about this. My application is still just a plaything for async programming, so I’ll be thinking about the architecture more over the coming days.

EDIT: Just to be clear, I don’t think I actually need to be using the task groups here…I was just using them as a convenient way of grouping things together. I don’t use any of the wait for some tasks, cancel all tasks, etc. functionality.


It looks like the solution you ended up with could be simplified even more to

async def main():
    order_ids = range(4)

    async def read_then_fetch(order_id):
        _, units = await read_purchase_order(order_id)
        await fetch_item_for_order(order_id)

    for i in order_ids:
        await curio.spawn(read_then_fetch, i)

The general principle here would be that it’s generally better to structure control flow as python code when you can, and async/await makes this possible in many more cases than traditional async libraries.


You raise a good point there. I was reflexively using Tasks because that’s what Curio is all about right? But I don’t actually know what benefit I would derive from using Tasks over just a standard async function. Is there such a benefit?


The main purpose of using Tasks is if you want to do multiple things at the same time. So if you want to look up multiple purchase orders at the same time, you can put those into separate tasks. But if for each purchase order you want to first read, then fetch, in sequence, then that can be a simple function; those two stages don’t need to be put into separate tasks.

I guess in curio there’s also a secondary reason why you might want to use a Task: if you want to manually cancel something, then you can only do that to a task. (In trio cancellation and tasks are decoupled, so that Tasks are only about doing multiple things at the same time.)


I don’t follow this whole distinction you’re making here. Tasks in Curio are used to do multiple things at the same time. There is no other purpose than that so far as I know.


It’s just a minor point. But if I have some function I want to call, and I want it to be possible for some other code to cancel it when some arbitrary condition occurs (i.e. not a timeout), then sticking it into a Task is the only way to do it, right? Obviously this is not main reason people use tasks, but I was trying to think of situations where it might make sense to spawn a Task and then immediately join it.


I’m not exactly sure what a cancellation would be applying to other than some code running in a Task (given that all code in Curio runs within a Task). That said, I don’t think it’s necessary to encapsulate code in an additional Task just to apply a different kind of cancellation to it. The cancel() method of Tasks allows custom exceptions to be passed in and could be used at a fairly low-level by code that wanted to do non-timeout related things. Maybe I’m missing something.