Discussion forum for David Beazley

Adventures in Adding ZeroMQ Support


So, I woke up this morning thinking about ZeroMQ and the problem of how it might be supported in Curio. Looking around in the source code for pyzmq, I noticed that they defined their own custom selector class for use with asyncio (https://github.com/zeromq/pyzmq/blob/master/zmq/asyncio/init.py).

Even though Curio doesn’t use any part of asyncio, it does use the same underlying selector classes for I/O polling. So, I got to wondering if I could just plug this selector into Curio and have it work. It turns out that the answer is yes. So, with that selector, a small Curio compatible version of the Socket class, and a custom zmq context, it seems to work. See the code: https://github.com/dabeaz/curio/blob/master/curio/zmq.py

Using that, you can write some fairly normal looking zmq code using Curio. For example, here is some code that pushes messages out on a ZMQ PUSH socket:

import curio.zmq as zmq

async def pusher(address):
    ctx = zmq.Context()
    sock = ctx.socket(zmq.PUSH)
    for n in range(100):
        await sock.send(b'Message %d' % n)
        await sock.send(b'exit')

if __name__ == '__main__':

Here is the corresponding client code:

import curio.zmq as zmq

async def puller(address):
    ctx = zmq.Context()
    sock = ctx.socket(zmq.PULL)
    while True:
        msg = await sock.recv()
        if msg == b'exit':
        print('Got:', msg)

if __name__ == '__main__':

For a day of coding, I’m probably missing something critical, but for now it all seems to work. Cool.


Cool. I also use a custom selector class that conforms to the Selector interface. It also works with Curio just fine.


@kdart: neat! Mind saying a bit more about what you’re doing? I’m sure
folks would be interested.


Yes, I’d be interested in hearing more too. Implementing a custom selector has always been an open avenue for integrating Curio with other things. However, it’s not something that I’ve really explored until recently with the ZeroMQ support.


Sure. It’s an IO event handler I had previously that is basically a more full-featured, Pythonic wrapper around kqueue system call (I expect that it will only be run on OS X). e.g. it also handles signals and timers. I looked at the selectors interface and made it conform to that so it could also be be used with asyncio. It also supports other interfaces needed by other frameworks, such as urwid. This way I can keep one event loop for everything.

I am using curio in a test automation framework where I’d like to keep the basic flow a more typical “script”, but also handle asynchronous events. I use it two ways. Sometimes I want to, synchronously, deal with multiple file descriptors (e.g. some kind of copy operation) that may be any type of fd, such as socket, tty, or pipe. So I invoke it (in a wrapper function) and the script of course blocks until it’s done.
The other way, for persistent copy services, is to first spawn a thread and then run a curio/async loop in the thread (with a new kernel).
I like the flexibility of the async/await plus curio to pop into and out of async loops when needed form a more traditional “script”.


Just took a look at this again. It seems like you could get rid of the need to use a special selector by fetching the notification fd via zmq_getsockopt(..., ZMQ_FD), and whenever you get zmq.Again doing an await _read_wait(zmq_fd)? AFAICT this would literally be like a 3 line change…

[Edit: on further thought I guess there’s one tricky bit, which is that if you have one task blocked waiting to read and one task blocked waiting to write, then they’ll both be blocked on _read_wait(the_same_fd), and we would need the semantics to be that this is legal and both tasks get woken up. Interesting that this is an actual use case for that! But, uh, also annoying. I guess the janky workaround would be to have _zmq_{read,write}_wait(zmq_fd) traps that allow one task to block on read and one on write, and picks which to wake up using ZMQ_EVENTS. Basically the same stuff the kernel already does, except it’d be kinda ugly to have ZMQ-specific stuff in the kernel like that.]


Reading up on that ZMQ_FD option seems to indicate that it’s a particularly tricky beast to work with. Interesting.

On the subject of ZeroMQ sockets though, it is technically legal to simultaneously have pending recv()/send() operations active on the same ZMQ socket? I don’t know if I’ve ever used ZeroMQ in that way. Many of the sockets have very specific patterns of usage that you’re supposed to follow.

This might require some further experimentation.