'MQTT client waits indefinitely during publish of message

I try to implement an asynchronous MQTT client with the paho library, that receives messages on topic "request", formulates a string and puts the response out on topic "response". I use the callbacks to handle the incoming messages.

#include "mqtt/async_client.h"  
#include "mqtt/topic.h"

const std::string SERVER_ADDRESS {"tcp://localhost:2883"};
const std::string CLIENT_ID {"test_client"};

class TestCallback : public virtual mqtt::callback
{
    // the mqtt client
    mqtt::async_client& cli_;

    // (re)connection success
    void connected(const std::string& cause) override
    {
        cli_.subscribe("request", 0);
    }

   // callback for when a message arrives.
    void message_arrived(mqtt::const_message_ptr msg) override 
    {
        if( msg->get_topic() == "request" )
        {   
            /* format response message here and put it into (string) msg */

            mqtt::message_ptr pubmsg = mqtt::make_message("response", msg);
            pubmsg->set_qos(2);

            //// PROBLEMATIC CODE ////
            cli_.publish(pubmsg)->wait();
            //////////////////////////
        }
    }

public:
    TestCallback(mqtt::async_client& cli)
        : cli_(cli) {}
};



int main(int argc, char** argv)
{    
    mqtt::async_client cli(SERVER_ADDRESS, CLIENT_ID);
    TestCallback cb(cli);
    cli.set_callback(cb);

    mqtt::connect_options connOpts = mqtt::connect_options_builder()
        .clean_session(false)
        .automatic_reconnect()
        .finalize();

    try
    {
        cli.connect(connOpts)->wait();
    }
    catch (const mqtt::exception& exc)
    {
        std::cerr << "[ERROR] " << exc.what() << std::endl;
        return 1;
    }

    // run until the application is shut down
    while (std::tolower(std::cin.get()) != 'q')
            ;

    try
    {
        cli.disconnect()->wait();
    }
    catch (const mqtt::exception& exc)
    {
        std::cerr << "[ERROR] " << exc.what() << std::endl;
        return 1;
    }

    return 0;
}

The problem arises when I try to publish the response message, as the client seems to wait indefinitely. Responsible for this is the wait function which is used on a token to track the status of the published message (reference). To my understanding, this has to be done especially when using higher levels of QoS so ensure everything went well.

Upon removal of the call to wait(), it works as expected. But I am not sure if this ensures the correct publishing of messages.

What is the correct way to do this?



Solution 1:[1]

I'm going to make a guess here, because I don't really know how async works in C++.

The MQTT client has a single message handling thread, this deals with all the incoming and outgoing TCP packets as they arrive/depart on the socket. When a new MQTT message arrives it then calls the message handler callback (message_arrived), in which you call publish and wait for it to complete. But because the call to wait effectively blocks message_arrived the message handling thread can not continue. This means it can not deal with the 3 legged QOS2 handshake required for the publish to complete, hence it hangs.

I will also guess that if you changed the publish to QOS 0 it would complete, but would also fail with QOS 1 as that requires the message handling thread to send/receive multiple messages to continue.

Not waiting for the publish to complete is probably the correct solution.

Solution 2:[2]

Yes, @hardillb is right: the problem is that you can not make a blocking call to the library from within a callback. And wait() is a blocking call, so it deadlocks the callback thread.

There's a single thread processing the incoming packets from the MQTT connection, and that thread is used to invoke the callbacks. When you call wait() on a QoS 1 publish, it blocks the input processing, so it can't process the PUBACK to complete the wait.

If you're going to use callbacks, you sort of need to go "all in" on them and use an additional callback to indicate success/failure completion of the publish.

Honestly, I was never a big fan of callback-driven asynchronous I/O; it's confusing and puts a heavy burden of thread synchronization on the app. But the initial goal of the C++ lib was to make it similar to the earlier IBM Java library.

I much prefer the future/promise (async/await) style. I think if/when there's a revamped v2.0 of the library it'll just implement that style.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 hardillb
Solution 2 Frank P