[35/50] incubator-beam git commit: [BEAM-813] support metadata in Avro sink
[BEAM-813] support metadata in Avro sink Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/eba099f5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/eba099f5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/eba099f5 Branch: refs/heads/apex-runner Commit: eba099f564dba3dfbba30ae3533496b9e14f57a7 Parents: 25102f7 Author: Neville LiAuthored: Mon Oct 24 18:56:36 2016 -0400 Committer: Luke Cwik Committed: Wed Oct 26 15:30:50 2016 -0700 -- .../java/org/apache/beam/sdk/io/AvroIO.java | 143 --- .../java/org/apache/beam/sdk/io/AvroIOTest.java | 29 2 files changed, 154 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eba099f5/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index d912ff7..6deca7f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -21,11 +21,16 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.common.io.BaseEncoding; import java.io.IOException; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; +import java.util.Map; import java.util.regex.Pattern; import javax.annotation.Nullable; + import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; @@ -39,6 +44,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.values.PBegin; @@ -455,6 +461,15 @@ public class AvroIO { } /** + * Returns a {@link PTransform} that writes Avro file(s) with the specified metadata. + * + * Supported value types are String, Long, and byte[]. + */ +public static Bound withMetadata(Map metadata) { + return new Bound<>(GenericRecord.class).withMetadata(metadata); +} + +/** * A {@link PTransform} that writes a bounded {@link PCollection} to an Avro file (or * multiple Avro files matching a sharding pattern). * @@ -464,6 +479,8 @@ public class AvroIO { private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX; private static final SerializableAvroCodecFactory DEFAULT_CODEC = new SerializableAvroCodecFactory(CodecFactory.deflateCodec(6)); + // This should be a multiple of 4 to not get a partial encoded byte. + private static final int METADATA_BYTES_MAX_LENGTH = 40; /** The filename to write to. */ @Nullable @@ -486,6 +503,8 @@ public class AvroIO { * https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html */ final SerializableAvroCodecFactory codec; + /** Avro file metadata. */ + final ImmutableMap metadata; Bound(Class type) { this( @@ -497,7 +516,8 @@ public class AvroIO { type, null, true, -DEFAULT_CODEC); +DEFAULT_CODEC, +ImmutableMap. of()); } Bound( @@ -509,7 +529,8 @@ public class AvroIO { Class type, Schema schema, boolean validate, - SerializableAvroCodecFactory codec) { + SerializableAvroCodecFactory codec, + Map metadata) { super(name); this.filenamePrefix = filenamePrefix; this.filenameSuffix = filenameSuffix; @@ -519,6 +540,18 @@ public class AvroIO { this.schema = schema; this.validate = validate; this.codec = codec; + +Map badKeys = Maps.newLinkedHashMap(); +for (Map.Entry entry : metadata.entrySet()) { + Object v = entry.getValue(); + if (!(v instanceof String || v instanceof Long || v instanceof byte[])) { +badKeys.put(entry.getKey(), v.getClass().getSimpleName()); + } +
[47/50] [abbrv] incubator-beam git commit: [BEAM-813] support metadata in Avro sink
[BEAM-813] support metadata in Avro sink Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/eba099f5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/eba099f5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/eba099f5 Branch: refs/heads/python-sdk Commit: eba099f564dba3dfbba30ae3533496b9e14f57a7 Parents: 25102f7 Author: Neville LiAuthored: Mon Oct 24 18:56:36 2016 -0400 Committer: Luke Cwik Committed: Wed Oct 26 15:30:50 2016 -0700 -- .../java/org/apache/beam/sdk/io/AvroIO.java | 143 --- .../java/org/apache/beam/sdk/io/AvroIOTest.java | 29 2 files changed, 154 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eba099f5/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index d912ff7..6deca7f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -21,11 +21,16 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.common.io.BaseEncoding; import java.io.IOException; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; +import java.util.Map; import java.util.regex.Pattern; import javax.annotation.Nullable; + import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; @@ -39,6 +44,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.values.PBegin; @@ -455,6 +461,15 @@ public class AvroIO { } /** + * Returns a {@link PTransform} that writes Avro file(s) with the specified metadata. + * + * Supported value types are String, Long, and byte[]. + */ +public static Bound withMetadata(Map metadata) { + return new Bound<>(GenericRecord.class).withMetadata(metadata); +} + +/** * A {@link PTransform} that writes a bounded {@link PCollection} to an Avro file (or * multiple Avro files matching a sharding pattern). * @@ -464,6 +479,8 @@ public class AvroIO { private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX; private static final SerializableAvroCodecFactory DEFAULT_CODEC = new SerializableAvroCodecFactory(CodecFactory.deflateCodec(6)); + // This should be a multiple of 4 to not get a partial encoded byte. + private static final int METADATA_BYTES_MAX_LENGTH = 40; /** The filename to write to. */ @Nullable @@ -486,6 +503,8 @@ public class AvroIO { * https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html */ final SerializableAvroCodecFactory codec; + /** Avro file metadata. */ + final ImmutableMap metadata; Bound(Class type) { this( @@ -497,7 +516,8 @@ public class AvroIO { type, null, true, -DEFAULT_CODEC); +DEFAULT_CODEC, +ImmutableMap. of()); } Bound( @@ -509,7 +529,8 @@ public class AvroIO { Class type, Schema schema, boolean validate, - SerializableAvroCodecFactory codec) { + SerializableAvroCodecFactory codec, + Map metadata) { super(name); this.filenamePrefix = filenamePrefix; this.filenameSuffix = filenameSuffix; @@ -519,6 +540,18 @@ public class AvroIO { this.schema = schema; this.validate = validate; this.codec = codec; + +Map badKeys = Maps.newLinkedHashMap(); +for (Map.Entry entry : metadata.entrySet()) { + Object v = entry.getValue(); + if (!(v instanceof String || v instanceof Long || v instanceof byte[])) { +badKeys.put(entry.getKey(), v.getClass().getSimpleName()); + } +
incubator-beam git commit: [BEAM-813] support metadata in Avro sink
Repository: incubator-beam Updated Branches: refs/heads/master 25102f798 -> eba099f56 [BEAM-813] support metadata in Avro sink Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/eba099f5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/eba099f5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/eba099f5 Branch: refs/heads/master Commit: eba099f564dba3dfbba30ae3533496b9e14f57a7 Parents: 25102f7 Author: Neville LiAuthored: Mon Oct 24 18:56:36 2016 -0400 Committer: Luke Cwik Committed: Wed Oct 26 15:30:50 2016 -0700 -- .../java/org/apache/beam/sdk/io/AvroIO.java | 143 --- .../java/org/apache/beam/sdk/io/AvroIOTest.java | 29 2 files changed, 154 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eba099f5/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index d912ff7..6deca7f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -21,11 +21,16 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.common.io.BaseEncoding; import java.io.IOException; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; +import java.util.Map; import java.util.regex.Pattern; import javax.annotation.Nullable; + import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; @@ -39,6 +44,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.values.PBegin; @@ -455,6 +461,15 @@ public class AvroIO { } /** + * Returns a {@link PTransform} that writes Avro file(s) with the specified metadata. + * + * Supported value types are String, Long, and byte[]. + */ +public static Bound withMetadata(Map metadata) { + return new Bound<>(GenericRecord.class).withMetadata(metadata); +} + +/** * A {@link PTransform} that writes a bounded {@link PCollection} to an Avro file (or * multiple Avro files matching a sharding pattern). * @@ -464,6 +479,8 @@ public class AvroIO { private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX; private static final SerializableAvroCodecFactory DEFAULT_CODEC = new SerializableAvroCodecFactory(CodecFactory.deflateCodec(6)); + // This should be a multiple of 4 to not get a partial encoded byte. + private static final int METADATA_BYTES_MAX_LENGTH = 40; /** The filename to write to. */ @Nullable @@ -486,6 +503,8 @@ public class AvroIO { * https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html */ final SerializableAvroCodecFactory codec; + /** Avro file metadata. */ + final ImmutableMap metadata; Bound(Class type) { this( @@ -497,7 +516,8 @@ public class AvroIO { type, null, true, -DEFAULT_CODEC); +DEFAULT_CODEC, +ImmutableMap. of()); } Bound( @@ -509,7 +529,8 @@ public class AvroIO { Class type, Schema schema, boolean validate, - SerializableAvroCodecFactory codec) { + SerializableAvroCodecFactory codec, + Map metadata) { super(name); this.filenamePrefix = filenamePrefix; this.filenameSuffix = filenameSuffix; @@ -519,6 +540,18 @@ public class AvroIO { this.schema = schema; this.validate = validate; this.codec = codec; + +Map badKeys = Maps.newLinkedHashMap(); +for (Map.Entry entry : metadata.entrySet()) { + Object v = entry.getValue(); + if (!(v instanceof String || v instanceof Long || v instanceof byte[])) { +