'How to implement sending a heartbeat message using a boost beast websocket client

I need to implement sending a heartbeat message to a third party server with regular intervals. The server uses these heartbeat messages to determine if that the client is still connected to it. When receiving heartbeat messages, the server does not respond to them, but simply takes note of them. If within a certain time (20 seconds) the server does not receive the next heartbeat message, then it terminates the connection.

A similar topic was raised here. sehe gave an example of how to send heartbeat messages in asynchronous mode from the boost tcp server to the client. The example works great.

However, when I try to implement something like this using websocket (boost beast), I almost immediately get the error:

// If this assert goes off it means you are attempting to
// simultaneously initiate more than one of same asynchronous
// operation, which is not allowed. For example, you must wait
// for an async_read to complete before performing another
// async_read.
//
BOOST_ASSERT(id_ != T::id);

Below is a sample code to illustrate the problem.

Server (taken from example from here with minor change):

namespace beast = boost::beast;         // from <boost/beast.hpp>
namespace http = beast::http;           // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio;            // from <boost/asio.hpp>
using tcp = boost::asio::ip::tcp;       // from <boost/asio/ip/tcp.hpp>

//------------------------------------------------------------------------------

// Report a failure
void
fail(beast::error_code ec, char const* what)
{
    std::cerr << what << ": " << ec.message() << "\n";
}

// Echoes back all received WebSocket messages
class session : public std::enable_shared_from_this<session>
{
    websocket::stream<beast::tcp_stream> ws_;
    beast::flat_buffer buffer_;

public:
    // Take ownership of the socket
    explicit
    session(tcp::socket&& socket)
            : ws_(std::move(socket))
    {
    }

    // Get on the correct executor
    void
    run()
    {
        // We need to be executing within a strand to perform async operations
        // on the I/O objects in this session. Although not strictly necessary
        // for single-threaded contexts, this example code is written to be
        // thread-safe by default.
        net::dispatch(ws_.get_executor(),
                      beast::bind_front_handler(
                              &session::on_run,
                              shared_from_this()));
    }

    // Start the asynchronous operation
    void
    on_run()
    {
        // Set suggested timeout settings for the websocket
        ws_.set_option(
                websocket::stream_base::timeout::suggested(
                        beast::role_type::server));

        // Set a decorator to change the Server of the handshake
        ws_.set_option(websocket::stream_base::decorator(
                [](websocket::response_type& res)
                {
                    res.set(http::field::server,
                            std::string(BOOST_BEAST_VERSION_STRING) +
                            " websocket-server-async");
                }));
        // Accept the websocket handshake
        ws_.async_accept(
                beast::bind_front_handler(
                        &session::on_accept,
                        shared_from_this()));
    }

    void
    on_accept(beast::error_code ec)
    {
        if(ec)
            return fail(ec, "accept");

        // Read a message
        do_read();
    }

    void
    do_read()
    {
        // Read a message into our buffer
        ws_.async_read(
                buffer_,
                beast::bind_front_handler(
                        &session::on_read,
                        shared_from_this()));
    }

    void
    on_read(
            beast::error_code ec,
            std::size_t bytes_transferred)
    {
        boost::ignore_unused(bytes_transferred);

        // This indicates that the session was closed
        if(ec == websocket::error::closed)
            return;

        if(ec)
            fail(ec, "read");

        std::string message = boost::beast::buffers_to_string(buffer_.data());
        if (message != std::string("ZZZ"))
        {
            // Echo the message
            ws_.text(ws_.got_text());
            ws_.async_write(
                    buffer_.data(),
                    beast::bind_front_handler(
                            &session::on_write,
                            shared_from_this()));
        }

    }

    void
    on_write(
            beast::error_code ec,
            std::size_t bytes_transferred)
    {
        boost::ignore_unused(bytes_transferred);

        if(ec)
            return fail(ec, "write");

        // Clear the buffer
        buffer_.consume(buffer_.size());

        // Do another read
        do_read();
    }
};

//------------------------------------------------------------------------------

// Accepts incoming connections and launches the sessions
class listener : public std::enable_shared_from_this<listener>
{
    net::io_context& ioc_;
    tcp::acceptor acceptor_;

public:
    listener(
            net::io_context& ioc,
            tcp::endpoint endpoint)
            : ioc_(ioc)
            , acceptor_(ioc)
    {
        beast::error_code ec;

        // Open the acceptor
        acceptor_.open(endpoint.protocol(), ec);
        if(ec)
        {
            fail(ec, "open");
            return;
        }

        // Allow address reuse
        acceptor_.set_option(net::socket_base::reuse_address(true), ec);
        if(ec)
        {
            fail(ec, "set_option");
            return;
        }

        // Bind to the server address
        acceptor_.bind(endpoint, ec);
        if(ec)
        {
            fail(ec, "bind");
            return;
        }

        // Start listening for connections
        acceptor_.listen(
                net::socket_base::max_listen_connections, ec);
        if(ec)
        {
            fail(ec, "listen");
            return;
        }
    }

