[35/50] incubator-beam git commit: [BEAM-813] support metadata in Avro sink

2016-10-28 Thread thw
[BEAM-813] support metadata in Avro sink


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/eba099f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/eba099f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/eba099f5

Branch: refs/heads/apex-runner
Commit: eba099f564dba3dfbba30ae3533496b9e14f57a7
Parents: 25102f7
Author: Neville Li 
Authored: Mon Oct 24 18:56:36 2016 -0400
Committer: Luke Cwik 
Committed: Wed Oct 26 15:30:50 2016 -0700

--
 .../java/org/apache/beam/sdk/io/AvroIO.java | 143 ---
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  29 
 2 files changed, 154 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eba099f5/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
--
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index d912ff7..6deca7f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -21,11 +21,16 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.io.BaseEncoding;
 import java.io.IOException;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
+import java.util.Map;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
+
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
@@ -39,6 +44,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.values.PBegin;
@@ -455,6 +461,15 @@ public class AvroIO {
 }
 
 /**
+ * Returns a {@link PTransform} that writes Avro file(s) with the 
specified metadata.
+ *
+ * Supported value types are String, Long, and byte[].
+ */
+public static Bound withMetadata(Map 
metadata) {
+  return new Bound<>(GenericRecord.class).withMetadata(metadata);
+}
+
+/**
  * A {@link PTransform} that writes a bounded {@link PCollection} to an 
Avro file (or
  * multiple Avro files matching a sharding pattern).
  *
@@ -464,6 +479,8 @@ public class AvroIO {
   private static final String DEFAULT_SHARD_TEMPLATE = 
ShardNameTemplate.INDEX_OF_MAX;
   private static final SerializableAvroCodecFactory DEFAULT_CODEC =
   new SerializableAvroCodecFactory(CodecFactory.deflateCodec(6));
+  // This should be a multiple of 4 to not get a partial encoded byte.
+  private static final int METADATA_BYTES_MAX_LENGTH = 40;
 
   /** The filename to write to. */
   @Nullable
