pjfanning commented on code in PR #2365:
URL: https://github.com/apache/pekko/pull/2365#discussion_r2462857655
##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapSpec.scala:
##########
@@ -455,4 +454,86 @@ class FlowStatefulMapSpec extends StreamSpec {
.expectComplete()
}
+ "will not call onComplete twice on cancel when `onComplete` fails" in {
+ val closedCounter = new AtomicInteger(0)
+ val (source, sink) = TestSource()
+ .viaMat(Flow[Int].statefulMap(() => 23)((s, elem) => (s, elem),
+ _ => {
+ closedCounter.incrementAndGet()
+ throw TE("boom")
+ }))(Keep.left)
+ .toMat(TestSink[Int]())(Keep.both)
+ .run()
+
+ EventFilter[TE](occurrences = 1).intercept {
+ sink.request(1)
+ source.sendNext(1)
+ sink.expectNext(1)
+ sink.cancel()
+ source.expectCancellation()
+ }
+ closedCounter.get() should ===(1)
+ }
+
+ "emit onClose return value before restarting" in {
+ val stateCounter = new AtomicInteger(0)
+ val (source, sink) = TestSource[String]()
+ .viaMat(Flow[String].statefulMap(() => stateCounter.incrementAndGet())({
(s, elem) =>
+ if (elem == "boom") throw TE("boom")
+ else (s, elem + s.toString)
+ }, _ => Some("onClose")))(Keep.left)
+
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
+ .toMat(TestSink())(Keep.both)
+ .run()
+
+ sink.request(1)
+ source.sendNext("one")
+ sink.expectNext("one1")
+ sink.request(1)
+ source.sendNext("boom")
+ sink.expectNext("onClose")
+ sink.request(1)
+ source.sendNext("two")
+ sink.expectNext("two2")
+ sink.cancel()
+ source.expectCancellation()
+ }
+
+ "not allow null state" in {
+ EventFilter[NullPointerException](occurrences = 1).intercept {
+ Source
+ .single("one")
+ .statefulMap(() => null: String)((s, t) => (s, t), _ => None)
+ .runWith(Sink.head)
+ .failed
+ .futureValue shouldBe a[NullPointerException]
+ }
+ }
+
+ "not allow null next state" in {
+ EventFilter[NullPointerException](occurrences = 1).intercept {
+ Source
+ .single("one")
+ .statefulMap(() => "state")((_, t) => (null, t), _ => None)
+ .runWith(Sink.seq)
+ .failed
+ .futureValue shouldBe a[NullPointerException]
+ }
+ }
+
+ "not allow null state on restart" in {
Review Comment:
in theory, we can change behaviour in 1.3.0 - but if we truly want to keep
null support, we could try to modify this test and the next one and then update
the code to make the modified tests work
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]