He-Pin commented on code in PR #744: URL: https://github.com/apache/pekko-http/pull/744#discussion_r2326415144
########## docs/src/main/paradox/common/sse-support.md: ########## @@ -56,6 +56,49 @@ Scala Java : @@snip [EventStreamMarshallingTest.java](/http-tests/src/test/java/org/apache/pekko/http/javadsl/unmarshalling/sse/EventStreamUnmarshallingTest.java) { #event-stream-unmarshalling-example } +## Configuration + +Apache Pekko HTTP provides several configuration options for Server-Sent Events handling: + +### Message Size Limits + +The SSE client parser has configurable limits to handle various message sizes: + +```hocon +pekko.http.sse { + # The maximum size for parsing received server-sent events. + # This value must be larger than `max-line-size`. Set to 0 to disable limit entirely (unlimited). + max-event-size = 115713 + + # The maximum size for parsing received lines of a server-sent event. Set to 0 to disable limit entirely (unlimited). + max-line-size = 115712 +} +``` + +### Oversized Message Handling + +When SSE messages exceed the configured `max-line-size`, Apache Pekko HTTP provides four handling strategies: + +- **fail-stream** (default): Fails the stream with a clear error message, maintaining backward compatibility +- **log-and-skip**: Logs a warning and skips the oversized message, continuing stream processing +- **truncate**: Logs a warning and truncates the message to the configured limit, continuing processing +- **dead-letter**: Logs a warning and sends the oversized message to the dead letter queue, continuing processing + +```hocon +pekko.http.sse { + # How to handle messages that exceed max-line-size limit + # Options: + # "fail-stream" - Fail the stream with a clear error message (default) + # "log-and-skip" - Log a warning and skip the oversized message + # "truncate" - Log a warning and truncate the message to max-line-size + # "dead-letter" - Log a warning, send oversized message to dead letters + oversized-message-handling = "fail-stream" Review Comment: Seems dead later is not needed, because we already has a log and skip ########## http-tests/src/test/scala/org/apache/pekko/http/scaladsl/unmarshalling/sse/EventStreamParserOversizedSpec.scala: ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +package org.apache.pekko.http +package scaladsl +package unmarshalling +package sse + +import org.apache.pekko +import pekko.http.scaladsl.model.sse.ServerSentEvent +import pekko.http.scaladsl.settings.OversizedSseStrategy +import pekko.stream.scaladsl.{ Sink, Source } +import pekko.util.ByteString +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AsyncWordSpec + +final class EventStreamParserOversizedSpec extends AsyncWordSpec with Matchers with BaseUnmarshallingSpec { + + "An EventStreamParser with oversized message handling" should { + + "parse normal SSE messages correctly with all strategies" in { + val normalSseData = ByteString( + """data: event1 + | + |data: event2 + |event: custom + |id: 123 + | + |data: event3 + | + |""".stripMargin) + + val expected = Vector( + ServerSentEvent("event1"), + ServerSentEvent("event2", Some("custom"), Some("123")), + ServerSentEvent("event3") + ) + + for { + failStreamResult <- Source.single(normalSseData) + .via(EventStreamParser(100, 1000, emitEmptyEvents = false, OversizedSseStrategy.FailStream)) + .runWith(Sink.seq) + logAndSkipResult <- Source.single(normalSseData) + .via(EventStreamParser(100, 1000, emitEmptyEvents = false, OversizedSseStrategy.LogAndSkip)) + .runWith(Sink.seq) + truncateResult <- Source.single(normalSseData) + .via(EventStreamParser(100, 1000, emitEmptyEvents = false, OversizedSseStrategy.Truncate)) + .runWith(Sink.seq) + deadLetterResult <- Source.single(normalSseData) + .via(EventStreamParser(100, 1000, emitEmptyEvents = false, OversizedSseStrategy.DeadLetter)) + .runWith(Sink.seq) + } yield { + failStreamResult shouldBe expected + logAndSkipResult shouldBe expected + truncateResult shouldBe expected + deadLetterResult shouldBe expected + } + } + + "fail the stream when using FailStream strategy with oversized SSE line" in { + val oversizedSseData = ByteString( + s"""data: before + | + |data: ${"x" * 200} + | + |data: after + | + |""".stripMargin) + + recoverToExceptionIf[IllegalStateException] { + Source.single(oversizedSseData) + .via(EventStreamParser(50, 1000, emitEmptyEvents = false, OversizedSseStrategy.FailStream)) + .runWith(Sink.seq) + }.map { exception => + exception.getMessage should include("SSE line size") + exception.getMessage should include("exceeds max-line-size: 50") + } + } + + "skip oversized SSE lines and continue processing with LogAndSkip strategy" in { + val oversizedSseData = ByteString( + s"""data: before + | + |data: ${"x" * 200} + | + |data: after + | + |""".stripMargin) + + Source.single(oversizedSseData) + .via(EventStreamParser(50, 1000, emitEmptyEvents = false, OversizedSseStrategy.LogAndSkip)) + .runWith(Sink.seq) + .map { result => + result shouldBe Vector( + ServerSentEvent("before"), + ServerSentEvent("after") + ) + } + } + + "truncate oversized SSE lines and continue processing with Truncate strategy" in { + val oversizedLine = "x" * 200 + val oversizedSseData = ByteString( + s"""data: before + | + |data: $oversizedLine + | + |data: after + | + |""".stripMargin) + + Source.single(oversizedSseData) + .via(EventStreamParser(50, 1000, emitEmptyEvents = false, OversizedSseStrategy.Truncate)) + .runWith(Sink.seq) + .map { result => + result should have size 3 + result(0) shouldBe ServerSentEvent("before") + result(1).data shouldBe "x" * 44 // truncated to line size limit + result(2) shouldBe ServerSentEvent("after") + } + } + + "send oversized SSE lines to dead letters and continue processing with DeadLetter strategy" in { + val oversizedSseData = ByteString( + s"""data: before + | + |data: ${"x" * 200} + | + |data: after + | + |""".stripMargin) + + Source.single(oversizedSseData) + .via(EventStreamParser(50, 1000, emitEmptyEvents = false, OversizedSseStrategy.DeadLetter)) + .runWith(Sink.seq) + .map { result => + result shouldBe Vector( + ServerSentEvent("before"), + ServerSentEvent("after") + ) + } + } + + "handle multiple oversized lines in complex SSE events with LogAndSkip strategy" in { + val oversizedSseData = ByteString( + s"""data: event1 + | + |data: ${"x" * 100} + |event: ${"y" * 100} + |id: normal-id + | + |data: event2 + | + |event: ${"z" * 100} + |data: event3 + | + |""".stripMargin) + + Source.single(oversizedSseData) + .via(EventStreamParser(50, 1000, emitEmptyEvents = false, OversizedSseStrategy.LogAndSkip)) + .runWith(Sink.seq) + .map { result => + result shouldBe Vector( + ServerSentEvent("event1"), + ServerSentEvent("event2"), + ServerSentEvent("event3") + ) + } + } + + "handle multiline data with some oversized lines using Truncate strategy" in { + val oversizedSseData = ByteString( + s"""data: line1 + |data: ${"x" * 100} + |data: line3 + | + |data: after + | + |""".stripMargin) + + Source.single(oversizedSseData) + .via(EventStreamParser(50, 1000, emitEmptyEvents = false, OversizedSseStrategy.Truncate)) + .runWith(Sink.seq) + .map { result => + result should have size 2 + // According to SSE spec, multiple data fields are joined with newlines + result(0).data shouldBe s"line1\n${"x" * 44}\nline3" // truncated middle line + result(1) shouldBe ServerSentEvent("after") + } + } + + "handle streaming SSE data with oversized content across chunks" in { + val chunk1 = ByteString("data: before\n\ndata: ") + val chunk2 = ByteString("x" * 100 + "\n\ndata: after\n\n") + + Source(Vector(chunk1, chunk2)) + .via(EventStreamParser(50, 1000, emitEmptyEvents = false, OversizedSseStrategy.LogAndSkip)) + .runWith(Sink.seq) + .map { result => + result shouldBe Vector( + ServerSentEvent("before"), + ServerSentEvent("after") + ) + } + } + + "handle event field oversizing with different strategies" in { + val oversizedEventType = "x" * 100 + val oversizedSseData = ByteString( + s"""data: before + | + |data: middle + |event: $oversizedEventType + |id: 123 + | + |data: after + | + |""".stripMargin) + + for { + logSkipResult <- Source.single(oversizedSseData) + .via(EventStreamParser(50, 1000, emitEmptyEvents = false, OversizedSseStrategy.LogAndSkip)) + .runWith(Sink.seq) + truncateResult <- Source.single(oversizedSseData) + .via(EventStreamParser(50, 1000, emitEmptyEvents = false, OversizedSseStrategy.Truncate)) + .runWith(Sink.seq) + deadLetterResult <- Source.single(oversizedSseData) + .via(EventStreamParser(50, 1000, emitEmptyEvents = false, OversizedSseStrategy.DeadLetter)) + .runWith(Sink.seq) + } yield { + // All should preserve the before/after events + logSkipResult.map(_.data) should contain allOf ("before", "after") + truncateResult.map(_.data) should contain allOf ("before", "after") + deadLetterResult.map(_.data) should contain allOf ("before", "after") + + // Middle event behavior differs by strategy + logSkipResult.find(_.data == "middle") shouldBe Some(ServerSentEvent("middle", None, Some("123"))) + truncateResult.find(_.data == "middle").get.eventType.get should have length 43 // truncated event type (50 - "event: ".length) + deadLetterResult.find(_.data == "middle") shouldBe Some(ServerSentEvent("middle", None, Some("123"))) + } + } + + "work with unlimited line sizes when maxLineSize is 0" in { + val veryLongData = "x" * 10000 + val oversizedSseData = ByteString( + s"""data: before + | + |data: $veryLongData + | + |data: after + | + |""".stripMargin) + + Source.single(oversizedSseData) + .via(EventStreamParser(0, 20000, emitEmptyEvents = false, OversizedSseStrategy.FailStream)) // 0 = unlimited + .runWith(Sink.seq) + .map { result => + result should have size 3 + result(0) shouldBe ServerSentEvent("before") + result(1) shouldBe ServerSentEvent(veryLongData) + result(2) shouldBe ServerSentEvent("after") + } + } + } +} Review Comment: New line ########## http/src/main/scala/org/apache/pekko/http/scaladsl/settings/OversizedSseStrategy.scala: ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +package org.apache.pekko.http.scaladsl.settings + +import org.apache.pekko.annotation.InternalApi + +sealed trait OversizedSseStrategy + +object OversizedSseStrategy { + case object FailStream extends OversizedSseStrategy + case object LogAndSkip extends OversizedSseStrategy + case object Truncate extends OversizedSseStrategy + case object DeadLetter extends OversizedSseStrategy + Review Comment: Java api is needed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org For additional commands, e-mail: notifications-h...@pekko.apache.org