Skip to content

Commit

Permalink
Merge pull request #1 from alekseinovikov/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
alekseinovikov authored Sep 29, 2021
2 parents 3da4d60 + 28c5621 commit afc7229
Show file tree
Hide file tree
Showing 15 changed files with 404 additions and 28 deletions.
19 changes: 19 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ plugins {
id("com.google.cloud.tools.jib") version "3.1.4"
}

val scalaBinaryVersion = "2.13"
val akkaVersion = "2.6.16"
val slickVersion = "3.3.3"
val akkaManagementVersion = "1.1.1"
val postgresDriverVersion = "42.2.24"


group = "me.alekseinovikov.akaes"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_11
Expand All @@ -17,6 +24,18 @@ repositories {
}

dependencies {
implementation(platform("com.typesafe.akka:akka-bom_${scalaBinaryVersion}:${akkaVersion}"))
implementation("org.postgresql:postgresql:${postgresDriverVersion}")
implementation("com.typesafe.akka:akka-cluster-sharding-typed_${scalaBinaryVersion}")
implementation("com.typesafe.akka:akka-persistence-typed_${scalaBinaryVersion}")
implementation("com.typesafe.akka:akka-serialization-jackson_${scalaBinaryVersion}")
implementation("com.lightbend.akka:akka-persistence-jdbc_${scalaBinaryVersion}:5.0.4")
implementation("com.typesafe.akka:akka-persistence-query_${scalaBinaryVersion}:${akkaVersion}")
implementation("com.typesafe.slick:slick_${scalaBinaryVersion}:${slickVersion}")
implementation("com.typesafe.slick:slick-hikaricp_${scalaBinaryVersion}:${slickVersion}")
implementation("com.lightbend.akka.discovery:akka-discovery-kubernetes-api_${scalaBinaryVersion}:${akkaManagementVersion}")
implementation("com.lightbend.akka.management:akka-management-cluster-bootstrap_${scalaBinaryVersion}:${akkaManagementVersion}")

implementation("org.springframework.boot:spring-boot-starter-webflux")
implementation("org.springframework.boot:spring-boot-starter-amqp")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
Expand Down
15 changes: 14 additions & 1 deletion k8s/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ spec:
- containerPort: 8080
protocol: TCP
name: web
- containerPort: 8558
name: management
- containerPort: 2552
name: remoting
---

apiVersion: v1
Expand All @@ -37,7 +41,16 @@ metadata:
name: "akaes-service"
spec:
ports:
- port: 8080
- name: web
port: 8080
protocol: TCP
- name: management
port: 8558
targetPort: 8558
protocol: TCP
- name: remoting
port: 2552
targetPort: 2552
protocol: TCP
selector:
app: akaes
Expand Down
66 changes: 66 additions & 0 deletions src/main/kotlin/me/alekseinovikov/akaes/actor/ClassActor.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package me.alekseinovikov.akaes.actor

import akka.actor.typed.Behavior
import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.javadsl.ActorContext
import akka.actor.typed.javadsl.Behaviors
import akka.cluster.sharding.typed.javadsl.EntityTypeKey
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.javadsl.*
import me.alekseinovikov.akaes.model.*
import me.alekseinovikov.akaes.props.EventSourcingProperties
import java.time.Duration

class ClassActor(
private val className: String,
private val context: ActorContext<ClassCommand>,
private val props: EventSourcingProperties,
persistenceId: PersistenceId?,
) :
EventSourcedBehavior<ClassCommand, ClassEvent, ClassState>(
persistenceId, SupervisorStrategy.restartWithBackoff(
Duration.ofSeconds(props.restartBackOffMinSeconds),
Duration.ofSeconds(props.restartBackOffMaxSeconds),
props.restartBackOffRandomFactor
)
) {

companion object {
var ENTITY_TYPE_KEY: EntityTypeKey<ClassCommand> = EntityTypeKey.create(ClassCommand::class.java, "Class")
fun createBehaviour(
className: String,
persistenceId: PersistenceId,
props: EventSourcingProperties
): Behavior<ClassCommand> = Behaviors.setup { context -> ClassActor(className, context, props, persistenceId) }
}

override fun retentionCriteria(): SnapshotCountRetentionCriteria = RetentionCriteria
.snapshotEvery(props.numberOfEvents, props.keepSnapshots)
.withDeleteEventsOnSnapshot()

override fun emptyState() = ClassState(className)
override fun eventHandler(): EventHandler<ClassState, ClassEvent> = EventHandler { state, event -> event.applyTo(state) }

override fun commandHandler(): CommandHandler<ClassCommand, ClassEvent, ClassState> =
newCommandHandlerBuilder()
.forAnyState()
.onCommand(AddStudentCommand::class.java, this::addStudent)
.onCommand(DeleteStudentCommand::class.java, this::deleteStudent)
.onCommand(GetAllStudentsCommand::class.java, this::getState)
.build()

private fun addStudent(command: AddStudentCommand): Effect<ClassEvent, ClassState> = AddStudentEvent(command.studentName).let {
Effect()
.persist(it)
}

private fun deleteStudent(command: DeleteStudentCommand): Effect<ClassEvent, ClassState> = DeleteStudentEvent(command.studentName).let {
Effect()
.persist(it)
}

private fun getState(command: GetAllStudentsCommand): Effect<ClassEvent, ClassState> = Effect()
.none()
.thenRun<ClassState> { newState -> command.replyTo.tell(newState) }

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package me.alekseinovikov.akaes.annotations

interface CborSerializable
64 changes: 64 additions & 0 deletions src/main/kotlin/me/alekseinovikov/akaes/config/AkkaConfig.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package me.alekseinovikov.akaes.config

import akka.actor.typed.ActorSystem
import akka.actor.typed.javadsl.Behaviors
import akka.cluster.sharding.typed.javadsl.ClusterSharding
import akka.cluster.sharding.typed.javadsl.Entity
import akka.management.cluster.bootstrap.ClusterBootstrap
import akka.management.javadsl.AkkaManagement
import akka.persistence.jdbc.testkit.javadsl.SchemaUtils
import akka.persistence.typed.PersistenceId
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import me.alekseinovikov.akaes.actor.ClassActor
import me.alekseinovikov.akaes.props.EventSourcingProperties
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.core.env.Environment

@Configuration
class AkkaConfig {

@Bean
fun akkaConfiguration(env: Environment): Config {
val filePostfix = if (env.activeProfiles.contains("local")) "local" else "kube"
return ConfigFactory
.parseResources("akka_$filePostfix.cfg")
.resolve();
}

@Bean
fun actorSystem(
config: Config,
@Value("\${akka.cluster.system.name}") systemName: String
): ActorSystem<Any> = ActorSystem.create(Behaviors.empty(), systemName, config)

@Bean
fun clusterSharding(
actorSystem: ActorSystem<Any>,
env: Environment,
props: EventSourcingProperties
): ClusterSharding {
val sharding = ClusterSharding.get(actorSystem)
if (env.activeProfiles.contains("local").not()) {
AkkaManagement.get(actorSystem).start() //Enable management of cluster
ClusterBootstrap.get(actorSystem).start() //And nodes auto discovery via kube service
}

SchemaUtils.createIfNotExists(actorSystem)
.toCompletableFuture()
.get() //Wait for DB schema creation

sharding.init(Entity.of(ClassActor.ENTITY_TYPE_KEY) { context ->
ClassActor.createBehaviour(
context.entityId,
PersistenceId.of(context.entityTypeKey.name(), context.entityId),
props
)
})

return sharding
}

}
4 changes: 3 additions & 1 deletion src/main/kotlin/me/alekseinovikov/akaes/message/Messages.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package me.alekseinovikov.akaes.message

import me.alekseinovikov.akaes.annotations.CborSerializable

enum class StudentActionType {
ADD,
DELETE
Expand All @@ -9,4 +11,4 @@ data class StudentActionMessage(
val className: String,
val studentName: String,
val actionType: StudentActionType
)
) : CborSerializable
4 changes: 3 additions & 1 deletion src/main/kotlin/me/alekseinovikov/akaes/model/ClassState.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package me.alekseinovikov.akaes.model

import me.alekseinovikov.akaes.annotations.CborSerializable

data class ClassState(
val name: String,
val students: MutableSet<String> = mutableSetOf()
)
): CborSerializable
12 changes: 12 additions & 0 deletions src/main/kotlin/me/alekseinovikov/akaes/model/Commands.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package me.alekseinovikov.akaes.model

import akka.actor.typed.ActorRef
import me.alekseinovikov.akaes.annotations.CborSerializable

interface ClassCommand: CborSerializable

abstract class StudentClassCommand(val studentName: String) : ClassCommand

class AddStudentCommand(studentName: String) : StudentClassCommand(studentName)
class DeleteStudentCommand(studentName: String) : StudentClassCommand(studentName)
class GetAllStudentsCommand(val replyTo: ActorRef<ClassState>): ClassCommand
19 changes: 19 additions & 0 deletions src/main/kotlin/me/alekseinovikov/akaes/model/Events.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package me.alekseinovikov.akaes.model

import me.alekseinovikov.akaes.annotations.CborSerializable

interface ClassEvent : CborSerializable {
fun applyTo(classState: ClassState): ClassState
}

data class AddStudentEvent(val studentName: String) : ClassEvent {
override fun applyTo(classState: ClassState) = classState.apply {
classState.students.add(studentName)
}
}

data class DeleteStudentEvent(val studentName: String) : ClassEvent {
override fun applyTo(classState: ClassState) = classState.apply {
classState.students.remove(studentName)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package me.alekseinovikov.akaes.props

import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.stereotype.Component

@Component
@ConfigurationProperties(prefix = "akka.event-sourcing", ignoreUnknownFields = true)
data class EventSourcingProperties(
var restartBackOffMinSeconds: Long = 10,
var restartBackOffMaxSeconds:Long = 30,
var restartBackOffRandomFactor: Double = 0.2,
var numberOfEvents: Int = 10,
var keepSnapshots: Int = 2,
var askTimeoutSeconds: Long = 120
)
52 changes: 29 additions & 23 deletions src/main/kotlin/me/alekseinovikov/akaes/service/ClassServiceImpl.kt
Original file line number Diff line number Diff line change
@@ -1,35 +1,41 @@
package me.alekseinovikov.akaes.service

import akka.cluster.sharding.typed.javadsl.ClusterSharding
import kotlinx.coroutines.reactor.awaitSingle
import me.alekseinovikov.akaes.actor.ClassActor
import me.alekseinovikov.akaes.message.StudentActionMessage
import me.alekseinovikov.akaes.message.StudentActionType
import me.alekseinovikov.akaes.model.AddStudentCommand
import me.alekseinovikov.akaes.model.ClassState
import me.alekseinovikov.akaes.model.DeleteStudentCommand
import me.alekseinovikov.akaes.model.GetAllStudentsCommand
import me.alekseinovikov.akaes.props.EventSourcingProperties
import org.springframework.stereotype.Component
import java.util.concurrent.ConcurrentHashMap
import reactor.kotlin.core.publisher.toMono
import java.time.Duration
import java.util.concurrent.CompletionStage

@Component
class ClassServiceImpl : ClassService {

private val classMap: MutableMap<String, ClassState?> = ConcurrentHashMap()


override fun applyStudentAction(action: StudentActionMessage) {
classMap.computeIfPresent(action.className) { _, oldState ->
when (action.actionType) {
StudentActionType.ADD -> oldState.students.add(action.studentName)
StudentActionType.DELETE -> oldState.students.remove(action.studentName)
}

return@computeIfPresent oldState
}

classMap.computeIfAbsent(action.className) {
when (action.actionType) {
StudentActionType.ADD -> ClassState(action.className).also { it.students.add(action.studentName) }
StudentActionType.DELETE -> null
}
class ClassServiceImpl(
props: EventSourcingProperties,
private val sharding: ClusterSharding
) : ClassService {

private val classes = mutableSetOf<String>()
private val askDuration = Duration.ofSeconds(props.askTimeoutSeconds)

override fun applyStudentAction(action: StudentActionMessage) = when (action.actionType) {
StudentActionType.ADD -> action.className.entityRef().tell(AddStudentCommand(action.studentName))
StudentActionType.DELETE -> action.className.entityRef().tell(DeleteStudentCommand(action.studentName))
}.also { classes.add(action.className) }

override suspend fun getCurrentStates(): List<ClassState> = classes
.map<String, CompletionStage<ClassState>> { className ->
className.entityRef().ask({ replyTo -> GetAllStudentsCommand(replyTo) }, askDuration)
}
}
.map { it.toCompletableFuture() }
.map { it.toMono().awaitSingle() }

override fun getCurrentStates(): List<ClassState> = this.classMap.values.filterNotNull().toList()
private fun String.entityRef() = sharding.entityRefFor(ClassActor.ENTITY_TYPE_KEY, this)

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ import me.alekseinovikov.akaes.model.ClassState

interface ClassService {
fun applyStudentAction(action: StudentActionMessage)
fun getCurrentStates(): List<ClassState>
suspend fun getCurrentStates(): List<ClassState>
}
Loading

0 comments on commit afc7229

Please sign in to comment.