Copilot commented on code in PR #3031:
URL: https://github.com/apache/pekko/pull/3031#discussion_r3330890533


##########
stream-tests/src/test/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreterSpec.scala:
##########
@@ -436,5 +436,77 @@ class ActorGraphInterpreterSpec extends StreamSpec {
       done.future.futureValue // would throw on failure
     }
 
+    "release reference to initial interpreter shell after it completes" in {
+      val mat = Materializer(system)
+      val initialShellCompleted = TestLatch(1)
+
+      // Create a stage that tracks when it completes
+      val trackingStage = new SimpleLinearGraphStage[String] {
+        override def createLogic(inheritedAttributes: Attributes): 
GraphStageLogic = new GraphStageLogic(shape) {
+          setHandler(in,
+            new InHandler {
+              override def onPush(): Unit = push(out, grab(in))
+              override def onUpstreamFinish(): Unit = {
+                complete(out)
+                initialShellCompleted.countDown()
+              }
+            })
+          setHandler(out,
+            new OutHandler {
+              override def onPull(): Unit = pull(in)
+            })
+        }
+      }
+
+      val upstream = TestPublisher.probe[String]()
+      val downstream = TestSubscriber.probe[String]()
+
+      
Source.fromPublisher(upstream).via(trackingStage).to(Sink.fromSubscriber(downstream)).run()(mat)
+
+      downstream.request(1)
+      upstream.sendNext("test")
+      downstream.expectNext("test")
+
+      // Complete the upstream to trigger stage completion
+      upstream.sendComplete()
+      downstream.expectComplete()
+
+      // Wait for the initial shell to complete
+      Await.ready(initialShellCompleted, remainingOrDefault)
+
+      // The actor should have stopped since there's only one interpreter shell
+      // and it has completed
+      // This verifies that the initial shell reference is released and the 
actor can stop
+    }

Review Comment:
   This test does not actually verify what its name and trailing comment claim. 
It runs a stream and waits for `initialShellCompleted`, but it never asserts 
that the `_initial` reference was released, nor that the actor was stopped 
(e.g. by watching the underlying `ActorGraphInterpreter` and using 
`expectTerminated`, or by querying the stream supervisor's children). With this 
body, the test would pass even if the fix were reverted. Consider obtaining the 
interpreter actor (e.g. via `StreamSupervisor.GetChildren` as in 
`ActorMaterializerSpec`) and `watch`/`expectTerminated` it, or otherwise assert 
the post-condition the PR is meant to guarantee.



##########
stream-tests/src/test/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreterSpec.scala:
##########
@@ -436,5 +436,77 @@ class ActorGraphInterpreterSpec extends StreamSpec {
       done.future.futureValue // would throw on failure
     }
 
+    "release reference to initial interpreter shell after it completes" in {
+      val mat = Materializer(system)
+      val initialShellCompleted = TestLatch(1)
+
+      // Create a stage that tracks when it completes
+      val trackingStage = new SimpleLinearGraphStage[String] {
+        override def createLogic(inheritedAttributes: Attributes): 
GraphStageLogic = new GraphStageLogic(shape) {
+          setHandler(in,
+            new InHandler {
+              override def onPush(): Unit = push(out, grab(in))
+              override def onUpstreamFinish(): Unit = {
+                complete(out)
+                initialShellCompleted.countDown()
+              }
+            })
+          setHandler(out,
+            new OutHandler {
+              override def onPull(): Unit = pull(in)
+            })
+        }
+      }
+
+      val upstream = TestPublisher.probe[String]()
+      val downstream = TestSubscriber.probe[String]()
+
+      
Source.fromPublisher(upstream).via(trackingStage).to(Sink.fromSubscriber(downstream)).run()(mat)
+
+      downstream.request(1)
+      upstream.sendNext("test")
+      downstream.expectNext("test")
+
+      // Complete the upstream to trigger stage completion
+      upstream.sendComplete()
+      downstream.expectComplete()
+
+      // Wait for the initial shell to complete
+      Await.ready(initialShellCompleted, remainingOrDefault)
+
+      // The actor should have stopped since there's only one interpreter shell
+      // and it has completed
+      // This verifies that the initial shell reference is released and the 
actor can stop
+    }
+
+    "not keep initial shell alive when actor hosts additional subfused 
interpreters" in {
+      val mat = Materializer(system)
+      val firstStreamDone = Promise[Done]()
+      val secondStreamDone = Promise[Done]()
+
+      // Create a flow that we'll use twice through the same materializer
+      val identityFlow = Flow[Int].map(identity)
+
+      // Run first stream and complete it
+      val sub1 = TestSubscriber.probe[Int]()
+      Source(1 to 
10).via(identityFlow).to(Sink.fromSubscriber(sub1)).run()(mat)
+      sub1.request(10)
+      sub1.expectNextN(1 to 10)
+      sub1.expectComplete()
+      firstStreamDone.success(Done)
+
+      // Run second stream - this should work even though the first stream's
+      // initial shell has been completed
+      val sub2 = TestSubscriber.probe[Int]()
+      Source(20 to 
30).via(identityFlow).to(Sink.fromSubscriber(sub2)).run()(mat)
+      sub2.request(11)
+      sub2.expectNextN(20 to 30)
+      sub2.expectComplete()
+      secondStreamDone.success(Done)
+
+      Await.result(firstStreamDone.future, remainingOrDefault)
+      Await.result(secondStreamDone.future, remainingOrDefault)
+    }

Review Comment:
   This test does not exercise the subfusing scenario described in the test 
name or the PR motivation. Each `.run()(mat)` materializes an independent 
stream and therefore spawns a separate `ActorGraphInterpreter` with its own 
initial shell — no second shell is ever subfused into the first actor via 
`SubFusingActorMaterializerImpl`/`registerShell`. As a result the test would 
also pass without the fix.
   
   To actually cover the regression, use a stage that materializes a sub-stream 
through `interpreter.subFusingMaterializer` (e.g. `LazySource`, 
`FlatMapPrefix`, `flatMapConcat`) so that an additional shell is registered 
into the same actor after the initial shell terminates, and assert 
behavior/lifecycle from there. The unused `firstStreamDone`/`secondStreamDone` 
promises are also no-ops here since they are completed synchronously 
immediately before being awaited.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to