pjfanning commented on code in PR #180: URL: https://github.com/apache/pekko-persistence-jdbc/pull/180#discussion_r1580672500
########## core/src/test/scala/org/apache/pekko/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala: ########## @@ -14,138 +14,126 @@ package org.apache.pekko.persistence.jdbc.query -import java.lang.management.ManagementFactory -import java.lang.management.MemoryMXBean -import java.util.UUID - -import org.apache.pekko -import pekko.actor.ActorSystem -import pekko.persistence.{ AtomicWrite, PersistentRepr } -import pekko.persistence.jdbc.journal.dao.legacy.{ ByteArrayJournalDao, JournalTables } -import pekko.serialization.SerializationExtension -import pekko.stream.scaladsl.{ Sink, Source } import com.typesafe.config.{ ConfigValue, ConfigValueFactory } +import org.apache.pekko import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.slf4j.LoggerFactory +import pekko.persistence.jdbc.query.JournalDaoStreamMessagesMemoryTest.fetchSize +import pekko.persistence.{ AtomicWrite, PersistentRepr } +import pekko.stream.scaladsl.{ Sink, Source } +import pekko.stream.testkit.scaladsl.TestSink +import pekko.stream.{ Materializer, SystemMaterializer } +import java.lang.management.{ ManagementFactory, MemoryMXBean } +import java.util.UUID import scala.collection.immutable import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import scala.util.{ Failure, Success } -import pekko.stream.testkit.scaladsl.TestSink -import org.scalatest.matchers.should.Matchers object JournalDaoStreamMessagesMemoryTest { - val configOverrides: Map[String, ConfigValue] = Map("jdbc-journal.fetch-size" -> ConfigValueFactory.fromAnyRef("100")) + val fetchSize: Int = 100 + val MB: Int = 1024 * 1024 - val MB = 1024 * 1024 + val configOverrides: Map[String, ConfigValue] = Map( + "jdbc-journal.fetch-size" -> ConfigValueFactory.fromAnyRef("100")) } abstract class JournalDaoStreamMessagesMemoryTest(configFile: String) - extends QueryTestSpec(configFile, JournalDaoStreamMessagesMemoryTest.configOverrides) - with JournalTables - with Matchers { + extends QueryTestSpec(configFile, JournalDaoStreamMessagesMemoryTest.configOverrides) { + import JournalDaoStreamMessagesMemoryTest.MB private val log = LoggerFactory.getLogger(this.getClass) - val journalSequenceActorConfig = readJournalConfig.journalSequenceRetrievalConfiguration - val journalTableCfg = journalConfig.journalTableConfiguration + val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean - implicit val askTimeout: FiniteDuration = 50.millis + it should "stream events" in withActorSystem { implicit system => + implicit val ec: ExecutionContext = system.dispatcher + implicit val mat: Materializer = SystemMaterializer(system).materializer - def generateId: Int = 0 + withDao { dao => + val persistenceId = UUID.randomUUID().toString - val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean + val writerUuid = UUID.randomUUID().toString + + val payloadSize = 5000 // 5000 bytes + val eventsPerBatch = 1000 + + val maxMem = 64 * MB - behavior.of("Replaying Persistence Actor") - - it should "stream events" in { - if (newDao) - pending - withActorSystem { implicit system: ActorSystem => - withDatabase { db => - implicit val ec: ExecutionContext = system.dispatcher - - val persistenceId = UUID.randomUUID().toString - val dao = new ByteArrayJournalDao(db, profile, journalConfig, SerializationExtension(system)) - - val payloadSize = 5000 // 5000 bytes - val eventsPerBatch = 1000 - - val maxMem = 64 * MB - - val numberOfInsertBatches = { - // calculate the number of batches using a factor to make sure we go a little bit over the limit - (maxMem / (payloadSize * eventsPerBatch) * 1.2).round.toInt - } - val totalMessages = numberOfInsertBatches * eventsPerBatch - val totalMessagePayload = totalMessages * payloadSize - log.info( - s"batches: $numberOfInsertBatches (with $eventsPerBatch events), total messages: $totalMessages, total msgs size: $totalMessagePayload") - - // payload can be the same when inserting to avoid unnecessary memory usage - val payload = Array.fill(payloadSize)('a'.toByte) - - val lastInsert = - Source - .fromIterator(() => (1 to numberOfInsertBatches).toIterator) - .mapAsync(1) { i => - val end = i * eventsPerBatch - val start = end - (eventsPerBatch - 1) - log.info(s"batch $i - events from $start to $end") - val atomicWrites = - (start to end).map { j => - AtomicWrite(immutable.Seq(PersistentRepr(payload, j, persistenceId))) - }.toSeq - - dao.asyncWriteMessages(atomicWrites).map(_ => i) - } - .runWith(Sink.last) - - // wait until we write all messages - // being very generous, 1 second per message - lastInsert.futureValue(Timeout(totalMessages.seconds)) - - log.info("Events written, starting replay") - - // sleep and gc to have some kind of stable measurement of current heap usage - Thread.sleep(1000) - System.gc() - Thread.sleep(1000) - val usedBefore = memoryMBean.getHeapMemoryUsage.getUsed - - val messagesSrc = - dao.messagesWithBatch(persistenceId, 0, totalMessages, batchSize = 100, None) - val probe = - messagesSrc - .map { - case Success((repr, _)) => - if (repr.sequenceNr % 100 == 0) - log.info(s"fetched: ${repr.persistenceId} - ${repr.sequenceNr}/$totalMessages") - case Failure(exception) => - log.error("Failure when reading messages.", exception) - } - .runWith(TestSink.probe) - - probe.request(10) - probe.within(20.seconds) { - probe.expectNextN(10) - } - - // sleep and gc to have some kind of stable measurement of current heap usage - Thread.sleep(2000) - System.gc() - Thread.sleep(1000) - val usedAfter = memoryMBean.getHeapMemoryUsage.getUsed - - log.info(s"Used heap before ${usedBefore / MB} MB, after ${usedAfter / MB} MB") - // actual usage is much less than 10 MB - (usedAfter - usedBefore) should be <= (10L * MB) - - probe.cancel() + val numberOfInsertBatches = { + // calculate the number of batches using a factor to make sure we go a little bit over the limit + (maxMem / (payloadSize * eventsPerBatch) * 1.2).round.toInt } + val totalMessages = numberOfInsertBatches * eventsPerBatch + val totalMessagePayload = totalMessages * payloadSize + log.info( + s"batches: $numberOfInsertBatches (with $eventsPerBatch events), total messages: $totalMessages, total msgs size: $totalMessagePayload") + + // payload can be the same when inserting to avoid unnecessary memory usage + val payload = Array.fill(payloadSize)('a'.toByte) + + val lastInsert = + Source + .fromIterator(() => (1 to numberOfInsertBatches).iterator) + .mapAsync(1) { i => + val end = i * eventsPerBatch + val start = end - (eventsPerBatch - 1) + log.info(s"batch $i - events from $start to $end") + val atomicWrites = + (start to end).map { j => + AtomicWrite(immutable.Seq(PersistentRepr(payload, j, persistenceId, writerUuid = writerUuid))) + } + dao.asyncWriteMessages(atomicWrites).map(_ => i) + } + .runWith(Sink.last) + + // wait until we write all messages + // being very generous, 1 second per message + lastInsert.futureValue(Timeout(totalMessages.seconds)) + + log.info("Events written, starting replay") + + // sleep and gc to have some kind of stable measurement of current heap usage + Thread.sleep(1000) + System.gc() + Thread.sleep(1000) + val usedBefore = memoryMBean.getHeapMemoryUsage.getUsed + + val messagesSrc = + dao.messagesWithBatch(persistenceId, 0, totalMessages, batchSize = fetchSize, None) + val probe = + messagesSrc + .map { + case Success((repr, _)) => + if (repr.sequenceNr % 100 == 0) + log.info(s"fetched: ${repr.persistenceId} - ${repr.sequenceNr}/$totalMessages") + case Failure(exception) => + log.error("Failure when reading messages.", exception) + } + .runWith(TestSink.probe) + + probe.request(10) + probe.within(20.seconds) { + probe.expectNextN(10) + } + + // sleep and gc to have some kind of stable measurement of current heap usage Review Comment: ok - I see there is an assertion - this does look like it could be problematic - any tests that have magic sleep times are liable to fail occasionally depending on machine performance -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org For additional commands, e-mail: notifications-h...@pekko.apache.org