From a049cdbf7784a763245fda6438f198be6432611b Mon Sep 17 00:00:00 2001 From: Aleksei Novikov Date: Wed, 29 Sep 2021 15:12:03 +0300 Subject: [PATCH 1/3] Added akka cluster support --- build.gradle.kts | 19 ++++ k8s/application.yaml | 15 +++- .../alekseinovikov/akaes/actor/ClassActor.kt | 66 ++++++++++++++ .../akaes/annotations/Serialization.kt | 3 + .../alekseinovikov/akaes/config/AkkaConfig.kt | 29 +++++++ .../alekseinovikov/akaes/message/Messages.kt | 4 +- .../alekseinovikov/akaes/model/ClassState.kt | 4 +- .../me/alekseinovikov/akaes/model/Commands.kt | 12 +++ .../me/alekseinovikov/akaes/model/Events.kt | 19 ++++ .../akaes/props/EventSourcingProperties.kt | 15 ++++ .../akaes/service/ClassServiceImpl.kt | 71 +++++++++++---- src/main/resources/akka_kube.cfg | 86 +++++++++++++++++++ src/main/resources/akka_local.cfg | 67 +++++++++++++++ src/main/resources/application.properties | 2 + 14 files changed, 390 insertions(+), 22 deletions(-) create mode 100644 src/main/kotlin/me/alekseinovikov/akaes/actor/ClassActor.kt create mode 100644 src/main/kotlin/me/alekseinovikov/akaes/annotations/Serialization.kt create mode 100644 src/main/kotlin/me/alekseinovikov/akaes/config/AkkaConfig.kt create mode 100644 src/main/kotlin/me/alekseinovikov/akaes/model/Commands.kt create mode 100644 src/main/kotlin/me/alekseinovikov/akaes/model/Events.kt create mode 100644 src/main/kotlin/me/alekseinovikov/akaes/props/EventSourcingProperties.kt create mode 100644 src/main/resources/akka_kube.cfg create mode 100644 src/main/resources/akka_local.cfg diff --git a/build.gradle.kts b/build.gradle.kts index 8d0d6c1..f88ae86 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -8,6 +8,13 @@ plugins { id("com.google.cloud.tools.jib") version "3.1.4" } +val scalaBinaryVersion = "2.13" +val akkaVersion = "2.6.16" +val slickVersion = "3.3.3" +val akkaManagementVersion = "1.1.1" +val postgresDriverVersion = "42.2.24" + + group = "me.alekseinovikov.akaes" version = "0.0.1-SNAPSHOT" java.sourceCompatibility = JavaVersion.VERSION_11 @@ -17,6 +24,18 @@ repositories { } dependencies { + implementation(platform("com.typesafe.akka:akka-bom_${scalaBinaryVersion}:${akkaVersion}")) + implementation("org.postgresql:postgresql:${postgresDriverVersion}") + implementation("com.typesafe.akka:akka-cluster-sharding-typed_${scalaBinaryVersion}") + implementation("com.typesafe.akka:akka-persistence-typed_${scalaBinaryVersion}") + implementation("com.typesafe.akka:akka-serialization-jackson_${scalaBinaryVersion}") + implementation("com.lightbend.akka:akka-persistence-jdbc_${scalaBinaryVersion}:5.0.4") + implementation("com.typesafe.akka:akka-persistence-query_${scalaBinaryVersion}:${akkaVersion}") + implementation("com.typesafe.slick:slick_${scalaBinaryVersion}:${slickVersion}") + implementation("com.typesafe.slick:slick-hikaricp_${scalaBinaryVersion}:${slickVersion}") + implementation("com.lightbend.akka.discovery:akka-discovery-kubernetes-api_${scalaBinaryVersion}:${akkaManagementVersion}") + implementation("com.lightbend.akka.management:akka-management-cluster-bootstrap_${scalaBinaryVersion}:${akkaManagementVersion}") + implementation("org.springframework.boot:spring-boot-starter-webflux") implementation("org.springframework.boot:spring-boot-starter-amqp") implementation("com.fasterxml.jackson.module:jackson-module-kotlin") diff --git a/k8s/application.yaml b/k8s/application.yaml index 5ea56df..aecc6b3 100644 --- a/k8s/application.yaml +++ b/k8s/application.yaml @@ -27,6 +27,10 @@ spec: - containerPort: 8080 protocol: TCP name: web + - containerPort: 8558 + name: management + - containerPort: 2552 + name: remoting --- apiVersion: v1 @@ -37,7 +41,16 @@ metadata: name: "akaes-service" spec: ports: - - port: 8080 + - name: web + port: 8080 + protocol: TCP + - name: management + port: 8558 + targetPort: 8558 + protocol: TCP + - name: remoting + port: 2552 + targetPort: 2552 protocol: TCP selector: app: akaes diff --git a/src/main/kotlin/me/alekseinovikov/akaes/actor/ClassActor.kt b/src/main/kotlin/me/alekseinovikov/akaes/actor/ClassActor.kt new file mode 100644 index 0000000..437eff3 --- /dev/null +++ b/src/main/kotlin/me/alekseinovikov/akaes/actor/ClassActor.kt @@ -0,0 +1,66 @@ +package me.alekseinovikov.akaes.actor + +import akka.actor.typed.Behavior +import akka.actor.typed.SupervisorStrategy +import akka.actor.typed.javadsl.ActorContext +import akka.actor.typed.javadsl.Behaviors +import akka.cluster.sharding.typed.javadsl.EntityTypeKey +import akka.persistence.typed.PersistenceId +import akka.persistence.typed.javadsl.* +import me.alekseinovikov.akaes.model.* +import me.alekseinovikov.akaes.props.EventSourcingProperties +import java.time.Duration + +class ClassActor( + private val className: String, + private val context: ActorContext, + private val props: EventSourcingProperties, + persistenceId: PersistenceId?, +) : + EventSourcedBehavior( + persistenceId, SupervisorStrategy.restartWithBackoff( + Duration.ofSeconds(props.restartBackOffMinSeconds), + Duration.ofSeconds(props.restartBackOffMaxSeconds), + props.restartBackOffRandomFactor + ) + ) { + + companion object { + var ENTITY_TYPE_KEY: EntityTypeKey = EntityTypeKey.create(ClassCommand::class.java, "Class") + fun createBehaviour( + className: String, + persistenceId: PersistenceId, + props: EventSourcingProperties + ): Behavior = Behaviors.setup { context -> ClassActor(className, context, props, persistenceId) } + } + + override fun retentionCriteria(): SnapshotCountRetentionCriteria = RetentionCriteria + .snapshotEvery(props.numberOfEvents, props.keepSnapshots) + .withDeleteEventsOnSnapshot() + + override fun emptyState() = ClassState(className) + override fun eventHandler(): EventHandler = EventHandler { state, event -> event.applyTo(state) } + + override fun commandHandler(): CommandHandler = + newCommandHandlerBuilder() + .forAnyState() + .onCommand(AddStudentCommand::class.java, this::addStudent) + .onCommand(DeleteStudentCommand::class.java, this::deleteStudent) + .onCommand(GetAllStudentsCommand::class.java, this::getState) + .build() + + private fun addStudent(command: AddStudentCommand): Effect = AddStudentEvent(command.studentName).let { + Effect() + .persist(it) + } + + private fun deleteStudent(command: DeleteStudentCommand): Effect = DeleteStudentEvent(command.studentName).let { + Effect() + .persist(it) + } + + private fun getState(command: GetAllStudentsCommand): Effect = Effect() + .none() + .thenRun { newState -> command.replyTo.tell(newState) } + +} \ No newline at end of file diff --git a/src/main/kotlin/me/alekseinovikov/akaes/annotations/Serialization.kt b/src/main/kotlin/me/alekseinovikov/akaes/annotations/Serialization.kt new file mode 100644 index 0000000..6b5c093 --- /dev/null +++ b/src/main/kotlin/me/alekseinovikov/akaes/annotations/Serialization.kt @@ -0,0 +1,3 @@ +package me.alekseinovikov.akaes.annotations + +interface CborSerializable \ No newline at end of file diff --git a/src/main/kotlin/me/alekseinovikov/akaes/config/AkkaConfig.kt b/src/main/kotlin/me/alekseinovikov/akaes/config/AkkaConfig.kt new file mode 100644 index 0000000..dc3e2bb --- /dev/null +++ b/src/main/kotlin/me/alekseinovikov/akaes/config/AkkaConfig.kt @@ -0,0 +1,29 @@ +package me.alekseinovikov.akaes.config + +import akka.actor.typed.ActorSystem +import akka.actor.typed.javadsl.Behaviors +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import org.springframework.beans.factory.annotation.Value +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.core.env.Environment + +@Configuration +class AkkaConfig { + + @Bean + fun akkaConfiguration(env: Environment): Config { + val filePostfix = if (env.activeProfiles.contains("local")) "local" else "kube" + return ConfigFactory + .parseResources("akka_$filePostfix.cfg") + .resolve(); + } + + @Bean + fun actorSystem( + config: Config, + @Value("\${akka.cluster.system.name}") systemName: String + ): ActorSystem = ActorSystem.create(Behaviors.empty(), systemName, config) + +} \ No newline at end of file diff --git a/src/main/kotlin/me/alekseinovikov/akaes/message/Messages.kt b/src/main/kotlin/me/alekseinovikov/akaes/message/Messages.kt index a1cd555..2f45ad6 100644 --- a/src/main/kotlin/me/alekseinovikov/akaes/message/Messages.kt +++ b/src/main/kotlin/me/alekseinovikov/akaes/message/Messages.kt @@ -1,5 +1,7 @@ package me.alekseinovikov.akaes.message +import me.alekseinovikov.akaes.annotations.CborSerializable + enum class StudentActionType { ADD, DELETE @@ -9,4 +11,4 @@ data class StudentActionMessage( val className: String, val studentName: String, val actionType: StudentActionType -) \ No newline at end of file +) : CborSerializable \ No newline at end of file diff --git a/src/main/kotlin/me/alekseinovikov/akaes/model/ClassState.kt b/src/main/kotlin/me/alekseinovikov/akaes/model/ClassState.kt index 135efab..1148c85 100644 --- a/src/main/kotlin/me/alekseinovikov/akaes/model/ClassState.kt +++ b/src/main/kotlin/me/alekseinovikov/akaes/model/ClassState.kt @@ -1,6 +1,8 @@ package me.alekseinovikov.akaes.model +import me.alekseinovikov.akaes.annotations.CborSerializable + data class ClassState( val name: String, val students: MutableSet = mutableSetOf() -) \ No newline at end of file +): CborSerializable \ No newline at end of file diff --git a/src/main/kotlin/me/alekseinovikov/akaes/model/Commands.kt b/src/main/kotlin/me/alekseinovikov/akaes/model/Commands.kt new file mode 100644 index 0000000..3c4fab4 --- /dev/null +++ b/src/main/kotlin/me/alekseinovikov/akaes/model/Commands.kt @@ -0,0 +1,12 @@ +package me.alekseinovikov.akaes.model + +import akka.actor.typed.ActorRef +import me.alekseinovikov.akaes.annotations.CborSerializable + +interface ClassCommand: CborSerializable + +abstract class StudentClassCommand(val studentName: String) : ClassCommand + +class AddStudentCommand(studentName: String) : StudentClassCommand(studentName) +class DeleteStudentCommand(studentName: String) : StudentClassCommand(studentName) +class GetAllStudentsCommand(val replyTo: ActorRef): ClassCommand \ No newline at end of file diff --git a/src/main/kotlin/me/alekseinovikov/akaes/model/Events.kt b/src/main/kotlin/me/alekseinovikov/akaes/model/Events.kt new file mode 100644 index 0000000..bfa92ca --- /dev/null +++ b/src/main/kotlin/me/alekseinovikov/akaes/model/Events.kt @@ -0,0 +1,19 @@ +package me.alekseinovikov.akaes.model + +import me.alekseinovikov.akaes.annotations.CborSerializable + +interface ClassEvent : CborSerializable { + fun applyTo(classState: ClassState): ClassState +} + +data class AddStudentEvent(val studentName: String) : ClassEvent { + override fun applyTo(classState: ClassState) = classState.apply { + classState.students.add(studentName) + } +} + +data class DeleteStudentEvent(val studentName: String) : ClassEvent { + override fun applyTo(classState: ClassState) = classState.apply { + classState.students.remove(studentName) + } +} \ No newline at end of file diff --git a/src/main/kotlin/me/alekseinovikov/akaes/props/EventSourcingProperties.kt b/src/main/kotlin/me/alekseinovikov/akaes/props/EventSourcingProperties.kt new file mode 100644 index 0000000..a6e8c79 --- /dev/null +++ b/src/main/kotlin/me/alekseinovikov/akaes/props/EventSourcingProperties.kt @@ -0,0 +1,15 @@ +package me.alekseinovikov.akaes.props + +import org.springframework.boot.context.properties.ConfigurationProperties +import org.springframework.stereotype.Component + +@Component +@ConfigurationProperties(prefix = "akka.event-sourcing", ignoreUnknownFields = true) +data class EventSourcingProperties( + var restartBackOffMinSeconds: Long = 10, + var restartBackOffMaxSeconds:Long = 30, + var restartBackOffRandomFactor: Double = 0.2, + var numberOfEvents: Int = 10, + var keepSnapshots: Int = 2, + var askTimeoutSeconds: Long = 120 +) \ No newline at end of file diff --git a/src/main/kotlin/me/alekseinovikov/akaes/service/ClassServiceImpl.kt b/src/main/kotlin/me/alekseinovikov/akaes/service/ClassServiceImpl.kt index e932d13..7c8b650 100644 --- a/src/main/kotlin/me/alekseinovikov/akaes/service/ClassServiceImpl.kt +++ b/src/main/kotlin/me/alekseinovikov/akaes/service/ClassServiceImpl.kt @@ -1,35 +1,68 @@ package me.alekseinovikov.akaes.service +import akka.actor.typed.ActorSystem +import akka.cluster.sharding.typed.javadsl.ClusterSharding +import akka.cluster.sharding.typed.javadsl.Entity +import akka.management.cluster.bootstrap.ClusterBootstrap +import akka.management.javadsl.AkkaManagement +import akka.persistence.jdbc.testkit.javadsl.SchemaUtils +import akka.persistence.typed.PersistenceId +import me.alekseinovikov.akaes.actor.ClassActor import me.alekseinovikov.akaes.message.StudentActionMessage import me.alekseinovikov.akaes.message.StudentActionType +import me.alekseinovikov.akaes.model.AddStudentCommand import me.alekseinovikov.akaes.model.ClassState +import me.alekseinovikov.akaes.model.DeleteStudentCommand +import me.alekseinovikov.akaes.model.GetAllStudentsCommand +import me.alekseinovikov.akaes.props.EventSourcingProperties +import org.springframework.core.env.Environment import org.springframework.stereotype.Component -import java.util.concurrent.ConcurrentHashMap +import java.time.Duration +import java.util.concurrent.CompletionStage +import javax.annotation.PostConstruct @Component -class ClassServiceImpl : ClassService { +class ClassServiceImpl( + private val actorSystem: ActorSystem, + private val props: EventSourcingProperties, + private val env: Environment +) : ClassService { - private val classMap: MutableMap = ConcurrentHashMap() + private val sharding = ClusterSharding.get(actorSystem) + private val classes = mutableSetOf() + private val askDuration = Duration.ofSeconds(props.askTimeoutSeconds) - - override fun applyStudentAction(action: StudentActionMessage) { - classMap.computeIfPresent(action.className) { _, oldState -> - when (action.actionType) { - StudentActionType.ADD -> oldState.students.add(action.studentName) - StudentActionType.DELETE -> oldState.students.remove(action.studentName) - } - - return@computeIfPresent oldState + @PostConstruct + fun init() { + if (env.activeProfiles.contains("local").not()) { + AkkaManagement.get(actorSystem).start() //Enable management of cluster + ClusterBootstrap.get(actorSystem).start() //And nodes auto discovery via kube service } - classMap.computeIfAbsent(action.className) { - when (action.actionType) { - StudentActionType.ADD -> ClassState(action.className).also { it.students.add(action.studentName) } - StudentActionType.DELETE -> null - } - } + SchemaUtils.createIfNotExists(actorSystem) + .toCompletableFuture() + .get() //Wait for DB schema creation + + sharding.init(Entity.of(ClassActor.ENTITY_TYPE_KEY) { context -> + ClassActor.createBehaviour( + context.entityId, + PersistenceId.of(context.entityTypeKey.name(), context.entityId), + props + ) + }) } - override fun getCurrentStates(): List = this.classMap.values.filterNotNull().toList() + + override fun applyStudentAction(action: StudentActionMessage) = when (action.actionType) { + StudentActionType.ADD -> action.className.entityRef().tell(AddStudentCommand(action.studentName)) + StudentActionType.DELETE -> action.className.entityRef().tell(DeleteStudentCommand(action.studentName)) + }.also { classes.add(action.className) } + + override fun getCurrentStates(): List = classes + .map> { className -> className.entityRef().ask({ replyTo -> GetAllStudentsCommand(replyTo) }, askDuration) } + .map { it.toCompletableFuture() } + .map { it.get() } + + private fun String.entityRef() = sharding.entityRefFor(ClassActor.ENTITY_TYPE_KEY, this) } \ No newline at end of file diff --git a/src/main/resources/akka_kube.cfg b/src/main/resources/akka_kube.cfg new file mode 100644 index 0000000..e6d92ed --- /dev/null +++ b/src/main/resources/akka_kube.cfg @@ -0,0 +1,86 @@ +akka { + extensions = ["akka.management.cluster.bootstrap.ClusterBootstrap"] + actor { + provider = "cluster" + serialization-bindings { + "me.alekseinovikov.akaes.annotations.CborSerializable" = jackson-json + } + } + + cluster { + downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" + sharding { + remember-entities = on + remember-entities-store = eventsourced + journal-plugin-id = "jdbc-journal" + snapshot-plugin-id = "jdbc-snapshot-store" + } + } + + persistence { + journal { + plugin = "jdbc-journal" + auto-start-journals = ["jdbc-journal"] + } + snapshot-store { + plugin = "jdbc-snapshot-store" + auto-start-snapshot-stores = ["jdbc-snapshot-store"] + } + max-concurrent-recoveries = 50 + } + + management { + cluster.bootstrap { + contact-point-discovery { + port-name = "management" + protocol = "tcp" + service-name = "akaes-service" + discovery-method = akka-dns + } + } + + http { + port = 8558 + bind-hostname = "0.0.0.0" + } + } +} + +akka-persistence-jdbc { + shared-databases { + slick { + profile = "slick.jdbc.PostgresProfile$" + db { + host = localhost + host = ${?DB_HOST} + port = 5432 + port = ${?DB_PORT} + db_name = class + db_name = ${?DB_NAME} + url = "jdbc:postgresql://"${akka-persistence-jdbc.shared-databases.slick.db.host}":"${akka-persistence-jdbc.shared-databases.slick.db.port}"/"${akka-persistence-jdbc.shared-databases.slick.db.db_name}"" + user = "postgres" + user = ${?DB_USER} + password = "postgres" + password = ${?DB_PASSWORD} + driver = "com.mysql.cj.jdbc.Driver" + numThreads = 40 + maxConnections = 40 + minConnections = 1 + } + } + } +} + +jdbc-journal { + use-shared-db = "slick" +} + +# the akka-persistence-snapshot-store in use +jdbc-snapshot-store { + use-shared-db = "slick" +} + +# the akka-persistence-query provider in use +jdbc-read-journal { + use-shared-db = "slick" +} diff --git a/src/main/resources/akka_local.cfg b/src/main/resources/akka_local.cfg new file mode 100644 index 0000000..99e8308 --- /dev/null +++ b/src/main/resources/akka_local.cfg @@ -0,0 +1,67 @@ +akka { + actor { + provider = "cluster" + serialization-bindings { + "me.alekseinovikov.akaes.annotations.CborSerializable" = jackson-json + } + } + remote.artery { + canonical { + hostname = "127.0.0.1" + port = 2552 + } + } + + cluster { + seed-nodes = ["akka://ClassActorSystem@127.0.0.1:2552"] + downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" + roles = ["Master"] + sharding { + role = "Master" + } + } + + persistence { + journal { + plugin = "jdbc-journal" + auto-start-journals = ["jdbc-journal"] + } + snapshot-store { + plugin = "jdbc-snapshot-store" + auto-start-snapshot-stores = ["jdbc-snapshot-store"] + } + max-concurrent-recoveries = 50 + } +} + +akka-persistence-jdbc { + shared-databases { + slick { + profile = "slick.jdbc.PostgresProfile$" + db { + host = localhost + url = "jdbc:postgresql://"${akka-persistence-jdbc.shared-databases.slick.db.host}":5432/class" + user = "root" + password = "123456" + driver = "org.postgresql.Driver" + numThreads = 40 + maxConnections = 40 + minConnections = 1 + } + } + } +} + +jdbc-journal { + use-shared-db = "slick" +} + +# the akka-persistence-snapshot-store in use +jdbc-snapshot-store { + use-shared-db = "slick" +} + +# the akka-persistence-query provider in use +jdbc-read-journal { + use-shared-db = "slick" +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index d33e8ae..5bb6683 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -2,3 +2,5 @@ spring.rabbitmq.listener.simple.concurrency=100 spring.rabbitmq.listener.simple.prefetch=1 spring.rabbitmq.host=localhost + +akka.cluster.system.name=ClassActorSystem From e4595ada56f3f58ac6f400d0afcc4aa5b99fc0e5 Mon Sep 17 00:00:00 2001 From: Aleksei Novikov Date: Wed, 29 Sep 2021 15:45:34 +0300 Subject: [PATCH 2/3] Added coroutines support for get method --- .../me/alekseinovikov/akaes/service/ClassServiceImpl.kt | 6 ++++-- .../kotlin/me/alekseinovikov/akaes/service/Interfaces.kt | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/main/kotlin/me/alekseinovikov/akaes/service/ClassServiceImpl.kt b/src/main/kotlin/me/alekseinovikov/akaes/service/ClassServiceImpl.kt index 7c8b650..db64783 100644 --- a/src/main/kotlin/me/alekseinovikov/akaes/service/ClassServiceImpl.kt +++ b/src/main/kotlin/me/alekseinovikov/akaes/service/ClassServiceImpl.kt @@ -7,6 +7,7 @@ import akka.management.cluster.bootstrap.ClusterBootstrap import akka.management.javadsl.AkkaManagement import akka.persistence.jdbc.testkit.javadsl.SchemaUtils import akka.persistence.typed.PersistenceId +import kotlinx.coroutines.reactor.awaitSingle import me.alekseinovikov.akaes.actor.ClassActor import me.alekseinovikov.akaes.message.StudentActionMessage import me.alekseinovikov.akaes.message.StudentActionType @@ -17,6 +18,7 @@ import me.alekseinovikov.akaes.model.GetAllStudentsCommand import me.alekseinovikov.akaes.props.EventSourcingProperties import org.springframework.core.env.Environment import org.springframework.stereotype.Component +import reactor.kotlin.core.publisher.toMono import java.time.Duration import java.util.concurrent.CompletionStage import javax.annotation.PostConstruct @@ -58,10 +60,10 @@ class ClassServiceImpl( StudentActionType.DELETE -> action.className.entityRef().tell(DeleteStudentCommand(action.studentName)) }.also { classes.add(action.className) } - override fun getCurrentStates(): List = classes + override suspend fun getCurrentStates(): List = classes .map> { className -> className.entityRef().ask({ replyTo -> GetAllStudentsCommand(replyTo) }, askDuration) } .map { it.toCompletableFuture() } - .map { it.get() } + .map { it.toMono().awaitSingle() } private fun String.entityRef() = sharding.entityRefFor(ClassActor.ENTITY_TYPE_KEY, this) diff --git a/src/main/kotlin/me/alekseinovikov/akaes/service/Interfaces.kt b/src/main/kotlin/me/alekseinovikov/akaes/service/Interfaces.kt index 0d0bfa7..e8579ad 100644 --- a/src/main/kotlin/me/alekseinovikov/akaes/service/Interfaces.kt +++ b/src/main/kotlin/me/alekseinovikov/akaes/service/Interfaces.kt @@ -5,5 +5,5 @@ import me.alekseinovikov.akaes.model.ClassState interface ClassService { fun applyStudentAction(action: StudentActionMessage) - fun getCurrentStates(): List + suspend fun getCurrentStates(): List } \ No newline at end of file From 28c562178fb394cafea3f87da858012adf78c99e Mon Sep 17 00:00:00 2001 From: alekseinovikov Date: Wed, 29 Sep 2021 21:32:43 +0300 Subject: [PATCH 3/3] Moved configuration of Akka Cluster to config file --- .../alekseinovikov/akaes/config/AkkaConfig.kt | 35 +++++++++++++++++ .../akaes/service/ClassServiceImpl.kt | 39 +++---------------- src/main/resources/akka_kube.cfg | 2 +- src/main/resources/application.properties | 2 +- 4 files changed, 42 insertions(+), 36 deletions(-) diff --git a/src/main/kotlin/me/alekseinovikov/akaes/config/AkkaConfig.kt b/src/main/kotlin/me/alekseinovikov/akaes/config/AkkaConfig.kt index dc3e2bb..4aee7bc 100644 --- a/src/main/kotlin/me/alekseinovikov/akaes/config/AkkaConfig.kt +++ b/src/main/kotlin/me/alekseinovikov/akaes/config/AkkaConfig.kt @@ -2,8 +2,16 @@ package me.alekseinovikov.akaes.config import akka.actor.typed.ActorSystem import akka.actor.typed.javadsl.Behaviors +import akka.cluster.sharding.typed.javadsl.ClusterSharding +import akka.cluster.sharding.typed.javadsl.Entity +import akka.management.cluster.bootstrap.ClusterBootstrap +import akka.management.javadsl.AkkaManagement +import akka.persistence.jdbc.testkit.javadsl.SchemaUtils +import akka.persistence.typed.PersistenceId import com.typesafe.config.Config import com.typesafe.config.ConfigFactory +import me.alekseinovikov.akaes.actor.ClassActor +import me.alekseinovikov.akaes.props.EventSourcingProperties import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration @@ -26,4 +34,31 @@ class AkkaConfig { @Value("\${akka.cluster.system.name}") systemName: String ): ActorSystem = ActorSystem.create(Behaviors.empty(), systemName, config) + @Bean + fun clusterSharding( + actorSystem: ActorSystem, + env: Environment, + props: EventSourcingProperties + ): ClusterSharding { + val sharding = ClusterSharding.get(actorSystem) + if (env.activeProfiles.contains("local").not()) { + AkkaManagement.get(actorSystem).start() //Enable management of cluster + ClusterBootstrap.get(actorSystem).start() //And nodes auto discovery via kube service + } + + SchemaUtils.createIfNotExists(actorSystem) + .toCompletableFuture() + .get() //Wait for DB schema creation + + sharding.init(Entity.of(ClassActor.ENTITY_TYPE_KEY) { context -> + ClassActor.createBehaviour( + context.entityId, + PersistenceId.of(context.entityTypeKey.name(), context.entityId), + props + ) + }) + + return sharding + } + } \ No newline at end of file diff --git a/src/main/kotlin/me/alekseinovikov/akaes/service/ClassServiceImpl.kt b/src/main/kotlin/me/alekseinovikov/akaes/service/ClassServiceImpl.kt index db64783..5777d3c 100644 --- a/src/main/kotlin/me/alekseinovikov/akaes/service/ClassServiceImpl.kt +++ b/src/main/kotlin/me/alekseinovikov/akaes/service/ClassServiceImpl.kt @@ -1,12 +1,6 @@ package me.alekseinovikov.akaes.service -import akka.actor.typed.ActorSystem import akka.cluster.sharding.typed.javadsl.ClusterSharding -import akka.cluster.sharding.typed.javadsl.Entity -import akka.management.cluster.bootstrap.ClusterBootstrap -import akka.management.javadsl.AkkaManagement -import akka.persistence.jdbc.testkit.javadsl.SchemaUtils -import akka.persistence.typed.PersistenceId import kotlinx.coroutines.reactor.awaitSingle import me.alekseinovikov.akaes.actor.ClassActor import me.alekseinovikov.akaes.message.StudentActionMessage @@ -16,52 +10,29 @@ import me.alekseinovikov.akaes.model.ClassState import me.alekseinovikov.akaes.model.DeleteStudentCommand import me.alekseinovikov.akaes.model.GetAllStudentsCommand import me.alekseinovikov.akaes.props.EventSourcingProperties -import org.springframework.core.env.Environment import org.springframework.stereotype.Component import reactor.kotlin.core.publisher.toMono import java.time.Duration import java.util.concurrent.CompletionStage -import javax.annotation.PostConstruct @Component class ClassServiceImpl( - private val actorSystem: ActorSystem, - private val props: EventSourcingProperties, - private val env: Environment + props: EventSourcingProperties, + private val sharding: ClusterSharding ) : ClassService { - private val sharding = ClusterSharding.get(actorSystem) private val classes = mutableSetOf() private val askDuration = Duration.ofSeconds(props.askTimeoutSeconds) - @PostConstruct - fun init() { - if (env.activeProfiles.contains("local").not()) { - AkkaManagement.get(actorSystem).start() //Enable management of cluster - ClusterBootstrap.get(actorSystem).start() //And nodes auto discovery via kube service - } - - SchemaUtils.createIfNotExists(actorSystem) - .toCompletableFuture() - .get() //Wait for DB schema creation - - sharding.init(Entity.of(ClassActor.ENTITY_TYPE_KEY) { context -> - ClassActor.createBehaviour( - context.entityId, - PersistenceId.of(context.entityTypeKey.name(), context.entityId), - props - ) - }) - } - - override fun applyStudentAction(action: StudentActionMessage) = when (action.actionType) { StudentActionType.ADD -> action.className.entityRef().tell(AddStudentCommand(action.studentName)) StudentActionType.DELETE -> action.className.entityRef().tell(DeleteStudentCommand(action.studentName)) }.also { classes.add(action.className) } override suspend fun getCurrentStates(): List = classes - .map> { className -> className.entityRef().ask({ replyTo -> GetAllStudentsCommand(replyTo) }, askDuration) } + .map> { className -> + className.entityRef().ask({ replyTo -> GetAllStudentsCommand(replyTo) }, askDuration) + } .map { it.toCompletableFuture() } .map { it.toMono().awaitSingle() } diff --git a/src/main/resources/akka_kube.cfg b/src/main/resources/akka_kube.cfg index e6d92ed..ae0b4a8 100644 --- a/src/main/resources/akka_kube.cfg +++ b/src/main/resources/akka_kube.cfg @@ -62,7 +62,7 @@ akka-persistence-jdbc { user = ${?DB_USER} password = "postgres" password = ${?DB_PASSWORD} - driver = "com.mysql.cj.jdbc.Driver" + driver = "org.postgresql.Driver" numThreads = 40 maxConnections = 40 minConnections = 1 diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 5bb6683..5da8ff7 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,4 +1,4 @@ -spring.rabbitmq.listener.simple.concurrency=100 +spring.rabbitmq.listener.simple.concurrency=1 spring.rabbitmq.listener.simple.prefetch=1 spring.rabbitmq.host=localhost