@@ -486,6 +503,8 @@ public class AvroIO {
* 
https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
*/
   final SerializableAvroCodecFactory codec;
+  /** Avro file metadata. */
+  final ImmutableMap metadata;
 
   Bound(Class type) {
 this(
@@ -497,7 +516,8 @@ public class AvroIO {
 type,
 null,
 true,
-DEFAULT_CODEC);
+DEFAULT_CODEC,
+ImmutableMap.of());
   }
 
   Bound(
@@ -509,7 +529,8 @@ public class AvroIO {
   Class type,
   Schema schema,
   boolean validate,
-  SerializableAvroCodecFactory codec) {
+  SerializableAvroCodecFactory codec,
+  Map metadata) {
 super(name);
 this.filenamePrefix = filenamePrefix;
 this.filenameSuffix = filenameSuffix;
@@ -519,6 +540,18 @@ public class AvroIO {
 this.schema = schema;
 this.validate = validate;
 this.codec = codec;
+
+Map badKeys = Maps.newLinkedHashMap();
+for (Map.Entry entry : metadata.entrySet()) {
+  Object v = entry.getValue();
+  if (!(v instanceof String || v instanceof Long || v instanceof 
byte[])) {
+badKeys.put(entry.getKey(), v.getClass().getSimpleName());
+  }
+

[47/50] [abbrv] incubator-beam git commit: [BEAM-813] support metadata in Avro sink

2016-10-27 Thread robertwb
[BEAM-813] support metadata in Avro sink


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/eba099f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/eba099f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/eba099f5

Branch: refs/heads/python-sdk
Commit: eba099f564dba3dfbba30ae3533496b9e14f57a7
Parents: 25102f7
Author: Neville Li 
Authored: Mon Oct 24 18:56:36 2016 -0400
Committer: Luke Cwik 
Committed: Wed Oct 26 15:30:50 2016 -0700

--
 .../java/org/apache/beam/sdk/io/AvroIO.java | 143 ---
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  29 
 2 files changed, 154 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eba099f5/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
--
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index d912ff7..6deca7f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -21,11 +21,16 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.io.BaseEncoding;
 import java.io.IOException;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
+import java.util.Map;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
+
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
@@ -39,6 +44,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.values.PBegin;
@@ -455,6 +461,15 @@ public class AvroIO {
 }
 
 /**
+ * Returns a {@link PTransform} that writes Avro file(s) with the 
specified metadata.
+ *
+ * Supported value types are String, Long, and byte[].
+ */
+public static Bound withMetadata(Map 
metadata) {
+  return new Bound<>(GenericRecord.class).withMetadata(metadata);
+}
+
+/**
  * A {@link PTransform} that writes a bounded {@link PCollection} to an 
Avro file (or
  * multiple Avro files matching a sharding pattern).
  *
@@ -464,6 +479,8 @@ public class AvroIO {
   private static final String DEFAULT_SHARD_TEMPLATE = 
ShardNameTemplate.INDEX_OF_MAX;
   private static final SerializableAvroCodecFactory DEFAULT_CODEC =
   new SerializableAvroCodecFactory(CodecFactory.deflateCodec(6));
+  // This should be a multiple of 4 to not get a partial encoded byte.
+  private static final int METADATA_BYTES_MAX_LENGTH = 40;
 
   /** The filename to write to. */
   @Nullable
@@ -486,6 +503,8 @@ public class AvroIO {
* 
https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
*/
   final SerializableAvroCodecFactory codec;
+  /** Avro file metadata. */
+  final ImmutableMap metadata;
 
   Bound(Class type) {
 this(
@@ -497,7 +516,8 @@ public class AvroIO {
 type,
 null,
 true,
-DEFAULT_CODEC);
+DEFAULT_CODEC,
+ImmutableMap.of());
   }
 
   Bound(
@@ -509,7 +529,8 @@ public class AvroIO {
   Class type,
   Schema schema,
   boolean validate,
-  SerializableAvroCodecFactory codec) {
+  SerializableAvroCodecFactory codec,
+  Map metadata) {
 super(name);
 this.filenamePrefix = filenamePrefix;
 this.filenameSuffix = filenameSuffix;
@@ -519,6 +540,18 @@ public class AvroIO {
 this.schema = schema;
 this.validate = validate;
 this.codec = codec;
+
+Map badKeys = Maps.newLinkedHashMap();
+for (Map.Entry entry : metadata.entrySet()) {
+  Object v = entry.getValue();
+  if (!(v instanceof String || v instanceof Long || v instanceof 
byte[])) {
+badKeys.put(entry.getKey(), v.getClass().getSimpleName());
+  }
+ 

incubator-beam git commit: [BEAM-813] support metadata in Avro sink

2016-10-26 Thread lcwik
Repository: incubator-beam
Updated Branches:
  refs/heads/master 25102f798 -> eba099f56


[BEAM-813] support metadata in Avro sink


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/eba099f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/eba099f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/eba099f5

Branch: refs/heads/master
Commit: eba099f564dba3dfbba30ae3533496b9e14f57a7
Parents: 25102f7
Author: Neville Li 
Authored: Mon Oct 24 18:56:36 2016 -0400
Committer: Luke Cwik 
Committed: Wed Oct 26 15:30:50 2016 -0700

--
 .../java/org/apache/beam/sdk/io/AvroIO.java | 143 ---
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  29 
 2 files changed, 154 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eba099f5/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
--
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index d912ff7..6deca7f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -21,11 +21,16 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.io.BaseEncoding;
 import java.io.IOException;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
+import java.util.Map;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
+
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
@@ -39,6 +44,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.values.PBegin;
@@ -455,6 +461,15 @@ public class AvroIO {
 }
 
 /**
+ * Returns a {@link PTransform} that writes Avro file(s) with the 
specified metadata.
+ *
+ * Supported value types are String, Long, and byte[].
+ */
+public static Bound withMetadata(Map 
metadata) {
+  return new Bound<>(GenericRecord.class).withMetadata(metadata);
+}
+
+/**
  * A {@link PTransform} that writes a bounded {@link PCollection} to an 
Avro file (or
  * multiple Avro files matching a sharding pattern).
  *
@@ -464,6 +479,8 @@ public class AvroIO {
   private static final String DEFAULT_SHARD_TEMPLATE = 
ShardNameTemplate.INDEX_OF_MAX;
   private static final SerializableAvroCodecFactory DEFAULT_CODEC =
   new SerializableAvroCodecFactory(CodecFactory.deflateCodec(6));
+  // This should be a multiple of 4 to not get a partial encoded byte.
+  private static final int METADATA_BYTES_MAX_LENGTH = 40;
 
   /** The filename to write to. */
   @Nullable
@@ -486,6 +503,8 @@ public class AvroIO {
* 
https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
*/
   final SerializableAvroCodecFactory codec;
+  /** Avro file metadata. */
+  final ImmutableMap metadata;
 
   Bound(Class type) {
 this(
@@ -497,7 +516,8 @@ public class AvroIO {
 type,
 null,
 true,
-DEFAULT_CODEC);
+DEFAULT_CODEC,
+ImmutableMap.of());
   }
 
   Bound(
@@ -509,7 +529,8 @@ public class AvroIO {
   Class type,
   Schema schema,
   boolean validate,
-  SerializableAvroCodecFactory codec) {
+  SerializableAvroCodecFactory codec,
+  Map metadata) {
 super(name);
 this.filenamePrefix = filenamePrefix;
 this.filenameSuffix = filenameSuffix;
@@ -519,6 +540,18 @@ public class AvroIO {
 this.schema = schema;
 this.validate = validate;
 this.codec = codec;
+
+Map badKeys = Maps.newLinkedHashMap();
+for (Map.Entry entry : metadata.entrySet()) {
+  Object v = entry.getValue();
+  if (!(v instanceof String || v instanceof Long || v instanceof 
byte[])) {
+