Version
tokio v0.2.21
Platform
Linux
Description
I noticed that poll_write is called instead of poll_write_buf when I call write_buf on a Boxed object.
I tried this code:
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::{Buf, Bytes};
use tokio::io::{self, AsyncWrite, AsyncWriteExt};
struct MockWrite {}
impl AsyncWrite for MockWrite {
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
_buf: &[u8],
) -> Poll<io::Result<usize>> {
println!("poll_write");
Poll::Ready(Ok(0))
}
fn poll_write_buf<B: Buf>(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
_buf: &mut B,
) -> Poll<io::Result<usize>> {
println!("poll_write_buf");
Poll::Ready(Ok(0))
}
#[inline]
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}
#[tokio::main(core_threads = 1)]
async fn main() {
let mut buf = Bytes::from("1");
let mut io = MockWrite{};
io.write_buf(&mut buf).await.unwrap();
let mut buf2 = Bytes::from("1");
let mut io2 = Box::new(MockWrite{});
io2.write_buf(&mut buf2).await.unwrap();
let mut buf3 = Bytes::from("1");
let mut io3 = MockWrite{};
let mut io3 = &mut io3;
(&mut io3).write_buf(&mut buf3).await.unwrap();
}
I expected to see this happen: (output of the code)
poll_write_buf
poll_write_buf
poll_write_buf
Instead, this happened:
poll_write_buf
poll_write
poll_write
The same issue applies to the read_buf in AsyncReadExt as well.
This issue may not impact the correctness of write_buf/read_buf but it may impact its performance since it disables vectored IO.
I believe I have a fix for it.
Thanks! To fix it, you should modify this part of the code. Do you want to open a PR with that too?
The compilation failure in #2611 explains why the buf version is not implemented: It is a generic function, so it cannot be called on a trait object. Sorry, this is not possible to fix due to how the Rust programming language works.
Sad!
I realized that I can have a workaround in my code by creating a minimal clone of AsyncWrite.
That solves my problem without requiring the real issue inside tokio to be fixed.
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct WriteVec<'a, W, B> {
writer: &'a mut W,
buf: &'a mut B,
}
pub trait AsyncWriteVec {
fn poll_write_vec<B: Buf>(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
_buf: &mut B,
) -> Poll<io::Result<usize>>;
fn write_vec <'a, B>(&'a mut self, src: &'a mut B) -> WriteVec<'a, Self, B>
where
Self: Sized,
B: Buf,
{
WriteVec { writer: self, buf: src }
}
}
impl<W, B> Future for WriteVec<'_, W, B>
where
W: AsyncWriteVec,
B: Buf,
{
type Output = io::Result<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
// safety: no data is moved from self
unsafe {
let me = self.get_unchecked_mut();
Pin::new_unchecked(&mut *me.writer).poll_write_vec(cx, &mut me.buf)
}
}
}
and
impl AsyncWriteVec for MockWrite {
fn poll_write_vec<B: Buf>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut B,
) -> Poll<io::Result<usize>> {
self.poll_write_buf(cx, buf)
}
}
Hopefully the snippet above could help others if they encounter the same issue.
@eaufavor
// safety: no data is moved from self unsafe { let me = self.get_unchecked_mut(); Pin::new_unchecked(&mut *me.writer).poll_write_vec(cx, &mut me.buf) }
BTW, this is a Pin<&mut Type> to Pin<Field> projection and is unsound if W is not Unpin (you can move W after WriteVec dropped):
https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=2145ea8ea52a73e63f65b1d5b34850c2
The correct projection is Pin<&mut Type> to Pin<&mut Field>. In this case, it is Pin<&mut WriteVec<'_, W, B>> to Pin<&mut &mut W>, and it needs to add W: Unpin bounds to convert Pin<&mut &mut W> to Pin<&mut W>.
Filed #2612 as I confirmed that read_buf and write_buf have the same unsafe code.