Repository: incubator-gobblin Updated Branches: refs/heads/master 067d42233 -> 6c14efe76
[GOBBLIN-194] Fix a NullPointerException that can occur if a partitioned writer is used and a filename for metadata output has not been set Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/2f9ac6cf Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/2f9ac6cf Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/2f9ac6cf Branch: refs/heads/master Commit: 2f9ac6cfbf403930b69fa17770fa908a1bc607bb Parents: 067d422 Author: Eric Ogren <eog...@linkedin.com> Authored: Tue Aug 8 12:13:56 2017 -0700 Committer: Eric Ogren <eog...@linkedin.com> Committed: Tue Aug 8 12:16:56 2017 -0700 ---------------------------------------------------------------------- .../gobblin/publisher/BaseDataPublisher.java | 8 ++++- .../publisher/BaseDataPublisherTest.java | 32 ++++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2f9ac6cf/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java index 5e1a731..f0d0e32 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java @@ -465,10 +465,16 @@ public class BaseDataPublisher extends SingleTaskDataPublisher { branchId, getMetadataOutputFileForBranch(anyState, branchId)); } else { + String metadataFilename = getMetadataFileNameForBranch(anyState, branchId); + if (mdOutputPath == null || metadataFilename == null) { + LOG.info("Metadata filename not set for branch " + String.valueOf(branchId) + ": not publishing metadata."); + continue; + } + for (String partition : partitions) { publishMetadata(getMergedMetadataForPartitionAndBranch(partition, branchId), branchId, - new Path(new Path(mdOutputPath, partition), getMetadataFileNameForBranch(anyState, branchId))); + new Path(new Path(mdOutputPath, partition), metadataFilename)); } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2f9ac6cf/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java b/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java index ef68fd1..09bc0c8 100644 --- a/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java +++ b/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java @@ -198,6 +198,38 @@ public class BaseDataPublisherTest { } @Test + public void testNoOutputWhenDisabledWithPartitions() + throws IOException { + + File publishPath = Files.createTempDir(); + + State s = buildDefaultState(1); + s.removeProp(ConfigurationKeys.DATA_PUBLISHER_METADATA_OUTPUT_DIR); + s.removeProp(ConfigurationKeys.DATA_PUBLISHER_METADATA_OUTPUT_FILE); + s.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, publishPath.getAbsolutePath()); + + WorkUnitState wuState = new WorkUnitState(); + addStateToWorkunit(s, wuState); + + wuState.setProp(ConfigurationKeys.WRITER_METADATA_KEY, "abcdefg"); + + FsWriterMetrics metrics1 = buildWriterMetrics("foo1.json", "1-2-3-4", 0, 10); + FsWriterMetrics metrics2 = buildWriterMetrics("foo1.json", "5-6-7-8",10, 20); + wuState.setProp(ConfigurationKeys.WRITER_PARTITION_PATH_KEY, "1-2-3-4"); + wuState.setProp(FsDataWriter.FS_WRITER_METRICS_KEY, metrics1.toJson()); + wuState.setProp(ConfigurationKeys.WRITER_PARTITION_PATH_KEY + "_0", "1-2-3-4"); + wuState.setProp(FsDataWriter.FS_WRITER_METRICS_KEY + " _0", metrics2.toJson()); + wuState.setProp(ConfigurationKeys.WRITER_PARTITION_PATH_KEY + "_1", "5-6-7-8"); + wuState.setProp(FsDataWriter.FS_WRITER_METRICS_KEY + " _1", metrics2.toJson()); + + BaseDataPublisher publisher = new BaseDataPublisher(s); + publisher.publishMetadata(Collections.singletonList(wuState)); + + String[] filesInPublishDir = publishPath.list(); + Assert.assertEquals(0, filesInPublishDir.length, "Expected 0 files to be output to publish path"); + } + + @Test public void testMergesExistingMetadata() throws IOException { File publishPath = Files.createTempDir(); try {