    // Start accepting incoming connections
    void
    run()
    {
        do_accept();
    }

private:
    void
    do_accept()
    {
        // The new connection gets its own strand
        acceptor_.async_accept(
                net::make_strand(ioc_),
                beast::bind_front_handler(
                        &listener::on_accept,
                        shared_from_this()));
    }

    void
    on_accept(beast::error_code ec, tcp::socket socket)
    {
        if(ec)
        {
            fail(ec, "accept");
        }
        else
        {
            // Create the session and run it
            std::make_shared<session>(std::move(socket))->run();
        }

        // Accept another connection
        do_accept();
    }
};

//------------------------------------------------------------------------------

int main(int argc, char* argv[])
{
    // Check command line arguments.
    if (argc != 4)
    {
        std::cerr <<
                  "Usage: websocket-server-async <address> <port> <threads>\n" <<
                  "Example:\n" <<
                  "    websocket-server-async 0.0.0.0 8080 1\n";
        return EXIT_FAILURE;
    }
    auto const address = net::ip::make_address(argv[1]);
    auto const port = static_cast<unsigned short>(std::atoi(argv[2]));
    auto const threads = std::max<int>(1, std::atoi(argv[3]));

    // The io_context is required for all I/O
    net::io_context ioc{threads};

    // Create and launch a listening port
    std::make_shared<listener>(ioc, tcp::endpoint{address, port})->run();

    // Run the I/O service on the requested number of threads
    std::vector<std::thread> v;
    v.reserve(threads - 1);
    for(auto i = threads - 1; i > 0; --i)
        v.emplace_back(
                [&ioc]
                {
                    ioc.run();
                });
    ioc.run();

    return EXIT_SUCCESS;
}

Asynchronous boost beast client (an example from here is taken as a basis):

namespace beast = boost::beast;         // from <boost/beast.hpp>
namespace http = beast::http;           // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio;            // from <boost/asio.hpp>
using tcp = boost::asio::ip::tcp;       // from <boost/asio/ip/tcp.hpp>

//------------------------------------------------------------------------------

// Report a failure
void
fail(beast::error_code ec, char const* what)
{
    std::cerr << what << ": " << ec.message() << "\n";
}

// Sends a WebSocket message and prints the response
class session : public std::enable_shared_from_this<session>
{
    boost::asio::ip::tcp::socket socket_;
    tcp::resolver resolver_;
    websocket::stream<beast::tcp_stream> ws_;
    beast::flat_buffer buffer_;
    std::string host_;
    std::string text_;
    const std::string hbmsg = "ZZZ";
    boost::asio::high_resolution_timer hbtimer { socket_.get_executor() };

public:
    // Resolver and socket require an io_context

    explicit
    session(net::io_context& ioc)
            : socket_ (net::make_strand(ioc)),
              resolver_(net::make_strand(ioc)),
              ws_(net::make_strand(ioc))
    {
    }

    //explicit
    //session(boost::asio::ip::tcp::socket&& s) : socket_(std::move(s)), ws_(std::move(socket_)) {}

    // Start the asynchronous operation
    void
    run(
            char const* host,
            char const* port,
            char const* text)
    {
        // Save these for later
        host_ = host;
        text_ = text;

        // Look up the domain name

        resolver_.async_resolve(
                host,
                port,
                beast::bind_front_handler(
                        &session::on_resolve,
                        shared_from_this()));
    }

