Copilot commented on code in PR #5554: URL: https://github.com/apache/texera/pull/5554#discussion_r3370318173
########## amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/ReplayLogGeneratorSpec.scala: ########## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.logreplay + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.serialization.{Serialization, SerializationExtension} +import org.apache.pekko.testkit.TestKit +import org.apache.texera.amber.core.virtualidentity.{ + ActorVirtualIdentity, + ChannelIdentity, + EmbeddedControlMessageIdentity +} +import org.apache.texera.amber.engine.common.AmberRuntime +import org.apache.texera.amber.engine.common.ambermessage.{DataFrame, WorkflowFIFOMessage} +import org.apache.texera.amber.engine.common.storage.{SequentialRecordStorage, VFSRecordStorage} +import org.scalatest.BeforeAndAfterAll +import org.scalatest.flatspec.AnyFlatSpec + +import java.nio.file.{Files, Path} + +class ReplayLogGeneratorSpec extends AnyFlatSpec with BeforeAndAfterAll { + + // --------------------------------------------------------------------------- + // Suite-local Pekko serde injected into AmberRuntime via reflection + // --------------------------------------------------------------------------- + // + // `SequentialRecordWriter.writeRecord` hard-codes `AmberRuntime.serde`, + // so any test that round-trips records through VFSRecordStorage needs + // AmberRuntime initialized. Pattern matches CheckpointSubsystemSpec / + // ClientEventSpec — own a suite-local ActorSystem, inject it into + // AmberRuntime's private vars via reflection, tear down in afterAll. + + private val testSystem: ActorSystem = + ActorSystem("ReplayLogGeneratorSpec-test", AmberRuntime.pekkoConfig) + private val testSerde: Serialization = SerializationExtension(testSystem) + + private def setAmberRuntimeField(name: String, value: AnyRef): Unit = { + val field = AmberRuntime.getClass.getDeclaredField(name) + field.setAccessible(true) + field.set(AmberRuntime, value) + } + + override protected def beforeAll(): Unit = { + super.beforeAll() + setAmberRuntimeField("_actorSystem", testSystem) + setAmberRuntimeField("_serde", testSerde) + } + + override protected def afterAll(): Unit = { + setAmberRuntimeField("_serde", null) + setAmberRuntimeField("_actorSystem", null) + TestKit.shutdownActorSystem(testSystem) + super.afterAll() + } Review Comment: This suite mutates global `AmberRuntime` state via reflection and then resets `_serde` / `_actorSystem` to `null` in `afterAll`. If any other test suite initializes AmberRuntime before this one (or relies on an already-initialized value), this teardown can silently clobber it and cause cross-suite coupling/flakiness. Capture the previous field values in `beforeAll` and restore them in `afterAll` instead of unconditionally nulling them out. ########## amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/ReplayLogGeneratorSpec.scala: ########## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.logreplay + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.serialization.{Serialization, SerializationExtension} +import org.apache.pekko.testkit.TestKit +import org.apache.texera.amber.core.virtualidentity.{ + ActorVirtualIdentity, + ChannelIdentity, + EmbeddedControlMessageIdentity +} +import org.apache.texera.amber.engine.common.AmberRuntime +import org.apache.texera.amber.engine.common.ambermessage.{DataFrame, WorkflowFIFOMessage} +import org.apache.texera.amber.engine.common.storage.{SequentialRecordStorage, VFSRecordStorage} +import org.scalatest.BeforeAndAfterAll +import org.scalatest.flatspec.AnyFlatSpec + +import java.nio.file.{Files, Path} + +class ReplayLogGeneratorSpec extends AnyFlatSpec with BeforeAndAfterAll { + + // --------------------------------------------------------------------------- + // Suite-local Pekko serde injected into AmberRuntime via reflection + // --------------------------------------------------------------------------- + // + // `SequentialRecordWriter.writeRecord` hard-codes `AmberRuntime.serde`, + // so any test that round-trips records through VFSRecordStorage needs + // AmberRuntime initialized. Pattern matches CheckpointSubsystemSpec / + // ClientEventSpec — own a suite-local ActorSystem, inject it into + // AmberRuntime's private vars via reflection, tear down in afterAll. + + private val testSystem: ActorSystem = + ActorSystem("ReplayLogGeneratorSpec-test", AmberRuntime.pekkoConfig) + private val testSerde: Serialization = SerializationExtension(testSystem) + + private def setAmberRuntimeField(name: String, value: AnyRef): Unit = { + val field = AmberRuntime.getClass.getDeclaredField(name) + field.setAccessible(true) + field.set(AmberRuntime, value) + } + + override protected def beforeAll(): Unit = { + super.beforeAll() + setAmberRuntimeField("_actorSystem", testSystem) + setAmberRuntimeField("_serde", testSerde) + } + + override protected def afterAll(): Unit = { + setAmberRuntimeField("_serde", null) + setAmberRuntimeField("_actorSystem", null) + TestKit.shutdownActorSystem(testSystem) + super.afterAll() + } + + // Best-effort temp-dir cleanup. `Files.walk` returns a closeable Stream + // backed by an open directory handle — wrap in try/finally so the + // handle is released even if traversal throws. + // + // We tolerate `FileSystemException` on `deleteIfExists` because + // `ReplayLogGenerator.generate` short-circuits at `ReplayDestination` + // via a non-local `return`, which leaks the underlying + // `SequentialRecordReader.Input` stream — and on Windows a leaked + // open file handle blocks the temp file from being deleted. That is a + // production bug to fix separately; in-test we just let the OS reap + // the temp files later instead of failing the case. + private def cleanup(sub: Path): Unit = { + val root = sub.getParent + if (root == null || !Files.exists(root)) return + val stream = Files.walk(root) + try { + stream + .sorted(java.util.Comparator.reverseOrder()) + .forEach { child => + try Files.deleteIfExists(child) + catch { case _: java.nio.file.FileSystemException => () } Review Comment: `cleanup` currently swallows *all* `FileSystemException`s during deletion on every OS, which can mask real cleanup/IO problems on non-Windows platforms. Since the comment indicates this tolerance is only needed due to a Windows file-handle leak, gate the suppression to Windows only. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
