Skip to content

Commit

Permalink
Modified code to move the health check on cloud schema loader to sche…
Browse files Browse the repository at this point in the history
…ma validation lib. Also, added the AWS S3 and Azure BLOB configuration on its own as opposed to being in the Azure SBUS and AWS SQS config respectively
  • Loading branch information
manu-govind committed Dec 19, 2024
1 parent 25f65e8 commit ce4419c
Show file tree
Hide file tree
Showing 17 changed files with 179 additions and 263 deletions.
2 changes: 2 additions & 0 deletions libs/schema-validation/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ dependencies {
implementation ("com.google.code.gson:gson:2.10.1")
implementation ("com.sun.activation:javax.activation:1.2.0")

implementation(project(":libs:commons-database"))
implementation(project(":libs:commons-types"))
implementation("io.insert-koin:koin-ktor:3.5.6")
// AWS SDK for S3
implementation("software.amazon.awssdk:s3:2.20.91")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package gov.cdc.ocio.reportschemavalidator.utils

import io.ktor.server.config.*

/**
* AWS S3 configuration class
* @param config ApplicationConfig
* @param configurationPath String?
*/
class AWSS3Configuration(config: ApplicationConfig, configurationPath: String? = null) {
private val configPath = if (configurationPath != null) "$configurationPath." else ""
val s3Bucket = config.tryGetString("${configPath}s3.report_schema_bucket") ?: ""
val s3Region = config.tryGetString("${configPath}s3.report_schema_region") ?: ""

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package gov.cdc.ocio.reportschemavalidator.utils

import io.ktor.server.config.*

/**
* Blob storage configuration class
* @param config ApplicationConfig
* @param configurationPath String?
*/
class AzureBlobStorageConfiguration(config: ApplicationConfig, configurationPath: String? = null) {
private val configPath = if (configurationPath != null) "$configurationPath." else ""
val connectionString = config.tryGetString("${configPath}blob_storage.connection_string") ?: ""
val container = config.tryGetString("${configPath}blob_storage.container") ?: ""

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package gov.cdc.ocio.reportschemavalidator.utils


import gov.cdc.ocio.reportschemavalidator.loaders.CloudSchemaLoader
import io.ktor.server.application.*
import io.ktor.server.config.*

class CloudSchemaLoaderConfiguration(environment: ApplicationEnvironment){
private val schemaLoaderSystem = environment.config.tryGetString("ktor.schema_loader_system")?: ""
private val s3Bucket = environment.config.tryGetString("aws.s3.report_schema_bucket") ?: ""
private val s3Region = environment.config.tryGetString("aws.s3.report_schema_region") ?: ""
private val connectionString = environment.config.tryGetString("azure.blob_storage.connection_string") ?: ""
private val container = environment.config.tryGetString("azure.blob_storage.container") ?: ""

fun createSchemaLoader(): CloudSchemaLoader {
when (schemaLoaderSystem.lowercase()) {
SchemaLoaderSystemType.S3.toString().lowercase() -> {
val config = mapOf(
"REPORT_SCHEMA_S3_BUCKET" to s3Bucket,
"REPORT_SCHEMA_S3_REGION" to s3Region
)
return CloudSchemaLoader(schemaLoaderSystem, config)
}

SchemaLoaderSystemType.BLOB_STORAGE.toString().lowercase() -> {
val config = mapOf(
"REPORT_SCHEMA_BLOB_CONNECTION_STR" to connectionString,
"REPORT_SCHEMA_BLOB_CONTAINER" to container
)
return CloudSchemaLoader(schemaLoaderSystem, config)
}
else ->throw IllegalArgumentException( "Unsupported schema loader type: $schemaLoaderSystem")

}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package gov.cdc.ocio.reportschemavalidator.utils

import io.ktor.server.application.*
import mu.KotlinLogging
import org.koin.core.module.Module
import org.koin.dsl.module

/**
* Helper class for creating koin modules for a report schema loader.
*/
class CloudSchemaLoaderConfigurationKoinCreator {

companion object {

/**
* The class which loads the specific cloud schema loader configuration based on the env vars
* @param environment ApplicationEnvironment
* @return SchemaLoader
*/
fun getSchemaLoaderConfigurationFromAppEnv(environment: ApplicationEnvironment): Module {
val logger = KotlinLogging.logger {}

val schemaLoaderSystemModule = module {
val schemaLoaderSystem = environment.config.property("ktor.schema_loader_system").getString()
val schemaLoaderSystemType: SchemaLoaderSystemType
when (schemaLoaderSystem.lowercase()) {
SchemaLoaderSystemType.S3.toString().lowercase() -> {
single { AWSS3Configuration(environment.config,configurationPath = "aws") }
schemaLoaderSystemType = SchemaLoaderSystemType.S3
}

SchemaLoaderSystemType.BLOB_STORAGE.toString().lowercase() -> {
single { AzureBlobStorageConfiguration(environment.config,configurationPath = "azure") }
schemaLoaderSystemType = SchemaLoaderSystemType.BLOB_STORAGE
}

else -> {
val msg = "Unsupported schema loader type: $schemaLoaderSystem"
logger.error { msg }
throw IllegalArgumentException(msg)
}

}
single { schemaLoaderSystemType } // add databaseType to Koin Modules
}
return schemaLoaderSystemModule
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package gov.cdc.ocio.reportschemavalidator.utils

enum class SchemaLoaderSystemType {
S3,
BLOB_STORAGE,
UNKNOWN
}
6 changes: 0 additions & 6 deletions pstatus-report-sink-ktor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,6 @@ dependencies {
implementation 'org.danilopianini:khttp:1.3.1'
implementation group: 'com.google.code.gson', name: 'gson', version: '2.8.9'

//Azure Blob
implementation 'com.azure:azure-storage-blob:12.22.0'

//AWS SDK S3
implementation 'software.amazon.awssdk:s3:2.20.140'


// MongoDB
implementation 'org.mongodb:mongodb-driver-sync:5.1.3'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gov.cdc.ocio.processingstatusapi

import gov.cdc.ocio.database.utils.DatabaseKoinCreator
import gov.cdc.ocio.processingstatusapi.plugins.*
import gov.cdc.ocio.reportschemavalidator.utils.CloudSchemaLoaderConfigurationKoinCreator
import io.ktor.serialization.jackson.*
import io.ktor.server.application.*
import io.ktor.server.engine.*
Expand All @@ -17,10 +18,7 @@ enum class MessageSystem {
RABBITMQ
}

enum class SchemaLoaderSystem {
S3,
BLOB_STORAGE
}


/**
* Load the environment configuration values
Expand All @@ -32,51 +30,37 @@ enum class SchemaLoaderSystem {
fun KoinApplication.loadKoinModules(environment: ApplicationEnvironment): KoinApplication {
val databaseModule = DatabaseKoinCreator.moduleFromAppEnv(environment)
val healthCheckDatabaseModule = DatabaseKoinCreator.dbHealthCheckModuleFromAppEnv(environment)
val schemaLoaderSystem = environment.config.property("ktor.schema_loader_system").getString()
val cloudSchemaConfigurationModule = CloudSchemaLoaderConfigurationKoinCreator.getSchemaLoaderConfigurationFromAppEnv(environment)
val messageSystemModule = module {
val msgType = environment.config.property("ktor.message_system").getString()
single {msgType} // add msgType to Koin Modules

when (msgType) {
MessageSystem.AZURE_SERVICE_BUS.toString() -> {
single(createdAtStart = true) {
AzureConfiguration(environment.config, configurationPath = "azure") }
AzureServiceBusConfiguration(environment.config, configurationPath = "azure") }

}
MessageSystem.RABBITMQ.toString() -> {
single(createdAtStart = true) {
RabbitMQServiceConfiguration(environment.config, configurationPath = "rabbitMQ")
}

//For local and/or when the msg system is RabbitMQ -we need to access the cloud storage either blob or s3
when (schemaLoaderSystem.lowercase()) {
SchemaLoaderSystem.S3.toString().lowercase() -> {
single(createdAtStart = true) {
AWSConfiguration(environment.config, configurationPath = "aws")
}
}
SchemaLoaderSystem.BLOB_STORAGE.toString().lowercase() -> {
single(createdAtStart = true) {
AzureConfiguration(environment.config, configurationPath = "azure")
}
}
else ->throw IllegalArgumentException( "Unsupported schema loader type: $schemaLoaderSystem")

}
}
MessageSystem.AWS.toString() -> {
single(createdAtStart = true) {
AWSConfiguration(environment.config, configurationPath = "aws")
AWSSQSServiceConfiguration(environment.config, configurationPath = "aws")
}
}
}
}
// FOR HEALTH CHECK
val schemaLoaderSystemModule = module {
single(createdAtStart = true) {
SchemaLoaderConfiguration(environment) }
}
return modules(listOf(databaseModule,healthCheckDatabaseModule, messageSystemModule, schemaLoaderSystemModule))
// FOR HEALTH CHECK
/* val schemaLoaderSystemModule = module {
single(createdAtStart = true) {
CloudSchemaLoaderConfiguration(environment) }
}*/

return modules(listOf(databaseModule,healthCheckDatabaseModule, messageSystemModule,cloudSchemaConfigurationModule)) //, schemaLoaderSystemModule
}

/**
Expand Down Expand Up @@ -104,14 +88,14 @@ fun Application.module() {

when (messageSystem) {
MessageSystem.AZURE_SERVICE_BUS -> {
azureModule()
serviceBusModule()
}
MessageSystem.RABBITMQ -> {
rabbitMQModule()
}

MessageSystem.AWS -> {
awsModule()
awsSQSModule()
}
else -> log.error("Invalid message system configuration")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@ package gov.cdc.ocio.processingstatusapi.health
import gov.cdc.ocio.database.DatabaseType
import gov.cdc.ocio.database.health.*
import gov.cdc.ocio.processingstatusapi.MessageSystem
import gov.cdc.ocio.processingstatusapi.SchemaLoaderSystem
import gov.cdc.ocio.processingstatusapi.health.messagesystem.HealthCheckAWSSQS
import gov.cdc.ocio.processingstatusapi.health.messagesystem.HealthCheckRabbitMQ
import gov.cdc.ocio.processingstatusapi.health.messagesystem.HealthCheckServiceBus
import gov.cdc.ocio.processingstatusapi.health.messagesystem.HealthCheckUnsupportedMessageSystem
import gov.cdc.ocio.processingstatusapi.health.schemaLoadersystem.HealthCheckBlobContainer
import gov.cdc.ocio.processingstatusapi.health.schemaLoadersystem.HealthCheckS3Bucket
import gov.cdc.ocio.processingstatusapi.health.schemaLoadersystem.HealthCheckUnsupportedSchemaLoaderSystem
import gov.cdc.ocio.processingstatusapi.plugins.SchemaLoaderConfiguration
import gov.cdc.ocio.reportschemavalidator.health.schemaLoadersystem.HealthCheckBlobContainer
import gov.cdc.ocio.reportschemavalidator.health.schemaLoadersystem.HealthCheckS3Bucket
import gov.cdc.ocio.reportschemavalidator.health.schemaLoadersystem.HealthCheckUnsupportedSchemaLoaderSystem
import gov.cdc.ocio.reportschemavalidator.utils.SchemaLoaderSystemType
import gov.cdc.ocio.types.health.HealthCheck
import gov.cdc.ocio.types.health.HealthCheckSystem
import gov.cdc.ocio.types.health.HealthStatusType
Expand All @@ -35,7 +34,7 @@ class HealthQueryService: KoinComponent {

private val msgType: String by inject()

private val schemaLoaderConfiguration by inject<SchemaLoaderConfiguration>()
private val schemaLoaderSystemType:SchemaLoaderSystemType by inject()

/**
* Returns a HealthCheck object with the overall health of the report-sink service and its dependencies.
Expand Down Expand Up @@ -66,9 +65,9 @@ class HealthQueryService: KoinComponent {
}
messageSystemHealthCheck.doHealthCheck()

schemaLoaderSystemHealthCheck = when (schemaLoaderConfiguration.schemaLoaderSystem) {
SchemaLoaderSystem.S3.toString().lowercase() -> HealthCheckS3Bucket()
SchemaLoaderSystem.BLOB_STORAGE.toString().lowercase() -> HealthCheckBlobContainer()
schemaLoaderSystemHealthCheck = when (schemaLoaderSystemType.toString().lowercase()) {
SchemaLoaderSystemType.S3.toString().lowercase() -> HealthCheckS3Bucket()
SchemaLoaderSystemType.BLOB_STORAGE.toString().lowercase() -> HealthCheckBlobContainer()
else -> HealthCheckUnsupportedSchemaLoaderSystem()
}
schemaLoaderSystemHealthCheck.doHealthCheck()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package gov.cdc.ocio.processingstatusapi.health.messagesystem
import aws.sdk.kotlin.services.sqs.SqsClient
import com.fasterxml.jackson.annotation.JsonIgnoreProperties
import gov.cdc.ocio.processingstatusapi.exceptions.BadStateException
import gov.cdc.ocio.processingstatusapi.plugins.AWSConfiguration
import gov.cdc.ocio.processingstatusapi.plugins.AWSSQSServiceConfiguration
import gov.cdc.ocio.types.health.HealthCheckSystem
import gov.cdc.ocio.types.health.HealthStatusType
import org.koin.core.component.KoinComponent
Expand All @@ -16,7 +16,7 @@ import org.koin.core.component.inject
@JsonIgnoreProperties("koin")
class HealthCheckAWSSQS : HealthCheckSystem("AWS SQS"), KoinComponent {

private val awsSqsServiceConfiguration by inject<AWSConfiguration>()
private val awsSqsServiceConfiguration by inject<AWSSQSServiceConfiguration>()

/**
* Checks and sets AWSSQSHealth status
Expand All @@ -41,7 +41,7 @@ class HealthCheckAWSSQS : HealthCheckSystem("AWS SQS"), KoinComponent {
* @return Boolean
*/
@Throws(BadStateException::class)
private fun isAWSSQSHealthy(config: AWSConfiguration): Boolean {
private fun isAWSSQSHealthy(config: AWSSQSServiceConfiguration): Boolean {
val sqsClient: SqsClient?
return try {
sqsClient = config.createSQSClient()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.azure.core.exception.ResourceNotFoundException
import com.azure.messaging.servicebus.ServiceBusException
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClientBuilder
import com.fasterxml.jackson.annotation.JsonIgnoreProperties
import gov.cdc.ocio.processingstatusapi.plugins.AzureConfiguration
import gov.cdc.ocio.processingstatusapi.plugins.AzureServiceBusConfiguration
import gov.cdc.ocio.types.health.HealthCheckSystem
import gov.cdc.ocio.types.health.HealthStatusType
import org.koin.core.component.KoinComponent
Expand All @@ -17,7 +17,7 @@ import org.koin.core.component.inject
@JsonIgnoreProperties("koin")
class HealthCheckServiceBus : HealthCheckSystem("Azure Service Bus"), KoinComponent {

private val azureServiceBusConfiguration by inject<AzureConfiguration>()
private val azureServiceBusConfiguration by inject<AzureServiceBusConfiguration>()

/**
* Checks and sets azureServiceBusHealth status
Expand All @@ -41,7 +41,7 @@ class HealthCheckServiceBus : HealthCheckSystem("Azure Service Bus"), KoinCompon
* @return Boolean
*/
@Throws(ResourceNotFoundException::class, ServiceBusException::class)
private fun isServiceBusHealthy(config: AzureConfiguration): Boolean {
private fun isServiceBusHealthy(config: AzureServiceBusConfiguration): Boolean {

val adminClient = ServiceBusAdministrationClientBuilder()
.connectionString(config.connectionString)
Expand Down
Loading

0 comments on commit ce4419c

Please sign in to comment.