Copilot commented on code in PR #2250: URL: https://github.com/apache/pekko/pull/2250#discussion_r2365776950
########## stream/src/main/scala/org/apache/pekko/stream/impl/fusing/SourceSink.scala: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.impl.fusing + +import org.apache.pekko +import org.apache.pekko.stream.impl.Stages.DefaultAttributes +import pekko.NotUsed +import pekko.annotation.InternalApi +import pekko.stream.{ ActorAttributes, Attributes, Inlet, SinkShape, StreamSubscriptionTimeoutTerminationMode } +import pekko.stream.ActorAttributes.StreamSubscriptionTimeout +import pekko.stream.scaladsl.Source +import pekko.stream.stage.{ + GraphStageLogic, + GraphStageWithMaterializedValue, + InHandler, + OutHandler, + TimerGraphStageLogic Review Comment: SubSourceOutlet is used but not imported, which will not compile. Add SubSourceOutlet to the import list. ```suggestion TimerGraphStageLogic, SubSourceOutlet ``` ########## docs/src/main/paradox/stream/operators/index.md: ########## @@ -80,6 +80,7 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl |Sink|<a name="queue"></a>@ref[queue](Sink/queue.md)|Materialize a `SinkQueue` that can be pulled to trigger demand through the sink.| |Sink|<a name="reduce"></a>@ref[reduce](Sink/reduce.md)|Apply a reduction function on the incoming elements and pass the result to the next invocation.| |Sink|<a name="seq"></a>@ref[seq](Sink/seq.md)|Collect values emitted from the stream into a collection.| +|Sink|<a name="source"></a>@ref[source](Sink/source.md)|Always backpressure never cancel and never consume any elements from the stream.| Review Comment: This description does not match the behavior of Sink.source. Suggest: “Materializes the sink as a single-subscriber Source; there is an asynchronous boundary between the sink and the materialized source.” ```suggestion |Sink|<a name="source"></a>@ref[source](Sink/source.md)|Materializes the sink as a single-subscriber Source; there is an asynchronous boundary between the sink and the materialized source.| ``` ########## stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala: ########## @@ -212,6 +212,18 @@ object Sink { def asPublisher[T](fanout: AsPublisher): Sink[T, Publisher[T]] = new Sink(scaladsl.Sink.asPublisher(fanout == AsPublisher.WITH_FANOUT)) + /** + * A `Sink` that materialize this `Sink` itself as a `Source`. + * The returned `Source` is a "live view" onto the `Sink` and only support a single `Subscriber`. + * + * Note: even the `Source` is directly connected to the `Sink`, there is still an asynchronous boundary + * between them, the performance can be improved in the future. Review Comment: Same grammar improvements as the Scala doc: “materializes”, “supports”, and “even if the `Source` is directly connected … there is still an asynchronous boundary …; performance may be improved in the future.” ```suggestion * A `Sink` that materializes this `Sink` itself as a `Source`. * The returned `Source` is a "live view" onto the `Sink` and only supports a single `Subscriber`. * * Note: even if the `Source` is directly connected to the `Sink`, there is still an asynchronous boundary * between them; performance may be improved in the future. ``` ########## docs/src/main/paradox/stream/operators/Sink/source.md: ########## @@ -0,0 +1,27 @@ +# Sink.source + +Always backpressure never cancel and never consume any elements from the stream. Review Comment: Description is inaccurate for Sink.source. Suggested: “Materializes this Sink as a single-subscriber Source. The Source forms a live view onto the Sink with an asynchronous boundary.” ```suggestion Materializes this Sink as a single-subscriber Source. The Source forms a live view onto the Sink with an asynchronous boundary. ``` ########## stream/src/main/scala/org/apache/pekko/stream/impl/fusing/SourceSink.scala: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.impl.fusing + +import org.apache.pekko +import org.apache.pekko.stream.impl.Stages.DefaultAttributes +import pekko.NotUsed +import pekko.annotation.InternalApi +import pekko.stream.{ ActorAttributes, Attributes, Inlet, SinkShape, StreamSubscriptionTimeoutTerminationMode } +import pekko.stream.ActorAttributes.StreamSubscriptionTimeout +import pekko.stream.scaladsl.Source +import pekko.stream.stage.{ + GraphStageLogic, + GraphStageWithMaterializedValue, + InHandler, + OutHandler, + TimerGraphStageLogic +} + +/** + * INTERNAL API + */ +@InternalApi private[pekko] object SourceSink + extends GraphStageWithMaterializedValue[SinkShape[Any], Source[Any, NotUsed]] { + private val SubscriptionTimerKey = "SubstreamSubscriptionTimerKey" + private val in = Inlet[Any]("sourceSink.in") + override val shape = SinkShape(in) + + override def toString: String = "SourceSink" + override protected def initialAttributes: Attributes = DefaultAttributes.sourceSink + + override def createLogicAndMaterializedValue( + inheritedAttributes: Attributes): (GraphStageLogic, Source[Any, NotUsed]) = { + + /** + * NOTE: in the current implementation of Pekko Stream, + * We have to materialization twice to do the piping, which means, even we can treat the Sink as a Source. + * + * In an idea word this stage should be purged out by the materializer optimization, + * and we can directly connect the upstream to the downstream. + */ + object logic extends TimerGraphStageLogic(shape) with InHandler with OutHandler { self => + val sinkSource = new SubSourceOutlet[Any]("sinkSource") + + private def subHandler(): OutHandler = new OutHandler { + override def onPull(): Unit = { + setKeepGoing(false) + cancelTimer(SubscriptionTimerKey) + pull(in) + sinkSource.setHandler(self) + } + override def onDownstreamFinish(cause: Throwable): Unit = self.onDownstreamFinish(cause) + } + + override def preStart(): Unit = { + sinkSource.setHandler(subHandler()) + setKeepGoing(true) + val timeout = inheritedAttributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout].timeout + scheduleOnce(SubscriptionTimerKey, timeout) + } + + override protected def onTimer(timerKey: Any): Unit = { + val materializer = interpreter.materializer + val StreamSubscriptionTimeout(timeout, mode) = + inheritedAttributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout] + + mode match { + case StreamSubscriptionTimeoutTerminationMode.CancelTermination => + sinkSource.timeout(timeout) + if (sinkSource.isClosed) + completeStage() + case StreamSubscriptionTimeoutTerminationMode.NoopTermination => + // do nothing + case StreamSubscriptionTimeoutTerminationMode.WarnTermination => + materializer.logger.warning( + "Substream subscription timeout triggered after {} in SourceSink.", + timeout) + } + } + + override def onPush(): Unit = sinkSource.push(grab(in)) + override def onPull(): Unit = pull(in) + + override def onUpstreamFinish(): Unit = { + if (!sinkSource.isClosed) { + sinkSource.complete() + } + completeStage() + } + + override def onUpstreamFailure(ex: Throwable): Unit = if (!sinkSource.isClosed) { + sinkSource.fail(ex) + completeStage() + } else failStage(ex) + + override def onDownstreamFinish(cause: Throwable): Unit = { + // cancel upstream only if the substream was cancelled + if (!isClosed(in)) cancelStage(cause) Review Comment: GraphStageLogic does not provide cancelStage(cause); use cancel(in) to cancel the upstream inlet. For example: if (!isClosed(in)) cancel(in). ```suggestion if (!isClosed(in)) cancel(in) ``` ########## stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala: ########## @@ -292,6 +292,19 @@ object Sink { if (fanout) new FanoutPublisherSink[T](DefaultAttributes.fanoutPublisherSink, shape("FanoutPublisherSink")) else new PublisherSink[T](DefaultAttributes.publisherSink, shape("PublisherSink"))) + /** + * A `Sink` that materialize this `Sink` itself as a `Source`. + * The returned `Source` is a "live view" onto the `Sink` and only support a single `Subscriber`. + * + * Note: even the `Source` is directly connected to the `Sink`, there is still an asynchronous boundary + * between them, the performance can be improved in the future. Review Comment: Minor grammar fixes: “materializes”, “supports”, and “even if the `Source` is directly connected … there is still an asynchronous boundary …; performance may be improved in the future.” ```suggestion * A `Sink` that materializes this `Sink` itself as a `Source`. * The returned `Source` is a "live view" onto the `Sink` and only supports a single `Subscriber`. * * Note: even if the `Source` is directly connected to the `Sink`, there is still an asynchronous boundary * between them; performance may be improved in the future. ``` ########## docs/src/main/paradox/stream/operators/Sink/source.md: ########## @@ -0,0 +1,27 @@ +# Sink.source + +Always backpressure never cancel and never consume any elements from the stream. + +@ref[Sink operators](../index.md#sink-operators) + +## Signature + +@apidoc[Sink.source](Sink$) { java="#source()" } +@apidoc[Sink.source](Sink$) { scala="#source()" } Review Comment: Scala signature should not include parentheses since the method is defined without them. Use scala=\"#source\". ```suggestion @apidoc[Sink.source](Sink$) { scala="#source" } ``` ########## stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSinkSpec.scala: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.scaladsl + +import org.apache.pekko +import pekko.stream.{ Attributes, StreamSubscriptionTimeoutTerminationMode } +import pekko.stream.ActorAttributes.StreamSubscriptionTimeout +import pekko.stream.testkit.StreamSpec +import pekko.stream.testkit.scaladsl.{ TestSink, TestSource } + +class SourceSinkSpec extends StreamSpec(""" + pekko.stream.materializer.initial-input-buffer-size = 2 + """) { + + "Sink.toSeq" must { + "Can be used as a Source with run twice" in { + val s = Source(1 to 6).runWith(Sink.source) + s.runWith(Sink.seq).futureValue should be(1 to 6) + } + + "Can complete when upstream completes without elements" in { + val s = Source.empty.runWith(Sink.source) + s.runWith(Sink.seq).futureValue should be(Nil) + } + + "Can cancel when down stream cancel" in { + val (pub, source) = TestSource.probe[Int] + .toMat(Sink.source)(Keep.both) + .run() + val sub = source.runWith(TestSink.probe[Int]) + pub.ensureSubscription() + sub.ensureSubscription() + sub.cancel() + pub.expectCancellation() + } + + "Can timeout when no subscription" in { + import scala.concurrent.duration._ + val (pub, source) = TestSource.probe[Int] + .toMat(Sink.source)(Keep.both) + .addAttributes(Attributes( + StreamSubscriptionTimeout( + 2.seconds, + StreamSubscriptionTimeoutTerminationMode.cancel + ) + )) + .run() + pub.expectCancellation() + Thread.sleep(1000) // wait a bit + val sub = source.runWith(TestSink.probe) + sub.expectSubscription() + sub.expectError() Review Comment: Avoid Thread.sleep in tests as it introduces flakiness. Prefer using testkit timing utilities (e.g., within, expectNoMessage, or awaiting specific probe signals) to synchronize on the timeout/cancellation. ```suggestion within(3.seconds) { val sub = source.runWith(TestSink.probe) sub.expectSubscription() sub.expectError() } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org For additional commands, e-mail: notifications-h...@pekko.apache.org