[BEAM-813] support metadata in Avro sink
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/eba099f5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/eba099f5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/eba099f5 Branch: refs/heads/python-sdk Commit: eba099f564dba3dfbba30ae3533496b9e14f57a7 Parents: 25102f7 Author: Neville Li <nevi...@spotify.com> Authored: Mon Oct 24 18:56:36 2016 -0400 Committer: Luke Cwik <lc...@google.com> Committed: Wed Oct 26 15:30:50 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/AvroIO.java | 143 ++++++++++++++++--- .../java/org/apache/beam/sdk/io/AvroIOTest.java | 29 ++++ 2 files changed, 154 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eba099f5/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index d912ff7..6deca7f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -21,11 +21,16 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.common.io.BaseEncoding; import java.io.IOException; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; +import java.util.Map; import java.util.regex.Pattern; import javax.annotation.Nullable; + import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; @@ -39,6 +44,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.values.PBegin; @@ -455,6 +461,15 @@ public class AvroIO { } /** + * Returns a {@link PTransform} that writes Avro file(s) with the specified metadata. + * + * <p>Supported value types are String, Long, and byte[]. + */ + public static Bound<GenericRecord> withMetadata(Map<String, Object> metadata) { + return new Bound<>(GenericRecord.class).withMetadata(metadata); + } + + /** * A {@link PTransform} that writes a bounded {@link PCollection} to an Avro file (or * multiple Avro files matching a sharding pattern). * @@ -464,6 +479,8 @@ public class AvroIO { private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX; private static final SerializableAvroCodecFactory DEFAULT_CODEC = new SerializableAvroCodecFactory(CodecFactory.deflateCodec(6)); + // This should be a multiple of 4 to not get a partial encoded byte. + private static final int METADATA_BYTES_MAX_LENGTH = 40; /** The filename to write to. */ @Nullable @@ -486,6 +503,8 @@ public class AvroIO { * https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html */ final SerializableAvroCodecFactory codec; + /** Avro file metadata. */ + final ImmutableMap<String, Object> metadata; Bound(Class<T> type) { this( @@ -497,7 +516,8 @@ public class AvroIO { type, null, true, - DEFAULT_CODEC); + DEFAULT_CODEC, + ImmutableMap.<String, Object>of()); } Bound( @@ -509,7 +529,8 @@ public class AvroIO { Class<T> type, Schema schema, boolean validate, - SerializableAvroCodecFactory codec) { + SerializableAvroCodecFactory codec, + Map<String, Object> metadata) { super(name); this.filenamePrefix = filenamePrefix; this.filenameSuffix = filenameSuffix; @@ -519,6 +540,18 @@ public class AvroIO { this.schema = schema; this.validate = validate; this.codec = codec; + + Map<String, String> badKeys = Maps.newLinkedHashMap(); + for (Map.Entry<String, Object> entry : metadata.entrySet()) { + Object v = entry.getValue(); + if (!(v instanceof String || v instanceof Long || v instanceof byte[])) { + badKeys.put(entry.getKey(), v.getClass().getSimpleName()); + } + } + checkArgument( + badKeys.isEmpty(), + "Metadata value type must be one of String, Long, or byte[]. Found {}", badKeys); + this.metadata = ImmutableMap.copyOf(metadata); } /** @@ -541,7 +574,8 @@ public class AvroIO { type, schema, validate, - codec); + codec, + metadata); } /** @@ -563,7 +597,8 @@ public class AvroIO { type, schema, validate, - codec); + codec, + metadata); } /** @@ -591,7 +626,8 @@ public class AvroIO { type, schema, validate, - codec); + codec, + metadata); } /** @@ -612,7 +648,8 @@ public class AvroIO { type, schema, validate, - codec); + codec, + metadata); } /** @@ -634,7 +671,8 @@ public class AvroIO { type, schema, validate, - codec); + codec, + metadata); } /** @@ -656,7 +694,8 @@ public class AvroIO { type, ReflectData.get().getSchema(type), validate, - codec); + codec, + metadata); } /** @@ -676,7 +715,8 @@ public class AvroIO { GenericRecord.class, schema, validate, - codec); + codec, + metadata); } /** @@ -710,7 +750,8 @@ public class AvroIO { type, schema, false, - codec); + codec, + metadata); } /** @@ -729,7 +770,28 @@ public class AvroIO { type, schema, validate, - new SerializableAvroCodecFactory(codec)); + new SerializableAvroCodecFactory(codec), + metadata); + } + + /** + * Returns a new {@link PTransform} that's like this one but + * that writes to Avro file(s) with the specified metadata. + * + * <p>Does not modify this object. + */ + public Bound<T> withMetadata(Map<String, Object> metadata) { + return new Bound<>( + name, + filenamePrefix, + filenameSuffix, + numShards, + shardTemplate, + type, + schema, + validate, + codec, + metadata); } @Override @@ -749,7 +811,8 @@ public class AvroIO { filenameSuffix, shardTemplate, AvroCoder.of(type, schema), - codec)); + codec, + metadata)); if (getNumShards() > 0) { write = write.withNumShards(getNumShards()); } @@ -779,6 +842,24 @@ public class AvroIO { .addIfNotDefault(DisplayData.item("codec", codec.toString()) .withLabel("Avro Compression Codec"), DEFAULT_CODEC.toString()); + builder.include("Metadata", new Metadata()); + } + + private class Metadata implements HasDisplayData { + @Override + public void populateDisplayData(DisplayData.Builder builder) { + for (Map.Entry<String, Object> entry : metadata.entrySet()) { + DisplayData.Type type = DisplayData.inferType(entry.getValue()); + if (type != null) { + builder.add(DisplayData.item(entry.getKey(), type, entry.getValue())); + } else { + String base64 = BaseEncoding.base64().encode((byte[]) entry.getValue()); + String repr = base64.length() <= METADATA_BYTES_MAX_LENGTH + ? base64 : base64.substring(0, METADATA_BYTES_MAX_LENGTH) + "..."; + builder.add(DisplayData.item(entry.getKey(), repr)); + } + } + } } /** @@ -824,6 +905,10 @@ public class AvroIO { public CodecFactory getCodec() { return codec.getCodec(); } + + public Map<String, Object> getMetadata() { + return metadata; + } } /** Disallow construction of utility class. */ @@ -853,6 +938,7 @@ public class AvroIO { static class AvroSink<T> extends FileBasedSink<T> { private final AvroCoder<T> coder; private final SerializableAvroCodecFactory codec; + private final ImmutableMap<String, Object> metadata; @VisibleForTesting AvroSink( @@ -860,16 +946,17 @@ public class AvroIO { String extension, String fileNameTemplate, AvroCoder<T> coder, - SerializableAvroCodecFactory codec) { + SerializableAvroCodecFactory codec, + ImmutableMap<String, Object> metadata) { super(baseOutputFilename, extension, fileNameTemplate); this.coder = coder; this.codec = codec; - + this.metadata = metadata; } @Override public FileBasedSink.FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options) { - return new AvroWriteOperation<>(this, coder, codec); + return new AvroWriteOperation<>(this, coder, codec, metadata); } /** @@ -879,18 +966,21 @@ public class AvroIO { private static class AvroWriteOperation<T> extends FileBasedWriteOperation<T> { private final AvroCoder<T> coder; private final SerializableAvroCodecFactory codec; + private final ImmutableMap<String, Object> metadata; private AvroWriteOperation(AvroSink<T> sink, AvroCoder<T> coder, - SerializableAvroCodecFactory codec) { + SerializableAvroCodecFactory codec, + ImmutableMap<String, Object> metadata) { super(sink); this.coder = coder; this.codec = codec; + this.metadata = metadata; } @Override public FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception { - return new AvroWriter<>(this, coder, codec); + return new AvroWriter<>(this, coder, codec, metadata); } } @@ -902,20 +992,37 @@ public class AvroIO { private final AvroCoder<T> coder; private DataFileWriter<T> dataFileWriter; private SerializableAvroCodecFactory codec; + private final ImmutableMap<String, Object> metadata; public AvroWriter(FileBasedWriteOperation<T> writeOperation, AvroCoder<T> coder, - SerializableAvroCodecFactory codec) { + SerializableAvroCodecFactory codec, + ImmutableMap<String, Object> metadata) { super(writeOperation); this.mimeType = MimeTypes.BINARY; this.coder = coder; this.codec = codec; + this.metadata = metadata; } @SuppressWarnings("deprecation") // uses internal test functionality. @Override protected void prepareWrite(WritableByteChannel channel) throws Exception { dataFileWriter = new DataFileWriter<>(coder.createDatumWriter()).setCodec(codec.getCodec()); + for (Map.Entry<String, Object> entry : metadata.entrySet()) { + Object v = entry.getValue(); + if (v instanceof String) { + dataFileWriter.setMeta(entry.getKey(), (String) v); + } else if (v instanceof Long) { + dataFileWriter.setMeta(entry.getKey(), (Long) v); + } else if (v instanceof byte[]) { + dataFileWriter.setMeta(entry.getKey(), (byte[]) v); + } else { + throw new IllegalStateException( + "Metadata value type must be one of String, Long, or byte[]. Found " + + v.getClass().getSimpleName()); + } + } dataFileWriter.create(coder.getSchema(), Channels.newOutputStream(channel)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eba099f5/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 4825875..1a07177 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -21,6 +21,7 @@ import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.hasItem; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -28,6 +29,7 @@ import static org.junit.Assert.assertTrue; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; import java.io.File; import java.io.FileInputStream; @@ -315,6 +317,33 @@ public class AvroIOTest { assertEquals(CodecFactory.xzCodec(9).toString(), serdeWrite.getCodec().toString()); } + @Test + @SuppressWarnings("unchecked") + @Category(NeedsRunner.class) + public void testMetdata() throws Exception { + TestPipeline p = TestPipeline.create(); + List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"), + new GenericClass(5, "bar")); + File outputFile = tmpFolder.newFile("output.avro"); + + p.apply(Create.of(values)) + .apply(AvroIO.Write.to(outputFile.getAbsolutePath()) + .withoutSharding() + .withSchema(GenericClass.class) + .withMetadata(ImmutableMap.<String, Object>of( + "stringKey", "stringValue", + "longKey", 100L, + "bytesKey", "bytesValue".getBytes()))); + p.run(); + + DataFileStream dataFileStream = new DataFileStream(new FileInputStream(outputFile), + new GenericDatumReader()); + assertEquals("stringValue", dataFileStream.getMetaString("stringKey")); + assertEquals(100L, dataFileStream.getMetaLong("longKey")); + assertArrayEquals("bytesValue".getBytes(), dataFileStream.getMeta("bytesKey")); + } + + @SuppressWarnings("deprecation") // using AvroCoder#createDatumReader for tests. private void runTestWrite(String[] expectedElements, int numShards) throws IOException { File baseOutputFile = new File(tmpFolder.getRoot(), "prefix");