diff --git a/libs/schema-validation/build.gradle.kts b/libs/schema-validation/build.gradle.kts index e6716143..5251f04b 100644 --- a/libs/schema-validation/build.gradle.kts +++ b/libs/schema-validation/build.gradle.kts @@ -35,6 +35,8 @@ dependencies { implementation ("com.google.code.gson:gson:2.10.1") implementation ("com.sun.activation:javax.activation:1.2.0") + implementation(project(":libs:commons-database")) + implementation(project(":libs:commons-types")) implementation("io.insert-koin:koin-ktor:3.5.6") // AWS SDK for S3 implementation("software.amazon.awssdk:s3:2.20.91") diff --git a/libs/schema-validation/src/main/kotlin/gov/cdc/ocio/reportschemavalidator/utils/AWSS3Configuration.kt b/libs/schema-validation/src/main/kotlin/gov/cdc/ocio/reportschemavalidator/utils/AWSS3Configuration.kt new file mode 100644 index 00000000..2a17488c --- /dev/null +++ b/libs/schema-validation/src/main/kotlin/gov/cdc/ocio/reportschemavalidator/utils/AWSS3Configuration.kt @@ -0,0 +1,15 @@ +package gov.cdc.ocio.reportschemavalidator.utils + +import io.ktor.server.config.* + +/** + * AWS S3 configuration class + * @param config ApplicationConfig + * @param configurationPath String? + */ +class AWSS3Configuration(config: ApplicationConfig, configurationPath: String? = null) { + private val configPath = if (configurationPath != null) "$configurationPath." else "" + val s3Bucket = config.tryGetString("${configPath}s3.report_schema_bucket") ?: "" + val s3Region = config.tryGetString("${configPath}s3.report_schema_region") ?: "" + +} \ No newline at end of file diff --git a/libs/schema-validation/src/main/kotlin/gov/cdc/ocio/reportschemavalidator/utils/AzureBlobStorageConfiguration.kt b/libs/schema-validation/src/main/kotlin/gov/cdc/ocio/reportschemavalidator/utils/AzureBlobStorageConfiguration.kt new file mode 100644 index 00000000..64c51f69 --- /dev/null +++ b/libs/schema-validation/src/main/kotlin/gov/cdc/ocio/reportschemavalidator/utils/AzureBlobStorageConfiguration.kt @@ -0,0 +1,15 @@ +package gov.cdc.ocio.reportschemavalidator.utils + +import io.ktor.server.config.* + +/** + * Blob storage configuration class + * @param config ApplicationConfig + * @param configurationPath String? + */ +class AzureBlobStorageConfiguration(config: ApplicationConfig, configurationPath: String? = null) { + private val configPath = if (configurationPath != null) "$configurationPath." else "" + val connectionString = config.tryGetString("${configPath}blob_storage.connection_string") ?: "" + val container = config.tryGetString("${configPath}blob_storage.container") ?: "" + +} \ No newline at end of file diff --git a/libs/schema-validation/src/main/kotlin/gov/cdc/ocio/reportschemavalidator/utils/CloudSchemaLoaderConfiguration.kt b/libs/schema-validation/src/main/kotlin/gov/cdc/ocio/reportschemavalidator/utils/CloudSchemaLoaderConfiguration.kt new file mode 100644 index 00000000..50ebbc6f --- /dev/null +++ b/libs/schema-validation/src/main/kotlin/gov/cdc/ocio/reportschemavalidator/utils/CloudSchemaLoaderConfiguration.kt @@ -0,0 +1,38 @@ +package gov.cdc.ocio.reportschemavalidator.utils + + +import gov.cdc.ocio.reportschemavalidator.loaders.CloudSchemaLoader +import io.ktor.server.application.* +import io.ktor.server.config.* + +class CloudSchemaLoaderConfiguration(environment: ApplicationEnvironment){ + private val schemaLoaderSystem = environment.config.tryGetString("ktor.schema_loader_system")?: "" + private val s3Bucket = environment.config.tryGetString("aws.s3.report_schema_bucket") ?: "" + private val s3Region = environment.config.tryGetString("aws.s3.report_schema_region") ?: "" + private val connectionString = environment.config.tryGetString("azure.blob_storage.connection_string") ?: "" + private val container = environment.config.tryGetString("azure.blob_storage.container") ?: "" + + fun createSchemaLoader(): CloudSchemaLoader { + when (schemaLoaderSystem.lowercase()) { + SchemaLoaderSystemType.S3.toString().lowercase() -> { + val config = mapOf( + "REPORT_SCHEMA_S3_BUCKET" to s3Bucket, + "REPORT_SCHEMA_S3_REGION" to s3Region + ) + return CloudSchemaLoader(schemaLoaderSystem, config) + } + + SchemaLoaderSystemType.BLOB_STORAGE.toString().lowercase() -> { + val config = mapOf( + "REPORT_SCHEMA_BLOB_CONNECTION_STR" to connectionString, + "REPORT_SCHEMA_BLOB_CONTAINER" to container + ) + return CloudSchemaLoader(schemaLoaderSystem, config) + } + else ->throw IllegalArgumentException( "Unsupported schema loader type: $schemaLoaderSystem") + + } + + } + +} \ No newline at end of file diff --git a/libs/schema-validation/src/main/kotlin/gov/cdc/ocio/reportschemavalidator/utils/CloudSchemaLoaderConfigurationKoinCreator.kt b/libs/schema-validation/src/main/kotlin/gov/cdc/ocio/reportschemavalidator/utils/CloudSchemaLoaderConfigurationKoinCreator.kt new file mode 100644 index 00000000..062d24fd --- /dev/null +++ b/libs/schema-validation/src/main/kotlin/gov/cdc/ocio/reportschemavalidator/utils/CloudSchemaLoaderConfigurationKoinCreator.kt @@ -0,0 +1,49 @@ +package gov.cdc.ocio.reportschemavalidator.utils + +import io.ktor.server.application.* +import mu.KotlinLogging +import org.koin.core.module.Module +import org.koin.dsl.module + +/** + * Helper class for creating koin modules for a report schema loader. + */ +class CloudSchemaLoaderConfigurationKoinCreator { + + companion object { + + /** + * The class which loads the specific cloud schema loader configuration based on the env vars + * @param environment ApplicationEnvironment + * @return SchemaLoader + */ + fun getSchemaLoaderConfigurationFromAppEnv(environment: ApplicationEnvironment): Module { + val logger = KotlinLogging.logger {} + + val schemaLoaderSystemModule = module { + val schemaLoaderSystem = environment.config.property("ktor.schema_loader_system").getString() + val schemaLoaderSystemType: SchemaLoaderSystemType + when (schemaLoaderSystem.lowercase()) { + SchemaLoaderSystemType.S3.toString().lowercase() -> { + single { AWSS3Configuration(environment.config,configurationPath = "aws") } + schemaLoaderSystemType = SchemaLoaderSystemType.S3 + } + + SchemaLoaderSystemType.BLOB_STORAGE.toString().lowercase() -> { + single { AzureBlobStorageConfiguration(environment.config,configurationPath = "azure") } + schemaLoaderSystemType = SchemaLoaderSystemType.BLOB_STORAGE + } + + else -> { + val msg = "Unsupported schema loader type: $schemaLoaderSystem" + logger.error { msg } + throw IllegalArgumentException(msg) + } + + } + single { schemaLoaderSystemType } // add databaseType to Koin Modules + } + return schemaLoaderSystemModule + } + } +} \ No newline at end of file diff --git a/libs/schema-validation/src/main/kotlin/gov/cdc/ocio/reportschemavalidator/utils/SchemaLoaderSystemType.kt b/libs/schema-validation/src/main/kotlin/gov/cdc/ocio/reportschemavalidator/utils/SchemaLoaderSystemType.kt new file mode 100644 index 00000000..02a27c47 --- /dev/null +++ b/libs/schema-validation/src/main/kotlin/gov/cdc/ocio/reportschemavalidator/utils/SchemaLoaderSystemType.kt @@ -0,0 +1,7 @@ +package gov.cdc.ocio.reportschemavalidator.utils + +enum class SchemaLoaderSystemType { + S3, + BLOB_STORAGE, + UNKNOWN +} \ No newline at end of file diff --git a/pstatus-report-sink-ktor/build.gradle b/pstatus-report-sink-ktor/build.gradle index f3362782..4e0a969b 100644 --- a/pstatus-report-sink-ktor/build.gradle +++ b/pstatus-report-sink-ktor/build.gradle @@ -74,12 +74,6 @@ dependencies { implementation 'org.danilopianini:khttp:1.3.1' implementation group: 'com.google.code.gson', name: 'gson', version: '2.8.9' - //Azure Blob - implementation 'com.azure:azure-storage-blob:12.22.0' - - //AWS SDK S3 - implementation 'software.amazon.awssdk:s3:2.20.140' - // MongoDB implementation 'org.mongodb:mongodb-driver-sync:5.1.3' diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/Application.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/Application.kt index 13053e1f..84a2931c 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/Application.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/Application.kt @@ -2,6 +2,7 @@ package gov.cdc.ocio.processingstatusapi import gov.cdc.ocio.database.utils.DatabaseKoinCreator import gov.cdc.ocio.processingstatusapi.plugins.* +import gov.cdc.ocio.reportschemavalidator.utils.CloudSchemaLoaderConfigurationKoinCreator import io.ktor.serialization.jackson.* import io.ktor.server.application.* import io.ktor.server.engine.* @@ -17,10 +18,7 @@ enum class MessageSystem { RABBITMQ } -enum class SchemaLoaderSystem { - S3, - BLOB_STORAGE -} + /** * Load the environment configuration values @@ -32,7 +30,7 @@ enum class SchemaLoaderSystem { fun KoinApplication.loadKoinModules(environment: ApplicationEnvironment): KoinApplication { val databaseModule = DatabaseKoinCreator.moduleFromAppEnv(environment) val healthCheckDatabaseModule = DatabaseKoinCreator.dbHealthCheckModuleFromAppEnv(environment) - val schemaLoaderSystem = environment.config.property("ktor.schema_loader_system").getString() + val cloudSchemaConfigurationModule = CloudSchemaLoaderConfigurationKoinCreator.getSchemaLoaderConfigurationFromAppEnv(environment) val messageSystemModule = module { val msgType = environment.config.property("ktor.message_system").getString() single {msgType} // add msgType to Koin Modules @@ -40,7 +38,7 @@ fun KoinApplication.loadKoinModules(environment: ApplicationEnvironment): KoinAp when (msgType) { MessageSystem.AZURE_SERVICE_BUS.toString() -> { single(createdAtStart = true) { - AzureConfiguration(environment.config, configurationPath = "azure") } + AzureServiceBusConfiguration(environment.config, configurationPath = "azure") } } MessageSystem.RABBITMQ.toString() -> { @@ -48,35 +46,21 @@ fun KoinApplication.loadKoinModules(environment: ApplicationEnvironment): KoinAp RabbitMQServiceConfiguration(environment.config, configurationPath = "rabbitMQ") } - //For local and/or when the msg system is RabbitMQ -we need to access the cloud storage either blob or s3 - when (schemaLoaderSystem.lowercase()) { - SchemaLoaderSystem.S3.toString().lowercase() -> { - single(createdAtStart = true) { - AWSConfiguration(environment.config, configurationPath = "aws") - } - } - SchemaLoaderSystem.BLOB_STORAGE.toString().lowercase() -> { - single(createdAtStart = true) { - AzureConfiguration(environment.config, configurationPath = "azure") - } - } - else ->throw IllegalArgumentException( "Unsupported schema loader type: $schemaLoaderSystem") - - } } MessageSystem.AWS.toString() -> { single(createdAtStart = true) { - AWSConfiguration(environment.config, configurationPath = "aws") + AWSSQSServiceConfiguration(environment.config, configurationPath = "aws") } } } } - // FOR HEALTH CHECK - val schemaLoaderSystemModule = module { - single(createdAtStart = true) { - SchemaLoaderConfiguration(environment) } - } - return modules(listOf(databaseModule,healthCheckDatabaseModule, messageSystemModule, schemaLoaderSystemModule)) +// FOR HEALTH CHECK +/* val schemaLoaderSystemModule = module { + single(createdAtStart = true) { + CloudSchemaLoaderConfiguration(environment) } + }*/ + + return modules(listOf(databaseModule,healthCheckDatabaseModule, messageSystemModule,cloudSchemaConfigurationModule)) //, schemaLoaderSystemModule } /** @@ -104,14 +88,14 @@ fun Application.module() { when (messageSystem) { MessageSystem.AZURE_SERVICE_BUS -> { - azureModule() + serviceBusModule() } MessageSystem.RABBITMQ -> { rabbitMQModule() } MessageSystem.AWS -> { - awsModule() + awsSQSModule() } else -> log.error("Invalid message system configuration") } diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/health/HealthQueryService.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/health/HealthQueryService.kt index afe930b9..2912671c 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/health/HealthQueryService.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/health/HealthQueryService.kt @@ -3,15 +3,14 @@ package gov.cdc.ocio.processingstatusapi.health import gov.cdc.ocio.database.DatabaseType import gov.cdc.ocio.database.health.* import gov.cdc.ocio.processingstatusapi.MessageSystem -import gov.cdc.ocio.processingstatusapi.SchemaLoaderSystem import gov.cdc.ocio.processingstatusapi.health.messagesystem.HealthCheckAWSSQS import gov.cdc.ocio.processingstatusapi.health.messagesystem.HealthCheckRabbitMQ import gov.cdc.ocio.processingstatusapi.health.messagesystem.HealthCheckServiceBus import gov.cdc.ocio.processingstatusapi.health.messagesystem.HealthCheckUnsupportedMessageSystem -import gov.cdc.ocio.processingstatusapi.health.schemaLoadersystem.HealthCheckBlobContainer -import gov.cdc.ocio.processingstatusapi.health.schemaLoadersystem.HealthCheckS3Bucket -import gov.cdc.ocio.processingstatusapi.health.schemaLoadersystem.HealthCheckUnsupportedSchemaLoaderSystem -import gov.cdc.ocio.processingstatusapi.plugins.SchemaLoaderConfiguration +import gov.cdc.ocio.reportschemavalidator.health.schemaLoadersystem.HealthCheckBlobContainer +import gov.cdc.ocio.reportschemavalidator.health.schemaLoadersystem.HealthCheckS3Bucket +import gov.cdc.ocio.reportschemavalidator.health.schemaLoadersystem.HealthCheckUnsupportedSchemaLoaderSystem +import gov.cdc.ocio.reportschemavalidator.utils.SchemaLoaderSystemType import gov.cdc.ocio.types.health.HealthCheck import gov.cdc.ocio.types.health.HealthCheckSystem import gov.cdc.ocio.types.health.HealthStatusType @@ -35,7 +34,7 @@ class HealthQueryService: KoinComponent { private val msgType: String by inject() - private val schemaLoaderConfiguration by inject() + private val schemaLoaderSystemType:SchemaLoaderSystemType by inject() /** * Returns a HealthCheck object with the overall health of the report-sink service and its dependencies. @@ -66,9 +65,9 @@ class HealthQueryService: KoinComponent { } messageSystemHealthCheck.doHealthCheck() - schemaLoaderSystemHealthCheck = when (schemaLoaderConfiguration.schemaLoaderSystem) { - SchemaLoaderSystem.S3.toString().lowercase() -> HealthCheckS3Bucket() - SchemaLoaderSystem.BLOB_STORAGE.toString().lowercase() -> HealthCheckBlobContainer() + schemaLoaderSystemHealthCheck = when (schemaLoaderSystemType.toString().lowercase()) { + SchemaLoaderSystemType.S3.toString().lowercase() -> HealthCheckS3Bucket() + SchemaLoaderSystemType.BLOB_STORAGE.toString().lowercase() -> HealthCheckBlobContainer() else -> HealthCheckUnsupportedSchemaLoaderSystem() } schemaLoaderSystemHealthCheck.doHealthCheck() diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/health/messagesystem/HealthCheckAWSSQS.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/health/messagesystem/HealthCheckAWSSQS.kt index f7c762be..8fe4ea3a 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/health/messagesystem/HealthCheckAWSSQS.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/health/messagesystem/HealthCheckAWSSQS.kt @@ -3,7 +3,7 @@ package gov.cdc.ocio.processingstatusapi.health.messagesystem import aws.sdk.kotlin.services.sqs.SqsClient import com.fasterxml.jackson.annotation.JsonIgnoreProperties import gov.cdc.ocio.processingstatusapi.exceptions.BadStateException -import gov.cdc.ocio.processingstatusapi.plugins.AWSConfiguration +import gov.cdc.ocio.processingstatusapi.plugins.AWSSQSServiceConfiguration import gov.cdc.ocio.types.health.HealthCheckSystem import gov.cdc.ocio.types.health.HealthStatusType import org.koin.core.component.KoinComponent @@ -16,7 +16,7 @@ import org.koin.core.component.inject @JsonIgnoreProperties("koin") class HealthCheckAWSSQS : HealthCheckSystem("AWS SQS"), KoinComponent { - private val awsSqsServiceConfiguration by inject() + private val awsSqsServiceConfiguration by inject() /** * Checks and sets AWSSQSHealth status @@ -41,7 +41,7 @@ class HealthCheckAWSSQS : HealthCheckSystem("AWS SQS"), KoinComponent { * @return Boolean */ @Throws(BadStateException::class) - private fun isAWSSQSHealthy(config: AWSConfiguration): Boolean { + private fun isAWSSQSHealthy(config: AWSSQSServiceConfiguration): Boolean { val sqsClient: SqsClient? return try { sqsClient = config.createSQSClient() diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/health/messagesystem/HealthCheckServiceBus.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/health/messagesystem/HealthCheckServiceBus.kt index d754ee40..9a000423 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/health/messagesystem/HealthCheckServiceBus.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/health/messagesystem/HealthCheckServiceBus.kt @@ -4,7 +4,7 @@ import com.azure.core.exception.ResourceNotFoundException import com.azure.messaging.servicebus.ServiceBusException import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClientBuilder import com.fasterxml.jackson.annotation.JsonIgnoreProperties -import gov.cdc.ocio.processingstatusapi.plugins.AzureConfiguration +import gov.cdc.ocio.processingstatusapi.plugins.AzureServiceBusConfiguration import gov.cdc.ocio.types.health.HealthCheckSystem import gov.cdc.ocio.types.health.HealthStatusType import org.koin.core.component.KoinComponent @@ -17,7 +17,7 @@ import org.koin.core.component.inject @JsonIgnoreProperties("koin") class HealthCheckServiceBus : HealthCheckSystem("Azure Service Bus"), KoinComponent { - private val azureServiceBusConfiguration by inject() + private val azureServiceBusConfiguration by inject() /** * Checks and sets azureServiceBusHealth status @@ -41,7 +41,7 @@ class HealthCheckServiceBus : HealthCheckSystem("Azure Service Bus"), KoinCompon * @return Boolean */ @Throws(ResourceNotFoundException::class, ServiceBusException::class) - private fun isServiceBusHealthy(config: AzureConfiguration): Boolean { + private fun isServiceBusHealthy(config: AzureServiceBusConfiguration): Boolean { val adminClient = ServiceBusAdministrationClientBuilder() .connectionString(config.connectionString) diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/health/schemaLoadersystem/HealthCheckBlobContainer.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/health/schemaLoadersystem/HealthCheckBlobContainer.kt deleted file mode 100644 index 91110112..00000000 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/health/schemaLoadersystem/HealthCheckBlobContainer.kt +++ /dev/null @@ -1,60 +0,0 @@ -package gov.cdc.ocio.processingstatusapi.health.schemaLoadersystem - - -import com.azure.storage.blob.BlobContainerClientBuilder -import com.fasterxml.jackson.annotation.JsonIgnoreProperties -import gov.cdc.ocio.processingstatusapi.exceptions.BadStateException -import gov.cdc.ocio.processingstatusapi.plugins.AzureConfiguration -import gov.cdc.ocio.types.health.HealthCheckSystem -import gov.cdc.ocio.types.health.HealthStatusType -import org.koin.core.component.KoinComponent -import org.koin.core.component.inject - - -/** - * Concrete implementation of the blob container health checks. - */ -@JsonIgnoreProperties("koin") -class HealthCheckBlobContainer : HealthCheckSystem("blob_storage"), KoinComponent { - - private val azureConfiguration by inject() - - /** - * Checks and sets blob container accessible status - * - * @return HealthCheckBlobContainer object with updated status - */ - override fun doHealthCheck() { - try { - if (isAzureBlobContainerHealthy(azureConfiguration)) { - status = HealthStatusType.STATUS_UP - } - - } catch (ex: Exception) { - logger.error("Blob container is not accessible and hence not healthy $ex.message") - healthIssues = ex.message - } - } - - /** - * Check whether blob container is accessible. - * - * @return Boolean - */ - @Throws(BadStateException::class) - fun isAzureBlobContainerHealthy(config:AzureConfiguration): Boolean { - return try { - val blobContainerClient = BlobContainerClientBuilder() - .connectionString(config.blobStorageConnectionString) - .containerName(config.container) - .buildClient() - - // Attempt to list blobs to ensure access - val blobs = blobContainerClient.listBlobs().iterator() - blobs.hasNext() - } catch (e: Exception) { - throw Exception("Failed to establish connection to blob storage.") - } - } - -} \ No newline at end of file diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/health/schemaLoadersystem/HealthCheckS3Bucket.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/health/schemaLoadersystem/HealthCheckS3Bucket.kt deleted file mode 100644 index 86f64a6a..00000000 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/health/schemaLoadersystem/HealthCheckS3Bucket.kt +++ /dev/null @@ -1,59 +0,0 @@ -package gov.cdc.ocio.processingstatusapi.health.schemaLoadersystem - - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties -import gov.cdc.ocio.processingstatusapi.exceptions.BadStateException -import gov.cdc.ocio.processingstatusapi.plugins.AWSConfiguration -import gov.cdc.ocio.types.health.HealthCheckSystem -import gov.cdc.ocio.types.health.HealthStatusType -import org.koin.core.component.KoinComponent -import org.koin.core.component.inject -import software.amazon.awssdk.services.s3.S3Client -import software.amazon.awssdk.services.s3.model.ListObjectsV2Request -import software.amazon.awssdk.services.s3.model.ListObjectsV2Response - -/** - * Concrete implementation of the S3 Bucket health checks. - */ -@JsonIgnoreProperties("koin") -class HealthCheckS3Bucket : HealthCheckSystem("s3"), KoinComponent { - - private val awsServiceConfiguration by inject() - - /** - * Checks and sets S3 Bucket accessible status - * - * @return HealthCheck S3 object with updated status - */ - override fun doHealthCheck() { - try { - if (isS3FolderHealthy(awsServiceConfiguration)) { - status = HealthStatusType.STATUS_UP - } - - } catch (ex: Exception) { - logger.error("S3 bucket is not accessible and hence not healthy $ex.message") - healthIssues = ex.message - } - } - - /** - * Check whether S3 Buket is accessible - * - * @return Boolean - */ - @Throws(BadStateException::class) - fun isS3FolderHealthy(config:AWSConfiguration): Boolean { - return try { - val s3Client = S3Client.builder().region(software.amazon.awssdk.regions.Region.of(config.s3Region)).build() - val request = ListObjectsV2Request.builder() - .bucket(config.s3Bucket) - .maxKeys(1) // one file - lightweight check - .build() - val response: ListObjectsV2Response = s3Client.listObjectsV2(request) - response.contents().isNotEmpty() - } catch (e: Exception) { - throw Exception("Failed to establish connection to S3 bucket.") - } - } -} \ No newline at end of file diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/health/schemaLoadersystem/HealthCheckUnsupportedSchemaLoaderSystem.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/health/schemaLoadersystem/HealthCheckUnsupportedSchemaLoaderSystem.kt deleted file mode 100644 index dc0b8b57..00000000 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/health/schemaLoadersystem/HealthCheckUnsupportedSchemaLoaderSystem.kt +++ /dev/null @@ -1,18 +0,0 @@ -package gov.cdc.ocio.processingstatusapi.health.schemaLoadersystem - -import gov.cdc.ocio.types.health.HealthCheckSystem -import gov.cdc.ocio.types.health.HealthStatusType - - -/** - * Concrete implementation of the unsupported messaging service health checks. - */ -class HealthCheckUnsupportedSchemaLoaderSystem : HealthCheckSystem("Cloud Schema Loader") { - - /** - * No health check - just inform unsupported - */ - override fun doHealthCheck() { - status = HealthStatusType.STATUS_UNSUPPORTED - } -} \ No newline at end of file diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/AWSSQS.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/AWSSQS.kt index 0187f11a..1aff8505 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/AWSSQS.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/AWSSQS.kt @@ -9,7 +9,7 @@ import aws.sdk.kotlin.services.sqs.model.* import aws.smithy.kotlin.runtime.net.url.Url import gov.cdc.ocio.processingstatusapi.utils.SchemaValidation -import gov.cdc.ocio.reportschemavalidator.loaders.CloudSchemaLoader +import gov.cdc.ocio.reportschemavalidator.utils.CloudSchemaLoaderConfiguration import io.ktor.server.application.* import io.ktor.server.application.hooks.* import io.ktor.server.config.* @@ -28,7 +28,7 @@ import java.nio.file.Path * @param config `ApplicationConfig` containing the configuration settings for AWS SQS. * @param configurationPath represents prefix used to locate environment variables specific to AWS within the configuration. */ -class AWSConfiguration(config: ApplicationConfig, configurationPath: String? = null) { +class AWSSQSServiceConfiguration(config: ApplicationConfig, configurationPath: String? = null) { private val configPath = if (configurationPath != null) "$configurationPath." else "" val queueURL: String = config.tryGetString("${configPath}sqs.url") ?: "" private val roleArn: String = config.tryGetString("${configPath}role_arn") ?: "" @@ -37,16 +37,14 @@ class AWSConfiguration(config: ApplicationConfig, configurationPath: String? = n private val secretAccessKey = config.tryGetString("${configPath}secret_access_key") ?: "" private val region = config.tryGetString("${configPath}region") ?: "us-east-1" private val endpoint: Url? = config.tryGetString("${configPath}endpoint")?.let { Url.parse(it) } - val s3Bucket = config.tryGetString("${configPath}s3.report_schema_bucket") ?: "" - val s3Region = config.tryGetString("${configPath}s3.report_schema_region") ?: "" fun createSQSClient(): SqsClient{ return SqsClient { if (accessKeyId.isNotEmpty() && secretAccessKey.isNotEmpty()) { StaticCredentialsProvider { - accessKeyId = this@AWSConfiguration.accessKeyId - secretAccessKey = this@AWSConfiguration.secretAccessKey + accessKeyId = this@AWSSQSServiceConfiguration.accessKeyId + secretAccessKey = this@AWSSQSServiceConfiguration.secretAccessKey } } else if (webIdentityTokenFile.isNotEmpty() && roleArn.isNotEmpty()) { WebIdentityTokenFileCredentialsProvider.builder() @@ -56,28 +54,23 @@ class AWSConfiguration(config: ApplicationConfig, configurationPath: String? = n } else { throw IllegalArgumentException("No valid credentials provided.") } - region = this@AWSConfiguration.region - endpointUrl = this@AWSConfiguration.endpoint + region = this@AWSSQSServiceConfiguration.region + endpointUrl = this@AWSSQSServiceConfiguration.endpoint } } - fun createSchemaLoader():CloudSchemaLoader{ - val config = mapOf( - "REPORT_SCHEMA_S3_BUCKET" to this@AWSConfiguration.s3Bucket, - "REPORT_SCHEMA_S3_REGION" to this@AWSConfiguration.s3Region - ) - return CloudSchemaLoader("s3", config) - } + } -val AWSPlugin = createApplicationPlugin( - name = "AWS Plugin", +val AWSSQSPlugin = createApplicationPlugin( + name = "AWS SQS", configurationPath = "aws", - createConfiguration = ::AWSConfiguration + createConfiguration = ::AWSSQSServiceConfiguration ) { lateinit var sqsClient: SqsClient lateinit var queueUrl: String - val schemaLoader: CloudSchemaLoader = pluginConfig.createSchemaLoader() + val environment: ApplicationEnvironment = this@createApplicationPlugin.application.environment + val schemaLoader = CloudSchemaLoaderConfiguration(environment).createSchemaLoader() // Create the schema loader here try { sqsClient = pluginConfig.createSQSClient() @@ -199,8 +192,8 @@ private fun cleanupResourcesAndUnsubscribe(application: Application, sqsClient: /** * The main application module which runs always */ -fun Application.awsModule() { - install(AWSPlugin) +fun Application.awsSQSModule() { + install(AWSSQSPlugin) } /** diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/RabbitMQ.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/RabbitMQ.kt index d8b4ca6a..dad7271e 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/RabbitMQ.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/RabbitMQ.kt @@ -1,48 +1,13 @@ package gov.cdc.ocio.processingstatusapi.plugins import com.rabbitmq.client.* -import gov.cdc.ocio.processingstatusapi.SchemaLoaderSystem import gov.cdc.ocio.processingstatusapi.utils.SchemaValidation -import gov.cdc.ocio.reportschemavalidator.loaders.CloudSchemaLoader import io.ktor.server.application.* import io.ktor.server.application.hooks.* import io.ktor.server.config.* import org.apache.qpid.proton.TimeoutException import java.io.IOException - -class SchemaLoaderConfiguration(environment: ApplicationEnvironment){ - val schemaLoaderSystem = environment.config.tryGetString("ktor.schema_loader_system")?: "" - private val s3Bucket = environment.config.tryGetString("aws.s3.report_schema_bucket") ?: "" - private val s3Region = environment.config.tryGetString("aws.s3.report_schema_region") ?: "" - private val connectionString = environment.config.tryGetString("azure.blob_storage.connection_string") ?: "" - private val container = environment.config.tryGetString("azure.blob_storage.container") ?: "" - - fun createSchemaLoader(): CloudSchemaLoader { - when (schemaLoaderSystem.lowercase()) { - SchemaLoaderSystem.S3.toString().lowercase() -> { - val config = mapOf( - "REPORT_SCHEMA_S3_BUCKET" to s3Bucket, - "REPORT_SCHEMA_S3_REGION" to s3Region - ) - return CloudSchemaLoader(schemaLoaderSystem, config) - } - - SchemaLoaderSystem.BLOB_STORAGE.toString().lowercase() -> { - val config = mapOf( - "REPORT_SCHEMA_BLOB_CONNECTION_STR" to connectionString, - "REPORT_SCHEMA_BLOB_CONTAINER" to container - ) - return CloudSchemaLoader(schemaLoaderSystem, config) - } - else ->throw IllegalArgumentException( "Unsupported schema loader type: $schemaLoaderSystem") - - } - - } - -} - /** * The `RabbitMQServiceConfiguration` class configures and initializes `RabbitMQ` connection factory based on settings provided in an `ApplicationConfig`. * @param config `ApplicationConfig` containing the configuration settings for RabbitMQ, including connection details. @@ -87,8 +52,7 @@ val RabbitMQPlugin = createApplicationPlugin( //val schemaLoader: CloudSchemaLoader = pluginConfig.createSchemaLoader() lateinit var connection: Connection lateinit var channel: Channel - val environment: ApplicationEnvironment = this@createApplicationPlugin.application.environment - val schemaLoader = SchemaLoaderConfiguration(environment).createSchemaLoader() // Create the schema loader here + try { connection = factory.newConnection() SchemaValidation.logger.info("Connection to the RabbitMQ server was successfully established") @@ -125,7 +89,7 @@ val RabbitMQPlugin = createApplicationPlugin( val message = String(body, Charsets.UTF_8) SchemaValidation.logger.info("Message received from RabbitMQ queue $queueName with routingKey $routingKey.") - rabbitMQProcessor.processMessage(message,schemaLoader) + // rabbitMQProcessor.processMessage(message,schemaLoader) // Acknowledge the message channel.basicAck(deliveryTag, false) diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt index b5bfa80c..b929f452 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt @@ -6,6 +6,7 @@ import com.azure.messaging.servicebus.* import gov.cdc.ocio.processingstatusapi.exceptions.BadRequestException import gov.cdc.ocio.processingstatusapi.utils.SchemaValidation.Companion.logger import gov.cdc.ocio.reportschemavalidator.loaders.CloudSchemaLoader +import gov.cdc.ocio.reportschemavalidator.utils.CloudSchemaLoaderConfiguration import io.ktor.server.application.* import io.ktor.server.application.hooks.* import io.ktor.server.config.* @@ -19,34 +20,26 @@ import java.util.concurrent.TimeUnit * @param config ApplicationConfig * */ -class AzureConfiguration(config: ApplicationConfig, configurationPath: String? = null) { +class AzureServiceBusConfiguration(config: ApplicationConfig, configurationPath: String? = null) { private val configPath = if (configurationPath != null) "$configurationPath." else "" val connectionString = config.tryGetString("${configPath}service_bus.connection_string") ?: "" val queueName = config.tryGetString("${configPath}service_bus.queue_name") ?: "" val topicName = config.tryGetString("${configPath}service_bus.topic_name") ?: "" val subscriptionName = config.tryGetString("${configPath}service_bus.subscription_name") ?: "" - val blobStorageConnectionString = config.tryGetString("${configPath}blob_storage.connection_string") ?: "" - val container = config.tryGetString("${configPath}blob_storage.container") ?: "" - fun createSchemaLoader():CloudSchemaLoader{ - val config = mapOf( - "REPORT_SCHEMA_BLOB_CONNECTION_STR" to this@AzureConfiguration.blobStorageConnectionString, - "REPORT_SCHEMA_BLOB_CONTAINER" to this@AzureConfiguration.container - ) - return CloudSchemaLoader("blob_storage", config) - } } -val AzurePlugin = createApplicationPlugin( - name = "AzureConfiguration", +val AzureServiceBus = createApplicationPlugin( + name = "AzureServiceBus", configurationPath = "azure", - createConfiguration = ::AzureConfiguration) { + createConfiguration = ::AzureServiceBusConfiguration) { val connectionString = pluginConfig.connectionString val queueName = pluginConfig.queueName val topicName = pluginConfig.topicName val subscriptionName = pluginConfig.subscriptionName - val schemaLoader: CloudSchemaLoader = pluginConfig.createSchemaLoader() + val environment: ApplicationEnvironment = this@createApplicationPlugin.application.environment + val schemaLoader = CloudSchemaLoaderConfiguration(environment).createSchemaLoader() // Create the schema loader here // Initialize Service Bus client for queue val processorQueueClient by lazy { ServiceBusClientBuilder() @@ -182,6 +175,6 @@ private fun cleanupResourcesAndUnsubscribe(processorQueueClient: ServiceBusProc /** * The main application module which runs always */ -fun Application.azureModule() { - install(AzurePlugin) +fun Application.serviceBusModule() { + install(AzureServiceBus) }