[BEAM-1231] Use well known coder types in Java
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3de4108e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3de4108e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3de4108e Branch: refs/heads/master Commit: 3de4108e3ab96ec0f860d3d40160c7b7e4f5d0f7 Parents: d86db15 Author: Luke Cwik <lc...@google.com> Authored: Thu Dec 29 12:03:31 2016 -0800 Committer: Luke Cwik <lc...@google.com> Committed: Tue Jan 3 16:35:37 2017 -0800 ---------------------------------------------------------------------- .../beam/sdk/coders/LengthPrefixCoder.java | 139 +++++++++++++++++++ .../org/apache/beam/sdk/util/CoderUtils.java | 28 ++-- .../beam/sdk/coders/LengthPrefixCoderTest.java | 116 ++++++++++++++++ 3 files changed, 270 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/3de4108e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java new file mode 100644 index 0000000..dd9af32 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.coders; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import com.google.common.io.ByteStreams; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.util.PropertyNames; +import org.apache.beam.sdk.util.VarInt; + +/** + * A {@link Coder} which is able to take any existing coder and wrap it such that it is only + * invoked in the {@link org.apache.beam.sdk.coders.Coder.Context#OUTER outer context}. The data + * representing the element is prefixed with a length using a variable integer encoding. + * + * @param <T> the type of the values being transcoded + */ +public class LengthPrefixCoder<T> extends StandardCoder<T> { + + public static <T> LengthPrefixCoder<T> of( + Coder<T> valueCoder) { + checkNotNull(valueCoder, "Coder not expected to be null"); + return new LengthPrefixCoder<>(valueCoder); + } + + @JsonCreator + public static LengthPrefixCoder<?> of( + @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) + List<Coder<?>> components) { + checkArgument(components.size() == 1, + "Expecting 1 components, got " + components.size()); + return of(components.get(0)); + } + + ///////////////////////////////////////////////////////////////////////////// + + private final Coder<T> valueCoder; + + private LengthPrefixCoder(Coder<T> valueCoder) { + this.valueCoder = valueCoder; + } + + @Override + public void encode(T value, OutputStream outStream, Context context) + throws CoderException, IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + valueCoder.encode(value, bos, Context.OUTER); + VarInt.encode(bos.size(), outStream); + bos.writeTo(outStream); + } + + @Override + public T decode(InputStream inStream, Context context) throws CoderException, IOException { + long size = VarInt.decodeLong(inStream); + return valueCoder.decode(ByteStreams.limit(inStream, size), Context.OUTER); + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return ImmutableList.of(valueCoder); + } + + /** + * {@code LengthPrefixCoder} is deterministic if the nested {@code Coder} is. + * + * {@inheritDoc} + */ + @Override + public void verifyDeterministic() throws NonDeterministicException { + valueCoder.verifyDeterministic(); + } + + /** + * {@code LengthPrefixCoder} is consistent with equals if the nested {@code Coder} is. + * + * {@inheritDoc} + */ + @Override + public boolean consistentWithEquals() { + return valueCoder.consistentWithEquals(); + } + + /** + * Overridden to short-circuit the default {@code StandardCoder} behavior of encoding and + * counting the bytes. The size is known to be the size of the value plus the number of bytes + * required to prefix the length. + * + * {@inheritDoc} + */ + @Override + protected long getEncodedElementByteSize(T value, Context context) throws Exception { + if (valueCoder instanceof StandardCoder) { + // If valueCoder is a StandardCoder then we can ask it directly for the encoded size of + // the value, adding the number of bytes to represent the length. + long valueSize = ((StandardCoder<T>) valueCoder).getEncodedElementByteSize( + value, Context.OUTER); + return VarInt.getLength(valueSize) + valueSize; + } + + // If value is not a StandardCoder then fall back to the default StandardCoder behavior + // of encoding and counting the bytes. The encoding will include the null indicator byte. + return super.getEncodedElementByteSize(value, context); + } + + /** + * {@code LengthPrefixCoder} is cheap if {@code valueCoder} is cheap. + * + * {@inheritDoc} + */ + @Override + public boolean isRegisterByteSizeObserverCheap(@Nullable T value, Context context) { + return valueCoder.isRegisterByteSizeObserverCheap(value, Context.OUTER); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/3de4108e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java index 36bf789..7b93b59 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java @@ -31,6 +31,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.type.TypeFactory; import com.google.api.client.util.Base64; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -39,10 +40,13 @@ import java.io.OutputStream; import java.lang.ref.SoftReference; import java.lang.reflect.ParameterizedType; import java.lang.reflect.TypeVariable; +import java.util.Map; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.LengthPrefixCoder; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.values.TypeDescriptor; /** @@ -51,15 +55,15 @@ import org.apache.beam.sdk.values.TypeDescriptor; public final class CoderUtils { private CoderUtils() {} // Non-instantiable - /** - * Coder class-name alias for a key-value type. - */ - public static final String KIND_PAIR = "kind:pair"; - - /** - * Coder class-name alias for a stream type. - */ - public static final String KIND_STREAM = "kind:stream"; + /** A mapping from well known coder types to their implementing classes. */ + private static final Map<String, Class<?>> WELL_KNOWN_CODER_TYPES = + ImmutableMap.<String, Class<?>>builder() + .put("kind:pair", KvCoder.class) + .put("kind:stream", IterableCoder.class) + .put("kind:global_window", GlobalWindow.Coder.class) + .put("kind:length_prefix", LengthPrefixCoder.class) + .put("kind:windowed_value", WindowedValue.FullWindowedValueCoder.class) + .build(); private static ThreadLocal<SoftReference<ExposedByteArrayOutputStream>> threadLocalOutputStream = new ThreadLocal<>(); @@ -266,10 +270,8 @@ public final class CoderUtils { return Class.forName(id); } - if (id.equals(KIND_STREAM)) { - return IterableCoder.class; - } else if (id.equals(KIND_PAIR)) { - return KvCoder.class; + if (WELL_KNOWN_CODER_TYPES.containsKey(id)) { + return WELL_KNOWN_CODER_TYPES.get(id); } // Otherwise, see if the ID is the name of a class in http://git-wip-us.apache.org/repos/asf/beam/blob/3de4108e/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java new file mode 100644 index 0000000..e31c561 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.coders; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import com.google.common.collect.ImmutableList; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.testing.CoderProperties; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link LengthPrefixCoder}. */ +@RunWith(JUnit4.class) +public class LengthPrefixCoderTest { + private static final StandardCoder<byte[]> TEST_CODER = LengthPrefixCoder.of(ByteArrayCoder.of()); + + private static final List<byte[]> TEST_VALUES = Arrays.asList( + new byte[]{ 0xa, 0xb, 0xc }, + new byte[]{ 0xd, 0x3 }, + new byte[]{ 0xd, 0xe }, + new byte[]{ }); + + @Test + public void testCoderSerializable() throws Exception { + CoderProperties.coderSerializable(TEST_CODER); + } + + @Test + public void testEncodedSize() throws Exception { + assertEquals(4L, + TEST_CODER.getEncodedElementByteSize(TEST_VALUES.get(0), Coder.Context.NESTED)); + assertEquals(4L, + TEST_CODER.getEncodedElementByteSize(TEST_VALUES.get(0), Coder.Context.OUTER)); + } + + @Test + public void testObserverIsCheap() throws Exception { + NullableCoder<Double> coder = NullableCoder.of(DoubleCoder.of()); + assertTrue(coder.isRegisterByteSizeObserverCheap(5.0, Coder.Context.OUTER)); + } + + @Test + public void testObserverIsNotCheap() throws Exception { + NullableCoder<List<String>> coder = NullableCoder.of(ListCoder.of(StringUtf8Coder.of())); + assertFalse(coder.isRegisterByteSizeObserverCheap( + ImmutableList.of("hi", "test"), Coder.Context.OUTER)); + } + + @Test + public void testDecodeEncodeEquals() throws Exception { + for (byte[] value : TEST_VALUES) { + CoderProperties.coderDecodeEncodeEqual(TEST_CODER, value); + } + } + + @Test + public void testRegisterByteSizeObserver() throws Exception { + CoderProperties.testByteCount(TEST_CODER, Coder.Context.OUTER, + new byte[][]{{ 0xa, 0xb, 0xc }}); + + CoderProperties.testByteCount(TEST_CODER, Coder.Context.NESTED, + new byte[][]{{ 0xa, 0xb, 0xc }, {}, {}, { 0xd, 0xe }, {}}); + } + + @Test + public void testStructuralValueConsistentWithEquals() throws Exception { + for (byte[] value1 : TEST_VALUES) { + for (byte[] value2 : TEST_VALUES) { + CoderProperties.structuralValueConsistentWithEquals(TEST_CODER, value1, value2); + } + } + } + + // If this changes, it implies the binary format has changed. + private static final String EXPECTED_ENCODING_ID = ""; + + @Test + public void testEncodingId() throws Exception { + CoderProperties.coderHasEncodingId(TEST_CODER, EXPECTED_ENCODING_ID); + } + + /** + * Generated data to check that the wire format has not changed. To regenerate, see + * {@link org.apache.beam.sdk.coders.PrintBase64Encodings}. + */ + private static final List<String> TEST_ENCODINGS = Arrays.asList( + "AwoLDA", + "Ag0D", + "Ag0O", + "AA"); + + @Test + public void testWireFormatEncode() throws Exception { + CoderProperties.coderEncodesBase64(TEST_CODER, TEST_VALUES, TEST_ENCODINGS); + } +}