Beast: strand/executor issue

Created on 23 Apr 2019  路  26Comments  路  Source: boostorg/beast

Hi,

my WebSession class looked like this:

class WebSession : public enable_shared_from_this<WebSession>
{
public:
    explicit WebSession(tcp::socket socket) :
        m_Buffer(),
        m_WebSocket(std::move(socket)),
        m_Strand(m_WebSocket.get_executor())
    {
    }

private:
    websocket::stream<tcp::socket> m_WebSocket;
    boost::asio::strand<boost::asio::io_context::executor_type> m_Strand;
    boost::beast::multi_buffer m_Buffer;

};

With the latest version boost 1.70 the code doesn't compile anymore at the following line

m_Strand(m_WebSocket.get_executor())

error C2664: 'boost::asio::strand::strand(boost::asio::strand &&) noexcept': cannot convert argument 1 from 'boost::asio::executor' to 'const boost::asio::io_context::executor_type &'
note: Reason: cannot convert from 'boost::asio::executor' to 'const boost::asio::io_context::executor_type'
note: No user-defined-conversion operator available that can perform this conversion, or the operator cannot be called

Any idea how to fix this?

Most helpful comment

Yes just like that

All 26 comments

Yes, remove m_Strand and construct the websocket with boost::asio::make_strand(socket.get_executor())

Thanks for your fast reply. And what about the following calls?

m_WebSocket.async_accept(boost::asio::bind_executor(m_Strand, std::bind(&WebSession::OnAccept, shared_from_this(), std::placeholders::_1)));

m_WebSocket.async_read(m_Buffer, boost::asio::bind_executor(m_Strand, std::bind(&WebSession::OnRead, shared_from_this(), std::placeholders::_1, std::placeholders::_2)));

m_WebSocket.async_write(m_Buffer.data(), boost::asio::bind_executor(m_Strand, std::bind(&WebSession::OnWrite, shared_from_this(), std::placeholders::_1, std::placeholders::_2)));

By constructing the websocket with the strand, you do not need to call bind_executor anymore, see:
https://github.com/boostorg/asio/commit/59066d80b26e1d5b83b60d127ee17948d9ae9702

See also:
http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p1322r0.html

Ok, that solved it. Thanks again.

What about running_in_this_thread(). How to use this if strand is incapsulated in ws stream object?

Use ws.get_executor().running_in_this_thread()

error: no member named 'running_in_this_thread' in 'boost::asio::executor'
https://wandbox.org/permlink/ErNto4m8i0RwHF1a

You have to use a NextLayer that names the strand instead of the polymorphic executor, something like:

beast::websocket::stream<
    asio::ip::tcp::basic_socket<
        asio::strand<net::io_context::executor_type>>> ws;

Thanks, so in ssl context smth like

boost::beast::websocket::stream<boost::beast::ssl_stream< boost::beast::basic_stream<boost::asio::ip::tcp, boost::asio::strand<boost::asio::io_context::executor_type>, boost::beast::unlimited_rate_policy>>> ?

Yes just like that

Thanks

@vinniefalco
How the example from docs:
https://www.boost.org/doc/libs/1_71_0/libs/beast/example/websocket/client/async/websocket_client_async.cpp
will look like in case of applying strand from https://github.com/boostorg/beast/issues/1589#issuecomment-530162897:

https://wandbox.org/permlink/CtsYIKpNIjelFNUw

Will the use of strand executor make possible to concurrently call async_write from several threads, or is a queue necessary?

Will the use of strand executor make possible to concurrently call async_write from several threads, or is a queue necessary?

You need a queue. See: https://github.com/boostorg/beast/blob/develop/example/websocket/server/chat-multi/websocket_session.cpp#L102

Thanks, @vinniefalco
I saw this implementation, but it鈥檚 not entirely clear to me how the queue (queue_) is protected from concurrent access from multiple threads.

Right now I am using an atomic write counter with the read-modify-write operation to prevent race condition in call async_write from multiple threads. And as a queue I use boost::asio - in case the writing is already in progress, I just call the boost::post.

Sorry for the stupid question, but if post to strand protect this members from from concurrent access why we need to protect member ws_ (call async_write) from concurrent execution.

Why can't we just do something like this:

// Can be executed from any thread
void websocket_session::write(std::string &&ss) {
  if (!ws.get_executor().running_in_this_thread()) {
    net::post(ws_.get_executor(),
              beast::bind_front_handler(&websocket_session::write, shared_from_this(), std::move(ss)));
  } else {
    ws_.async_write(net::buffer(ss), beast::bind_front_handler(&websocket_session::on_write, shared_from_this()));
  }
}

