Repository: incubator-gobblin
Updated Branches:
  refs/heads/master f89505702 -> 0791ee992


[GOBBLIN-407] fix job output path for full snapshot

Closes #2284 from arjun4084346/fixOutputDirPath


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/0791ee99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/0791ee99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/0791ee99

Branch: refs/heads/master
Commit: 0791ee992e533957654ee7dedb33358b312acf26
Parents: f895057
Author: Arjun <ab...@linkedin.com>
Authored: Mon Feb 12 18:20:33 2018 -0800
Committer: Hung Tran <hut...@linkedin.com>
Committed: Mon Feb 12 18:20:33 2018 -0800

----------------------------------------------------------------------
 .../gobblin/configuration/WorkUnitState.java    |  4 ++++
 .../apache/gobblin/source/workunit/Extract.java |  2 ++
 .../gobblin/source/workunit/WorkUnit.java       | 21 +++++++++++++++++++-
 .../org/apache/gobblin/util/WriterUtils.java    |  5 ++---
 4 files changed, 28 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0791ee99/gobblin-api/src/main/java/org/apache/gobblin/configuration/WorkUnitState.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/WorkUnitState.java 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/WorkUnitState.java
index 0b40399..45bf807 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/WorkUnitState.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/WorkUnitState.java
@@ -62,6 +62,10 @@ public class WorkUnitState extends State {
 
   private static final Gson GSON = new Gson();
 
+  public String getOutputFilePath() {
+    return this.workUnit.getOutputFilePath();
+  }
+
   /**
    * Runtime state of the {@link WorkUnit}.
    *

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0791ee99/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/Extract.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/Extract.java 
b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/Extract.java
index 5161427..7a3735b 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/Extract.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/Extract.java
@@ -137,7 +137,9 @@ public class Extract extends State {
    * Get the writer output file path corresponding to this {@link Extract}.
    *
    * @return writer output file path corresponding to this {@link Extract}
+   * @deprecated As {@code this.getIsFull} is deprecated.
    */
+  @Deprecated
   public String getOutputFilePath() {
     return this.getNamespace().replaceAll("\\.", "/") + "/" + this.getTable() 
+ "/" + this.getExtractId() + "_"
         + (this.getIsFull() ? "full" : "append");

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0791ee99/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java 
b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java
index 38aabcb..7d3f5d3 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java
@@ -311,9 +311,14 @@ public class WorkUnit extends State {
 
   @Override
   public String getProp(String key) {
+    return getProp(key, null);
+  }
+
+  @Override
+  public String getProp(String key, String def) {
     String value = super.getProp(key);
     if (value == null) {
-      value = this.extract.getProp(key);
+      value = this.extract.getProp(key, def);
     }
     return value;
   }
@@ -359,4 +364,18 @@ public class WorkUnit extends State {
     result = prime * result + ((this.extract == null) ? 0 : 
this.extract.hashCode());
     return result;
   }
+
+  public String getOutputFilePath() {
+    // Search for the properties in the workunit.
+    // This search for the property first in State and then in the Extract of 
this workunit.
+    String namespace = getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, 
"");
+    String table = getProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, "");
+    String extractId = getProp(ConfigurationKeys.EXTRACT_EXTRACT_ID_KEY, "");
+    // getPropAsBoolean and other similar methods are not overridden in 
WorkUnit class
+    // Thus, to enable searching in WorkUnit's Extract, we use getProp, and 
not getPropAsBoolean
+    boolean isFull =  
Boolean.parseBoolean(getProp(ConfigurationKeys.EXTRACT_IS_FULL_KEY));
+
+    return namespace.replaceAll("\\.", "/") + "/" + table + "/" + extractId + 
"_"
+        + (isFull ? "full" : "append");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0791ee99/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java
----------------------------------------------------------------------
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java
index f5658ea..9a628d0 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java
@@ -209,12 +209,11 @@ public class WriterUtils {
   public static Path getDefaultWriterFilePath(State state, int numBranches, 
int branchId) {
     if (state instanceof WorkUnitState) {
       WorkUnitState workUnitState = (WorkUnitState) state;
-      return new Path(ForkOperatorUtils.getPathForBranch(workUnitState, 
workUnitState.getExtract().getOutputFilePath(),
+      return new Path(ForkOperatorUtils.getPathForBranch(workUnitState, 
workUnitState.getOutputFilePath(),
           numBranches, branchId));
-
     } else if (state instanceof WorkUnit) {
       WorkUnit workUnit = (WorkUnit) state;
-      return new Path(ForkOperatorUtils.getPathForBranch(workUnit, 
workUnit.getExtract().getOutputFilePath(),
+      return new Path(ForkOperatorUtils.getPathForBranch(workUnit, 
workUnit.getOutputFilePath(),
           numBranches, branchId));
     }
 

Reply via email to