Repository: beam Updated Branches: refs/heads/master 006fde46c -> dc70383cb
[TRIVIAL] InstantCoder: stop boxing Longs unnecessarily In encode, and similar in decode, the existing path goes Instant->long (inside of converter) long->Long (returned from converter) Long->long (inside of LongCoder). This is a relatively small improvement, but as we encode timestamps for every single element, this is likely to make a difference in lightweight stages of pipelines. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/11bf8253 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/11bf8253 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/11bf8253 Branch: refs/heads/master Commit: 11bf82537a000f74b9690598cc5fb8e9173904d4 Parents: 006fde4 Author: Dan Halperin <dhalp...@google.com> Authored: Fri May 26 16:39:13 2017 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Wed May 31 10:10:43 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/coders/InstantCoder.java | 81 +++++++++----------- 1 file changed, 38 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/11bf8253/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java index 648493e..e4fadef 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java @@ -17,11 +17,13 @@ */ package org.apache.beam.sdk.coders; -import com.google.common.base.Converter; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import org.apache.beam.sdk.util.common.ElementByteSizeObserver; +import java.io.UTFDataFormatException; import org.apache.beam.sdk.values.TypeDescriptor; import org.joda.time.Instant; @@ -39,53 +41,46 @@ public class InstantCoder extends AtomicCoder<Instant> { private static final InstantCoder INSTANCE = new InstantCoder(); private static final TypeDescriptor<Instant> TYPE_DESCRIPTOR = new TypeDescriptor<Instant>() {}; - private static final BigEndianLongCoder LONG_CODER = BigEndianLongCoder.of(); - private InstantCoder() {} - private static final Converter<Instant, Long> ORDER_PRESERVING_CONVERTER = - new LexicographicLongConverter(); - - /** - * Converts {@link Instant} to a {@code Long} representing its millis-since-epoch, - * but shifted so that the byte representation of negative values are lexicographically - * ordered before the byte representation of positive values. - * - * <p>This deliberately utilizes the well-defined overflow for {@code Long} values. - * See http://docs.oracle.com/javase/specs/jls/se7/html/jls-15.html#jls-15.18.2 - */ - private static class LexicographicLongConverter extends Converter<Instant, Long> { - - @Override - protected Long doForward(Instant instant) { - return instant.getMillis() - Long.MIN_VALUE; - } - - @Override - protected Instant doBackward(Long shiftedMillis) { - return new Instant(shiftedMillis + Long.MIN_VALUE); - } - } - @Override - public void encode(Instant value, OutputStream outStream) - throws CoderException, IOException { + public void encode(Instant value, OutputStream outStream) throws CoderException, IOException { if (value == null) { throw new CoderException("cannot encode a null Instant"); } - LONG_CODER.encode(ORDER_PRESERVING_CONVERTER.convert(value), outStream); + + // Converts {@link Instant} to a {@code long} representing its millis-since-epoch, + // but shifted so that the byte representation of negative values are lexicographically + // ordered before the byte representation of positive values. + // + // This deliberately utilizes the well-defined underflow for {@code long} values. + // See http://docs.oracle.com/javase/specs/jls/se7/html/jls-15.html#jls-15.18.2 + long shiftedMillis = value.getMillis() - Long.MIN_VALUE; + new DataOutputStream(outStream).writeLong(shiftedMillis); } @Override - public Instant decode(InputStream inStream) - throws CoderException, IOException { - return ORDER_PRESERVING_CONVERTER.reverse().convert(LONG_CODER.decode(inStream)); + public Instant decode(InputStream inStream) throws CoderException, IOException { + long shiftedMillis; + try { + shiftedMillis = new DataInputStream(inStream).readLong(); + } catch (EOFException | UTFDataFormatException exn) { + // These exceptions correspond to decoding problems, so change + // what kind of exception they're branded as. + throw new CoderException(exn); + } + + // Produces an {@link Instant} from a {@code long} representing its millis-since-epoch, + // but shifted so that the byte representation of negative values are lexicographically + // ordered before the byte representation of positive values. + // + // This deliberately utilizes the well-defined overflow for {@code long} values. + // See http://docs.oracle.com/javase/specs/jls/se7/html/jls-15.html#jls-15.18.2 + return new Instant(shiftedMillis + Long.MIN_VALUE); } @Override - public void verifyDeterministic() { - LONG_CODER.verifyDeterministic(); - } + public void verifyDeterministic() {} /** * {@inheritDoc} @@ -104,15 +99,15 @@ public class InstantCoder extends AtomicCoder<Instant> { */ @Override public boolean isRegisterByteSizeObserverCheap(Instant value) { - return LONG_CODER.isRegisterByteSizeObserverCheap( - ORDER_PRESERVING_CONVERTER.convert(value)); + return true; } @Override - public void registerByteSizeObserver( - Instant value, ElementByteSizeObserver observer) throws Exception { - LONG_CODER.registerByteSizeObserver( - ORDER_PRESERVING_CONVERTER.convert(value), observer); + protected long getEncodedElementByteSize(Instant value) throws Exception { + if (value == null) { + throw new CoderException("cannot encode a null Instant"); + } + return 8; } @Override