diff --git a/docker-compose.yml b/docker-compose.yml index 119e3c89..b7aaa381 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -13,7 +13,7 @@ services: COUCHBASE_PASSWORD: "password" PSTATUS_WORKFLOW_NOTIFICATIONS_BASE_URL: "" depends_on: - - couchbase-setup + - couchbase restart: "always" report-sink: @@ -35,7 +35,7 @@ services: RABBITMQ_PASSWORD: "guest" RABBITMQ_VIRTUAL_HOST: "/" depends_on: - - couchbase-setup + - couchbase - rabbitmq restart: "always" diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/AWSSQS.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/AWSSQS.kt index 99a24199..55b92cac 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/AWSSQS.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/AWSSQS.kt @@ -2,16 +2,15 @@ package gov.cdc.ocio.processingstatusapi.plugins import aws.sdk.kotlin.runtime.AwsServiceException import aws.sdk.kotlin.runtime.ClientException + import aws.sdk.kotlin.runtime.auth.credentials.StaticCredentialsProvider import aws.sdk.kotlin.services.sqs.SqsClient import aws.sdk.kotlin.services.sqs.model.* -import aws.smithy.kotlin.runtime.net.url.Url -import gov.cdc.ocio.processingstatusapi.models.ValidationComponents + import gov.cdc.ocio.processingstatusapi.utils.SchemaValidation import io.ktor.server.application.* import io.ktor.server.application.hooks.* import io.ktor.server.config.* -import java.util.Base64 import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.delay @@ -19,13 +18,11 @@ import kotlinx.coroutines.launch import org.apache.qpid.proton.TimeoutException /** - * The `AWSSQServiceConfiguration` class configures and initializes connection AWS SQS based on - * settings provided in an `ApplicationConfig`. This class extracts necessary AWS credentials and - * configuration details, such as the SQS queue URL, access key, secret key, and region, using the - * provided configuration path as a prefix. + * The `AWSSQServiceConfiguration` class configures and initializes connection AWS SQS based on settings provided in an `ApplicationConfig`. + * This class extracts necessary AWS credentials and configuration details, such as the SQS queue URL, access key, secret key, and region, + * using the provided configuration path as a prefix. * @param config `ApplicationConfig` containing the configuration settings for AWS SQS. - * @param configurationPath represents prefix used to locate environment variables specific to AWS - * within the configuration. + * @param configurationPath represents prefix used to locate environment variables specific to AWS within the configuration. */ class AWSSQServiceConfiguration(config: ApplicationConfig, configurationPath: String? = null) { private val configPath = if (configurationPath != null) "$configurationPath." else "" @@ -33,10 +30,9 @@ class AWSSQServiceConfiguration(config: ApplicationConfig, configurationPath: St private val accessKeyID = config.tryGetString("${configPath}access_key_id") ?: "" private val secretAccessKey = config.tryGetString("${configPath}secret_access_key") ?: "" private val region = config.tryGetString("${configPath}region") ?: "us-east-1" - private val endpoint: Url? = config.tryGetString("${configPath}endpoint")?.let { Url.parse(it) } - fun createSQSClient(): SqsClient { - return SqsClient { + fun createSQSClient(): SqsClient{ + return SqsClient{ credentialsProvider = StaticCredentialsProvider { accessKeyId = this@AWSSQServiceConfiguration.accessKeyID secretAccessKey = this@AWSSQServiceConfiguration.secretAccessKey @@ -47,152 +43,122 @@ class AWSSQServiceConfiguration(config: ApplicationConfig, configurationPath: St } } -data class SQSMessage(val Message: String) - -val AWSSQSPlugin = - createApplicationPlugin( - name = "AWS SQS", - configurationPath = "aws", - createConfiguration = ::AWSSQServiceConfiguration - ) { - lateinit var sqsClient: SqsClient - lateinit var queueUrl: String - val components = ValidationComponents.getComponents() +val AWSSQSPlugin = createApplicationPlugin( + name = "AWS SQS", + configurationPath = "aws", + createConfiguration = ::AWSSQServiceConfiguration +) { + lateinit var sqsClient: SqsClient + lateinit var queueUrl: String + try { + sqsClient = pluginConfig.createSQSClient() + queueUrl = pluginConfig.queueURL + SchemaValidation.logger.info("Connection to the AWS SQS was successfully established") + } catch (e: SqsException) { + SchemaValidation.logger.error("Failed to create AWS SQS client ${e.message}") + } catch (e: QueueDoesNotExist) { + SchemaValidation.logger.error("AWS SQS URL provided does not exist ${e.message}") + } catch (e: TimeoutException) { + SchemaValidation.logger.error("Timeout occurred ${e.message}") + } catch (e: Exception) { + SchemaValidation.logger.error("Unexpected error occurred ${e.message}") + } + /** + * Deletes messages from AWS SQS Service that has been validated + * @param receivedMessages the list of message(s) received from the queue to be deleted + * @throws Exception + */ + suspend fun deleteMessage(receivedMessages: ReceiveMessageResponse) { + receivedMessages.messages?.forEach { message -> try { - sqsClient = pluginConfig.createSQSClient() - queueUrl = pluginConfig.queueURL - SchemaValidation.logger.info( - "Connection to the AWS SQS was successfully established" - ) - } catch (e: SqsException) { - SchemaValidation.logger.error("Failed to create AWS SQS client ${e.message}") - } catch (e: QueueDoesNotExist) { - SchemaValidation.logger.error("AWS SQS URL provided does not exist ${e.message}") - } catch (e: TimeoutException) { - SchemaValidation.logger.error("Timeout occurred ${e.message}") - } catch (e: Exception) { - SchemaValidation.logger.error("Unexpected error occurred ${e.message}") - } - /** - * Deletes messages from AWS SQS Service that has been validated - * @param receivedMessages the list of message(s) received from the queue to be deleted - * @throws Exception - */ - suspend fun deleteMessage(receivedMessages: ReceiveMessageResponse) { - receivedMessages.messages?.forEach { message -> - try { - retryWithBackoff(numOfRetries = 5) { - val deleteMessageRequest = DeleteMessageRequest { - this.queueUrl = queueUrl - this.receiptHandle = message.receiptHandle - } - sqsClient.deleteMessage(deleteMessageRequest) - } - SchemaValidation.logger.info( - "Successfully deleted processed report from AWS SQS" - ) - } catch (e: Exception) { - SchemaValidation.logger.error( - "Something went wrong while deleting the report from the queue ${e.message}" - ) + retryWithBackoff(numOfRetries = 5) { + val deleteMessageRequest = DeleteMessageRequest { + this.queueUrl = queueUrl + this.receiptHandle = message.receiptHandle } + sqsClient.deleteMessage(deleteMessageRequest) } + SchemaValidation.logger.info("Successfully deleted processed report from AWS SQS") + }catch (e: Exception) { + SchemaValidation.logger.error("Something went wrong while deleting the report from the queue ${e.message}") } - /** - * Validates messages from the AWS SQS Service - * @param receivedMessages the list of message(s) received from the queue to be - * validated - * @throws Exception thrown during validation `and` it's important to delete the message - * as it will be persisted to dead-letter container - * ``` - * in configured database - * ``` - */ - suspend fun validate(receivedMessages: ReceiveMessageResponse) { - try { - receivedMessages.messages?.forEach { message -> - SchemaValidation.logger.info( - "Received message from AWS SQS: ${message.body}" - ) - val awsSQSProcessor = AWSSQSProcessor() - message.body?.let { - val rawMessage = components.gson.fromJson(it, SQSMessage::class.java) - - awsSQSProcessor.processMessage( - String(Base64.getDecoder().decode(rawMessage.Message)) - ) - } - } - deleteMessage(receivedMessages) - } catch (e: Exception) { - SchemaValidation.logger.error( - "An Exception occurred during validation ${e.message}" - ) - deleteMessage(receivedMessages) + } + } + /** + * Validates messages from the AWS SQS Service + * @param receivedMessages the list of message(s) received from the queue to be validated + * @throws Exception thrown during validation `and` it's important to delete the message as it will be persisted to dead-letter container + * in configured database + */ + suspend fun validate(receivedMessages: ReceiveMessageResponse) { + try { + receivedMessages.messages?.forEach { message -> + SchemaValidation.logger.info("Received message from AWS SQS: ${message.body}") + val awsSQSProcessor = AWSSQSProcessor() + message.body?.let { + awsSQSProcessor.processMessage(it) } } - /** - * The `consumeMessages` function continuously listens for and processes messages from - * an AWS SQS queue. This function runs in a non-blocking coroutine, retrieving messages - * from the queue, validating them using `AWSSQSProcessor`, and then deleting the - * processed messages from the queue. - * - * @throws Exception - * @throws AwsServiceException - */ - fun consumeMessages() { - SchemaValidation.logger.info("Consuming messages from AWS SQS") - CoroutineScope(Dispatchers.IO).launch { - while (true) { - var receivedMessages: ReceiveMessageResponse? - try { - receivedMessages = - retryWithBackoff(numOfRetries = 5) { - val receiveMessageRequest = ReceiveMessageRequest { - this.queueUrl = queueUrl - maxNumberOfMessages = 5 - } - sqsClient.receiveMessage(receiveMessageRequest) - } - validate(receivedMessages) - } catch (e: AwsServiceException) { - SchemaValidation.logger.error( - "AwsServiceException occurred while processing the request ${e.message} with requestID: ${e.sdkErrorMetadata.requestId}" - ) - throw e - } catch (e: ClientException) { - SchemaValidation.logger.error( - "ClientException occurred either while trying to send request to AWS or while trying to parse a response from AWS ${e.message}" - ) - throw e - } catch (e: Exception) { - SchemaValidation.logger.error( - "AWS service exception occurred: ${e.message}" - ) - throw e + deleteMessage(receivedMessages) + }catch (e: Exception) { + SchemaValidation.logger.error("An Exception occurred during validation ${e.message}") + deleteMessage(receivedMessages) + } + } + /** + * The `consumeMessages` function continuously listens for and processes messages from an AWS SQS queue. + * This function runs in a non-blocking coroutine, retrieving messages from the queue, validating them using + * `AWSSQSProcessor`, and then deleting the processed messages from the queue. + * + * @throws Exception + * @throws AwsServiceException + */ + fun consumeMessages() { + SchemaValidation.logger.info("Consuming messages from AWS SQS") + CoroutineScope(Dispatchers.IO).launch { + while (true) { + var receivedMessages: ReceiveMessageResponse? + try { + receivedMessages = retryWithBackoff(numOfRetries = 5){ + val receiveMessageRequest = ReceiveMessageRequest { + this.queueUrl = queueUrl + maxNumberOfMessages = 5 } + sqsClient.receiveMessage(receiveMessageRequest) } + validate(receivedMessages) + } catch (e: AwsServiceException) { + SchemaValidation.logger.error("AwsServiceException occurred while processing the request ${e.message} with requestID: ${e.sdkErrorMetadata.requestId}") + throw e + } catch (e: ClientException) { + SchemaValidation.logger.error("ClientException occurred either while trying to send request to AWS or while trying to parse a response from AWS ${e.message}") + throw e + } catch (e: Exception) { + SchemaValidation.logger.error("AWS service exception occurred: ${e.message}") + throw e } } + } + } - on(MonitoringEvent(ApplicationStarted)) { application -> - application.log.info("Application started successfully.") - consumeMessages() - } + on(MonitoringEvent(ApplicationStarted)) { application -> + application.log.info("Application started successfully.") + consumeMessages() + } - on(MonitoringEvent(ApplicationStopped)) { application -> - application.log.info("Application stopped successfully.") - cleanupResourcesAndUnsubscribe(application, sqsClient) - } - } + on(MonitoringEvent(ApplicationStopped)) { application -> + application.log.info("Application stopped successfully.") + cleanupResourcesAndUnsubscribe(application, sqsClient) + } +} /** * We need to clean up the resources and unsubscribe from application life events. * - * @param application The Ktor instance, provides access to the environment monitor used for - * unsubscribing from events. - * @param sqsClient `sqsClient` used to receive and then delete messages from AWS SQS + * @param application The Ktor instance, provides access to the environment monitor used + * for unsubscribing from events. + * @param sqsClient `sqsClient` used to receive and then delete messages from AWS SQS */ private fun cleanupResourcesAndUnsubscribe(application: Application, sqsClient: SqsClient) { application.log.info("Closing SQS client") @@ -201,39 +167,40 @@ private fun cleanupResourcesAndUnsubscribe(application: Application, sqsClient: application.environment.monitor.unsubscribe(ApplicationStopped) {} } -/** The main application module which runs always */ +/** + * The main application module which runs always + */ fun Application.awsSQSModule() { install(AWSSQSPlugin) } /** - * The `retryWithBackoff` retries block of code with exponential backoff, doubling the delay before - * each retry until `maxDelay` is reached or specified number of retries is exhausted. + * The `retryWithBackoff` retries block of code with exponential backoff, doubling the delay before each retry + * until `maxDelay` is reached or specified number of retries is exhausted. * * @param numOfRetries The number of times to retry attempts. Default is 3. * @param baseDelay The initial delay between retries in milliseconds. Default is 1000 ms. * @param maxDelay The maximum delay between retries, in milliseconds. Default is 6000 ms. * @param block The block of code to be executed. + * */ -suspend fun
retryWithBackoff( - numOfRetries: Int = 3, - baseDelay: Long = 1000, - maxDelay: Long = 6000, - block: suspend () -> P -): P { +suspend fun
retryWithBackoff( + numOfRetries: Int = 3, + baseDelay:Long = 1000, + maxDelay: Long = 6000, + block:suspend()-> P +): P{ var currentDelay = baseDelay repeat(numOfRetries) { - try { + try{ return block() - } catch (e: Exception) { - SchemaValidation.logger.error( - "Attempt failed with exception: ${e.message}. Retrying again in $currentDelay" - ) + }catch (e:Exception){ + SchemaValidation.logger.error("Attempt failed with exception: ${e.message}. Retrying again in $currentDelay") delay(currentDelay) - currentDelay = (currentDelay * 2).coerceAtMost(maxDelay) + currentDelay = (currentDelay *2).coerceAtMost(maxDelay) } } - // This is the last attempt, and if it fails again will throw an exception + //This is the last attempt, and if it fails again will throw an exception SchemaValidation.logger.error("Last Attempt, if it fails again exception will be thrown") return block() }