Repository: incubator-gobblin Updated Branches: refs/heads/master f89505702 -> 0791ee992
[GOBBLIN-407] fix job output path for full snapshot Closes #2284 from arjun4084346/fixOutputDirPath Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/0791ee99 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/0791ee99 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/0791ee99 Branch: refs/heads/master Commit: 0791ee992e533957654ee7dedb33358b312acf26 Parents: f895057 Author: Arjun <ab...@linkedin.com> Authored: Mon Feb 12 18:20:33 2018 -0800 Committer: Hung Tran <hut...@linkedin.com> Committed: Mon Feb 12 18:20:33 2018 -0800 ---------------------------------------------------------------------- .../gobblin/configuration/WorkUnitState.java | 4 ++++ .../apache/gobblin/source/workunit/Extract.java | 2 ++ .../gobblin/source/workunit/WorkUnit.java | 21 +++++++++++++++++++- .../org/apache/gobblin/util/WriterUtils.java | 5 ++--- 4 files changed, 28 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0791ee99/gobblin-api/src/main/java/org/apache/gobblin/configuration/WorkUnitState.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/WorkUnitState.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/WorkUnitState.java index 0b40399..45bf807 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/WorkUnitState.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/WorkUnitState.java @@ -62,6 +62,10 @@ public class WorkUnitState extends State { private static final Gson GSON = new Gson(); + public String getOutputFilePath() { + return this.workUnit.getOutputFilePath(); + } + /** * Runtime state of the {@link WorkUnit}. * http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0791ee99/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/Extract.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/Extract.java b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/Extract.java index 5161427..7a3735b 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/Extract.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/Extract.java @@ -137,7 +137,9 @@ public class Extract extends State { * Get the writer output file path corresponding to this {@link Extract}. * * @return writer output file path corresponding to this {@link Extract} + * @deprecated As {@code this.getIsFull} is deprecated. */ + @Deprecated public String getOutputFilePath() { return this.getNamespace().replaceAll("\\.", "/") + "/" + this.getTable() + "/" + this.getExtractId() + "_" + (this.getIsFull() ? "full" : "append"); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0791ee99/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java index 38aabcb..7d3f5d3 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java @@ -311,9 +311,14 @@ public class WorkUnit extends State { @Override public String getProp(String key) { + return getProp(key, null); + } + + @Override + public String getProp(String key, String def) { String value = super.getProp(key); if (value == null) { - value = this.extract.getProp(key); + value = this.extract.getProp(key, def); } return value; } @@ -359,4 +364,18 @@ public class WorkUnit extends State { result = prime * result + ((this.extract == null) ? 0 : this.extract.hashCode()); return result; } + + public String getOutputFilePath() { + // Search for the properties in the workunit. + // This search for the property first in State and then in the Extract of this workunit. + String namespace = getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, ""); + String table = getProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, ""); + String extractId = getProp(ConfigurationKeys.EXTRACT_EXTRACT_ID_KEY, ""); + // getPropAsBoolean and other similar methods are not overridden in WorkUnit class + // Thus, to enable searching in WorkUnit's Extract, we use getProp, and not getPropAsBoolean + boolean isFull = Boolean.parseBoolean(getProp(ConfigurationKeys.EXTRACT_IS_FULL_KEY)); + + return namespace.replaceAll("\\.", "/") + "/" + table + "/" + extractId + "_" + + (isFull ? "full" : "append"); + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0791ee99/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java index f5658ea..9a628d0 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java @@ -209,12 +209,11 @@ public class WriterUtils { public static Path getDefaultWriterFilePath(State state, int numBranches, int branchId) { if (state instanceof WorkUnitState) { WorkUnitState workUnitState = (WorkUnitState) state; - return new Path(ForkOperatorUtils.getPathForBranch(workUnitState, workUnitState.getExtract().getOutputFilePath(), + return new Path(ForkOperatorUtils.getPathForBranch(workUnitState, workUnitState.getOutputFilePath(), numBranches, branchId)); - } else if (state instanceof WorkUnit) { WorkUnit workUnit = (WorkUnit) state; - return new Path(ForkOperatorUtils.getPathForBranch(workUnit, workUnit.getExtract().getOutputFilePath(), + return new Path(ForkOperatorUtils.getPathForBranch(workUnit, workUnit.getOutputFilePath(), numBranches, branchId)); }