Repository: beam Updated Branches: refs/heads/master 2d22485c1 -> dc0fdcb7e
Add Additional CloudObjectTranslators Add IterableLikeCoders, MapCoder Add UnionCoder, CoGbkResultCoder, and NullableCoder translators. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/73cdd994 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/73cdd994 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/73cdd994 Branch: refs/heads/master Commit: 73cdd99466bef0c35158d4dd89ac10e9cb056782 Parents: 2d22485 Author: Thomas Groh <tg...@google.com> Authored: Mon May 1 22:29:34 2017 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Tue May 2 18:07:00 2017 -0700 ---------------------------------------------------------------------- .../dataflow/util/CloudObjectTranslators.java | 199 +++++++++++++++++++ ...aultCoderCloudObjectTranslatorRegistrar.java | 21 +- .../runners/dataflow/util/CloudObjectsTest.java | 33 ++- .../beam/sdk/transforms/join/CoGbkResult.java | 8 + 4 files changed, 256 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/73cdd994/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java index c27bee7..f3e3312 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java @@ -18,6 +18,8 @@ package org.apache.beam.runners.dataflow.util; +import static com.google.common.base.Preconditions.checkArgument; + import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.Collections; @@ -27,9 +29,15 @@ import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.IterableLikeCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.LengthPrefixCoder; +import org.apache.beam.sdk.coders.MapCoder; +import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.transforms.join.CoGbkResult.CoGbkResultCoder; +import org.apache.beam.sdk.transforms.join.CoGbkResultSchema; +import org.apache.beam.sdk.transforms.join.UnionCoder; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder; import org.apache.beam.sdk.util.CloudObject; @@ -39,6 +47,7 @@ import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.StringUtils; import org.apache.beam.sdk.util.Structs; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; +import org.apache.beam.sdk.values.TupleTag; /** Utilities for creating {@link CloudObjectTranslator} instances for {@link Coder Coders}. */ class CloudObjectTranslators { @@ -373,4 +382,194 @@ class CloudObjectTranslators { } }; } + public static CloudObjectTranslator<IterableLikeCoder> iterableLike( + final Class<? extends IterableLikeCoder> clazz) { + return new CloudObjectTranslator<IterableLikeCoder>() { + @Override + public CloudObject toCloudObject(IterableLikeCoder target) { + CloudObject base = CloudObject.forClass(clazz); + return addComponents(base, Collections.<Coder<?>>singletonList(target.getElemCoder())); + } + + @Override + public IterableLikeCoder<?, ?> fromCloudObject(CloudObject cloudObject) { + List<Coder<?>> elemCoderList = getComponents(cloudObject); + checkArgument( + elemCoderList.size() == 1, + "Expected 1 component for %s, got %s", + cloudObject.getClassName(), + elemCoderList.size()); + return InstanceBuilder.ofType(clazz) + .fromFactoryMethod("of") + .withArg(Coder.class, elemCoderList.get(0)) + .build(); + } + + @Override + public Class<? extends IterableLikeCoder> getSupportedClass() { + return clazz; + } + + @Override + public String cloudObjectClassName() { + return CloudObject.forClass(clazz).getClassName(); + } + }; + } + + public static CloudObjectTranslator<MapCoder> map() { + return new CloudObjectTranslator<MapCoder>() { + @Override + public CloudObject toCloudObject(MapCoder target) { + CloudObject base = CloudObject.forClass(MapCoder.class); + return addComponents( + base, ImmutableList.<Coder<?>>of(target.getKeyCoder(), target.getValueCoder())); + } + + @Override + public MapCoder<?, ?> fromCloudObject(CloudObject cloudObject) { + List<Coder<?>> components = getComponents(cloudObject); + checkArgument( + components.size() == 2, + "Expected 2 components for %s, got %s", + MapCoder.class.getSimpleName(), + components.size()); + return MapCoder.of(components.get(0), components.get(1)); + } + + @Override + public Class<? extends MapCoder> getSupportedClass() { + return MapCoder.class; + } + + @Override + public String cloudObjectClassName() { + return CloudObject.forClass(MapCoder.class).getClassName(); + } + }; + } + + public static CloudObjectTranslator<NullableCoder> nullable() { + return new CloudObjectTranslator<NullableCoder>() { + @Override + public CloudObject toCloudObject(NullableCoder target) { + CloudObject base = CloudObject.forClass(NullableCoder.class); + return addComponents(base, Collections.<Coder<?>>singletonList(target.getValueCoder())); + } + + @Override + public NullableCoder<?> fromCloudObject(CloudObject cloudObject) { + List<Coder<?>> componentList = getComponents(cloudObject); + checkArgument( + componentList.size() == 1, + "Expected 1 component for %s, got %s", + NullableCoder.class.getSimpleName(), + componentList.size()); + return NullableCoder.of(componentList.get(0)); + } + + @Override + public Class<? extends NullableCoder> getSupportedClass() { + return NullableCoder.class; + } + + @Override + public String cloudObjectClassName() { + return CloudObject.forClass(NullableCoder.class).getClassName(); + } + }; + } + + public static CloudObjectTranslator<UnionCoder> union() { + return new CloudObjectTranslator<UnionCoder>() { + @Override + public CloudObject toCloudObject(UnionCoder target) { + return addComponents(CloudObject.forClass(UnionCoder.class), target.getElementCoders()); + } + + @Override + public UnionCoder fromCloudObject(CloudObject cloudObject) { + List<Coder<?>> elementCoders = getComponents(cloudObject); + return UnionCoder.of(elementCoders); + } + + @Override + public Class<? extends UnionCoder> getSupportedClass() { + return UnionCoder.class; + } + + @Override + public String cloudObjectClassName() { + return CloudObject.forClass(UnionCoder.class).getClassName(); + } + }; + } + + public static CloudObjectTranslator<CoGbkResultCoder> coGroupByKeyResult() { + return new CloudObjectTranslator<CoGbkResultCoder>() { + @Override + public CloudObject toCloudObject(CoGbkResultCoder target) { + CloudObject base = CloudObject.forClass(CoGbkResultCoder.class); + Structs.addObject( + base, PropertyNames.CO_GBK_RESULT_SCHEMA, toCloudObject(target.getSchema())); + return addComponents(base, Collections.singletonList(target.getUnionCoder())); + } + + private CloudObject toCloudObject(CoGbkResultSchema schema) { + CloudObject result = CloudObject.forClass(CoGbkResultSchema.class); + List<CloudObject> tags = new ArrayList<>(schema.getTupleTagList().size()); + for (TupleTag<?> tag : schema.getTupleTagList().getAll()) { + CloudObject tagCloudObject = CloudObject.forClass(TupleTag.class); + Structs.addString(tagCloudObject, PropertyNames.VALUE, tag.getId()); + tags.add(tagCloudObject); + } + Structs.addList(result, PropertyNames.TUPLE_TAGS, tags); + return result; + } + + @Override + public CoGbkResultCoder fromCloudObject(CloudObject cloudObject) { + List<Coder<?>> components = getComponents(cloudObject); + checkArgument( + components.size() == 1, + "Expected 1 component for %s, got %s", + CoGbkResultCoder.class.getSimpleName(), + components.size()); + checkArgument( + components.get(0) instanceof UnionCoder, + "Expected only component to be a %s, got %s", + UnionCoder.class.getSimpleName(), + components.get(0).getClass().getName()); + return CoGbkResultCoder.of( + schemaFromCloudObject( + CloudObject.fromSpec( + Structs.getObject(cloudObject, PropertyNames.CO_GBK_RESULT_SCHEMA))), + (UnionCoder) components.get(0)); + } + + @Override + public Class<? extends CoGbkResultCoder> getSupportedClass() { + return CoGbkResultCoder.class; + } + + private CoGbkResultSchema schemaFromCloudObject(CloudObject cloudObject) { + List<TupleTag<?>> tags = new ArrayList<>(); + List<Map<String, Object>> serializedTags = + Structs.getListOfMaps( + cloudObject, + PropertyNames.TUPLE_TAGS, + Collections.<Map<String, Object>>emptyList()); + for (Map<String, Object> serializedTag : serializedTags) { + TupleTag<?> tag = new TupleTag<>(Structs.getString(serializedTag, PropertyNames.VALUE)); + tags.add(tag); + } + return CoGbkResultSchema.of(tags); + } + + @Override + public String cloudObjectClassName() { + return CloudObject.forClass(CoGbkResultCoder.class).getClassName(); + } + }; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/73cdd994/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java index 5cae13f..4567098 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java @@ -37,9 +37,12 @@ import org.apache.beam.sdk.coders.BigIntegerCoder; import org.apache.beam.sdk.coders.BitSetCoder; import org.apache.beam.sdk.coders.ByteCoder; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CollectionCoder; import org.apache.beam.sdk.coders.DoubleCoder; import org.apache.beam.sdk.coders.DurationCoder; import org.apache.beam.sdk.coders.InstantCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.SetCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.TextualIntegerCoder; import org.apache.beam.sdk.coders.VarIntCoder; @@ -56,7 +59,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder; public class DefaultCoderCloudObjectTranslatorRegistrar implements CoderCloudObjectTranslatorRegistrar { private static final List<CloudObjectTranslator<? extends Coder>> DEFAULT_TRANSLATORS = - ImmutableList.<CloudObjectTranslator<? extends Coder>>of( + ImmutableList.of( CloudObjectTranslators.globalWindow(), CloudObjectTranslators.intervalWindow(), CloudObjectTranslators.bytes(), @@ -67,7 +70,16 @@ public class DefaultCoderCloudObjectTranslatorRegistrar CloudObjectTranslators.windowedValue(), new AvroCoderCloudObjectTranslator(), new SerializableCoderCloudObjectTranslator(), + CloudObjectTranslators.iterableLike(CollectionCoder.class), + CloudObjectTranslators.iterableLike(ListCoder.class), + CloudObjectTranslators.iterableLike(SetCoder.class), + CloudObjectTranslators.map(), + CloudObjectTranslators.nullable(), + CloudObjectTranslators.union(), + CloudObjectTranslators.coGroupByKeyResult(), CloudObjectTranslators.javaSerialized()); + // TODO: ElementAndRestrictionCoder. This is in runners-core, but probably needs to be + // in core-construction @VisibleForTesting static final ImmutableSet<Class<? extends Coder>> KNOWN_ATOMIC_CODERS = ImmutableSet.<Class<? extends Coder>>of( @@ -91,6 +103,11 @@ public class DefaultCoderCloudObjectTranslatorRegistrar TextualIntegerCoder.class, VarIntCoder.class, VoidCoder.class); + // TODO: WriteBundlesToFiles.ResultCoder.class); + // TODO: Atomic, GCPIO Coders: + // TableRowInfoCoder.class + // PubsubUnboundedSink.OutgoingMessageCoder.class, + // PubsubUnboundedSource.PubsubCheckpointCoder.class, @Override public Map<String, CloudObjectTranslator<? extends Coder>> classNamesToTranslators() { @@ -106,7 +123,7 @@ public class DefaultCoderCloudObjectTranslatorRegistrar public Map<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>> classesToTranslators() { Builder<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>> builder = - ImmutableMap.<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>>builder(); + ImmutableMap.builder(); for (CloudObjectTranslator<? extends Coder> defaultTranslator : DEFAULT_TRANSLATORS) { builder.put(defaultTranslator.getSupportedClass(), defaultTranslator); } http://git-wip-us.apache.org/repos/asf/beam/blob/73cdd994/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java index b670268..2e66d43 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java @@ -37,18 +37,27 @@ import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CollectionCoder; import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.LengthPrefixCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.MapCoder; +import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.SetCoder; import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.transforms.join.CoGbkResult.CoGbkResultCoder; +import org.apache.beam.sdk.transforms.join.CoGbkResultSchema; +import org.apache.beam.sdk.transforms.join.UnionCoder; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.util.CloudObject; import org.apache.beam.sdk.util.InstanceBuilder; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; import org.junit.Test; import org.junit.experimental.runners.Enclosed; import org.junit.runner.RunWith; @@ -84,7 +93,7 @@ public class CloudObjectsTest { Set<Class<? extends Coder>> missing = new HashSet<>(); missing.addAll(defaultCoderTranslators); missing.removeAll(testedClasses); - assertThat(missing, emptyIterable()); + assertThat("Coders with custom serializers should all be tested", missing, emptyIterable()); } @Test @@ -117,10 +126,28 @@ public class CloudObjectsTest { WindowedValue.getFullCoder( KvCoder.of(VarLongCoder.of(), ByteArrayCoder.of()), IntervalWindow.getCoder())) - .add(VarLongCoder.of()) .add(ByteArrayCoder.of()) + .add(VarLongCoder.of()) .add(SerializableCoder.of(Record.class)) - .add(AvroCoder.of(Record.class)); + .add(AvroCoder.of(Record.class)) + .add(CollectionCoder.of(VarLongCoder.of())) + .add(ListCoder.of(VarLongCoder.of())) + .add(SetCoder.of(VarLongCoder.of())) + .add(MapCoder.of(VarLongCoder.of(), ByteArrayCoder.of())) + .add(NullableCoder.of(IntervalWindow.getCoder())) + .add( + UnionCoder.of( + ImmutableList.<Coder<?>>of( + VarLongCoder.of(), + ByteArrayCoder.of(), + KvCoder.of(VarLongCoder.of(), ByteArrayCoder.of())))) + .add( + CoGbkResultCoder.of( + CoGbkResultSchema.of( + ImmutableList.<TupleTag<?>>of( + new TupleTag<Long>(), new TupleTag<byte[]>())), + UnionCoder.of( + ImmutableList.<Coder<?>>of(VarLongCoder.of(), ByteArrayCoder.of())))); for (Class<? extends Coder> atomicCoder : DefaultCoderCloudObjectTranslatorRegistrar.KNOWN_ATOMIC_CODERS) { dataBuilder.add(InstanceBuilder.ofType(atomicCoder).fromFactoryMethod("of").build()); http://git-wip-us.apache.org/repos/asf/beam/blob/73cdd994/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java index 6c62cbe..02e1185 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java @@ -230,6 +230,14 @@ public class CoGbkResult { return ImmutableList.of(unionCoder); } + public CoGbkResultSchema getSchema() { + return schema; + } + + public UnionCoder getUnionCoder() { + return unionCoder; + } + @Override @SuppressWarnings("unchecked") public void encode(