He-Pin commented on code in PR #989:
URL: https://github.com/apache/incubator-pekko/pull/989#discussion_r1460420837


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowForallSpec.scala:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.scaladsl
+
+import org.apache.pekko.stream.testkit.StreamSpec
+import org.apache.pekko.stream.testkit.Utils.TE
+import org.apache.pekko.stream.{ ActorAttributes, Supervision }
+import scala.concurrent.{ Await, Future }
+import scala.concurrent.duration.DurationInt
+
+class FlowForallSpec extends StreamSpec {
+
+  "a forall" must {
+    val error = TE("Boom!")
+    val inputSource = Source(1 to 100)
+
+    "work when use source.forall = true" in {
+      // # forall
+      val predicate: Future[Boolean] = Source(1 to 100).forall(_ > 
0).runWith(Sink.head)
+      // # forall
+      Await.result(predicate, 3.seconds) shouldBe true
+    }
+
+    "work when using source.forall = false" in {
+      Await.result(inputSource.forall(_ < 50).runWith(Sink.head), 3.seconds) 
should be(false)
+    }
+
+    "work when using sink.forall = false" in {
+      Await.result(inputSource.runWith(Sink.forall(_ < 50)), 3.second) should 
be(false)
+    }
+
+    "work when using flow.forall = false" in {
+      Await.result(inputSource.via(Flow[Int].forall(_ < 
50)).runWith(Sink.head), 3.second) should be(false)
+    }
+
+    "work when using source.runForall = false" in {
+      Await.result(inputSource.runForall(_ < 50), 3.seconds) should be(false)
+    }
+
+    "work when using source.runForall = true" in {
+      Await.result(inputSource.runForall(_ <= 100), 3.seconds) should be(true)
+    }
+
+    "work when source empty = true" in {
+      Await.result(inputSource.filter(_ < 0).runForall(_ > 0), 3.seconds) 
should be(true)
+    }
+
+    "work when using flow.forallAsync = false" in {
+      Await.result(inputSource.via(Flow[Int].forall(_ < 
50)).runWith(Sink.head), 3.second) should be(false)

Review Comment:
   Split the forallAsync to another pr.



##########
docs/src/main/paradox/stream/operators/Sink/forall.md:
##########
@@ -0,0 +1,34 @@
+# Sink.forall
+
+forall applies a predicate assertion to each element in the stream; it returns 
true if all elements satisfy the assertion, otherwise it returns false.
+
+## Signature
+
+@apidoc[Sink.forall](Sink$) { 
scala="#forall(predicate:T=&gt;U):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[Boolean]]"
 java="#forall(org.apache.pekko.japi.function.Predicate)" }
+
+## Description
+forall, is implemented through `Fold`, applies a predicate assertion to each 
element in the stream; it returns true if all elements satisfy the assertion, 
otherwise it returns false.
+
+It materializes into a `Future` (in Scala) or a `CompletionStage` (in Java) 
that completes with the last state when the stream has finished.
+
+This operator enables combining values into a result without relying on global 
mutable state, as it passes the state along between invocations.
+
+## Example
+
+This example reads a stream of positive integers, asserts that all numbers are 
greater than 0, and finally prints the assertion result.
+
+Scala
+:   @@snip 
[snip](/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala)
 { #forall }
+
+Java
+:   @@snip 
[snip](/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java)
 { #forall }
+
+## Reactive Streams Semantics
+
+@@@div { .callout }
+
+**cancels**: never
+
+**backpressures**: when the previous predicate function invocation has not yet 
completed

Review Comment:
   Add the full semantics?



##########
stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java:
##########
@@ -224,4 +225,18 @@ public void sinkForeachMustBeDocumented()
     // #foreach
     assertEquals(Done.done(), done);
   }
+
+  @Test
+  public void sinkForallMustBeDocumented()
+      throws InterruptedException, ExecutionException, TimeoutException {
+    // #forall
+    Sink<Integer, CompletionStage<Boolean>> forallSink = 
Sink.forall((Predicate<Integer>) param -> param > 0);
+    CompletionStage<Boolean> cs = Source.from(Arrays.asList(1, 2, 3, 
4)).runWith(forallSink, system);
+    Object predicate = cs.toCompletableFuture().get(100, 
TimeUnit.MILLISECONDS);

Review Comment:
   Boolean?



##########
stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java:
##########
@@ -1614,4 +1614,15 @@ public void mustBeAbleToConvertToJavaInJava() {
         org.apache.pekko.stream.scaladsl.Flow.apply();
     Flow<Integer, Integer, NotUsed> javaFlow = scalaFlow.asJava();
   }
+
+  @Test
+  public void mustBeAbleToUseForall() throws Exception {
+    final TestKit probe = new TestKit(system);
+    // # forall
+    Flow<Integer, Boolean, NotUsed> forallFlow = 
Flow.of(Integer.class).forall((Predicate<Integer>) s -> s > 0);

Review Comment:
   Ha, java



##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowForallSpec.scala:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.scaladsl
+
+import org.apache.pekko.stream.testkit.StreamSpec
+import org.apache.pekko.stream.testkit.Utils.TE
+import org.apache.pekko.stream.{ ActorAttributes, Supervision }
+import scala.concurrent.{ Await, Future }
+import scala.concurrent.duration.DurationInt
+
+class FlowForallSpec extends StreamSpec {
+
+  "a forall" must {
+    val error = TE("Boom!")
+    val inputSource = Source(1 to 100)
+
+    "work when use source.forall = true" in {
+      // # forall
+      val predicate: Future[Boolean] = Source(1 to 100).forall(_ > 
0).runWith(Sink.head)
+      // # forall
+      Await.result(predicate, 3.seconds) shouldBe true
+    }
+
+    "work when using source.forall = false" in {
+      Await.result(inputSource.forall(_ < 50).runWith(Sink.head), 3.seconds) 
should be(false)
+    }
+
+    "work when using sink.forall = false" in {
+      Await.result(inputSource.runWith(Sink.forall(_ < 50)), 3.second) should 
be(false)
+    }
+
+    "work when using flow.forall = false" in {
+      Await.result(inputSource.via(Flow[Int].forall(_ < 
50)).runWith(Sink.head), 3.second) should be(false)
+    }
+
+    "work when using source.runForall = false" in {
+      Await.result(inputSource.runForall(_ < 50), 3.seconds) should be(false)
+    }
+
+    "work when using source.runForall = true" in {
+      Await.result(inputSource.runForall(_ <= 100), 3.seconds) should be(true)
+    }
+
+    "work when source empty = true" in {
+      Await.result(inputSource.filter(_ < 0).runForall(_ > 0), 3.seconds) 
should be(true)
+    }
+
+    "work when using flow.forallAsync = false" in {
+      Await.result(inputSource.via(Flow[Int].forall(_ < 
50)).runWith(Sink.head), 3.second) should be(false)
+    }
+
+    "function predicate should be short-circuited" in {
+      Await.result(inputSource.runForall(s => if (s < 50) false else throw 
error), 3.seconds) should be(false)
+    }
+
+    "complete future with failure when the predicate function throws and the 
supervisor strategy decides to stop" in {
+      val forall = inputSource.runForall(s => if (s == 50) throw error else 
true)
+      the[Exception] thrownBy Await.result(forall, 3.seconds) should be(error)
+    }
+
+    "resume with the accumulated state when the predicate function throws and 
the supervisor strategy decides to resume" in {
+      val forall = Sink.forall[Int](s => if (s == 50) throw error else true)
+      val future =
+        
inputSource.runWith(forall.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)))
+
+      Await.result(future, 3.seconds) should be(true)
+    }
+
+    "resume and reset the state when the predicate function throws when the 
supervisor strategy decides to restart" in {
+      val forall = Sink.forall[Int](s => if (s == 50) throw error else true)
+      val future =
+        
inputSource.runWith(forall.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)))
+
+      Await.result(future, 3.seconds) should be(true)
+    }

Review Comment:
   Add a spec for empty source



##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowForallSpec.scala:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.scaladsl
+
+import org.apache.pekko.stream.testkit.StreamSpec
+import org.apache.pekko.stream.testkit.Utils.TE
+import org.apache.pekko.stream.{ ActorAttributes, Supervision }
+import scala.concurrent.{ Await, Future }
+import scala.concurrent.duration.DurationInt
+
+class FlowForallSpec extends StreamSpec {
+
+  "a forall" must {
+    val error = TE("Boom!")
+    val inputSource = Source(1 to 100)
+
+    "work when use source.forall = true" in {
+      // # forall
+      val predicate: Future[Boolean] = Source(1 to 100).forall(_ > 
0).runWith(Sink.head)
+      // # forall
+      Await.result(predicate, 3.seconds) shouldBe true
+    }
+
+    "work when using source.forall = false" in {
+      Await.result(inputSource.forall(_ < 50).runWith(Sink.head), 3.seconds) 
should be(false)
+    }
+
+    "work when using sink.forall = false" in {
+      Await.result(inputSource.runWith(Sink.forall(_ < 50)), 3.second) should 
be(false)
+    }
+
+    "work when using flow.forall = false" in {
+      Await.result(inputSource.via(Flow[Int].forall(_ < 
50)).runWith(Sink.head), 3.second) should be(false)

Review Comment:
   Can use futurevalue should be in scala spec



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to