Discussion forum for David Beazley

Die Threads - EuroPython 2018


#1

I just thought I would post a few followup thoughts on my “Die Threads” keynote from EuroPython 2018. A screencast is available here:

https://www.youtube.com/watch?v=U66KuyD3T0M

The interaction between async and threads is a topic that I have been exploring for a few years now, primarily in the context of the Curio project (https://github.com/dabeaz/curio). In the talk, I presented a library “thredo” (https://github.com/dabeaz/thredo) that supported a variety of features such as thread groups, thread cancellation, queues, and various magic not normally associated with threads. In reality, this was Curio in disguise–in particular, an application of Curio’s “Async Thread” feature.

You can read quite a bit about Curio, threads, and async/thread interaction at https://curio.readthedocs.io/en/latest/devel.html#programming-with-threads. In particular, pay close attention to the part about “Asynchronous Threads”. You’ll find that the “thredo” library is a tiny layer on top of this. For example, the thredo run() function merely sets up an execution environment and launches into a thread like this:

def run(callable, *args):
    async def _runner():
        t = await curio.spawn(curio.thread.thread_handler)
        try:
            async with curio.spawn_thread():
                return callable(*args)
    finally:
            await t.cancel()
    return curio.run(_runner)

Likewise, if you look at the implementation of something like the thredo.Queue object (used in the talk), it looks like this:

class Queue(object):
    def __init__(self, maxsize=0):
        self._queue = curio.Queue(maxsize)

    def empty(self):
        return self._queue.empty()

    def full(self):
        return self._queue.full()

    def get(self):
        return AWAIT(self._queue.get)

    def join(self):
        return AWAIT(self._queue.join)

    def put(self, item):
        return AWAIT(self._queue.put, item)

    def qsize(self):
        return self._queue.qsize()

    def task_done(self):
        return AWAIT(self._queue.task_done)

There is no real “implementation” here. It’s purely an abstraction layer on top of curio.Queue with some magic AWAIT functions thrown in for good measure. What’s AWAIT you ask? It delegates the operation over to the async event loop. There are some fiddly bits involved there, but behind the scenes, every real thread has a backing coroutine with which it communicates. That coroutine is responsible for managing blocking, but also other aspects of thread-async interaction (i.e., communicating requests, getting results, handling cancellation, etc.). All of this is really the whole nugget of the idea presented in the talk–that you can build a threading library where blocking operations are handled in an async library as opposed to blocking in the operating system kernel. By blocking in an async library in this way, you get all of the fancy new magic like task groups, cancellation, and more. AWAIT is what enables that on the thread side.

Although thredo is implemented on Curio, there’s no real reason why you couldn’t make something similar work on other async libraries. The key concept that if a thread is going to block, you just make sure it happens on an async event loop, not in the OS. In other words, if you want make a “modern” thread library, you’d implement it on top of async, because, well, of course you would!