Copilot commented on code in PR #5554: URL: https://github.com/apache/texera/pull/5554#discussion_r3408718634
########## amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/ReplayLogGeneratorSpec.scala: ########## @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.logreplay + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.serialization.{Serialization, SerializationExtension} +import org.apache.pekko.testkit.TestKit +import org.apache.texera.amber.core.virtualidentity.{ + ActorVirtualIdentity, + ChannelIdentity, + EmbeddedControlMessageIdentity +} +import org.apache.texera.amber.engine.common.AmberRuntime +import org.apache.texera.amber.engine.common.ambermessage.{DataFrame, WorkflowFIFOMessage} +import org.apache.texera.amber.engine.common.storage.{SequentialRecordStorage, VFSRecordStorage} +import org.scalatest.BeforeAndAfterAll +import org.scalatest.flatspec.AnyFlatSpec + +import java.nio.file.{Files, Path} + +class ReplayLogGeneratorSpec extends AnyFlatSpec with BeforeAndAfterAll { + + // --------------------------------------------------------------------------- + // Suite-local Pekko serde injected into AmberRuntime via reflection + // --------------------------------------------------------------------------- + // + // `SequentialRecordWriter.writeRecord` hard-codes `AmberRuntime.serde`, + // so any test that round-trips records through VFSRecordStorage needs + // AmberRuntime initialized. Pattern matches CheckpointSubsystemSpec / + // ClientEventSpec — own a suite-local ActorSystem, inject it into + // AmberRuntime's private vars via reflection, tear down in afterAll. + + private val testSystem: ActorSystem = + ActorSystem("ReplayLogGeneratorSpec-test", AmberRuntime.pekkoConfig) + private val testSerde: Serialization = SerializationExtension(testSystem) + + private def getAmberRuntimeField(name: String): AnyRef = { + val field = AmberRuntime.getClass.getDeclaredField(name) + field.setAccessible(true) + field.get(AmberRuntime) + } + + private def setAmberRuntimeField(name: String, value: AnyRef): Unit = { + val field = AmberRuntime.getClass.getDeclaredField(name) + field.setAccessible(true) + field.set(AmberRuntime, value) + } + + // Capture whatever AmberRuntime held before we overwrite it so afterAll can + // restore it. Unconditionally nulling the fields would clobber an already + // initialized AmberRuntime and couple this suite to test execution order. + private var prevActorSystem: AnyRef = _ + private var prevSerde: AnyRef = _ + + override protected def beforeAll(): Unit = { + super.beforeAll() + prevActorSystem = getAmberRuntimeField("_actorSystem") + prevSerde = getAmberRuntimeField("_serde") + setAmberRuntimeField("_actorSystem", testSystem) + setAmberRuntimeField("_serde", testSerde) + } + + override protected def afterAll(): Unit = { + setAmberRuntimeField("_serde", prevSerde) + setAmberRuntimeField("_actorSystem", prevActorSystem) + TestKit.shutdownActorSystem(testSystem) + super.afterAll() + } + + private val isWindows: Boolean = + System.getProperty("os.name", "").toLowerCase.contains("win") + + // Best-effort temp-dir cleanup. `Files.walk` returns a closeable Stream + // backed by an open directory handle — wrap in try/finally so the + // handle is released even if traversal throws. + // + // On Windows we tolerate `FileSystemException` on `deleteIfExists` because + // `ReplayLogGenerator.generate` short-circuits at `ReplayDestination` + // via a non-local `return`, which leaks the underlying + // `SequentialRecordReader.Input` stream — and a leaked open file handle + // blocks the temp file from being deleted there. That is a production bug + // to fix separately; in-test we just let the OS reap the temp files later + // instead of failing the case. On other platforms an open handle does not + // block deletion, so a `FileSystemException` signals a real problem and is + // allowed to propagate. + private def cleanup(sub: Path): Unit = { + val root = sub.getParent + if (root == null || !Files.exists(root)) return + val stream = Files.walk(root) + try { + stream + .sorted(java.util.Comparator.reverseOrder()) + .forEach { child => + try Files.deleteIfExists(child) + catch { case _: java.nio.file.FileSystemException if isWindows => () } + } + } finally { + stream.close() + } + } + + // --------------------------------------------------------------------------- + // Fixtures + // --------------------------------------------------------------------------- + + private val cid: ChannelIdentity = + ChannelIdentity(ActorVirtualIdentity("from"), ActorVirtualIdentity("to"), isControl = false) + private val destA: EmbeddedControlMessageIdentity = EmbeddedControlMessageIdentity("dest-A") + private val destB: EmbeddedControlMessageIdentity = EmbeddedControlMessageIdentity("dest-B") + + private def newStorage(): (Path, SequentialRecordStorage[ReplayLogRecord]) = { + val root = Files.createTempDirectory("replay-log-generator-spec-") + val sub = root.resolve("logs") + val storage = new VFSRecordStorage[ReplayLogRecord](sub.toUri) + (sub, storage) + } + + private def writeLog( + storage: SequentialRecordStorage[ReplayLogRecord], + records: Seq[ReplayLogRecord] + ): Unit = { + val writer = storage.getWriter("log") + records.foreach(writer.writeRecord) + writer.flush() + writer.close() + } Review Comment: `writeLog` closes the writer only on the happy path. If `writeRecord` (serialization) throws, the underlying output stream can be left open, which can in turn make the temp-dir cleanup flaky (especially on Windows) and leak file handles during failed runs. Wrap the write loop in `try/finally` so `close()` always executes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
