Copilot commented on code in PR #3030:
URL: https://github.com/apache/pekko/pull/3030#discussion_r3330888788


##########
stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala:
##########
@@ -342,10 +342,13 @@ import pekko.stream.stage._
     var i = 0
     while (i < logics.length) {
       val logic = logics(i)
-      if (!isStageCompleted(logic) && !isStageFinalized(logic)) {
+      if ((logic ne null) && !isStageCompleted(logic) && 
!isStageFinalized(logic)) {
         markStageFinalized(logic)
         finalizeStage(logic)
       }
+      // Release reference to the stage logic so it can be garbage collected
+      // even if the GraphInterpreter is still alive due to other references
+      logics(i) = null

Review Comment:
   Nulling the logic array only in `finish()` does not release stages that 
complete while the interpreter keeps running: normal stage completion is 
finalized from `afterStageHasRun`, and `finish()` is only invoked when the 
interpreter is being aborted/disposed. The completed stage logics will still be 
retained in `logics` for the lifetime of a still-active fused interpreter, 
which is the leak this change is intended to address.



##########
stream-tests/src/test/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreterSpec.scala:
##########
@@ -315,6 +315,80 @@ class GraphInterpreterSpec extends StreamSpec with 
GraphInterpreterSpecKit {
       interpreter.isSuspended should be(false)
     }
 
+    "release references to completed stage logics to prevent memory leaks" in 
new TestSetup {
+      val source = new UpstreamProbe[Int]("source")
+      val sink = new DownstreamProbe[Int]("sink")
+
+      builder(GraphStages.identity[Int])
+        .connect(source, GraphStages.identity[Int].in)
+        .connect(GraphStages.identity[Int].out, sink)

Review Comment:
   This uses three separate `GraphStages.identity[Int]` instances: the stage 
passed to `builder`, the inlet passed to the first `connect`, and the outlet 
passed to the second `connect`. `createConnections` only knows ports from the 
stage instance registered with the builder, so `init()` will fail to find the 
owners for these ports. Reuse the same identity stage instance for the builder 
and both connections.



##########
stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala:
##########
@@ -342,10 +342,13 @@ import pekko.stream.stage._
     var i = 0
     while (i < logics.length) {
       val logic = logics(i)
-      if (!isStageCompleted(logic) && !isStageFinalized(logic)) {
+      if ((logic ne null) && !isStageCompleted(logic) && 
!isStageFinalized(logic)) {
         markStageFinalized(logic)
         finalizeStage(logic)
       }
+      // Release reference to the stage logic so it can be garbage collected
+      // even if the GraphInterpreter is still alive due to other references
+      logics(i) = null

Review Comment:
   Clearing `logics(i)` does not actually make the stage logic collectible 
because every `Connection` still keeps strong references to its `inOwner`, 
`outOwner`, `inHandler`, and `outHandler` (the owners are `GraphStageLogic` 
instances). As a result, an interpreter retained after `finish()` still retains 
the completed logics through `connections`, so the intended memory release is 
incomplete.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to