Version
tokio v0.3.2
mio v0.7.4
Platform
macOS 10.15.7
Description
While upgrading trust-dns to tokio 0.3, I've run into an issue with tokio or possibly mio in regards to UdpSocket usage. The Trust-DNS proto library has two implementations for implementing usage over UdpSocket, UdpClientStream and UdpStream. The UdpClientStream appears to have no issues, but the UdpStream's implementation which worked fine in tokio 0.2, is experiencing issues. I haven't tracked this down yet, but am considering that it might be due to interest on the UdpSocket not being set properly, given the fact that we multiplex the socket, it's possible the order of read/write operations is an issue?
We hang on recv here: https://github.com/bluejekyll/trust-dns/blob/2941221fe50326a01d2ccab5489623e30d4edc79/crates/proto/src/udp/udp_stream.rs#L154
The test I'm using to produce this error is here: https://github.com/bluejekyll/trust-dns/blob/2941221fe50326a01d2ccab5489623e30d4edc79/crates/proto/src/tests/udp.rs#L20-L108
And the hung block_on is here: https://github.com/bluejekyll/trust-dns/blob/2941221fe50326a01d2ccab5489623e30d4edc79/crates/proto/src/tests/udp.rs#L97
The server defined here (with std::net::UdpSocket) get's one message, returns it, and then hangs waiting for the next message from the client here: https://github.com/bluejekyll/trust-dns/blob/2941221fe50326a01d2ccab5489623e30d4edc79/crates/proto/src/tests/udp.rs#L64
It's going to take me some effort to setup my local env to get to the root cause of this, I'm hoping someone else has seen this, or something similar.
One possibility that immediately comes to mind is that with Tokio 0.3, you must set the socket to non-blocking mode manually before using UdpSocket::from_std, if you create the socket through that method. I can't immediately see whether you are using that method, but it has tripped up several people so far, so I might as well mention it before investigating further.
I saw that note. I am using the Tokio UdpSocket, even given that, because of the issue you mentioned I added some code to ensure non-blocking was set on the socket just to double check. It did not resolve the issue for me.
If it blocks in a non-blocking call, that implies to me that the socket is not a non-blocking socket. We would need to track down how that happens.
I haven't yet tracked this down in enough detail, i.e. I'm not yet clear if we are "blocked" on the socket, or if we've missed a readable event for some reason, and are stuck waiting for another that won't ever come.
Ok, I'm going to close this. I refactored the code that from a manually built Future to use async fn and this resolved the issue I was seeing. The only thing I can guess is that in Tokio or mio something changed with UdpSocket such that a single poll is no longer enough to drive perform a send or recv in certain circumstances. I did not track down why that's the case though.
I'm actually going to reopen, as I think there really is an issue here. I've found a work around for this by cx.waker().wake_by_ref(); before returning a Poll::Pending in the Stream impl of the UdpStream linked above.
This is obviously not ideal... it's odd that this is not an issue in the refactored code that was refactored to an async fn. Will continue to investigate.
Oh, I had a closer look. These two lines create a future, poll it once, then drop it, cancelling the future.
ready!(socket.send_to(message.bytes(), addr).poll_unpin(cx))?;
```rust
let (len, src) = ready!(socket.recv_from(&mut buf).poll_unpin(cx))?;
You should be calling the `poll_*` methods on Tokio's `UdpSocket` rather than the async fns, which have a destructor that de-registers the waker.
If you wish to keep the signature of your `UdpSocket` trait, you can implement it as follows:
```rust
use futures::future::poll_fn;
#[cfg(feature = "tokio-runtime")]
#[async_trait]
impl UdpSocket for tokio::net::UdpSocket {
type Time = crate::TokioTime;
async fn bind(addr: &SocketAddr) -> io::Result<Self> {
tokio::net::UdpSocket::bind(addr).await
}
async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
poll_fn(|cx| self.poll_recv_from(cx, buf)).await
}
async fn send_to(&mut self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> {
poll_fn(|cx| self.poll_send_to(cx, buf, target)).await
}
}
although be aware that using it like that will cause a memory allocation for every call to poll_*. I would recommend adding non-async poll_* variants to the trait itself.
@Darksonn just to be clear, even if you implement the trait that way, if you try to call it from a Poll returning method (say a Stream impl) with socket.recv_from(&mut buf).poll_unpin(cx) you will run into the same registration issues, yes?
If you use the poll_fn based implementation I posted, then the destructor of the future doesn't do anything, and thus it _would_ work.
Oh, that would be a simple fix. I think we might want to drop that async_trait, impl. I really appreciate the feedback on this @Darksonn, I should have responded earlier, but have been distracted with other events over the weekend.
@leshow and @Darksonn, I just confirmed this locally. This fixes that issue, thank you for the pointer. I'll leave a note to go back and add the poll methods on that interface, as I agree that it would be more performant.
I really appreciate your help here.