asyncio.StreamWriter's write, drain and the absence of yield


#1

Hi, I am working on a sans-io implementation of amqp client. So far I only implemented asyncio adapter - and I faced the issue that if one coroutine does nothing but writing and draining on a non paused connection, instead of yielding to the event loop, drain short-circuits making it impossible for other coroutines to be executed in between.

My use case is publishing lots of messages, and publishing in amqp do not have a reply - so there’s nothing I can wait for. However, if publish fails (for example, because of wrong serialization code), the broker sends an exception asynchronously which is read by another coroutine running as a task in “background” - and since drain short-circuits, this task never executes and I can’t handle the exception until all messages are published.

I’m looking for an idea about how to deal with such situation.
Of course, I can put (await asyncio.sleep(0)) right after (await writer.drain), but it makes everything 2.5-3.5 times slower. Or I can pretend this is not an issue and nobody will ever try to send millions of messages and some of them could be malformed.
I’d like to know if there’s something in between - that can be as fast as the current version and can yield to the event loop somehow to receive messages from the broker.

I thank in advance anyone who share their ideas!


#2

I don’t know if there is a better option than await asyncio.sleep(0). Maybe fut = asyncio.Future(); fut.set_result(None); await fut will be a bit faster? Or you could keep a counter and insert a sleep(0) after every, say, 10 messages? (Trio’s networking layer always yields to the event loop when you write, exactly to avoid these kinds of issues – make it right, then make it fast :slight_smile: – and I’ve thought about using the “only yield occasionally” trick to speed things up if we need to.)


#3

Thanks, Nathaniel. Counter definitely works.
I tried to set the write limit to 50 writer.write(); await writer.drain() calls and then do an explicit await asyncio.sleep(0). Publish rate is still around 12k messages per second - so no performance penalty at all!

Looks like this problem is so complex that all existing asyncio-based amqp client implementations simply don’t bother to solve it.


#4

With respect to Curio, you would also need to take explicit steps to yield (such as inserting a sleep call or putting a counter in the code).

I must admit that I am generally curious about these sorts of things though. For example, it is common to see real-world applications that saturate I/O at such a high rate that they would never ever block due to some latency or slow-down elsewhere in the system?

Big picture, it’s not something that I would probably modify Curio to deal with implicitly–I’d punt it up to the application and make it give some further guidance in the form of explicit yields or some other mechanism to make sure other things can run if needed.


#5

Maybe it’s not that common - even to publish 12k messages per second you must get the payload somewhere. It would probably be a HTTP request payload so there’s another coroutines to run and to await.

Having said that, I still think that a library (in my case, amqproto) should behave correctly. Partially because it is attractive to say in the readme that this library behaves more correctly than the others. I’m sure you and Nathaniel understand this :slight_smile:

Partially because new users probably will run a similar test I wrote to play with the library. And it won’t be a pleasant experience if things go wrong from the very beginning, on a developer’s machine. On the other hand, it could be a surprise if things go right! :slight_smile:

I’m also concerned about the fact that somebody will have such IO intensive application and they will come up with a (probably) wrong solution - like sleeping every time we write to the socket or, even worse, locking on the write/drain calls (but, most likely they put these solutions in their own codebase instead of making a PR). It would be really difficult to explain why these solutions are wrong and how to properly fix the issue if I, by myself, had never thought about and tried to solve it.


#6

The point about “correctness” is well-taken, but I’ve never viewed correctness as a simple on-off switch. There are many dimensions to how systems might behave, and performance is certainly one of them. You, yourself mentioned a 2.5-3.5x slowdown due to added async.sleep(0) calls and a desire to avoid that.

My comment about Curio is that it takes no position whatsoever about “correctness” on this I/O case. That is, it performs the I/O operation immediately if it can and blocks if it must. Beyond that, it’s not dictating any kind of additional policy about task scheduling. I’m not suggesting that this would be something that your users would have to deal with themselves, but as a library writer, it’s something you’d certainly need to be aware of when using Curio. I don’t think it’s hard to solve–there are a variety of knobs that can be turned to make Curio do what you need it to do.


#7

Sure it’s not! And I’m definitely not asking asyncio, curio or trio to provide a “default knob” that doesn’t need to be turned.
Although it would be good if this little nuance is documented somewhere so I, as a client author, can be aware of this before facing it in production.


#8

Oh definitely! I’m already thinking of how I might add some things to the docs. So, all of this discussion is pretty interesting.