This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 062a0d2d87b [BEAM-13015, #21250] Reuse buffers when possible when 
writing on Dataflow streaming hot paths. (#22780)
062a0d2d87b is described below

commit 062a0d2d87b82a42d0e3d69e07373276c69e2dfc
Author: Luke Cwik <lc...@google.com>
AuthorDate: Thu Aug 18 14:52:27 2022 -0700

    [BEAM-13015, #21250] Reuse buffers when possible when writing on Dataflow 
streaming hot paths. (#22780)
    
    * [BEAM-13015, #21250] Reuse buffers when possible when writing on Dataflow 
streaming hot paths.
    
    * Address PR comments
---
 .../apache/beam/runners/dataflow/worker/PubsubSink.java   | 15 ++++++++++-----
 .../apache/beam/runners/dataflow/worker/WindmillSink.java | 10 ++++++++--
 .../runners/dataflow/worker/WindmillStateInternals.java   |  2 +-
 3 files changed, 19 insertions(+), 8 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java
index b52962559b8..7548cfa2402 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.dataflow.worker;
 
 import static org.apache.beam.runners.dataflow.util.Structs.getBytes;
 import static org.apache.beam.runners.dataflow.util.Structs.getString;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.service.AutoService;
 import java.io.IOException;
@@ -141,6 +142,7 @@ class PubsubSink<T> extends Sink<WindowedValue<T>> {
   /** The SinkWriter for a PubsubSink. */
   class PubsubWriter implements SinkWriter<WindowedValue<T>> {
     private Windmill.PubSubMessageBundle.Builder outputBuilder;
+    private ByteStringOutputStream stream; // Kept across adds for buffer 
reuse.
 
     private PubsubWriter(String topic) {
       outputBuilder =
@@ -149,10 +151,15 @@ class PubsubSink<T> extends Sink<WindowedValue<T>> {
               .setTimestampLabel(timestampLabel)
               .setIdLabel(idLabel)
               .setWithAttributes(withAttributes);
+      stream = new ByteStringOutputStream();
     }
 
     @Override
     public long add(WindowedValue<T> data) throws IOException {
+      checkState(
+          stream.size() == 0,
+          "Expected output stream to be empty but had %s",
+          stream.toByteString());
       ByteString byteString = null;
       if (formatFn != null) {
         PubsubMessage formatted = formatFn.apply(data.getValue());
@@ -161,13 +168,11 @@ class PubsubSink<T> extends Sink<WindowedValue<T>> {
         if (formatted.getAttributeMap() != null) {
           pubsubMessageBuilder.putAllAttributes(formatted.getAttributeMap());
         }
-        ByteStringOutputStream output = new ByteStringOutputStream();
-        pubsubMessageBuilder.build().writeTo(output);
-        byteString = output.toByteString();
+        pubsubMessageBuilder.build().writeTo(stream);
+        byteString = stream.toByteStringAndReset();
       } else {
-        ByteStringOutputStream stream = new ByteStringOutputStream();
         coder.encode(data.getValue(), stream, Coder.Context.OUTER);
-        byteString = stream.toByteString();
+        byteString = stream.toByteStringAndReset();
       }
 
       outputBuilder.addMessages(
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
index a669700fce3..2f5a4e78434 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.dataflow.worker;
 
 import static org.apache.beam.runners.dataflow.util.Structs.getString;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.service.AutoService;
 import java.io.IOException;
@@ -129,16 +130,21 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
   class WindmillStreamWriter implements SinkWriter<WindowedValue<T>> {
     private Map<ByteString, Windmill.KeyedMessageBundle.Builder> productionMap;
     private final String destinationName;
+    private final ByteStringOutputStream stream; // Kept across encodes for 
buffer reuse.
 
     private WindmillStreamWriter(String destinationName) {
       this.destinationName = destinationName;
       productionMap = new HashMap<>();
+      stream = new ByteStringOutputStream();
     }
 
     private <EncodeT> ByteString encode(Coder<EncodeT> coder, EncodeT object) 
throws IOException {
-      ByteStringOutputStream stream = new ByteStringOutputStream();
+      checkState(
+          stream.size() == 0,
+          "Expected output stream to be empty but had %s",
+          stream.toByteString());
       coder.encode(object, stream, Coder.Context.OUTER);
-      return stream.toByteString();
+      return stream.toByteStringAndReset();
     }
 
     @Override
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
index 512f081b5d9..f5d771545a4 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
@@ -350,7 +350,7 @@ class WindmillStateInternals<K> implements StateInternals {
   @VisibleForTesting
   static ByteString encodeKey(StateNamespace namespace, StateTag<?> address) {
     try {
-      // Use ByteString.Output rather than concatenation and String.format. We 
build these keys
+      // Use ByteStringOutputStream rather than concatenation and 
String.format. We build these keys
       // a lot, and this leads to better performance results. See associated 
benchmarks.
       ByteStringOutputStream stream = new ByteStringOutputStream();
       OutputStreamWriter writer = new OutputStreamWriter(stream, 
StandardCharsets.UTF_8);

Reply via email to