This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 062a0d2d87b [BEAM-13015, #21250] Reuse buffers when possible when writing on Dataflow streaming hot paths. (#22780) 062a0d2d87b is described below commit 062a0d2d87b82a42d0e3d69e07373276c69e2dfc Author: Luke Cwik <lc...@google.com> AuthorDate: Thu Aug 18 14:52:27 2022 -0700 [BEAM-13015, #21250] Reuse buffers when possible when writing on Dataflow streaming hot paths. (#22780) * [BEAM-13015, #21250] Reuse buffers when possible when writing on Dataflow streaming hot paths. * Address PR comments --- .../apache/beam/runners/dataflow/worker/PubsubSink.java | 15 ++++++++++----- .../apache/beam/runners/dataflow/worker/WindmillSink.java | 10 ++++++++-- .../runners/dataflow/worker/WindmillStateInternals.java | 2 +- 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java index b52962559b8..7548cfa2402 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.dataflow.worker; import static org.apache.beam.runners.dataflow.util.Structs.getBytes; import static org.apache.beam.runners.dataflow.util.Structs.getString; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; import com.google.auto.service.AutoService; import java.io.IOException; @@ -141,6 +142,7 @@ class PubsubSink<T> extends Sink<WindowedValue<T>> { /** The SinkWriter for a PubsubSink. */ class PubsubWriter implements SinkWriter<WindowedValue<T>> { private Windmill.PubSubMessageBundle.Builder outputBuilder; + private ByteStringOutputStream stream; // Kept across adds for buffer reuse. private PubsubWriter(String topic) { outputBuilder = @@ -149,10 +151,15 @@ class PubsubSink<T> extends Sink<WindowedValue<T>> { .setTimestampLabel(timestampLabel) .setIdLabel(idLabel) .setWithAttributes(withAttributes); + stream = new ByteStringOutputStream(); } @Override public long add(WindowedValue<T> data) throws IOException { + checkState( + stream.size() == 0, + "Expected output stream to be empty but had %s", + stream.toByteString()); ByteString byteString = null; if (formatFn != null) { PubsubMessage formatted = formatFn.apply(data.getValue()); @@ -161,13 +168,11 @@ class PubsubSink<T> extends Sink<WindowedValue<T>> { if (formatted.getAttributeMap() != null) { pubsubMessageBuilder.putAllAttributes(formatted.getAttributeMap()); } - ByteStringOutputStream output = new ByteStringOutputStream(); - pubsubMessageBuilder.build().writeTo(output); - byteString = output.toByteString(); + pubsubMessageBuilder.build().writeTo(stream); + byteString = stream.toByteStringAndReset(); } else { - ByteStringOutputStream stream = new ByteStringOutputStream(); coder.encode(data.getValue(), stream, Coder.Context.OUTER); - byteString = stream.toByteString(); + byteString = stream.toByteStringAndReset(); } outputBuilder.addMessages( diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java index a669700fce3..2f5a4e78434 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.dataflow.worker; import static org.apache.beam.runners.dataflow.util.Structs.getString; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; import com.google.auto.service.AutoService; import java.io.IOException; @@ -129,16 +130,21 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> { class WindmillStreamWriter implements SinkWriter<WindowedValue<T>> { private Map<ByteString, Windmill.KeyedMessageBundle.Builder> productionMap; private final String destinationName; + private final ByteStringOutputStream stream; // Kept across encodes for buffer reuse. private WindmillStreamWriter(String destinationName) { this.destinationName = destinationName; productionMap = new HashMap<>(); + stream = new ByteStringOutputStream(); } private <EncodeT> ByteString encode(Coder<EncodeT> coder, EncodeT object) throws IOException { - ByteStringOutputStream stream = new ByteStringOutputStream(); + checkState( + stream.size() == 0, + "Expected output stream to be empty but had %s", + stream.toByteString()); coder.encode(object, stream, Coder.Context.OUTER); - return stream.toByteString(); + return stream.toByteStringAndReset(); } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java index 512f081b5d9..f5d771545a4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java @@ -350,7 +350,7 @@ class WindmillStateInternals<K> implements StateInternals { @VisibleForTesting static ByteString encodeKey(StateNamespace namespace, StateTag<?> address) { try { - // Use ByteString.Output rather than concatenation and String.format. We build these keys + // Use ByteStringOutputStream rather than concatenation and String.format. We build these keys // a lot, and this leads to better performance results. See associated benchmarks. ByteStringOutputStream stream = new ByteStringOutputStream(); OutputStreamWriter writer = new OutputStreamWriter(stream, StandardCharsets.UTF_8);