aglinxinyuan commented on code in PR #5554: URL: https://github.com/apache/texera/pull/5554#discussion_r3408715474
########## amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/ReplayLogGeneratorSpec.scala: ########## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.logreplay + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.serialization.{Serialization, SerializationExtension} +import org.apache.pekko.testkit.TestKit +import org.apache.texera.amber.core.virtualidentity.{ + ActorVirtualIdentity, + ChannelIdentity, + EmbeddedControlMessageIdentity +} +import org.apache.texera.amber.engine.common.AmberRuntime +import org.apache.texera.amber.engine.common.ambermessage.{DataFrame, WorkflowFIFOMessage} +import org.apache.texera.amber.engine.common.storage.{SequentialRecordStorage, VFSRecordStorage} +import org.scalatest.BeforeAndAfterAll +import org.scalatest.flatspec.AnyFlatSpec + +import java.nio.file.{Files, Path} + +class ReplayLogGeneratorSpec extends AnyFlatSpec with BeforeAndAfterAll { + + // --------------------------------------------------------------------------- + // Suite-local Pekko serde injected into AmberRuntime via reflection + // --------------------------------------------------------------------------- + // + // `SequentialRecordWriter.writeRecord` hard-codes `AmberRuntime.serde`, + // so any test that round-trips records through VFSRecordStorage needs + // AmberRuntime initialized. Pattern matches CheckpointSubsystemSpec / + // ClientEventSpec — own a suite-local ActorSystem, inject it into + // AmberRuntime's private vars via reflection, tear down in afterAll. + + private val testSystem: ActorSystem = + ActorSystem("ReplayLogGeneratorSpec-test", AmberRuntime.pekkoConfig) + private val testSerde: Serialization = SerializationExtension(testSystem) + + private def setAmberRuntimeField(name: String, value: AnyRef): Unit = { + val field = AmberRuntime.getClass.getDeclaredField(name) + field.setAccessible(true) + field.set(AmberRuntime, value) + } + + override protected def beforeAll(): Unit = { + super.beforeAll() + setAmberRuntimeField("_actorSystem", testSystem) + setAmberRuntimeField("_serde", testSerde) + } + + override protected def afterAll(): Unit = { + setAmberRuntimeField("_serde", null) + setAmberRuntimeField("_actorSystem", null) + TestKit.shutdownActorSystem(testSystem) + super.afterAll() + } Review Comment: Good catch — now capturing `_actorSystem`/`_serde` in `beforeAll` and restoring those values in `afterAll` instead of nulling them, so the suite no longer clobbers an already-initialized `AmberRuntime` or couples to execution order. Fixed in cdc37ee. ########## amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/ReplayLogGeneratorSpec.scala: ########## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.logreplay + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.serialization.{Serialization, SerializationExtension} +import org.apache.pekko.testkit.TestKit +import org.apache.texera.amber.core.virtualidentity.{ + ActorVirtualIdentity, + ChannelIdentity, + EmbeddedControlMessageIdentity +} +import org.apache.texera.amber.engine.common.AmberRuntime +import org.apache.texera.amber.engine.common.ambermessage.{DataFrame, WorkflowFIFOMessage} +import org.apache.texera.amber.engine.common.storage.{SequentialRecordStorage, VFSRecordStorage} +import org.scalatest.BeforeAndAfterAll +import org.scalatest.flatspec.AnyFlatSpec + +import java.nio.file.{Files, Path} + +class ReplayLogGeneratorSpec extends AnyFlatSpec with BeforeAndAfterAll { + + // --------------------------------------------------------------------------- + // Suite-local Pekko serde injected into AmberRuntime via reflection + // --------------------------------------------------------------------------- + // + // `SequentialRecordWriter.writeRecord` hard-codes `AmberRuntime.serde`, + // so any test that round-trips records through VFSRecordStorage needs + // AmberRuntime initialized. Pattern matches CheckpointSubsystemSpec / + // ClientEventSpec — own a suite-local ActorSystem, inject it into + // AmberRuntime's private vars via reflection, tear down in afterAll. + + private val testSystem: ActorSystem = + ActorSystem("ReplayLogGeneratorSpec-test", AmberRuntime.pekkoConfig) + private val testSerde: Serialization = SerializationExtension(testSystem) + + private def setAmberRuntimeField(name: String, value: AnyRef): Unit = { + val field = AmberRuntime.getClass.getDeclaredField(name) + field.setAccessible(true) + field.set(AmberRuntime, value) + } + + override protected def beforeAll(): Unit = { + super.beforeAll() + setAmberRuntimeField("_actorSystem", testSystem) + setAmberRuntimeField("_serde", testSerde) + } + + override protected def afterAll(): Unit = { + setAmberRuntimeField("_serde", null) + setAmberRuntimeField("_actorSystem", null) + TestKit.shutdownActorSystem(testSystem) + super.afterAll() + } + + // Best-effort temp-dir cleanup. `Files.walk` returns a closeable Stream + // backed by an open directory handle — wrap in try/finally so the + // handle is released even if traversal throws. + // + // We tolerate `FileSystemException` on `deleteIfExists` because + // `ReplayLogGenerator.generate` short-circuits at `ReplayDestination` + // via a non-local `return`, which leaks the underlying + // `SequentialRecordReader.Input` stream — and on Windows a leaked + // open file handle blocks the temp file from being deleted. That is a + // production bug to fix separately; in-test we just let the OS reap + // the temp files later instead of failing the case. + private def cleanup(sub: Path): Unit = { + val root = sub.getParent + if (root == null || !Files.exists(root)) return + val stream = Files.walk(root) + try { + stream + .sorted(java.util.Comparator.reverseOrder()) + .forEach { child => + try Files.deleteIfExists(child) + catch { case _: java.nio.file.FileSystemException => () } Review Comment: Done — gated the `FileSystemException` suppression behind an `isWindows` check, so on non-Windows platforms (where an open handle does not block deletion) a real cleanup/IO failure still propagates. Fixed in cdc37ee. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
