'How to implement sending a heartbeat message using a boost beast websocket client
I need to implement sending a heartbeat message to a third party server with regular intervals. The server uses these heartbeat messages to determine if that the client is still connected to it. When receiving heartbeat messages, the server does not respond to them, but simply takes note of them. If within a certain time (20 seconds) the server does not receive the next heartbeat message, then it terminates the connection.
A similar topic was raised here. sehe gave an example of how to send heartbeat messages in asynchronous mode from the boost tcp server to the client. The example works great.
However, when I try to implement something like this using websocket (boost beast), I almost immediately get the error:
// If this assert goes off it means you are attempting to
// simultaneously initiate more than one of same asynchronous
// operation, which is not allowed. For example, you must wait
// for an async_read to complete before performing another
// async_read.
//
BOOST_ASSERT(id_ != T::id);
Below is a sample code to illustrate the problem.
Server (taken from example from here with minor change):
namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio; // from <boost/asio.hpp>
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
//------------------------------------------------------------------------------
// Report a failure
void
fail(beast::error_code ec, char const* what)
{
std::cerr << what << ": " << ec.message() << "\n";
}
// Echoes back all received WebSocket messages
class session : public std::enable_shared_from_this<session>
{
websocket::stream<beast::tcp_stream> ws_;
beast::flat_buffer buffer_;
public:
// Take ownership of the socket
explicit
session(tcp::socket&& socket)
: ws_(std::move(socket))
{
}
// Get on the correct executor
void
run()
{
// We need to be executing within a strand to perform async operations
// on the I/O objects in this session. Although not strictly necessary
// for single-threaded contexts, this example code is written to be
// thread-safe by default.
net::dispatch(ws_.get_executor(),
beast::bind_front_handler(
&session::on_run,
shared_from_this()));
}
// Start the asynchronous operation
void
on_run()
{
// Set suggested timeout settings for the websocket
ws_.set_option(
websocket::stream_base::timeout::suggested(
beast::role_type::server));
// Set a decorator to change the Server of the handshake
ws_.set_option(websocket::stream_base::decorator(
[](websocket::response_type& res)
{
res.set(http::field::server,
std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-server-async");
}));
// Accept the websocket handshake
ws_.async_accept(
beast::bind_front_handler(
&session::on_accept,
shared_from_this()));
}
void
on_accept(beast::error_code ec)
{
if(ec)
return fail(ec, "accept");
// Read a message
do_read();
}
void
do_read()
{
// Read a message into our buffer
ws_.async_read(
buffer_,
beast::bind_front_handler(
&session::on_read,
shared_from_this()));
}
void
on_read(
beast::error_code ec,
std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
// This indicates that the session was closed
if(ec == websocket::error::closed)
return;
if(ec)
fail(ec, "read");
std::string message = boost::beast::buffers_to_string(buffer_.data());
if (message != std::string("ZZZ"))
{
// Echo the message
ws_.text(ws_.got_text());
ws_.async_write(
buffer_.data(),
beast::bind_front_handler(
&session::on_write,
shared_from_this()));
}
}
void
on_write(
beast::error_code ec,
std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
if(ec)
return fail(ec, "write");
// Clear the buffer
buffer_.consume(buffer_.size());
// Do another read
do_read();
}
};
//------------------------------------------------------------------------------
// Accepts incoming connections and launches the sessions
class listener : public std::enable_shared_from_this<listener>
{
net::io_context& ioc_;
tcp::acceptor acceptor_;
public:
listener(
net::io_context& ioc,
tcp::endpoint endpoint)
: ioc_(ioc)
, acceptor_(ioc)
{
beast::error_code ec;
// Open the acceptor
acceptor_.open(endpoint.protocol(), ec);
if(ec)
{
fail(ec, "open");
return;
}
// Allow address reuse
acceptor_.set_option(net::socket_base::reuse_address(true), ec);
if(ec)
{
fail(ec, "set_option");
return;
}
// Bind to the server address
acceptor_.bind(endpoint, ec);
if(ec)
{
fail(ec, "bind");
return;
}
// Start listening for connections
acceptor_.listen(
net::socket_base::max_listen_connections, ec);
if(ec)
{
fail(ec, "listen");
return;
}
}
// Start accepting incoming connections
void
run()
{
do_accept();
}
private:
void
do_accept()
{
// The new connection gets its own strand
acceptor_.async_accept(
net::make_strand(ioc_),
beast::bind_front_handler(
&listener::on_accept,
shared_from_this()));
}
void
on_accept(beast::error_code ec, tcp::socket socket)
{
if(ec)
{
fail(ec, "accept");
}
else
{
// Create the session and run it
std::make_shared<session>(std::move(socket))->run();
}
// Accept another connection
do_accept();
}
};
//------------------------------------------------------------------------------
int main(int argc, char* argv[])
{
// Check command line arguments.
if (argc != 4)
{
std::cerr <<
"Usage: websocket-server-async <address> <port> <threads>\n" <<
"Example:\n" <<
" websocket-server-async 0.0.0.0 8080 1\n";
return EXIT_FAILURE;
}
auto const address = net::ip::make_address(argv[1]);
auto const port = static_cast<unsigned short>(std::atoi(argv[2]));
auto const threads = std::max<int>(1, std::atoi(argv[3]));
// The io_context is required for all I/O
net::io_context ioc{threads};
// Create and launch a listening port
std::make_shared<listener>(ioc, tcp::endpoint{address, port})->run();
// Run the I/O service on the requested number of threads
std::vector<std::thread> v;
v.reserve(threads - 1);
for(auto i = threads - 1; i > 0; --i)
v.emplace_back(
[&ioc]
{
ioc.run();
});
ioc.run();
return EXIT_SUCCESS;
}
Asynchronous boost beast client (an example from here is taken as a basis):
namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio; // from <boost/asio.hpp>
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
//------------------------------------------------------------------------------
// Report a failure
void
fail(beast::error_code ec, char const* what)
{
std::cerr << what << ": " << ec.message() << "\n";
}
// Sends a WebSocket message and prints the response
class session : public std::enable_shared_from_this<session>
{
boost::asio::ip::tcp::socket socket_;
tcp::resolver resolver_;
websocket::stream<beast::tcp_stream> ws_;
beast::flat_buffer buffer_;
std::string host_;
std::string text_;
const std::string hbmsg = "ZZZ";
boost::asio::high_resolution_timer hbtimer { socket_.get_executor() };
public:
// Resolver and socket require an io_context
explicit
session(net::io_context& ioc)
: socket_ (net::make_strand(ioc)),
resolver_(net::make_strand(ioc)),
ws_(net::make_strand(ioc))
{
}
//explicit
//session(boost::asio::ip::tcp::socket&& s) : socket_(std::move(s)), ws_(std::move(socket_)) {}
// Start the asynchronous operation
void
run(
char const* host,
char const* port,
char const* text)
{
// Save these for later
host_ = host;
text_ = text;
// Look up the domain name
resolver_.async_resolve(
host,
port,
beast::bind_front_handler(
&session::on_resolve,
shared_from_this()));
}
void
on_resolve(
beast::error_code ec,
tcp::resolver::results_type results)
{
if(ec)
return fail(ec, "resolve");
// Set the timeout for the operation
beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));
// Make the connection on the IP address we get from a lookup
beast::get_lowest_layer(ws_).async_connect(
results,
beast::bind_front_handler(
&session::on_connect,
shared_from_this()));
}
void
on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type)
{
if(ec)
return fail(ec, "connect");
// Turn off the timeout on the tcp_stream, because
// the websocket stream has its own timeout system.
beast::get_lowest_layer(ws_).expires_never();
// Set suggested timeout settings for the websocket
ws_.set_option(
websocket::stream_base::timeout::suggested(
beast::role_type::client));
// Set a decorator to change the User-Agent of the handshake
ws_.set_option(websocket::stream_base::decorator(
[](websocket::request_type& req)
{
req.set(http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-client-async");
}));
// Perform the websocket handshake
ws_.async_handshake(host_, "/",
beast::bind_front_handler(
&session::on_handshake,
shared_from_this()));
}
void
on_handshake(beast::error_code ec)
{
if(ec)
return fail(ec, "handshake");
// Send the message
/*
ws_.async_write(
net::buffer(text_),
beast::bind_front_handler(
&session::on_write,
shared_from_this()));
*/
hb_wait();
req_loop();
}
void hb_wait(boost::beast::error_code ec = {}) {
if(ec)
return fail(ec, "hb_wait");
hbtimer.expires_from_now(std::chrono::milliseconds(1000));
hbtimer.async_wait([this](boost::system::error_code ec) { hb_send(ec); });
}
void hb_send(boost::beast::error_code ec) {
if(ec)
return fail(ec, "hb_send");
ws_.async_write(boost::asio::buffer(hbmsg), [this](boost::system::error_code ec, size_t) { hb_wait(ec); });
}
void req_loop(boost::beast::error_code ec = {}, std::size_t bytes_transferred = 0) {
if(ec)
return fail(ec, "req_loop");
ws_.async_write(
net::buffer(text_),
beast::bind_front_handler(
&session::on_write,
shared_from_this()));
}
void
on_write(
beast::error_code ec,
std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
if(ec)
return fail(ec, "write");
// Read a message into our buffer
ws_.async_read(
buffer_,
beast::bind_front_handler(
&session::on_read,
shared_from_this()));
}
void
on_read(
beast::error_code ec,
std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
if(ec)
return fail(ec, "read");
std::string message = boost::beast::buffers_to_string(buffer_.data());
std::cout << beast::make_printable(buffer_.data()) << std::endl;
buffer_.consume(buffer_.size());
ws_.async_write(
boost::asio::buffer(message),
beast::bind_front_handler(
&session::req_loop,
shared_from_this()));
//ws_.async_write(boost::asio::buffer(message), [this](boost::beast::error_code ec, size_t) { req_loop(ec); });
}
};
//------------------------------------------------------------------------------
int main(int argc, char** argv)
{
// Check command line arguments.
if(argc != 4)
{
std::cerr <<
"Usage: websocket-client-async <host> <port> <text>\n" <<
"Example:\n" <<
" websocket-client-async echo.websocket.org 80 \"Hello, world!\"\n";
return EXIT_FAILURE;
}
auto const host = argv[1];
auto const port = argv[2];
auto const text = argv[3];
// The io_context is required for all I/O
net::io_context ioc;
boost::asio::ip::tcp::socket sock(ioc);
// Launch the asynchronous operation
std::make_shared<session>(ioc)->run(host, port, text);
// Run the I/O service. The call will return when
// the socket is closed.
ioc.run();
return EXIT_SUCCESS;
}
How to solve the problem of sending heartbeat messages in boost beast in asynchronous mode?
Is this possible in principle? Sorry for my bad english.
Solution 1:[1]
Of curse boost can do such works, And I'm also writing the heartbeat websocket using boost these days. Here's some of my implement details and other problems, which already been solved in this issue.
Generally, I meet similar error, and assertion info says that:
you are attempting to issue two of the same asynchronous I/O operation at the same time, without waiting for the first one to complete. For example, attempting two simultaneous calls to async_read_some. Only one pending call of each I/O type (read and write) is permitted.
which means you may successively call the same async_xxxx intentionally or unintentionally.
here's boost thread asynchronous Operations doc, which said:
Like a regular Boost.Asio socket, a stream is not thread safe. Callers are responsible for synchronizing operations on the socket using an implicit or explicit strand, as per the Asio documentation. The websocket stream asynchronous interface supports one of each of the following operations to be active at the same time:
async_read
orasync_read_some
async_write
orasync_write_some
async_ping
orasync_pong
async_close
the following code is produces undefined behavior, because the program is attempting to perform two simultaneous reads:
ws.async_read(b, [](error_code, std::size_t){});
ws.async_read(b, [](error_code, std::size_t){});
For example, you async_connect
ed twice in a short time, and then these async_connect handler calls their own async_write
or async_read
at the same time following the flow of your codes.
Hope this helps. If there's any thing wring or mistakes in explanation, please pointing out. Thanks.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 |