    void
    on_resolve(
            beast::error_code ec,
            tcp::resolver::results_type results)
    {
        if(ec)
            return fail(ec, "resolve");

        // Set the timeout for the operation
        beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));

        // Make the connection on the IP address we get from a lookup
        beast::get_lowest_layer(ws_).async_connect(
                results,
                beast::bind_front_handler(
                        &session::on_connect,
                        shared_from_this()));
    }

    void
    on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type)
    {
        if(ec)
            return fail(ec, "connect");

        // Turn off the timeout on the tcp_stream, because
        // the websocket stream has its own timeout system.
        beast::get_lowest_layer(ws_).expires_never();

        // Set suggested timeout settings for the websocket
        ws_.set_option(
                websocket::stream_base::timeout::suggested(
                        beast::role_type::client));

        // Set a decorator to change the User-Agent of the handshake
        ws_.set_option(websocket::stream_base::decorator(
                [](websocket::request_type& req)
                {
                    req.set(http::field::user_agent,
                            std::string(BOOST_BEAST_VERSION_STRING) +
                            " websocket-client-async");
                }));

        // Perform the websocket handshake
        ws_.async_handshake(host_, "/",
                            beast::bind_front_handler(
                                    &session::on_handshake,
                                    shared_from_this()));
    }

    void
    on_handshake(beast::error_code ec)
    {
        if(ec)
            return fail(ec, "handshake");

        // Send the message
        /*
        ws_.async_write(
                net::buffer(text_),
                beast::bind_front_handler(
                        &session::on_write,
                        shared_from_this()));
        */
        hb_wait();
        req_loop();

    }

    void hb_wait(boost::beast::error_code ec = {}) {
        if(ec)
            return fail(ec, "hb_wait");

        hbtimer.expires_from_now(std::chrono::milliseconds(1000));
        hbtimer.async_wait([this](boost::system::error_code ec) { hb_send(ec); });
    }

    void hb_send(boost::beast::error_code ec) {
        if(ec)
            return fail(ec, "hb_send");

        ws_.async_write(boost::asio::buffer(hbmsg), [this](boost::system::error_code ec, size_t) { hb_wait(ec); });
    }

    void req_loop(boost::beast::error_code ec = {}, std::size_t bytes_transferred = 0) {
        if(ec)
            return fail(ec, "req_loop");

        ws_.async_write(
                net::buffer(text_),
                beast::bind_front_handler(
                        &session::on_write,
                        shared_from_this()));
    }


    void
    on_write(
            beast::error_code ec,
            std::size_t bytes_transferred)
    {
        boost::ignore_unused(bytes_transferred);

        if(ec)
            return fail(ec, "write");

        // Read a message into our buffer
        ws_.async_read(
                buffer_,
                beast::bind_front_handler(
                        &session::on_read,
                        shared_from_this()));
    }

    void
    on_read(
            beast::error_code ec,
            std::size_t bytes_transferred)
    {
        boost::ignore_unused(bytes_transferred);

        if(ec)
            return fail(ec, "read");

        std::string message = boost::beast::buffers_to_string(buffer_.data());
        std::cout << beast::make_printable(buffer_.data()) << std::endl;
        buffer_.consume(buffer_.size());

        ws_.async_write(
                boost::asio::buffer(message),
                beast::bind_front_handler(
                        &session::req_loop,
                        shared_from_this()));

        //ws_.async_write(boost::asio::buffer(message), [this](boost::beast::error_code ec, size_t) { req_loop(ec); });



    }

};

//------------------------------------------------------------------------------

int main(int argc, char** argv)
{
    // Check command line arguments.
    if(argc != 4)
    {
        std::cerr <<
                  "Usage: websocket-client-async <host> <port> <text>\n" <<
                  "Example:\n" <<
                  "    websocket-client-async echo.websocket.org 80 \"Hello, world!\"\n";
        return EXIT_FAILURE;
    }
    auto const host = argv[1];
    auto const port = argv[2];
    auto const text = argv[3];

    // The io_context is required for all I/O
    net::io_context ioc;
    boost::asio::ip::tcp::socket sock(ioc);
    // Launch the asynchronous operation
    std::make_shared<session>(ioc)->run(host, port, text);

    // Run the I/O service. The call will return when
    // the socket is closed.
    ioc.run();

    return EXIT_SUCCESS;
}

How to solve the problem of sending heartbeat messages in boost beast in asynchronous mode?
Is this possible in principle? Sorry for my bad english.



Solution 1:[1]

Of curse boost can do such works, And I'm also writing the heartbeat websocket using boost these days. Here's some of my implement details and other problems, which already been solved in this issue.

Generally, I meet similar error, and assertion info says that:

you are attempting to issue two of the same asynchronous I/O operation at the same time, without waiting for the first one to complete. For example, attempting two simultaneous calls to async_read_some. Only one pending call of each I/O type (read and write) is permitted.

which means you may successively call the same async_xxxx intentionally or unintentionally.

here's boost thread asynchronous Operations doc, which said:

Like a regular Boost.Asio socket, a stream is not thread safe. Callers are responsible for synchronizing operations on the socket using an implicit or explicit strand, as per the Asio documentation. The websocket stream asynchronous interface supports one of each of the following operations to be active at the same time:

  • async_read or async_read_some
  • async_write or async_write_some
  • async_ping or async_pong
  • async_close

the following code is produces undefined behavior, because the program is attempting to perform two simultaneous reads:

ws.async_read(b, [](error_code, std::size_t){});
ws.async_read(b, [](error_code, std::size_t){});

For example, you async_connected twice in a short time, and then these async_connect handler calls their own async_write or async_read at the same time following the flow of your codes.

Hope this helps. If there's any thing wring or mistakes in explanation, please pointing out. Thanks.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1