akka PersistentActor(event sourcing)通过mysql持久化数据

    xiaoxiao2021-07-24  281

    akka PersistentActor(event sourcing)通过mysql持久化数据

    一:新建mysql数据库表

    create database if not exists akka_persistence_sql_async; CREATE TABLE IF NOT EXISTS persistence_metadata ( persistence_key BIGINT NOT NULL AUTO_INCREMENT, persistence_id VARCHAR(255) NOT NULL, sequence_nr BIGINT NOT NULL, PRIMARY KEY (persistence_key), UNIQUE (persistence_id) ) ENGINE = InnoDB; CREATE TABLE IF NOT EXISTS persistence_journal ( persistence_key BIGINT NOT NULL, sequence_nr BIGINT NOT NULL, message LONGBLOB NOT NULL, PRIMARY KEY (persistence_key, sequence_nr), FOREIGN KEY (persistence_key) REFERENCES persistence_metadata (persistence_key) ) ENGINE = InnoDB; CREATE TABLE IF NOT EXISTS persistence_snapshot ( persistence_key BIGINT NOT NULL, sequence_nr BIGINT NOT NULL, created_at BIGINT NOT NULL, snapshot LONGBLOB NOT NULL, PRIMARY KEY (persistence_key, sequence_nr), FOREIGN KEY (persistence_key) REFERENCES persistence_metadata (persistence_key) ) ENGINE = InnoDB;

    二:新建一个sbt项目PersistTest

    在build.sbt 中添加依赖

    name := "PersistTest" version := "1.0" scalaVersion := "2.12.6" lazy val akkaVersion = "2.6.0-M1" lazy val sparkVersion="2.4.3" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-actor" % akkaVersion, "com.typesafe.akka" %% "akka-testkit" % akkaVersion, "org.scalatest" %% "scalatest" % "3.0.5" % "test", "com.typesafe.akka" %% "akka-persistence" % "2.5.6", "com.okumin" %% "akka-persistence-sql-async" % "0.5.1", "com.github.mauricio" %% "mysql-async" % "0.2.21", // "com.github.mauricio" %% "postgresql-async" % "0.2.20", // for postgres, but this example is for mysql, so not needed "org.apache.spark" %% "spark-sql" % sparkVersion )

    三、配置数据库

    新建文件src\main\resources\application.conf

    在application.conf文件中添加配置信息,注意将password修改为mysql数据库对应的密码

    akka { persistence { journal.plugin = "akka-persistence-sql-async.journal" snapshot-store.plugin = "akka-persistence-sql-async.snapshot-store" } } akka-persistence-sql-async { journal.class = "akka.persistence.journal.sqlasync.MySQLAsyncWriteJournal" snapshot-store.class = "akka.persistence.snapshot.sqlasync.MySQLSnapshotStore" # For PostgreSQL # journal.class = "akka.persistence.journal.sqlasync.PostgreSQLAsyncWriteJournal" # snapshot-store.class = "akka.persistence.snapshot.sqlasync.PostgreSQLSnapshotStore" user = "root" password = "123456" url = "jdbc:mysql://localhost/akka_persistence_sql_async" max-pool-size = 4 wait-queue-capacity = 10000 metadata-table-name = "persistence_metadata" journal-table-name = "persistence_journal" snapshot-table-name = "persistence_snapshot" connect-timeout = 5s query-timeout = 5s }

    四:编写代码

    package com.example import java.util import akka.actor.{ActorSystem, Props} import akka.persistence.{PersistentActor, SnapshotOffer} import com.example._ sealed trait Command case class Add(str:String) extends Command case class Get() extends Command case class Clear() extends Command case class Save() extends Command case class Snapshot(Strs:util.ArrayList[String]) object Run extends App{ val actorSystem = ActorSystem("myhellosys") val myakka = actorSystem.actorOf(Props[Mypersistence],"myakaactor") myakka ! Clear() myakka ! Get() for(i <- 1 to 10){ myakka ! Add(i.toString()) if( i==5 ) myakka ! Save() //第五的时候,保存snapshot } myakka ! Get() } class Mypersistence extends PersistentActor{ val ID:Int = -1 var c=0 var Strs:util.ArrayList[String]=new util.ArrayList[String] override def persistenceId: String = s"this persistentId $ID" override def receiveRecover: Receive = { case Add(str) => { Strs.add(str) c+=1 } case SnapshotOffer(_,snapshot: Snapshot)=>{ Strs=snapshot.Strs println("snapshot") } } override def receiveCommand: Receive = { case Add(str) => persist(Add(str)){e=> Strs.add(str) } case Get() => { sender() ! Strs println("mystrs:"+Strs.toString) println("conut:"+c) } case Clear() =>{ persist(Clear())(e=>Strs.clear()) } case Save()=> saveSnapshot(Snapshot(Strs)) } }

    五:运行结果

    mystrs:[] conut:0 mystrs:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10] conut:0

    修改代码,注释部分代码

    object Run extends App{ val actorSystem = ActorSystem("myhellosys") val myakka = actorSystem.actorOf(Props[Mypersistence],"myakaactor") // myakka ! Clear() // myakka ! Get() // for(i <- 1 to 10){ // myakka ! Add(i.toString()) // if( i==5 ) myakka ! Save() //第五的时候,保存snapshot // } myakka ! Get() }

    再次运行,得到结果:

    snapshot mystrs:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10] conut:5

    可见,再次运行的时候先从snapshot载入,然后依次运行snapshot之后的动作


    最新回复(0)