Copilot commented on code in PR #3030:
URL: https://github.com/apache/pekko/pull/3030#discussion_r3330888788
##########
stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala:
##########
@@ -342,10 +342,13 @@ import pekko.stream.stage._
var i = 0
while (i < logics.length) {
val logic = logics(i)
- if (!isStageCompleted(logic) && !isStageFinalized(logic)) {
+ if ((logic ne null) && !isStageCompleted(logic) &&
!isStageFinalized(logic)) {
markStageFinalized(logic)
finalizeStage(logic)
}
+ // Release reference to the stage logic so it can be garbage collected
+ // even if the GraphInterpreter is still alive due to other references
+ logics(i) = null
Review Comment:
Nulling the logic array only in `finish()` does not release stages that
complete while the interpreter keeps running: normal stage completion is
finalized from `afterStageHasRun`, and `finish()` is only invoked when the
interpreter is being aborted/disposed. The completed stage logics will still be
retained in `logics` for the lifetime of a still-active fused interpreter,
which is the leak this change is intended to address.
##########
stream-tests/src/test/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreterSpec.scala:
##########
@@ -315,6 +315,80 @@ class GraphInterpreterSpec extends StreamSpec with
GraphInterpreterSpecKit {
interpreter.isSuspended should be(false)
}
+ "release references to completed stage logics to prevent memory leaks" in
new TestSetup {
+ val source = new UpstreamProbe[Int]("source")
+ val sink = new DownstreamProbe[Int]("sink")
+
+ builder(GraphStages.identity[Int])
+ .connect(source, GraphStages.identity[Int].in)
+ .connect(GraphStages.identity[Int].out, sink)
Review Comment:
This uses three separate `GraphStages.identity[Int]` instances: the stage
passed to `builder`, the inlet passed to the first `connect`, and the outlet
passed to the second `connect`. `createConnections` only knows ports from the
stage instance registered with the builder, so `init()` will fail to find the
owners for these ports. Reuse the same identity stage instance for the builder
and both connections.
##########
stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala:
##########
@@ -342,10 +342,13 @@ import pekko.stream.stage._
var i = 0
while (i < logics.length) {
val logic = logics(i)
- if (!isStageCompleted(logic) && !isStageFinalized(logic)) {
+ if ((logic ne null) && !isStageCompleted(logic) &&
!isStageFinalized(logic)) {
markStageFinalized(logic)
finalizeStage(logic)
}
+ // Release reference to the stage logic so it can be garbage collected
+ // even if the GraphInterpreter is still alive due to other references
+ logics(i) = null
Review Comment:
Clearing `logics(i)` does not actually make the stage logic collectible
because every `Connection` still keeps strong references to its `inOwner`,
`outOwner`, `inHandler`, and `outHandler` (the owners are `GraphStageLogic`
instances). As a result, an interpreter retained after `finish()` still retains
the completed logics through `connections`, so the intended memory release is
incomplete.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]