What is AsyncThread.target for?


#1

AsyncThread.__init__ receive a target param:

class AsyncThread(object):

    def __init__(self, target, args=(), kwargs={}, daemon=False):
       # hold the target
        self.target = target

and when thread running, it would call self.target:

async def start(self):
    self._task = await spawn(self._coro_runner, daemon=True)
    # when the thread start running, it would call self._func_runner
    self._thread = threading.Thread(target=self._func_runner, daemon=True)
    # start thread
    self._thread.start()

def _func_runner(self):
    _locals.thread = self
    try:
        # call self.target
        self._result_value = self.target(*self.args, **self.kwargs)
        self._result_exc = None
    except BaseException as e:
        self._result_value = None
        self._result_exc = e
    # we send None to start coro, but what about self._result_value?
    self._request.set_result(None)
    self._terminate_evt.set()

and seems that we call self.target, and assign the result to self._result_value, and nothing more.

so, what is the target for? i am so confused.


#2

The target argument specifies the synchronous function that’s going to run in the thread–the same for threading.Thread objects in the standard library. You can see it used in the _func_runner() when it calls self.target(*self.args, **self.kwargs)


#3

Thanks for reply!

i know how a threading.Thread runs, and i just can`t understand the relationship between self._func_runner

and self._coro_runner, and try to find out that how to use AsyncThread correctly.


await AsyncThread.join to get the result of a sync function, which running in a thread

in curio, we could call await in AsyncThread.join to get the result of self.target, which is running in a thread.

1. how self._func_runner works

in self._func_runner, we call self.target(*self.args, **kwargs), and set self._terminate_evt.

and self._func_runner will runs in other thread.

def _func_runner(self):
    _locals.thread = self
    try:
        # ** here, got the result of self.target **
        self._result_value = self.target(*self.args, **self.kwargs)
        self._result_exc = None
    except BaseException as e:
        self._result_value = None
        self._result_exc = e

    # ** here, we break self._coro_runner by sending None**
    self._request.set_result(None)

    # ** here, AsyncThread.join would return**
    self._terminate_evt.set()

join for result:

async def join(self):
    # ** here, wait self._terminate_evt set, which set in self._func_runner **
    await self._terminate_evt.wait()
    if self._result_exc:
        raise errors.TaskError() from self._result_exc
    else:
        # ** here, return result **
        return self._result_value

2. self._coro_runner break, but nothing more

self._coro_runner would break if self._coro is None:

        await disable_cancellation(_future_wait(self._request))
        self._coro = self._request.result()
        self._request = Future()

        # **here, If no coroutine, we're shutting down **
        if not self._coro:
            break

and, when self.target runs done, send None to self._request, and self._coro_runner break.

def _func_runner(self):
    # omit self.target

   # ** here **
    self._request.set_result(None)

seems that we do not need self. _coro_runner


runs a coroutine in a thread, and get the result in sync way

in self._coro_runner, we run self._coro, and set event

        # Run the the coroutine
        try:
            self._result_value = await self._coro
            self._result_exc = None

        except BaseException as e:
            self._result_value = None
            self._result_exc = e

        # ** here, Hand it back to the thread **
        self._done_evt.set()

and then we could call AsyncThread.AWAIT, no await, to get result:

def AWAIT(self, coro):
   # ** here, send coro to self._coro_runner **
    self._request.set_result(coro)

   # ** wait event set **
    self._done_evt.wait()

but, if we want to call AsyncThread.AWAIT, we must start a AsyncThread, that means that we have to call AsyncThread.start, and then we break self._coro_runner.

if self._coro_runner had gone already, then we would block in AsyncThread.AWAIT eventually, because the self._done_evt would never be set,


Here are some examples

1. await AsyncThread.join

it is works well

import curio


def syn_thread_func(a, b):
    return sum([a, b])


async def test_async_thread():
    athread = curio.thread.AsyncThread(target=syn_thread_func, args=(1, 2))
    await athread.start()
    data = await athread.join()
    print('done, data is: %s' % data)
    return


def main():
    curio.run(test_async_thread(), with_monitor=True)
    return


if __name__ == '__main__':
    main()

2. block in AsyncThread.AWAIT

import curio


async def async_func():
    return 'async func'


def syn_thread_func(a, b):
    return sum([a, b])


async def test_async_thread():
    athread = curio.thread.AsyncThread(target=syn_thread_func, args=(1, 2))
    await athread.start()
    data = athread.AWAIT(async_func())
    print('done, data is: %s' % data)
    return


def main():
    curio.run(test_async_thread(), with_monitor=True)
    return


if __name__ == '__main__':
    main()

we must pass a sync function to AsyncThread.__init__ to setup a AsyncThread.

and then we block!!

3. maybe use curio.thread.AWAIT

raise errors.AsyncOnlyError

import curio


async def async_func():
    return 'async func'


async def test_async_thread():
    data = curio.thread.AWAIT(async_func())
    print('done, data is: %s' % data)
    return


def main():
    curio.run(test_async_thread(), with_monitor=True)
    return


if __name__ == '__main__':
    main()

looking forward to reply!


#4

The only valid example is (1). The AWAIT method of an AsyncThread is to never be used in async code (it’s an operational error).

More details about async threads and usage can be found at http://curio.readthedocs.io/en/latest/devel.html#programming-with-threads

Also: http://curio.readthedocs.io/en/latest/reference.html#asynchronous-threads


#5

thanks!

now i kind of got it.