Repository: nifi
Updated Branches:
  refs/heads/master 67819e501 -> 9c4fdd4ef


NIFI-4192: Add more Avro/metadata merge options to MergeContent

Signed-off-by: Pierre Villard <pierre.villard...@gmail.com>

This closes #2067.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9c4fdd4e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9c4fdd4e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9c4fdd4e

Branch: refs/heads/master
Commit: 9c4fdd4ef3589d64e4e15822fc9fdb79a2315535
Parents: 67819e5
Author: Matt Burgess <mattyb...@apache.org>
Authored: Tue Aug 8 13:32:17 2017 -0400
Committer: Pierre Villard <pierre.villard...@gmail.com>
Committed: Wed Aug 9 12:55:12 2017 +0200

----------------------------------------------------------------------
 .../nifi/processors/standard/MergeContent.java  |  69 +++--
 .../processors/standard/TestMergeContent.java   | 254 ++++++++++++++++++-
 2 files changed, 306 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/9c4fdd4e/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index edbc033..5ebe524 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -142,6 +142,33 @@ public class MergeContent extends BinFiles {
     public static final String SEGMENT_COUNT_ATTRIBUTE = "segment.count";
     public static final String SEGMENT_ORIGINAL_FILENAME = 
FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
 
+
+    public static final AllowableValue METADATA_STRATEGY_USE_FIRST = new 
AllowableValue("Use First Metadata", "Use First Metadata",
+            "For any input format that supports metadata (Avro, e.g.), the 
metadata for the first FlowFile in the bin will be set on the output 
FlowFile.");
+
+    public static final AllowableValue METADATA_STRATEGY_ALL_COMMON = new 
AllowableValue("Keep Only Common Metadata", "Keep Only Common Metadata",
+            "For any input format that supports metadata (Avro, e.g.), any 
FlowFile whose metadata values match those of the first FlowFile, any 
additional metadata "
+                    + "will be dropped but the FlowFile will be merged. Any 
FlowFile whose metadata values do not match those of the first FlowFile in the 
bin will not be merged.");
+
+    public static final AllowableValue METADATA_STRATEGY_IGNORE = new 
AllowableValue("Ignore Metadata", "Ignore Metadata",
+            "Ignores (does not transfer, compare, etc.) any metadata from a 
FlowFile whose content supports embedded metadata.");
+
+    public static final AllowableValue METADATA_STRATEGY_DO_NOT_MERGE = new 
AllowableValue("Do Not Merge Uncommon Metadata", "Do Not Merge Uncommon 
Metadata",
+            "For any input format that supports metadata (Avro, e.g.), any 
FlowFile whose metadata values do not match those of the first FlowFile in the 
bin will not be merged.");
+
+    public static final PropertyDescriptor METADATA_STRATEGY = new 
PropertyDescriptor.Builder()
+            .required(true)
+            .name("mergecontent-metadata-strategy")
+            .displayName("Metadata Strategy")
+            .description("For FlowFiles whose input format supports metadata 
(Avro, e.g.), this property determines which metadata should be added to the 
bundle. "
+                    + "If 'Use First Metadata' is selected, the metadata 
keys/values from the first FlowFile to be bundled will be used. If 'Keep Only 
Common Metadata' is selected, "
+                    + "only the metadata that exists on all FlowFiles in the 
bundle, with the same value, will be preserved. If 'Ignore Metadata' is 
selected, no metadata is transferred to "
+                    + "the outgoing bundled FlowFile. If 'Do Not Merge 
Uncommon Metadata' is selected, any FlowFile whose metadata values do not match 
those of the first bundled FlowFile "
+                    + "will not be merged.")
+            .allowableValues(METADATA_STRATEGY_USE_FIRST, 
METADATA_STRATEGY_ALL_COMMON, METADATA_STRATEGY_DO_NOT_MERGE, 
METADATA_STRATEGY_IGNORE)
+            .defaultValue(METADATA_STRATEGY_DO_NOT_MERGE.getValue())
+            .build();
+
     public static final AllowableValue MERGE_STRATEGY_BIN_PACK = new 
AllowableValue(
             "Bin-Packing Algorithm",
             "Bin-Packing Algorithm",
@@ -307,6 +334,7 @@ public class MergeContent extends BinFiles {
         descriptors.add(MERGE_FORMAT);
         descriptors.add(AttributeStrategyUtil.ATTRIBUTE_STRATEGY);
         descriptors.add(CORRELATION_ATTRIBUTE_NAME);
+        descriptors.add(METADATA_STRATEGY);
         descriptors.add(MIN_ENTRIES);
         descriptors.add(MAX_ENTRIES);
         descriptors.add(MIN_SIZE);
@@ -861,6 +889,7 @@ public class MergeContent extends BinFiles {
             final ProcessSession session = bin.getSession();
             final List<FlowFile> contents = bin.getContents();
 
+            final String metadataStrategy = 
context.getProperty(METADATA_STRATEGY).getValue();
             final Map<String, byte[]> metadata = new TreeMap<>();
             final AtomicReference<Schema> schema = new AtomicReference<>(null);
             final AtomicReference<String> inputCodec = new 
AtomicReference<>(null);
@@ -883,11 +912,13 @@ public class MergeContent extends BinFiles {
                                             // this is the first file - set up 
the writer, and store the
                                             // Schema & metadata we'll use.
                                             schema.set(reader.getSchema());
-                                            for (String key : 
reader.getMetaKeys()) {
-                                                if 
(!DataFileWriter.isReservedMeta(key)) {
-                                                    byte[] metadatum = 
reader.getMeta(key);
-                                                    metadata.put(key, 
metadatum);
-                                                    writer.setMeta(key, 
metadatum);
+                                            if 
(!METADATA_STRATEGY_IGNORE.getValue().equals(metadataStrategy)) {
+                                                for (String key : 
reader.getMetaKeys()) {
+                                                    if 
(!DataFileWriter.isReservedMeta(key)) {
+                                                        byte[] metadatum = 
reader.getMeta(key);
+                                                        metadata.put(key, 
metadatum);
+                                                        writer.setMeta(key, 
metadatum);
+                                                    }
                                                 }
                                             }
                                             
inputCodec.set(reader.getMetaString(DataFileConstants.CODEC));
@@ -905,19 +936,25 @@ public class MergeContent extends BinFiles {
                                                 unmerged.add(flowFile);
                                             }
 
-                                            // check that we're appending to 
the same metadata
-                                            for (String key : 
reader.getMetaKeys()) {
-                                                if 
(!DataFileWriter.isReservedMeta(key)) {
-                                                    byte[] metadatum = 
reader.getMeta(key);
-                                                    byte[] writersMetadatum = 
metadata.get(key);
-                                                    if 
(!Arrays.equals(metadatum, writersMetadatum)) {
-                                                        
getLogger().debug("Input file {} has different non-reserved metadata, not 
merging",
-                                                                new 
Object[]{flowFile.getId()});
-                                                        canMerge = false;
-                                                        unmerged.add(flowFile);
+                                            if 
(METADATA_STRATEGY_DO_NOT_MERGE.getValue().equals(metadataStrategy)
+                                                    || 
METADATA_STRATEGY_ALL_COMMON.getValue().equals(metadataStrategy)) {
+                                                // check that we're appending 
to the same metadata
+                                                for (String key : 
reader.getMetaKeys()) {
+                                                    if 
(!DataFileWriter.isReservedMeta(key)) {
+                                                        byte[] metadatum = 
reader.getMeta(key);
+                                                        byte[] 
writersMetadatum = metadata.get(key);
+                                                        if 
(!Arrays.equals(metadatum, writersMetadatum)) {
+                                                            // Ignore 
additional metadata if ALL_COMMON is the strategy, otherwise don't merge
+                                                            if 
(!METADATA_STRATEGY_ALL_COMMON.getValue().equals(metadataStrategy) || 
writersMetadatum != null) {
+                                                                
getLogger().debug("Input file {} has different non-reserved metadata, not 
merging",
+                                                                        new 
Object[]{flowFile.getId()});
+                                                                canMerge = 
false;
+                                                                
unmerged.add(flowFile);
+                                                            }
+                                                        }
                                                     }
                                                 }
-                                            }
+                                            } // else the metadata in the 
first FlowFile was either ignored or retained in the if-clause above
 
                                             // check that we're appending to 
the same codec
                                             String thisCodec = 
reader.getMetaString(DataFileConstants.CODEC);

http://git-wip-us.apache.org/repos/asf/nifi/blob/9c4fdd4e/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
index 4cd4460..d461712 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
@@ -197,6 +197,251 @@ public class TestMergeContent {
         Assert.assertTrue(places.containsKey("Some Place"));
     }
 
+    @Test
+    public void testAvroConcatWithDifferentMetadataDoNotMerge() throws 
IOException, InterruptedException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
+        runner.setProperty(MergeContent.MAX_ENTRIES, "3");
+        runner.setProperty(MergeContent.MIN_ENTRIES, "3");
+        runner.setProperty(MergeContent.MERGE_FORMAT, 
MergeContent.MERGE_FORMAT_AVRO);
+        runner.setProperty(MergeContent.METADATA_STRATEGY, 
MergeContent.METADATA_STRATEGY_DO_NOT_MERGE);
+
+        final Schema schema = new Schema.Parser().parse(new 
File("src/test/resources/TestMergeContent/user.avsc"));
+
+        final GenericRecord user1 = new GenericData.Record(schema);
+        user1.put("name", "Alyssa");
+        user1.put("favorite_number", 256);
+        final Map<String, String> userMeta1 = new HashMap<String, String>() {{
+            put("test_metadata1", "Test 1");
+        }};
+
+        final GenericRecord user2 = new GenericData.Record(schema);
+        user2.put("name", "Ben");
+        user2.put("favorite_number", 7);
+        user2.put("favorite_color", "red");
+        final Map<String, String> userMeta2 = new HashMap<String, String>() {{
+            put("test_metadata1", "Test 2"); // Test non-matching values
+        }};
+
+        final GenericRecord user3 = new GenericData.Record(schema);
+        user3.put("name", "John");
+        user3.put("favorite_number", 5);
+        user3.put("favorite_color", "blue");
+        final Map<String, String> userMeta3 = new HashMap<String, String>() {{
+            put("test_metadata1", "Test 1");
+            put("test_metadata2", "Test"); // Test unique
+        }};
+
+        final DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
+        final ByteArrayOutputStream out1 = serializeAvroRecord(schema, user1, 
datumWriter, userMeta1);
+        final ByteArrayOutputStream out2 = serializeAvroRecord(schema, user2, 
datumWriter, userMeta2);
+        final ByteArrayOutputStream out3 = serializeAvroRecord(schema, user3, 
datumWriter, userMeta3);
+
+        runner.enqueue(out1.toByteArray());
+        runner.enqueue(out2.toByteArray());
+        runner.enqueue(out3.toByteArray());
+
+        runner.run();
+        runner.assertQueueEmpty();
+        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
+        runner.assertTransferCount(MergeContent.REL_FAILURE, 2);
+        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
+
+        final MockFlowFile bundle = 
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
+        bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), 
"application/avro-binary");
+
+        // create a reader for the merged content
+        byte[] data = runner.getContentAsByteArray(bundle);
+        final Map<String, GenericRecord> users = getGenericRecordMap(data, 
schema, "name");
+
+        Assert.assertEquals(1, users.size());
+        Assert.assertTrue(users.containsKey("Alyssa"));
+    }
+
+    @Test
+    public void testAvroConcatWithDifferentMetadataIgnore() throws 
IOException, InterruptedException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
+        runner.setProperty(MergeContent.MAX_ENTRIES, "3");
+        runner.setProperty(MergeContent.MIN_ENTRIES, "3");
+        runner.setProperty(MergeContent.MERGE_FORMAT, 
MergeContent.MERGE_FORMAT_AVRO);
+        runner.setProperty(MergeContent.METADATA_STRATEGY, 
MergeContent.METADATA_STRATEGY_IGNORE);
+
+        final Schema schema = new Schema.Parser().parse(new 
File("src/test/resources/TestMergeContent/user.avsc"));
+
+        final GenericRecord user1 = new GenericData.Record(schema);
+        user1.put("name", "Alyssa");
+        user1.put("favorite_number", 256);
+        final Map<String, String> userMeta1 = new HashMap<String, String>() {{
+            put("test_metadata1", "Test 1");
+        }};
+
+        final GenericRecord user2 = new GenericData.Record(schema);
+        user2.put("name", "Ben");
+        user2.put("favorite_number", 7);
+        user2.put("favorite_color", "red");
+        final Map<String, String> userMeta2 = new HashMap<String, String>() {{
+            put("test_metadata1", "Test 2"); // Test non-matching values
+        }};
+
+        final GenericRecord user3 = new GenericData.Record(schema);
+        user3.put("name", "John");
+        user3.put("favorite_number", 5);
+        user3.put("favorite_color", "blue");
+        final Map<String, String> userMeta3 = new HashMap<String, String>() {{
+            put("test_metadata1", "Test 1");
+            put("test_metadata2", "Test"); // Test unique
+        }};
+
+        final DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
+        final ByteArrayOutputStream out1 = serializeAvroRecord(schema, user1, 
datumWriter, userMeta1);
+        final ByteArrayOutputStream out2 = serializeAvroRecord(schema, user2, 
datumWriter, userMeta2);
+        final ByteArrayOutputStream out3 = serializeAvroRecord(schema, user3, 
datumWriter, userMeta3);
+
+        runner.enqueue(out1.toByteArray());
+        runner.enqueue(out2.toByteArray());
+        runner.enqueue(out3.toByteArray());
+
+        runner.run();
+        runner.assertQueueEmpty();
+        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
+        runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
+        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
+
+        final MockFlowFile bundle = 
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
+        bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), 
"application/avro-binary");
+
+        // create a reader for the merged content
+        byte[] data = runner.getContentAsByteArray(bundle);
+        final Map<String, GenericRecord> users = getGenericRecordMap(data, 
schema, "name");
+
+        Assert.assertEquals(3, users.size());
+        Assert.assertTrue(users.containsKey("Alyssa"));
+        Assert.assertTrue(users.containsKey("Ben"));
+        Assert.assertTrue(users.containsKey("John"));
+    }
+
+    @Test
+    public void testAvroConcatWithDifferentMetadataUseFirst() throws 
IOException, InterruptedException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
+        runner.setProperty(MergeContent.MAX_ENTRIES, "3");
+        runner.setProperty(MergeContent.MIN_ENTRIES, "3");
+        runner.setProperty(MergeContent.MERGE_FORMAT, 
MergeContent.MERGE_FORMAT_AVRO);
+        runner.setProperty(MergeContent.METADATA_STRATEGY, 
MergeContent.METADATA_STRATEGY_USE_FIRST);
+
+        final Schema schema = new Schema.Parser().parse(new 
File("src/test/resources/TestMergeContent/user.avsc"));
+
+        final GenericRecord user1 = new GenericData.Record(schema);
+        user1.put("name", "Alyssa");
+        user1.put("favorite_number", 256);
+        final Map<String, String> userMeta1 = new HashMap<String, String>() {{
+            put("test_metadata1", "Test 1");
+        }};
+
+        final GenericRecord user2 = new GenericData.Record(schema);
+        user2.put("name", "Ben");
+        user2.put("favorite_number", 7);
+        user2.put("favorite_color", "red");
+        final Map<String, String> userMeta2 = new HashMap<String, String>() {{
+            put("test_metadata1", "Test 2"); // Test non-matching values
+        }};
+
+        final GenericRecord user3 = new GenericData.Record(schema);
+        user3.put("name", "John");
+        user3.put("favorite_number", 5);
+        user3.put("favorite_color", "blue");
+        final Map<String, String> userMeta3 = new HashMap<String, String>() {{
+            put("test_metadata1", "Test 1");
+            put("test_metadata2", "Test"); // Test unique
+        }};
+
+        final DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
+        final ByteArrayOutputStream out1 = serializeAvroRecord(schema, user1, 
datumWriter, userMeta1);
+        final ByteArrayOutputStream out2 = serializeAvroRecord(schema, user2, 
datumWriter, userMeta2);
+        final ByteArrayOutputStream out3 = serializeAvroRecord(schema, user3, 
datumWriter, userMeta3);
+
+        runner.enqueue(out1.toByteArray());
+        runner.enqueue(out2.toByteArray());
+        runner.enqueue(out3.toByteArray());
+
+        runner.run();
+        runner.assertQueueEmpty();
+        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
+        runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
+        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
+
+        final MockFlowFile bundle = 
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
+        bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), 
"application/avro-binary");
+
+        // create a reader for the merged content
+        byte[] data = runner.getContentAsByteArray(bundle);
+        final Map<String, GenericRecord> users = getGenericRecordMap(data, 
schema, "name");
+
+        Assert.assertEquals(3, users.size());
+        Assert.assertTrue(users.containsKey("Alyssa"));
+        Assert.assertTrue(users.containsKey("Ben"));
+        Assert.assertTrue(users.containsKey("John"));
+    }
+
+    @Test
+    public void testAvroConcatWithDifferentMetadataKeepCommon() throws 
IOException, InterruptedException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
+        runner.setProperty(MergeContent.MAX_ENTRIES, "3");
+        runner.setProperty(MergeContent.MIN_ENTRIES, "3");
+        runner.setProperty(MergeContent.MERGE_FORMAT, 
MergeContent.MERGE_FORMAT_AVRO);
+        runner.setProperty(MergeContent.METADATA_STRATEGY, 
MergeContent.METADATA_STRATEGY_ALL_COMMON);
+
+        final Schema schema = new Schema.Parser().parse(new 
File("src/test/resources/TestMergeContent/user.avsc"));
+
+        final GenericRecord user1 = new GenericData.Record(schema);
+        user1.put("name", "Alyssa");
+        user1.put("favorite_number", 256);
+        final Map<String, String> userMeta1 = new HashMap<String, String>() {{
+            put("test_metadata1", "Test 1");
+        }};
+
+        final GenericRecord user2 = new GenericData.Record(schema);
+        user2.put("name", "Ben");
+        user2.put("favorite_number", 7);
+        user2.put("favorite_color", "red");
+        final Map<String, String> userMeta2 = new HashMap<String, String>() {{
+            put("test_metadata1", "Test 2"); // Test non-matching values
+        }};
+
+        final GenericRecord user3 = new GenericData.Record(schema);
+        user3.put("name", "John");
+        user3.put("favorite_number", 5);
+        user3.put("favorite_color", "blue");
+        final Map<String, String> userMeta3 = new HashMap<String, String>() {{
+            put("test_metadata1", "Test 1");
+            put("test_metadata2", "Test"); // Test unique
+        }};
+
+        final DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
+        final ByteArrayOutputStream out1 = serializeAvroRecord(schema, user1, 
datumWriter, userMeta1);
+        final ByteArrayOutputStream out2 = serializeAvroRecord(schema, user2, 
datumWriter, userMeta2);
+        final ByteArrayOutputStream out3 = serializeAvroRecord(schema, user3, 
datumWriter, userMeta3);
+
+        runner.enqueue(out1.toByteArray());
+        runner.enqueue(out2.toByteArray());
+        runner.enqueue(out3.toByteArray());
+
+        runner.run();
+        runner.assertQueueEmpty();
+        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
+        runner.assertTransferCount(MergeContent.REL_FAILURE, 1);
+        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
+
+        final MockFlowFile bundle = 
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
+        bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), 
"application/avro-binary");
+
+        // create a reader for the merged content
+        byte[] data = runner.getContentAsByteArray(bundle);
+        final Map<String, GenericRecord> users = getGenericRecordMap(data, 
schema, "name");
+
+        Assert.assertEquals(2, users.size());
+        Assert.assertTrue(users.containsKey("Alyssa"));
+        Assert.assertTrue(users.containsKey("John"));
+    }
+
     private Map<String, GenericRecord> getGenericRecordMap(byte[] data, Schema 
schema, String key) throws IOException {
         // create a reader for the merged contet
         DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>(schema);
@@ -213,8 +458,15 @@ public class TestMergeContent {
     }
 
     private ByteArrayOutputStream serializeAvroRecord(Schema schema, 
GenericRecord user2, DatumWriter<GenericRecord> datumWriter) throws IOException 
{
+        return serializeAvroRecord(schema, user2, datumWriter, null);
+    }
+
+    private ByteArrayOutputStream serializeAvroRecord(Schema schema, 
GenericRecord user2, DatumWriter<GenericRecord> datumWriter, Map<String, 
String> metadata) throws IOException {
         ByteArrayOutputStream out2 = new ByteArrayOutputStream();
-        DataFileWriter<GenericRecord> dataFileWriter2 = new 
DataFileWriter<GenericRecord>(datumWriter);
+        DataFileWriter<GenericRecord> dataFileWriter2 = new 
DataFileWriter<>(datumWriter);
+        if (metadata != null) {
+            metadata.forEach(dataFileWriter2::setMeta);
+        }
         dataFileWriter2.create(schema, out2);
         dataFileWriter2.append(user2);
         dataFileWriter2.close();

Reply via email to