Properly reading messages in a background task


#1

Hello again, I’m still working on a sans-io amqp client called amqproto and I have another general-ish question: now about receiving exceptions from background tasks.

In the asyncio adapter, I have a long-running background task reading data from the broker, parsing and handling frames.
By asyncio tasks design, in order to receive an exception raised from parsing code, I need to await this background task, and I do in the close method.
However, this is definitely not the one and only place I should await the task. User-defined code that publishes/consumes messages doesn’t know anything about the fact that the task is failed and they should await it/close the connection. But since the task isn’t running, it isn’t receiving responses from the broker - and the whole application looks like it’s stalled. C-c helps (since it goes through __aexit__ routines which await self.close()), for sure, but I’m looking for a better solution. Does one exist?

As usual, thanks for your ideas!


#2

If I may paraphrase, is the main problem here that code is publishing messages (on a queue?) to another task, but it doesn’t know if the destination task has died or not? Just trying to better wrap my brain around it.


#3

Let me show a simpler code:

import asyncio


async def background_task(queue):
    for i in range(10):
        # Imagine here constantly receiving messages from the broker
        await asyncio.sleep(1)
        await queue.put(f'Message received: {i}')
        # At some point something bad happens
        # For example, a bug in the parser, or just an unexpected exception
        if i == 5:
            raise RuntimeError('something went horribly wrong')


async def received_messages(queue):
    # Helper function to illustrate my case
    while True:
        message = await queue.get()
        yield message


async def main():
    queue = asyncio.Queue()
    task = asyncio.ensure_future(background_task(queue))
    try:
        async for message in received_messages(queue):
            print(f'[Handling message] {message}')
    finally:
        # Actually, this happens in an async context manager's __aclose__,
        # but it really doesn't matter
        await task


asyncio.get_event_loop().run_until_complete(main())

If you run this, the output will be

[Handling message] Message received: 0
[Handling message] Message received: 1
[Handling message] Message received: 2
[Handling message] Message received: 3
[Handling message] Message received: 4
[Handling message] Message received: 5

… and it hangs because the background task raised RuntimeError and it wasn’t propagated.


#4

Turns out, this can be solved by using a “poison pill” like this:

import asyncio

async def background_task(queue):
    try:
        for i in range(10):
            await asyncio.sleep(1)
            await queue.put(f'Message received: {i}')
            if i == 5:
                raise RuntimeError('something went horribly wrong')
    except Exception as exc:
        await queue.put(exc)

async def received_messages(queue):
    while True:
        message = await queue.get()
        if isinstance(message, Exception):
            raise message
        yield message

async def main():
    queue = asyncio.Queue()
    task = asyncio.ensure_future(background_task(queue))
    try:
        async for message in received_messages(queue):
            print(f'[Handling message] {message}')
    finally:
        await task

asyncio.get_event_loop().run_until_complete(main())

#5

How tightly coupled is the “received_messages()” functionality to the background task? I can’t speak for asyncio specifically, but in the context of Curio or Trio, I sort of wonder how this might utilize a TaskGroup or Nursery concept.


#6

I’d say they aren’t coupled at all. received_messages() is just an example of public API that works with the queue internally but there are many similar public methods.

I can’t imagine how can TaskGroup or Nursery help in my case. Maybe I’m missing something.
This background task lives until the connection is closed, so my first attempt was to await it in the close method. While this works perfectly for the errors coming from the broker, client side errors aren’t propagated at all. With a taskgroup/nursery, I think I’ll have the same problem - when do I “join” a taskgroup/nursery.
I can’t expose this as a public api of my library, neither I can ask users to make a taskgroup/nursery in order to use my library. IMO, the usage should be straightforward:

from asyncio import run
from amqproto.adapters.asyncio_adapter import AsyncioConnection
async def main():
    async with AsyncioConnection(host='localhost') as connection:
        async with connection.get_channel() as channel:
            await channel.queue_declare('hello')
            await channel.basic_publish(
                b'Hello, world!',
                exchange='',
                routing_key='hello',
            )
            print(" [x] Sent 'Hello World!'")
run(main())

#7

I my thinking on TaskGroups was more on their cancellation behavior. By putting tasks in a task group, they live together. If one dies with an exception, the other one gets cancelled. So, doing something like this might be one way to avoid the whole “poison pill” code. The TaskGroup could certainly be an internal detail not exposed in the API.


#8

I’ve been playing with TaskGroups a bit, and I’m not sure I fully understand how cancellation works:

In [1]: import curio

In [2]: async def echo(what, to_sleep):
   ...:     print(f'sleeping in echo for {to_sleep} seconds')
   ...:     await curio.sleep(to_sleep)
   ...:     if what == 'bar':
   ...:         raise RuntimeError('not a single bar')
   ...:     print(what)
   ...:

In [3]: async def main():
   ...:     async with curio.TaskGroup() as tg:
   ...:         await tg.spawn(echo, 'foo', 5)
   ...:         await tg.spawn(echo, 'bar', 2)
   ...:         await curio.sleep(10)
   ...:

