Repository: beam
Updated Branches:
  refs/heads/master 006fde46c -> dc70383cb


[TRIVIAL] InstantCoder: stop boxing Longs unnecessarily

In encode, and similar in decode, the existing path goes
  Instant->long (inside of converter)
  long->Long (returned from converter)
  Long->long (inside of LongCoder).

This is a relatively small improvement, but as we encode timestamps
for every single element, this is likely to make a difference in
lightweight stages of pipelines.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/11bf8253
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/11bf8253
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/11bf8253

Branch: refs/heads/master
Commit: 11bf82537a000f74b9690598cc5fb8e9173904d4
Parents: 006fde4
Author: Dan Halperin <dhalp...@google.com>
Authored: Fri May 26 16:39:13 2017 -0700
Committer: Dan Halperin <dhalp...@google.com>
Committed: Wed May 31 10:10:43 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/coders/InstantCoder.java    | 81 +++++++++-----------
 1 file changed, 38 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/11bf8253/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
index 648493e..e4fadef 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
@@ -17,11 +17,13 @@
  */
 package org.apache.beam.sdk.coders;
 
-import com.google.common.base.Converter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import java.io.UTFDataFormatException;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.joda.time.Instant;
 
@@ -39,53 +41,46 @@ public class InstantCoder extends AtomicCoder<Instant> {
   private static final InstantCoder INSTANCE = new InstantCoder();
   private static final TypeDescriptor<Instant> TYPE_DESCRIPTOR = new 
TypeDescriptor<Instant>() {};
 
-  private static final BigEndianLongCoder LONG_CODER = BigEndianLongCoder.of();
-
   private InstantCoder() {}
 
-  private static final Converter<Instant, Long> ORDER_PRESERVING_CONVERTER =
-      new LexicographicLongConverter();
-
-  /**
-   * Converts {@link Instant} to a {@code Long} representing its 
millis-since-epoch,
-   * but shifted so that the byte representation of negative values are 
lexicographically
-   * ordered before the byte representation of positive values.
-   *
-   * <p>This deliberately utilizes the well-defined overflow for {@code Long} 
values.
-   * See 
http://docs.oracle.com/javase/specs/jls/se7/html/jls-15.html#jls-15.18.2
-   */
-  private static class LexicographicLongConverter extends Converter<Instant, 
Long> {
-
-    @Override
-    protected Long doForward(Instant instant) {
-      return instant.getMillis() - Long.MIN_VALUE;
-    }
-
-    @Override
-    protected Instant doBackward(Long shiftedMillis) {
-      return new Instant(shiftedMillis + Long.MIN_VALUE);
-    }
-  }
-
   @Override
-  public void encode(Instant value, OutputStream outStream)
-      throws CoderException, IOException {
+  public void encode(Instant value, OutputStream outStream) throws 
CoderException, IOException {
     if (value == null) {
       throw new CoderException("cannot encode a null Instant");
     }
-    LONG_CODER.encode(ORDER_PRESERVING_CONVERTER.convert(value), outStream);
+
+    // Converts {@link Instant} to a {@code long} representing its 
millis-since-epoch,
+    // but shifted so that the byte representation of negative values are 
lexicographically
+    // ordered before the byte representation of positive values.
+    //
+    // This deliberately utilizes the well-defined underflow for {@code long} 
values.
+    // See 
http://docs.oracle.com/javase/specs/jls/se7/html/jls-15.html#jls-15.18.2
+    long shiftedMillis = value.getMillis() - Long.MIN_VALUE;
+    new DataOutputStream(outStream).writeLong(shiftedMillis);
   }
 
   @Override
-  public Instant decode(InputStream inStream)
-      throws CoderException, IOException {
-    return 
ORDER_PRESERVING_CONVERTER.reverse().convert(LONG_CODER.decode(inStream));
+  public Instant decode(InputStream inStream) throws CoderException, 
IOException {
+    long shiftedMillis;
+    try {
+      shiftedMillis = new DataInputStream(inStream).readLong();
+    } catch (EOFException | UTFDataFormatException exn) {
+      // These exceptions correspond to decoding problems, so change
+      // what kind of exception they're branded as.
+      throw new CoderException(exn);
+    }
+
+    // Produces an {@link Instant} from a {@code long} representing its 
millis-since-epoch,
+    // but shifted so that the byte representation of negative values are 
lexicographically
+    // ordered before the byte representation of positive values.
+    //
+    // This deliberately utilizes the well-defined overflow for {@code long} 
values.
+    // See 
http://docs.oracle.com/javase/specs/jls/se7/html/jls-15.html#jls-15.18.2
+    return new Instant(shiftedMillis + Long.MIN_VALUE);
   }
 
   @Override
-  public void verifyDeterministic() {
-    LONG_CODER.verifyDeterministic();
-  }
+  public void verifyDeterministic() {}
 
   /**
    * {@inheritDoc}
@@ -104,15 +99,15 @@ public class InstantCoder extends AtomicCoder<Instant> {
    */
   @Override
   public boolean isRegisterByteSizeObserverCheap(Instant value) {
-    return LONG_CODER.isRegisterByteSizeObserverCheap(
-        ORDER_PRESERVING_CONVERTER.convert(value));
+    return true;
   }
 
   @Override
-  public void registerByteSizeObserver(
-      Instant value, ElementByteSizeObserver observer) throws Exception {
-    LONG_CODER.registerByteSizeObserver(
-        ORDER_PRESERVING_CONVERTER.convert(value), observer);
+  protected long getEncodedElementByteSize(Instant value) throws Exception {
+    if (value == null) {
+      throw new CoderException("cannot encode a null Instant");
+    }
+    return 8;
   }
 
   @Override

Reply via email to