In your code, ss goes out of scope after ws_.async_write(net::buffer(ss)... returns and invalidates the buffer, resulting in undefined behavior.

In your code, ss goes out of scope after ws_.async_write(net::buffer(ss)... returns and invalidates the buffer, resulting in undefined behavior.

Thanks!
You are absolutely right!!! Perhaps the queue option is not so bad, although it looks a bit clumsy.

@vinniefalco sorry for disturbing you again... We were also using an algorithm with atomics to try to fix the problem with multiple async calls, we've found your comment recently and wanted to give it a try.

While trying to implement this I got very confused with the strand concept.

Before 1.70.0 we were creating the strands explicitly, just like OP did. Then we updated to 1.70.0 and did the changes to make it work: here is a discussion regarding this.

So let's start from the beginning, a strand object is(according to the official docs):

The io_context::strand class provides the ability to post and dispatch handlers with the guarantee that none of those handlers will execute concurrently.

All clear.

Now before 1.70.0 we were creating the strands explicit:

ws(std::move(socket(io_context)));

strand(io_context);

And when we wanted to use it, we would pass the strand to the async call:

ws.async_accept(
  boost::asio::bind_executor(
     strand, std::bind(&WebSession::on_accept, shared_from_this(), std::placeholders::_1)
  )
);

This gave us the freedom to use multiple strands on the same ws. For example I could have a read strand and a write strand, because I do not care if the completion handlers for write and read operations would execute concurrently. I only cared that the read completion handlers will not execute concurrently with each other and the same thing with the write completion handlers.

Now in 1.7x.0 things have changed, we are creating the ws like this:

boost::asio::ip::tcp::socket socket( ::boost::asio::make_strand( io_context ) );

boost::beast::tcp_stream stream(::std::move(socket));
 ::boost::beast::ssl_stream<::boost::beast::tcp_stream> ssl_stream(::std::move(stream));

::boost::beast::websocket::stream<
    ::boost::beast::ssl_stream<::boost::beast::tcp_stream>>
    ws(::std::move(ssl_stream));

And calling async methods like this:

ws.async_write( boost::asio::buffer( "Message"),
      std::bind( &ws_server::on_write_websocket, this, message_ptr,
        std::placeholders::_1,
        std::placeholders::_2 ) );

All clear, but after reading this issue I got a little bit confused. Notice that I have declared the ws as:

::boost::beast::websocket::stream<
    ::boost::beast::ssl_stream<::boost::beast::tcp_stream>>
    ws(::std::move(ssl_stream));

Not like you said:

You have to use a NextLayer that names the strand instead of the polymorphic executor, something like:

beast::websocket::stream<
    asio::ip::tcp::basic_socket<
        asio::strand<net::io_context::executor_type>>> ws;

Does this mean that we are not using strand, even if we did create the socket using make_strand?

On which completion handlers does that strand apply(the one we created with make_strand and passed to the socket)? Read? Write? Both? How can I manually specify which strand to apply on which completion handlers?

Is this a valid case: to use read and write strands? I'm asking this because I see it as a performance improvement to use two strands, the reads and writes are separated and can be done concurrently without any problem...

What is the purpose of asio::strand<net::io_context::executor_type> in the type of the ws? It's just so we can access more methods? Like running_in_this_thread(methods from the strand object)?

LE:

I forgot to mention, we are using tcp_stream from beast which is declared like this:

using tcp_stream = basic_stream<
    net::ip::tcp,
    net::executor,
    unlimited_rate_policy>;

To apply the strand layer we should do something like this: ?

using tcp_stream_strand =
  basic_stream<net::ip::tcp,
  boost::asio::strand<boost::asio::io_context::executor_type>,
  unlimited_rate_policy>

Thanks :smile:

Is this a valid case: to use read and write strands?

Nope, that will cause undefined behavior, because two different threads could touch the websocket::stream object at the same time. Generally speaking, I/O objects are not thread safe and may not be accessed concurrently.

The purpose of putting the strand executor on the stream is for convenience. It makes calls to initiating functions easier to read (because you don't have to bind the strand executor to the callback every time).

Hey @vinniefalco,
like I said in the previous comment:

I forgot to mention, we are using tcp_stream from beast which is declared like this..

I did replace tcp_stream from beast with:

using tcp_stream_strand = ::boost::beast::basic_stream<::boost::asio::ip::tcp,
  boost::asio::strand<boost::asio::io_context::executor_type>,
  ::boost::beast::unlimited_rate_policy>;

But I got an error and I do not know how to fix it:

/celibs/boost_1_71_0/boost/beast/core/impl/basic_stream.hpp:38:17: error: no matching function for call to 'boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::strand<boost::asio::io_context::executor_type> >::basic_stream_socket(boost::asio::basic_stream_socket<boost::asio::ip::tcp>)'

   38 |     , timer(ex())

Here's the source code, it's only 3 lines in main: https://godbolt.org/z/UwLdY1

using tcp_stream_strand = ::boost::beast::basic_stream<::boost::asio::ip::tcp,
  boost::asio::strand<boost::asio::io_context::executor_type>,
  ::boost::beast::unlimited_rate_policy>;

int main()
{
    boost::asio::io_context ioc;
    boost::asio::ip::tcp::socket socket(::boost::asio::make_strand(ioc));
    tcp_stream_strand stream(::std::move(socket)); // here is the error
   // boost::beast::tcp_stream stream(::std::move(socket)); // This works
    return 0;
}

I do not understand why it doesn't work... What am I doing wrong?

I do not understand why it doesn't work... What am I doing wrong?

https://github.com/boostorg/beast/blob/develop/include/boost/beast/core/basic_stream.hpp#L240

Should read:

        net::basic_waitable_timer<
            chrono::steady_clock,
                Executor> timer; // rate timer;

I don't think that's your problem though, I'm working on it.

I do not understand why it doesn't work... What am I doing wrong?

I see the problem. You are attempting to use this constructor overload:

https://www.boost.org/doc/libs/1_71_0/doc/html/boost_asio/reference/basic_stream_socket/basic_stream_socket/overload10.html

However, this overload is only available if the following condition is met:

is_convertible< Protocol1, Protocol >::value &&is_convertible< Executor1, Executor >::value

Here, _Executor1_ is net::executor while _Executor_ is net::io_context::executor_type. You cannot construct a net::io_context::executor_type from a net::executor (you can go the other way though).

If you want to use the concrete type in the beast::basic_stream rather than the polymorphic executor wrapper net::executor, then you must also use the concrete type on the socket. This should compile for you:

net::basic_stream_socket<
    net::ip::tcp,
    net::strand<
        net::io_context::executor_type>
            > sock(net::make_strand(ioc));
basic_stream<
    net::ip::tcp,
    net::strand<
        net::io_context::executor_type>,
    unlimited_rate_policy> stream(std::move(sock));
BOOST_STATIC_ASSERT(
    std::is_convertible<
        decltype(sock)::executor_type,
        decltype(stream)::executor_type>::value);

Thanks, that works :smile:

Today I've reread your previous answer again and there are some things that I thought I knew how they work in boost asio...

I though that the purpose of the strand is only to avoid having multiple completion handlers invoked concurrently.(serialize completion handlers of the async operations)

But the strand also serializes the async calls. Let me give you an example:

If I'm using a strand for a stream, when I'm calling two async methods from two different threads: async_read and async_write(on the same stream), those calls aren't done in parallel, those calls are serialize. Right?

It's like when you call async_read, this will be posted in the internal strand of the stream, same thing for the async_write. Then the worker thread will get the async_read from the strand and execute it(which should return immediately) and then it calls async_write(I'm not talking about completion handler, those will also be posted after the async calls are made).

Otherwise there would be two calls on the stream object from two different threads, and there would be a data race. Because:

I/O objects are not thread safe and may not be accessed concurrently.

Can you confirm this? Or the async calls do not modify the stream so there is no need to serialize them?

Thank you again, really. You've helped me alot. :smile:

If I'm using a strand for a stream, when I'm calling two async methods from two different threads: async_read and async_write(on the same stream), those calls aren't done in parallel, those calls are serialize. Right?

This is undefined behavior. You cannot call any member functions concurrently. You must make sure that only one thread is accessing the socket at a time. The easiest way to do this is to use a strand. See:
image

Perfect. Thanks. This is something that I didn't knew :smile:

Was this page helpful?
0 / 5 - 0 ratings

Related issues

vinniefalco picture vinniefalco  路  4Comments

inetic picture inetic  路  4Comments

tensor5375 picture tensor5375  路  4Comments

ksergey picture ksergey  路  6Comments

MarcoRhayden picture MarcoRhayden  路  6Comments