In [4]: curio.run(main)
sleeping in echo for 5 seconds
sleeping in echo for 2 seconds
Task Crash: Task(id=4, name='echo', state='TERMINATED')
Traceback (most recent call last):
# snip
~/Projects/ideas/amqproto/venv/lib/python3.6/site-packages/curio/task.py in join(self, wait)
    521         # If there are exceptions on any task, we raise a TaskGroupError
    522         if exceptional:
--> 523             raise TaskGroupError(exceptional)
    524
    525     async def __aenter__(self):

TaskGroupError: TaskGroupError(RuntimeError)

In [5]:

From your description I was thinking that "foo" won’t be printed because of the error in another task, and the await sleep(10) in main function will be cancelled as well (overall this code runs for 15 seconds). Am I missing something? Maybe this is a bug, because trio’s Nursery works as I would expect (with a waaaaaaaaay longer traceback which I’m not going to fully include):

In [1]: import trio

In [2]: async def echo(what, to_sleep):
   ...:     print(f'sleeping in echo for {to_sleep} seconds')
   ...:     await trio.sleep(to_sleep)
   ...:     if what == 'bar':
   ...:         raise RuntimeError('not a single bar')
   ...:     print(what)
   ...:

In [3]: async def main():
   ...:     async with trio.open_nursery() as nursery:
   ...:         nursery.start_soon(echo, 'foo', 5)
   ...:         nursery.start_soon(echo, 'bar', 2)
   ...:         await trio.sleep(10)
   ...:

In [4]: trio.run(main)
sleeping in echo for 5 seconds
sleeping in echo for 2 seconds
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-4-4b3b16db9147> in <module>()
----> 1 trio.run(main)
# snip
<ipython-input-2-a74c803da98e> in echo(what, to_sleep)
      3     await trio.sleep(to_sleep)
      4     if what == 'bar':
----> 5         raise RuntimeError('not a single bar')
      6     print(what)
      7

RuntimeError: not a single bar

In [5]:

(This code runs for just 2 seconds)


#9

Interesting. So, Trio actually cancels the trio.sleep(10) call inside the nursery block as well?

I’m not sure Curio does that (in fact, I’m almost certain it doesn’t). Instead, an error originating in a spawned task wouldn’t be caught until the task group is in a “joining” mode at the end of the context-block. Let me ponder this further.


#10

Then, unfortunately, a TaskGroup isn’t really useful in my use-case - I have no single place where I can join the task group. It’s more like there’s a background daemon thread that works independently from the main thread, communicating with it via a queue. BUT I want to (maybe periodically) detect failures in this daemon thread in the main thread without joining daemon thread.


#11

The thing I need to think about is whether or not the body of TaskGroup when used as context manager executes as part of the task group or not (i.e., does the curio.sleep(10) execute as part of the group). Right now it doesn’t. However, I’m not married to that idea. Honestly, I hadn’t even thought of it until it got brought up here. I’m going to play around with it further.


#12

Right, in Trio 0.1.0, exception propagation, auto-cancellation, etc. only worked when the parent task was blocked in the nursery’s __aexit__. We called this the “parenting is a full time job” rule – you were supposed to make sure that after you opened a nursery, you proceeded quickly to the __aexit__, without doing anything that would block for long.

It turns out this was confusing, and in particular caused problems for libraries that needed background tasks, like for AMQP or Websockets or whatever. So in 0.3.0 we got rid of this rule, and made it so that the body of the async with open_nursery() block was treated like a child task, and the exception propagation/cancellation stuff happens magically in the background as part of the runtime, instead of just in __aexit__. And the new way also turns out to be way simpler to reason about, explain, etc.

Change log: https://trio.readthedocs.io/en/latest/history.html#trio-0-3-0-2017-12-28

The issue where this was discussed: https://github.com/python-trio/trio/issues/136


So for @malinoff’s original question: trio does force you to use some kind of async with block to create the background task – so the idiomatic way would be to tell your users to write:

async with amqproto.open_connection(HOST, PORT) as amqp_connection:
    await amqp_connection.send(...)  # or whatever

and you’d implement this as something like:

# This decorator available in python 3.7, or the async_generator library
@asynccontextmanager
async def open_connection(host, port):
    async with trio.open_nursery() as nursery:
        # You could also expose AMQPConnection directly as an "expert API"
        # (e.g. for those who want to use their own nursery)
        amqp_connection = AMQPConnection()
        await amqp_connection.connect(host, port, nursery)
        async with amqp_connection:  # make sure to close the connection at the end
            yield amqp_connection

This feels a bit weird at first because now your users have to use async with to create a connection. (Of course they probably should be doing this anyway, but usually in Python this is something we recommend as being good style, not something you get forced to do. So it’s a bit weird.)

But once you do that, Trio automatically takes care of the problem you raised at the beginning of the thread: if the background task you spawn into that nursery crashes, then trio will immediately notice and make sure that the exception doesn’t get lost.

(Of course if you want some other behavior you can do that too, e.g. you could catch exceptions in the background task and set a flag so that the next time the user calls a method on the AMQP connection it raises an error.)


#13

Thanks for the links on the Trio discussion. I’ll have to digest that a bit. I don’t know if I’ll change Curio TaskGroup or not, but I’d definitely consider it it. Probably need to fool around with it for a bit–there are some uses of TaskGroups that might break, but I’d need to investigate.