'How do I convert (node js) sequential processing of AWS log stream to mysql local db to parallel processing, without Error: Pool is closed. message?

The node js code operates as follows; fnGetLogEvents() calls AWS GetLogEventsCommand to get the events of the log stream putCloudwatchLog is called to convert each dynoPObj object, one of the events, to mysql rec and write via a pooled connection, which works in series by waiting for each db write to complete

require('dotenv').config();
const { CloudWatchLogsClient, DescribeLogStreamsCommand, GetLogEventsCommand } = require('@aws-sdk/client-cloudwatch-logs');

// a client can be shared by different commands.
const client = new CloudWatchLogsClient({ region: process.env.AWS_REGN });

const mysql = require('mysql2/promise');

const config = {
    host: process.env.DB_HOST,
    user: process.env.DB_USER,
    password: process.env.DB_PASS,
    database: process.env.DB_NAME,
    waitForConnections: true,
    connectionLimit: 10,
    queueLimit: 0
};
const pool = mysql.createPool(config);

const paramsEvents = {
  logGroupName: process.env.AWS_LG_GRP
};

const commandEvents = new GetLogEventsCommand(paramsEvents);

// START of INSERT into mysqldb
async function putCloudwatchLog(pool, cloudwatchArray, tableName) {
    console.log('putCloudwatchLog START');

    let uiVar = [];
    let uiStmt = 'INSERT INTO ' + tableName + ' (';
    let uiVal = 'VALUES (' ;

    let kvCntr = 0;
    let kvLength = Object.keys(cloudwatchArray).length

    for (const keyValue in cloudwatchArray) {
        if (++kvCntr == kvLength) {
          uiStmt = uiStmt + keyValue + ') ' ;
          uiVal  = uiVal  + '?)' ;
        } else {
          uiStmt = uiStmt + keyValue + ',' ;
          uiVal  = uiVal  + '?,' ;
        }
        uiVar.push(cloudwatchArray[keyValue]);
    }
    uiStmt += uiVal ;
  
    //  START
    const connection = await pool.getConnection();
    console.log('putCloudwatchLog POST CONNECTION');
    try {
        // V4 - additional cli parameters to specify group and index
        console.log('putCloudwatchLog PRE-QUERY', uiStmt, uiVar);
        const results = await connection.query(uiStmt, uiVar);

        connection.release();
        return results[0];

    }   catch (err) {
        console.log('putCloudwatchLog ERR', err);
        connection.release();
        throw err; // not connected!
    }
    //  END
}
// END of dbInsert

async function fnGetLogEvents(logStreamName) {
  try {
    paramsEvents.logStreamName = logStreamName ;
    const response = await client.send(commandEvents);
    
    let i=0;
    for (var record of response.events) {
        ++i;

        var switchStr = record.message.replace(/^[\{\"]*([a-zA-Z0-9_]+).*\n/, '$1');
        process.stdout.write('SWITCH:' + switchStr + ':');

        // start
        switch (switchStr) {
            case 'dynoParams':
                let dynoPObj = {};
                let dynoObjNew = JSON.parse(record.message);
                dynoPObj.request_ID = dynoObjNew.dynoParams.awsRequestId;
                dynoPObj.message_ID = dynoObjNew.dynoParams.MessageId;
                dynoPObj.log_stream_name = dynoObjNew.dynoParams.logStreamName;
                dynoPObj.uuid = dynoObjNew.dynoParams.uuid;
                console.log('dynoParamsObj:',dynoPObj);
                
                // insert putCloudWatchLog here with dynoPObj !!
                await putCloudwatchLog(pool, dynoPObj, 'CLOUDWATCH_PARAMS_LOG');

                break;
            default:
                console.log(" record" );
        }
        //end

    }
  } catch (error) {
    // error handling.
    console.log(error);
  } finally {
    // finally.
    console.log("fnGetLogEvents: END");
    return;
  }
}

async function main() {
  try {
    await fnGetLogEvents(process.env.AWS_LG_STRM);
  } catch (error) {
    // error handling.
    console.log(error);
  } finally {
    // finally.
    console.log("main: END");
    pool.end();
  }
}

main();

I need help converting this to parallel processing, without exhausting the connection pool. If I remove the await when calling connection.query(uiStmt, uiVar); I get the following error;

(node:11449) UnhandledPromiseRejectionWarning: Error: Pool is closed.
    at /home/ec2-user/AWS/r2data-index/br-cloudwatch-logs-2-mysql/node_modules/mysql2/lib/pool.js:57:21
    at PoolConnection.<anonymous> (/home/ec2-user/AWS/r2data-index/br-cloudwatch-logs-2-mysql/node_modules/mysql2/lib/connection.js:777:13)
    at Object.onceWrapper (events.js:483:26)
    at PoolConnection.emit (events.js:376:20)
    at ClientHandshake.<anonymous> (/home/ec2-user/AWS/r2data-index/br-cloudwatch-logs-2-mysql/node_modules/mysql2/lib/connection.js:121:14)
    at ClientHandshake.emit (events.js:376:20)
    at ClientHandshake.execute (/home/ec2-user/AWS/r2data-index/br-cloudwatch-logs-2-mysql/node_modules/mysql2/lib/commands/command.js:49:10)
    at PoolConnection.handlePacket (/home/ec2-user/AWS/r2data-index/br-cloudwatch-logs-2-mysql/node_modules/mysql2/lib/connection.js:456:32)
    at PacketParser.onPacket (/home/ec2-user/AWS/r2data-index/br-cloudwatch-logs-2-mysql/node_modules/mysql2/lib/connection.js:85:12)
    at PacketParser.executeStart (/home/ec2-user/AWS/r2data-index/br-cloudwatch-logs-2-mysql/node_modules/mysql2/lib/packet_parser.js:75:16)



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source