'How do I convert (node js) sequential processing of AWS log stream to mysql local db to parallel processing, without Error: Pool is closed. message?
The node js code operates as follows; fnGetLogEvents() calls AWS GetLogEventsCommand to get the events of the log stream putCloudwatchLog is called to convert each dynoPObj object, one of the events, to mysql rec and write via a pooled connection, which works in series by waiting for each db write to complete
require('dotenv').config();
const { CloudWatchLogsClient, DescribeLogStreamsCommand, GetLogEventsCommand } = require('@aws-sdk/client-cloudwatch-logs');
// a client can be shared by different commands.
const client = new CloudWatchLogsClient({ region: process.env.AWS_REGN });
const mysql = require('mysql2/promise');
const config = {
host: process.env.DB_HOST,
user: process.env.DB_USER,
password: process.env.DB_PASS,
database: process.env.DB_NAME,
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0
};
const pool = mysql.createPool(config);
const paramsEvents = {
logGroupName: process.env.AWS_LG_GRP
};
const commandEvents = new GetLogEventsCommand(paramsEvents);
// START of INSERT into mysqldb
async function putCloudwatchLog(pool, cloudwatchArray, tableName) {
console.log('putCloudwatchLog START');
let uiVar = [];
let uiStmt = 'INSERT INTO ' + tableName + ' (';
let uiVal = 'VALUES (' ;
let kvCntr = 0;
let kvLength = Object.keys(cloudwatchArray).length
for (const keyValue in cloudwatchArray) {
if (++kvCntr == kvLength) {
uiStmt = uiStmt + keyValue + ') ' ;
uiVal = uiVal + '?)' ;
} else {
uiStmt = uiStmt + keyValue + ',' ;
uiVal = uiVal + '?,' ;
}
uiVar.push(cloudwatchArray[keyValue]);
}
uiStmt += uiVal ;
// START
const connection = await pool.getConnection();
console.log('putCloudwatchLog POST CONNECTION');
try {
// V4 - additional cli parameters to specify group and index
console.log('putCloudwatchLog PRE-QUERY', uiStmt, uiVar);
const results = await connection.query(uiStmt, uiVar);
connection.release();
return results[0];
} catch (err) {
console.log('putCloudwatchLog ERR', err);
connection.release();
throw err; // not connected!
}
// END
}
// END of dbInsert
async function fnGetLogEvents(logStreamName) {
try {
paramsEvents.logStreamName = logStreamName ;
const response = await client.send(commandEvents);
let i=0;
for (var record of response.events) {
++i;
var switchStr = record.message.replace(/^[\{\"]*([a-zA-Z0-9_]+).*\n/, '$1');
process.stdout.write('SWITCH:' + switchStr + ':');
// start
switch (switchStr) {
case 'dynoParams':
let dynoPObj = {};
let dynoObjNew = JSON.parse(record.message);
dynoPObj.request_ID = dynoObjNew.dynoParams.awsRequestId;
dynoPObj.message_ID = dynoObjNew.dynoParams.MessageId;
dynoPObj.log_stream_name = dynoObjNew.dynoParams.logStreamName;
dynoPObj.uuid = dynoObjNew.dynoParams.uuid;
console.log('dynoParamsObj:',dynoPObj);
// insert putCloudWatchLog here with dynoPObj !!
await putCloudwatchLog(pool, dynoPObj, 'CLOUDWATCH_PARAMS_LOG');
break;
default:
console.log(" record" );
}
//end
}
} catch (error) {
// error handling.
console.log(error);
} finally {
// finally.
console.log("fnGetLogEvents: END");
return;
}
}
async function main() {
try {
await fnGetLogEvents(process.env.AWS_LG_STRM);
} catch (error) {
// error handling.
console.log(error);
} finally {
// finally.
console.log("main: END");
pool.end();
}
}
main();
I need help converting this to parallel processing, without exhausting the connection pool. If I remove the await when calling connection.query(uiStmt, uiVar); I get the following error;
(node:11449) UnhandledPromiseRejectionWarning: Error: Pool is closed.
at /home/ec2-user/AWS/r2data-index/br-cloudwatch-logs-2-mysql/node_modules/mysql2/lib/pool.js:57:21
at PoolConnection.<anonymous> (/home/ec2-user/AWS/r2data-index/br-cloudwatch-logs-2-mysql/node_modules/mysql2/lib/connection.js:777:13)
at Object.onceWrapper (events.js:483:26)
at PoolConnection.emit (events.js:376:20)
at ClientHandshake.<anonymous> (/home/ec2-user/AWS/r2data-index/br-cloudwatch-logs-2-mysql/node_modules/mysql2/lib/connection.js:121:14)
at ClientHandshake.emit (events.js:376:20)
at ClientHandshake.execute (/home/ec2-user/AWS/r2data-index/br-cloudwatch-logs-2-mysql/node_modules/mysql2/lib/commands/command.js:49:10)
at PoolConnection.handlePacket (/home/ec2-user/AWS/r2data-index/br-cloudwatch-logs-2-mysql/node_modules/mysql2/lib/connection.js:456:32)
at PacketParser.onPacket (/home/ec2-user/AWS/r2data-index/br-cloudwatch-logs-2-mysql/node_modules/mysql2/lib/connection.js:85:12)
at PacketParser.executeStart (/home/ec2-user/AWS/r2data-index/br-cloudwatch-logs-2-mysql/node_modules/mysql2/lib/packet_parser.js:75:16)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|