'Pubsub push subscription not acknowledging messages

This is my setup.

Subscription A is a push subscription that POSTs messages to a cloud Run deployment.

That deployment exposes an HTTP endpoint, processes the message, posts the result to Topic B, and responds 200 to subscription A's POST request. The whole process takes ~1.5 seconds.

Therefore, for every message in subscription A, I should end up with 1 message in Topic B. This is how my code looks like

My app started an Express server

const express = require('express');
const bodyParser = require('body-parser');

const _ = require('lodash');

const startBrowser = require('./startBrowser');
const tab = require('./tab');
const createMessage = require('./publishMessage');
const domain = 'https://example.com';

require('dotenv').config();

const app = express();
app.use(bodyParser.json());


const port = process.env.PORT || 8080;
app.listen(port, async () => {
  console.log('Listening on port', port);
});

The endpoint where all the magic happens

app.post('/', async (req, res) => {

  // Define the success and fail functions, that respond status 200 and 500 respectively
  const failed = () => res.status(500).send();
  const completed = async () => {
    const response = await res.status(200).send();
    if (response && res.writableEnded) {
      console.log('successfully responded 200');
    }
  };

  //Process the data coming from Subscription A
  let pubsubMessage = decodeBase64Json(req.body.message.data);
  let parsed =  await processor(pubsubMessage);
  
  //Post the processed data to topic B
  let messageId = await postParsedData(parsed);


  if (messageId) {
  // ACK the message once the data has been processed and posted to topic B.
    completed();

  } else {
    console.log('Didnt get a message id');
    // failed();
  }
});

//define the functions that post data to Topic B

const postParsedData = async (parsed) => {
  if (!_.isEmpty(parsed)) {
    const topicName = 'topic-B';
    const messageIdInternal = await createMessage(parsed, topicName);
  };


    return messageId;
  } else {
    console.log('Parsed is Empty');
    return null;
  }
};

function decodeBase64Json(data) {
  return JSON.parse(Buffer.from(data, 'base64').toString());
}

Execution time takes about ~1.5 seconds and I can see the successful responses logged on Cloud run every ~1.5seconds. That adds up to about ~2400messages/hour (per cloud run instance).

Topic B is getting new messages at ~2400messages/hour, Subscription A's acknowledgement rate is ~200messages/hour, which leads to the messages being re-delivered many times.

Subscription A's Acknowledgement deadline is 600 seconds. The request timeout period in Cloud run is 300 seconds.

I've tried ACKing messages before they're published to topic-B or even before parsing, but I'm getting the same result.

Edit: added screenshot of the pending messages and processed messages. Many more messages processed than ACKed pending messages. Should be 1:1

Screenshot

Thanks for your help

Solution This error could not be reproduced by GCP support. It didn't happen with large amounts of Cloud Run VMs. The solution is just to increase the number of worker instances



Solution 1:[1]

You need to await your complete(); function call. like this

....
 if (messageId) {
  // ACK the message once the data has been processed and posted to topic B.
    await completed();

  } else {
    console.log('Didnt get a message id');
    // failed();
  }

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 guillaume blaquiere