Copilot commented on code in PR #2250:
URL: https://github.com/apache/pekko/pull/2250#discussion_r2365776950


##########
stream/src/main/scala/org/apache/pekko/stream/impl/fusing/SourceSink.scala:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl.fusing
+
+import org.apache.pekko
+import org.apache.pekko.stream.impl.Stages.DefaultAttributes
+import pekko.NotUsed
+import pekko.annotation.InternalApi
+import pekko.stream.{ ActorAttributes, Attributes, Inlet, SinkShape, 
StreamSubscriptionTimeoutTerminationMode }
+import pekko.stream.ActorAttributes.StreamSubscriptionTimeout
+import pekko.stream.scaladsl.Source
+import pekko.stream.stage.{
+  GraphStageLogic,
+  GraphStageWithMaterializedValue,
+  InHandler,
+  OutHandler,
+  TimerGraphStageLogic

Review Comment:
   SubSourceOutlet is used but not imported, which will not compile. Add 
SubSourceOutlet to the import list.
   ```suggestion
     TimerGraphStageLogic,
     SubSourceOutlet
   ```



##########
docs/src/main/paradox/stream/operators/index.md:
##########
@@ -80,6 +80,7 @@ These built-in sinks are available from 
@scala[`org.apache.pekko.stream.scaladsl
 |Sink|<a name="queue"></a>@ref[queue](Sink/queue.md)|Materialize a `SinkQueue` 
that can be pulled to trigger demand through the sink.|
 |Sink|<a name="reduce"></a>@ref[reduce](Sink/reduce.md)|Apply a reduction 
function on the incoming elements and pass the result to the next invocation.|
 |Sink|<a name="seq"></a>@ref[seq](Sink/seq.md)|Collect values emitted from the 
stream into a collection.|
+|Sink|<a name="source"></a>@ref[source](Sink/source.md)|Always backpressure 
never cancel and never consume any elements from the stream.|

Review Comment:
   This description does not match the behavior of Sink.source. Suggest: 
“Materializes the sink as a single-subscriber Source; there is an asynchronous 
boundary between the sink and the materialized source.”
   ```suggestion
   |Sink|<a name="source"></a>@ref[source](Sink/source.md)|Materializes the 
sink as a single-subscriber Source; there is an asynchronous boundary between 
the sink and the materialized source.|
   ```



##########
stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala:
##########
@@ -212,6 +212,18 @@ object Sink {
   def asPublisher[T](fanout: AsPublisher): Sink[T, Publisher[T]] =
     new Sink(scaladsl.Sink.asPublisher(fanout == AsPublisher.WITH_FANOUT))
 
+  /**
+   * A `Sink` that materialize this `Sink` itself as a `Source`.
+   * The returned `Source` is a "live view" onto the `Sink` and only support a 
single `Subscriber`.
+   *
+   * Note: even the `Source` is directly connected to the `Sink`, there is 
still an asynchronous boundary
+   * between them, the performance can be improved in the future.

Review Comment:
   Same grammar improvements as the Scala doc: “materializes”, “supports”, and 
“even if the `Source` is directly connected … there is still an asynchronous 
boundary …; performance may be improved in the future.”
   ```suggestion
      * A `Sink` that materializes this `Sink` itself as a `Source`.
      * The returned `Source` is a "live view" onto the `Sink` and only 
supports a single `Subscriber`.
      *
      * Note: even if the `Source` is directly connected to the `Sink`, there 
is still an asynchronous boundary
      * between them; performance may be improved in the future.
   ```



##########
docs/src/main/paradox/stream/operators/Sink/source.md:
##########
@@ -0,0 +1,27 @@
+# Sink.source
+
+Always backpressure never cancel and never consume any elements from the 
stream.

Review Comment:
   Description is inaccurate for Sink.source. Suggested: “Materializes this 
Sink as a single-subscriber Source. The Source forms a live view onto the Sink 
with an asynchronous boundary.”
   ```suggestion
   Materializes this Sink as a single-subscriber Source. The Source forms a 
live view onto the Sink with an asynchronous boundary.
   ```



##########
stream/src/main/scala/org/apache/pekko/stream/impl/fusing/SourceSink.scala:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl.fusing
+
+import org.apache.pekko
+import org.apache.pekko.stream.impl.Stages.DefaultAttributes
+import pekko.NotUsed
+import pekko.annotation.InternalApi
+import pekko.stream.{ ActorAttributes, Attributes, Inlet, SinkShape, 
StreamSubscriptionTimeoutTerminationMode }
+import pekko.stream.ActorAttributes.StreamSubscriptionTimeout
+import pekko.stream.scaladsl.Source
+import pekko.stream.stage.{
+  GraphStageLogic,
+  GraphStageWithMaterializedValue,
+  InHandler,
+  OutHandler,
+  TimerGraphStageLogic
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] object SourceSink
+    extends GraphStageWithMaterializedValue[SinkShape[Any], Source[Any, 
NotUsed]] {
+  private val SubscriptionTimerKey = "SubstreamSubscriptionTimerKey"
+  private val in = Inlet[Any]("sourceSink.in")
+  override val shape = SinkShape(in)
+
+  override def toString: String = "SourceSink"
+  override protected def initialAttributes: Attributes = 
DefaultAttributes.sourceSink
+
+  override def createLogicAndMaterializedValue(
+      inheritedAttributes: Attributes): (GraphStageLogic, Source[Any, 
NotUsed]) = {
+
+    /**
+     * NOTE: in the current implementation of Pekko Stream,
+     * We have to materialization twice to do the piping, which means, even we 
can treat the Sink as a Source.
+     *
+     * In an idea word this stage should be purged out by the materializer 
optimization,
+     * and we can directly connect the upstream to the downstream.
+     */
+    object logic extends TimerGraphStageLogic(shape) with InHandler with 
OutHandler { self =>
+      val sinkSource = new SubSourceOutlet[Any]("sinkSource")
+
+      private def subHandler(): OutHandler = new OutHandler {
+        override def onPull(): Unit = {
+          setKeepGoing(false)
+          cancelTimer(SubscriptionTimerKey)
+          pull(in)
+          sinkSource.setHandler(self)
+        }
+        override def onDownstreamFinish(cause: Throwable): Unit = 
self.onDownstreamFinish(cause)
+      }
+
+      override def preStart(): Unit = {
+        sinkSource.setHandler(subHandler())
+        setKeepGoing(true)
+        val timeout = 
inheritedAttributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout].timeout
+        scheduleOnce(SubscriptionTimerKey, timeout)
+      }
+
+      override protected def onTimer(timerKey: Any): Unit = {
+        val materializer = interpreter.materializer
+        val StreamSubscriptionTimeout(timeout, mode) =
+          
inheritedAttributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout]
+
+        mode match {
+          case StreamSubscriptionTimeoutTerminationMode.CancelTermination =>
+            sinkSource.timeout(timeout)
+            if (sinkSource.isClosed)
+              completeStage()
+          case StreamSubscriptionTimeoutTerminationMode.NoopTermination =>
+          // do nothing
+          case StreamSubscriptionTimeoutTerminationMode.WarnTermination =>
+            materializer.logger.warning(
+              "Substream subscription timeout triggered after {} in 
SourceSink.",
+              timeout)
+        }
+      }
+
+      override def onPush(): Unit = sinkSource.push(grab(in))
+      override def onPull(): Unit = pull(in)
+
+      override def onUpstreamFinish(): Unit = {
+        if (!sinkSource.isClosed) {
+          sinkSource.complete()
+        }
+        completeStage()
+      }
+
+      override def onUpstreamFailure(ex: Throwable): Unit = if 
(!sinkSource.isClosed) {
+        sinkSource.fail(ex)
+        completeStage()
+      } else failStage(ex)
+
+      override def onDownstreamFinish(cause: Throwable): Unit = {
+        // cancel upstream only if the substream was cancelled
+        if (!isClosed(in)) cancelStage(cause)

Review Comment:
   GraphStageLogic does not provide cancelStage(cause); use cancel(in) to 
cancel the upstream inlet. For example: if (!isClosed(in)) cancel(in).
   ```suggestion
           if (!isClosed(in)) cancel(in)
   ```



##########
stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala:
##########
@@ -292,6 +292,19 @@ object Sink {
       if (fanout) new 
FanoutPublisherSink[T](DefaultAttributes.fanoutPublisherSink, 
shape("FanoutPublisherSink"))
       else new PublisherSink[T](DefaultAttributes.publisherSink, 
shape("PublisherSink")))
 
+  /**
+   * A `Sink` that materialize this `Sink` itself as a `Source`.
+   * The returned `Source` is a "live view" onto the `Sink` and only support a 
single `Subscriber`.
+   *
+   * Note: even the `Source` is directly connected to the `Sink`, there is 
still an asynchronous boundary
+   * between them, the performance can be improved in the future.

Review Comment:
   Minor grammar fixes: “materializes”, “supports”, and “even if the `Source` 
is directly connected … there is still an asynchronous boundary …; performance 
may be improved in the future.”
   ```suggestion
      * A `Sink` that materializes this `Sink` itself as a `Source`.
      * The returned `Source` is a "live view" onto the `Sink` and only 
supports a single `Subscriber`.
      *
      * Note: even if the `Source` is directly connected to the `Sink`, there 
is still an asynchronous boundary
      * between them; performance may be improved in the future.
   ```



##########
docs/src/main/paradox/stream/operators/Sink/source.md:
##########
@@ -0,0 +1,27 @@
+# Sink.source
+
+Always backpressure never cancel and never consume any elements from the 
stream.
+
+@ref[Sink operators](../index.md#sink-operators)
+
+## Signature
+
+@apidoc[Sink.source](Sink$) { java="#source()" }
+@apidoc[Sink.source](Sink$) { scala="#source()" }

Review Comment:
   Scala signature should not include parentheses since the method is defined 
without them. Use scala=\"#source\".
   ```suggestion
   @apidoc[Sink.source](Sink$) { scala="#source" }
   ```



##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSinkSpec.scala:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.scaladsl
+
+import org.apache.pekko
+import pekko.stream.{ Attributes, StreamSubscriptionTimeoutTerminationMode }
+import pekko.stream.ActorAttributes.StreamSubscriptionTimeout
+import pekko.stream.testkit.StreamSpec
+import pekko.stream.testkit.scaladsl.{ TestSink, TestSource }
+
+class SourceSinkSpec extends StreamSpec("""
+    pekko.stream.materializer.initial-input-buffer-size = 2
+  """) {
+
+  "Sink.toSeq" must {
+    "Can be used as a Source with run twice" in {
+      val s = Source(1 to 6).runWith(Sink.source)
+      s.runWith(Sink.seq).futureValue should be(1 to 6)
+    }
+
+    "Can complete when upstream completes without elements" in {
+      val s = Source.empty.runWith(Sink.source)
+      s.runWith(Sink.seq).futureValue should be(Nil)
+    }
+
+    "Can cancel when down stream cancel" in {
+      val (pub, source) = TestSource.probe[Int]
+        .toMat(Sink.source)(Keep.both)
+        .run()
+      val sub = source.runWith(TestSink.probe[Int])
+      pub.ensureSubscription()
+      sub.ensureSubscription()
+      sub.cancel()
+      pub.expectCancellation()
+    }
+
+    "Can timeout when no subscription" in {
+      import scala.concurrent.duration._
+      val (pub, source) = TestSource.probe[Int]
+        .toMat(Sink.source)(Keep.both)
+        .addAttributes(Attributes(
+          StreamSubscriptionTimeout(
+            2.seconds,
+            StreamSubscriptionTimeoutTerminationMode.cancel
+          )
+        ))
+        .run()
+      pub.expectCancellation()
+      Thread.sleep(1000) // wait a bit
+      val sub = source.runWith(TestSink.probe)
+      sub.expectSubscription()
+      sub.expectError()

Review Comment:
   Avoid Thread.sleep in tests as it introduces flakiness. Prefer using testkit 
timing utilities (e.g., within, expectNoMessage, or awaiting specific probe 
signals) to synchronize on the timeout/cancellation.
   ```suggestion
         within(3.seconds) {
           val sub = source.runWith(TestSink.probe)
           sub.expectSubscription()
           sub.expectError()
         }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org
For additional commands, e-mail: notifications-h...@pekko.apache.org

Reply via email to