'How to use grpc c++ ClientAsyncReader<Message> for server side streams

I am using a very simple proto where the Message contains only 1 string field. Like so:

service LongLivedConnection {  
  // Starts a grpc connection
  rpc Connect(Connection) returns (stream Message) {}
}

message Connection{
  string userId = 1;
}

message Message{
  string serverMessage = 1;
}

The use case is that the client should connect to the server, and the server will use this grpc for push messages.

Now, for the client code, assuming that I am already in a worker thread, how do I properly set it up so that I can continuously receive messages that come from server at random times?

void StartConnection(const std::string& user) {
  Connection request;
  request.set_userId(user);

  Message message;
  ClientContext context;

  stub_->Connect(&context, request, &reply);

  // What should I do from now on? 

  // notify(serverMessage);
}

void notify(std::string message) {
  // generate message events and pass to main event loop
}



Solution 1:[1]

I figured out how to used the api. Looks like it is pretty flexible, but still a little bit weird given that I typically just expect the async api to receive some kind of lambda callback.

The code below is blocking, you'll have to run this in a different thread so it doesn't block your application.

I believe you can have multiple thread accessing the CompletionQueue, but in my case I just had one single thread handling this grpc connection.

GrpcConnection.h file:
public:
void StartGrpcConnection();

private:
std::shared_ptr<grpc::Channel> m_channel;
std::unique_ptr<grpc::ClientReader<push_notifications::Message>> m_reader;
std::unique_ptr<push_notifications::PushNotificationService::Stub> m_stub;
GrpcConnection.cpp files:
...
void GrpcConnectionService::StartGrpcConnection()
{
    m_channel = grpc::CreateChannel("localhost:50051",grpc::InsecureChannelCredentials());
    LongLiveConnection::Connect request;
    request.set_user_id(12345);
    m_stub = LongLiveConnection::LongLiveConnectionService::NewStub(m_channel);
    
    grpc::ClientContext context;
    grpc::CompletionQueue cq;
    std::unique_ptr<grpc::ClientAsyncReader<LongLiveConnection::Message>> reader =
            m_stub->PrepareAsyncConnect(&context, request, &cq);

    void* got_tag;
    bool ok = false;
    LongLiveConnection::Message reply;

    reader->StartCall((void*)1);
    cq.Next(&got_tag, &ok);

    if (ok && got_tag == (void*)1)
    {
        // startCall() is successful if ok is true, and got_tag is void*1

        // start the first read message with a different hardcoded tag
        reader->Read(&reply, (void*)2);

        while (true)
        {
            ok = false;

            cq.Next(&got_tag, &ok);

            if (got_tag == (void*)2)
            {
                // this is the message from server
                std::string body = reply.server_message();
                // do whatever you want with body, in my case i push it to my applications' event stream to be processed by other components
                
                // lastly, initialize another read
                reader->Read(&reply, (void*)2);
            }
            else if (got_tag == (void*)3)
            {
                 // if you do something else, such as listening to GRPC channel state change, in your call, you can pass a different hardcoded tag, then, in here, you will be notified when the result is received from that call.
            }
        }
    }
}

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Zhen Liu