[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition

2019-09-23 Thread GitBox
arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature 
that enables PK-chunking in partition
URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r327343275
 
 

 ##
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
 ##
 @@ -869,7 +862,7 @@ public SalesforceBulkJobId 
getQueryResultIdsPkChunking(String entity, List

[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition

2019-09-23 Thread GitBox
arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature 
that enables PK-chunking in partition
URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r327275396
 
 

 ##
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
 ##
 @@ -598,6 +595,8 @@ public String getTimestampPredicateCondition(String 
column, long value, String v
 String[] batchIdResultIdArray = 
partitionPkChunkingBatchIdResultIterator.next().split(":");
 String batchId = batchIdResultIdArray[0];
 String resultId = batchIdResultIdArray[1];
+log.info(String.format("PK-Chunking work unit: fetching file for 
(jobId=%s, batchId=%s, resultId=%s) ",
 
 Review comment:
   thanks for offline talk, fixed. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition

2019-09-23 Thread GitBox
arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature 
that enables PK-chunking in partition
URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r327250904
 
 

 ##
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
 ##
 @@ -775,17 +775,15 @@ public SalesforceBulkJobId 
getQueryResultIdsPkChunking(String entity, List

[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition

2019-09-23 Thread GitBox
arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature 
that enables PK-chunking in partition
URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r327251198
 
 

 ##
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
 ##
 @@ -1130,35 +1120,28 @@ public void closeConnection() throws Exception {
 
   /**
* Waits for the PK batches to complete. The wait will stop after all 
batches are complete or on the first failed batch
-   * @param batchInfoList list of batch info
-   * @param waitInterval the polling interval
-   * @return the last {@link BatchInfo} processed
-   * @throws InterruptedException
-   * @throws AsyncApiException
*/
-  private BatchInfo waitForPkBatches(BatchInfoList batchInfoList, int 
waitInterval)
-  throws InterruptedException, AsyncApiException {
-BatchInfo batchInfo = null;
+  private void waitForPkBatches(String jobId, BatchInfoList batchInfoList, int 
waitInterval)  {
+long toWait = (long)waitInterval * 1000;
 BatchInfo[] batchInfos = batchInfoList.getBatchInfo();
-
+log.info(String.format("Waiting for bulk (jobId=%s)", jobId));
 
 Review comment:
   fixed. thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition

2019-09-23 Thread GitBox
arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature 
that enables PK-chunking in partition
URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r327250834
 
 

 ##
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
 ##
 @@ -598,6 +595,8 @@ public String getTimestampPredicateCondition(String 
column, long value, String v
 String[] batchIdResultIdArray = 
partitionPkChunkingBatchIdResultIterator.next().split(":");
 String batchId = batchIdResultIdArray[0];
 String resultId = batchIdResultIdArray[1];
+log.info(String.format("PK-Chunking work unit: fetching file for 
(jobId=%s, batchId=%s, resultId=%s) ",
 
 Review comment:
   thanks for online talk, fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition

2019-09-23 Thread GitBox
arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature 
that enables PK-chunking in partition
URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r327247892
 
 

 ##
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
 ##
 @@ -598,6 +595,8 @@ public String getTimestampPredicateCondition(String 
column, long value, String v
 String[] batchIdResultIdArray = 
partitionPkChunkingBatchIdResultIterator.next().split(":");
 String batchId = batchIdResultIdArray[0];
 String resultId = batchIdResultIdArray[1];
+log.info(String.format("PK-Chunking work unit: fetching file for 
(jobId=%s, batchId=%s, resultId=%s) ",
 
 Review comment:
   BTW, we were using this way a lot though. I copied the code :)
   I did some searching in our code, I didn't find good example. Can you pint 
me out a sample code? Are you talking about `MessageFormat`? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition

2019-09-23 Thread GitBox
arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature 
that enables PK-chunking in partition
URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r327243183
 
 

 ##
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
 ##
 @@ -857,7 +850,7 @@ public SalesforceBulkJobId 
getQueryResultIdsPkChunking(String entity, List

[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition

2019-09-23 Thread GitBox
arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature 
that enables PK-chunking in partition
URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r327241495
 
 

 ##
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
 ##
 @@ -869,7 +862,7 @@ public SalesforceBulkJobId 
getQueryResultIdsPkChunking(String entity, List

[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition

2019-09-18 Thread GitBox
arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature 
that enables PK-chunking in partition
URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r325803379
 
 

 ##
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
 ##
 @@ -144,28 +142,18 @@
   private final boolean pkChunkingSkipCountCheck;
   private final boolean bulkApiUseQueryAll;
 
+  private WorkUnitState workUnitState;
+
   public SalesforceExtractor(WorkUnitState state) {
 super(state);
-this.sfConnector = (SalesforceConnector) this.connector;
-
-// don't allow pk chunking if max partitions too high or have user 
specified partitions
-if (state.getPropAsBoolean(Partitioner.HAS_USER_SPECIFIED_PARTITIONS, 
false)
-|| 
state.getPropAsInt(ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS,
-ConfigurationKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS) > 
PK_CHUNKING_MAX_PARTITIONS_LIMIT) {
-  if (state.getPropAsBoolean(ENABLE_PK_CHUNKING_KEY, false)) {
-log.warn("Max partitions too high, so PK chunking is not enabled");
-  }
-
-  this.pkChunking = false;
-} else {
-  this.pkChunking = state.getPropAsBoolean(ENABLE_PK_CHUNKING_KEY, false);
-}
+this.workUnitState = state;
 
+this.sfConnector = (SalesforceConnector) this.connector;
 this.pkChunkingSize =
 Math.max(MIN_PK_CHUNKING_SIZE,
-Math.min(MAX_PK_CHUNKING_SIZE, 
state.getPropAsInt(PK_CHUNKING_SIZE_KEY, DEFAULT_PK_CHUNKING_SIZE)));
+Math.min(MAX_PK_CHUNKING_SIZE, 
state.getPropAsInt(PARTITION_PK_CHUNKING_SIZE, DEFAULT_PK_CHUNKING_SIZE)));
 
-this.pkChunkingSkipCountCheck = 
state.getPropAsBoolean(PK_CHUNKING_SKIP_COUNT_CHECK, 
DEFAULT_PK_CHUNKING_SKIP_COUNT_CHECK);
+this.pkChunkingSkipCountCheck = true;// won't be able to get count
 
 Review comment:
   removed. thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition

2019-09-17 Thread GitBox
arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature 
that enables PK-chunking in partition
URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r325346830
 
 

 ##
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
 ##
 @@ -979,39 +1118,33 @@ private void 
fetchResultBatchWithRetry(RecordSetList rs)
   @Override
   public void closeConnection() throws Exception {
 if (this.bulkConnection != null
-&& 
!this.bulkConnection.getJobStatus(this.bulkJob.getId()).getState().toString().equals("Closed"))
 {
+&& 
!this.bulkConnection.getJobStatus(this.getBulkJobId()).getState().toString().equals("Closed"))
 {
   log.info("Closing salesforce bulk job connection");
-  this.bulkConnection.closeJob(this.bulkJob.getId());
+  this.bulkConnection.closeJob(this.getBulkJobId());
 }
   }
 
-  public static List constructGetCommand(String restQuery) {
+  private static List constructGetCommand(String restQuery) {
 return Arrays.asList(new RestApiCommand().build(Arrays.asList(restQuery), 
RestApiCommandType.GET));
   }
 
   /**
* Waits for the PK batches to complete. The wait will stop after all 
batches are complete or on the first failed batch
* @param batchInfoList list of batch info
-   * @param retryInterval the polling interval
+   * @param waitInterval the polling interval
* @return the last {@link BatchInfo} processed
* @throws InterruptedException
* @throws AsyncApiException
*/
-  private BatchInfo waitForPkBatches(BatchInfoList batchInfoList, int 
retryInterval)
+  private BatchInfo waitForPkBatches(BatchInfoList batchInfoList, int 
waitInterval)
   throws InterruptedException, AsyncApiException {
 BatchInfo batchInfo = null;
 BatchInfo[] batchInfos = batchInfoList.getBatchInfo();
 
-// Wait for all batches other than the first one. The first one is not 
processed in PK chunking mode
-for (int i = 1; i < batchInfos.length; i++) {
-  BatchInfo bi = batchInfos[i];
-
-  // get refreshed job status
-  bi = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bi.getId());
-
-  while ((bi.getState() != BatchStateEnum.Completed)
-  && (bi.getState() != BatchStateEnum.Failed)) {
-Thread.sleep(retryInterval * 1000);
+for (BatchInfo bi: batchInfos) {
 
 Review comment:
   I had a ticket for this - 
https://jira01.corp.linkedin.com:8443/browse/DSS-1
   pkchunking is using this function.
   Sometimes, the parent may not be the first element in the list. (see 
screenshot in ticket)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition

2019-09-17 Thread GitBox
arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature 
that enables PK-chunking in partition
URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r325344285
 
 

 ##
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
 ##
 @@ -779,7 +880,7 @@ public boolean bulkApiLogin() throws Exception {
   BatchInfoList batchInfoList = 
this.bulkConnection.getBatchInfoList(this.bulkJob.getId());
 
   if (usingPkChunking && bulkBatchInfo.getState() == 
BatchStateEnum.NotProcessed) {
 
 Review comment:
   Thanks! removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition

2019-09-17 Thread GitBox
arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature 
that enables PK-chunking in partition
URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r325354169
 
 

 ##
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
 ##
 @@ -979,39 +1118,33 @@ private void 
fetchResultBatchWithRetry(RecordSetList rs)
   @Override
   public void closeConnection() throws Exception {
 if (this.bulkConnection != null
-&& 
!this.bulkConnection.getJobStatus(this.bulkJob.getId()).getState().toString().equals("Closed"))
 {
+&& 
!this.bulkConnection.getJobStatus(this.getBulkJobId()).getState().toString().equals("Closed"))
 {
   log.info("Closing salesforce bulk job connection");
-  this.bulkConnection.closeJob(this.bulkJob.getId());
+  this.bulkConnection.closeJob(this.getBulkJobId());
 }
   }
 
-  public static List constructGetCommand(String restQuery) {
+  private static List constructGetCommand(String restQuery) {
 return Arrays.asList(new RestApiCommand().build(Arrays.asList(restQuery), 
RestApiCommandType.GET));
   }
 
   /**
* Waits for the PK batches to complete. The wait will stop after all 
batches are complete or on the first failed batch
* @param batchInfoList list of batch info
-   * @param retryInterval the polling interval
+   * @param waitInterval the polling interval
* @return the last {@link BatchInfo} processed
* @throws InterruptedException
* @throws AsyncApiException
*/
-  private BatchInfo waitForPkBatches(BatchInfoList batchInfoList, int 
retryInterval)
+  private BatchInfo waitForPkBatches(BatchInfoList batchInfoList, int 
waitInterval)
   throws InterruptedException, AsyncApiException {
 BatchInfo batchInfo = null;
 BatchInfo[] batchInfos = batchInfoList.getBatchInfo();
 
-// Wait for all batches other than the first one. The first one is not 
processed in PK chunking mode
-for (int i = 1; i < batchInfos.length; i++) {
-  BatchInfo bi = batchInfos[i];
-
-  // get refreshed job status
-  bi = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bi.getId());
-
-  while ((bi.getState() != BatchStateEnum.Completed)
-  && (bi.getState() != BatchStateEnum.Failed)) {
-Thread.sleep(retryInterval * 1000);
+for (BatchInfo bi: batchInfos) {
+  BatchStateEnum state = bi.getState();
+  while (state != BatchStateEnum.Completed && state != 
BatchStateEnum.Failed && state != BatchStateEnum.NotProcessed) {
+Thread.sleep(waitInterval * 1000);
 bi = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), 
bi.getId());
 
 Review comment:
   Good catch! Thanks!
   I should not use the state variable. My test worked, because I was using 
break point, the time was enough to let sfdc execute.
   
   I did more refactoring for this part. Pushing code advance. Will do test 
during today. and update you. Thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition

2019-09-17 Thread GitBox
arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature 
that enables PK-chunking in partition
URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r325343123
 
 

 ##
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
 ##
 @@ -779,7 +880,7 @@ public boolean bulkApiLogin() throws Exception {
   BatchInfoList batchInfoList = 
this.bulkConnection.getBatchInfoList(this.bulkJob.getId());
 
 Review comment:
   >>   BatchInfoList batchInfoList = 
this.bulkConnection.getBatchInfoList(this.bulkJob.getId());
   I try not to change the existing code in **getQueryResultIds**. And this 
line is necessary. **getBatchInfoList** function's return value is 
**BatchIdAndResultId** which means a list of resultIds with corresponded batch 
Ids.
   One bulkJobId could have multiple results even though it is not a 
pk-chunking.
   
   I am not use **getQueryResultIds** for pkchunking. If we want to do further 
code refactoring, let's do it in another ticket.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition

2019-09-17 Thread GitBox
arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature 
that enables PK-chunking in partition
URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r325340740
 
 

 ##
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
 ##
 @@ -144,28 +142,18 @@
   private final boolean pkChunkingSkipCountCheck;
   private final boolean bulkApiUseQueryAll;
 
+  private WorkUnitState workUnitState;
+
   public SalesforceExtractor(WorkUnitState state) {
 super(state);
-this.sfConnector = (SalesforceConnector) this.connector;
-
-// don't allow pk chunking if max partitions too high or have user 
specified partitions
-if (state.getPropAsBoolean(Partitioner.HAS_USER_SPECIFIED_PARTITIONS, 
false)
 
 Review comment:
   This part code was only for **PK chunking**. We don't need 2nd level 
PK-chunking any more. (if we have normal pre-partition, we should not use pk 
chunking. It burns out request quota)
   
   I create separate function - getQueryResultIdsPkChunking for pkchunking. and 
made it not to depends on class member this.pkChunking, therefore removed 
pkChunking.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition

2019-09-13 Thread GitBox
arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature 
that enables PK-chunking in partition
URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r324393059
 
 

 ##
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
 ##
 @@ -146,12 +156,101 @@ protected void addLineageSourceInfo(SourceState 
sourceState, SourceEntity entity
 
   @Override
   protected List generateWorkUnits(SourceEntity sourceEntity, 
SourceState state, long previousWatermark) {
+String partitionType = state.getProp(PARTITION_TYPE, "");
+if (partitionType.equals("PK_CHUNKING")) {
+  // pk-chunking only supports start-time by 
source.querybased.start.value, and does not support end-time.
+  // always ingest data later than or equal source.querybased.start.value.
+  // we should only pk chunking based work units only in case of 
snapshot/full ingestion
+  return generateWorkUnitsPkChunking(sourceEntity, state, 
previousWatermark);
+} else {
+  return generateWorkUnitsStrategy(sourceEntity, state, previousWatermark);
+}
+  }
+
+  /**
+   * generate workUnit with noQuery=true
+   */
+  private List generateWorkUnitsPkChunking(SourceEntity 
sourceEntity, SourceState state, long previousWatermark) {
+  SalesforceBulkJobId salesforceBulkJobId = 
executeQueryWithPkChunking(state, previousWatermark);
+  List ret = createWorkUnits(sourceEntity, state, 
salesforceBulkJobId);
+  return ret;
+  }
+
+  private SalesforceBulkJobId executeQueryWithPkChunking(
+  SourceState sourceState,
+  long previousWatermark
+  ) throws RuntimeException {
+State state = new State(sourceState);
+WorkUnit workUnit = WorkUnit.createEmpty();
+try {
+  WorkUnitState workUnitState = new WorkUnitState(workUnit, state);
+  workUnitState.setId("Execute pk-chunking");
 
 Review comment:
   Hi @zxcware 
   is this OK? I am trying to set id for workUnit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition

2019-09-13 Thread GitBox
arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature 
that enables PK-chunking in partition
URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r324385831
 
 

 ##
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
 ##
 @@ -146,12 +156,98 @@ protected void addLineageSourceInfo(SourceState 
sourceState, SourceEntity entity
 
   @Override
   protected List generateWorkUnits(SourceEntity sourceEntity, 
SourceState state, long previousWatermark) {
+String partitionType = state.getProp(PARTITION_TYPE, "");
+if (partitionType.equals("PK_CHUNKING")) {
+  return generateWorkUnitsPkChunking(sourceEntity, state, 
previousWatermark);
 
 Review comment:
   Thanks, will add it!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition

2019-09-13 Thread GitBox
arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature 
that enables PK-chunking in partition
URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r324385070
 
 

 ##
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
 ##
 @@ -588,11 +576,41 @@ public String getTimestampPredicateCondition(String 
column, long value, String v
 return dataTypeMap;
   }
 
+
+  private String partitionPkChunkingJobId = null;
+  private Iterator partitionPkChunkingBatchIdResultIterator = null;
+
+  private Iterator getRecordSetPkchunking(WorkUnit workUnit) 
throws RuntimeException {
+if (partitionPkChunkingBatchIdResultIterator == null) {
+  partitionPkChunkingJobId = workUnit.getProp(PK_CHUNKING_JOB_ID);
+  partitionPkChunkingBatchIdResultIterator = 
Arrays.stream(workUnit.getProp(PK_CHUNKING_BATCH_RESULT_IDS).split(",")).iterator();
+}
+if (!partitionPkChunkingBatchIdResultIterator.hasNext()) {
+  return null;
+}
+try {
+  if (!bulkApiLogin()) {
+throw new IllegalArgumentException("Invalid Login");
+  }
+} catch (Exception e) {
+  throw new RuntimeException(e);
+}
+String[] batchIdResultIdArray = 
partitionPkChunkingBatchIdResultIterator.next().split(":");
+String batchId = batchIdResultIdArray[0];
+String resultId = batchIdResultIdArray[1];
+List rs = fetchPkChunkingResultSetWithRetry(bulkConnection, 
partitionPkChunkingJobId, batchId, resultId, fetchRetryLimit);
+return rs.iterator();
+  }
+
   @Override
   public Iterator getRecordSetFromSourceApi(String schema, String 
entity, WorkUnit workUnit,
   List predicateList) throws IOException {
 log.debug("Getting salesforce data using bulk api");
-RecordSet rs = null;
+
+// new version of extractor: bulk api with pk-chunking in pre-partitioning 
of SalesforceSource
+if (!workUnit.getProp(PK_CHUNKING_JOB_ID, "").equals("")) {
 
 Review comment:
   Thanks! will do.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition

2019-09-06 Thread GitBox
arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature 
that enables PK-chunking in partition
URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r321884734
 
 

 ##
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
 ##
 @@ -146,6 +156,95 @@ protected void addLineageSourceInfo(SourceState 
sourceState, SourceEntity entity
 
   @Override
   protected List generateWorkUnits(SourceEntity sourceEntity, 
SourceState state, long previousWatermark) {
+String partitionType = state.getProp(PARTITION_TYPE, "PK_CHUNKING");
+if (partitionType.equals("PK_CHUNKING")) {
+  return generateWorkUnitsPkChunking(sourceEntity, state, 
previousWatermark);
+} else {
+  return generateWorkUnitsStrategy(sourceEntity, state, previousWatermark);
+}
+  }
+
+  /**
+   * generate workUnit with noQuery=true
+   */
+  private List generateWorkUnitsPkChunking(SourceEntity 
sourceEntity, SourceState state, long previousWatermark) {
+  List batchIdAndResultIds = 
executeQueryWithPkChunking(state, previousWatermark);
+  List ret = createWorkUnits(sourceEntity, state, 
batchIdAndResultIds);
+  return ret;
+  }
+
+  private List 
executeQueryWithPkChunking(
+  SourceState sourceState,
+  long previousWatermark
+  ) throws RuntimeException {
+Properties commonProperties = sourceState.getCommonProperties();
+Properties specProperties = sourceState.getSpecProperties();
+State state = new State();
+state.setProps(commonProperties, specProperties);
+WorkUnit workUnit = WorkUnit.createEmpty();
+try {
+  WorkUnitState workUnitState = new WorkUnitState(workUnit, state);
+  workUnitState.setId("test" + new Random().nextInt());
+  workUnitState.setProp(ENABLE_PK_CHUNKING_KEY, true); // set extractor 
enable pk chunking
+  int chunkSize = workUnitState.getPropAsInt(PARTITION_PK_CHUNKING_SIZE, 
DEFAULT_PK_CHUNKING_SIZE);
+  workUnitState.setProp(PK_CHUNKING_SIZE_KEY, chunkSize); // set extractor 
pk chunking size
+  workUnitState.setProp(PK_CHUNKING_SKIP_COUNT_CHECK, true); // don't use 
count check for we couldn't get count
+  SalesforceExtractor salesforceExtractor = (SalesforceExtractor) 
this.getExtractor(workUnitState);
+  String entity = sourceState.getProp(ConfigurationKeys.SOURCE_ENTITY);
+  Partitioner partitioner = new Partitioner(sourceState);
+  if (isEarlyStopEnabled(state) && partitioner.isFullDump()) {
+throw new UnsupportedOperationException("Early stop mode cannot work 
with full dump mode.");
+  }
+  Partition partition = partitioner.getGlobalPartition(previousWatermark);
+  String condition = "";
 
 Review comment:
   We are using **>=**. It is start time. And it should be inclusive. we don’t 
need end time in our case.
   We usually fetch the whole table. In case we want to start from a specific 
time, users can set up **source.querybased.start.value**.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition

2019-09-06 Thread GitBox
arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature 
that enables PK-chunking in partition
URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r321880074
 
 

 ##
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
 ##
 @@ -896,6 +927,43 @@ private void reinitializeBufferedReader() throws 
IOException, AsyncApiException
 }
   }
 
+  private List fetchPkChunkingResultSetWithRetry(
 
 Review comment:
   You are right! There is no wait! I copied the retry logic from 
**fetchResultBatchWithRetry**  and didn't look into it.
   I have another thought - Since the gobblin would re-execute the workUnit 
again anyway when the workUnit fails, we can just remove the retry. 
   how do you think of?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition

2019-09-05 Thread GitBox
arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature 
that enables PK-chunking in partition
URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r321539964
 
 

 ##
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
 ##
 @@ -146,6 +156,95 @@ protected void addLineageSourceInfo(SourceState 
sourceState, SourceEntity entity
 
   @Override
   protected List generateWorkUnits(SourceEntity sourceEntity, 
SourceState state, long previousWatermark) {
+String partitionType = state.getProp(PARTITION_TYPE, "PK_CHUNKING");
+if (partitionType.equals("PK_CHUNKING")) {
+  return generateWorkUnitsPkChunking(sourceEntity, state, 
previousWatermark);
+} else {
+  return generateWorkUnitsStrategy(sourceEntity, state, previousWatermark);
+}
+  }
+
+  /**
+   * generate workUnit with noQuery=true
+   */
+  private List generateWorkUnitsPkChunking(SourceEntity 
sourceEntity, SourceState state, long previousWatermark) {
+  List batchIdAndResultIds = 
executeQueryWithPkChunking(state, previousWatermark);
+  List ret = createWorkUnits(sourceEntity, state, 
batchIdAndResultIds);
+  return ret;
+  }
+
+  private List 
executeQueryWithPkChunking(
+  SourceState sourceState,
+  long previousWatermark
+  ) throws RuntimeException {
+Properties commonProperties = sourceState.getCommonProperties();
+Properties specProperties = sourceState.getSpecProperties();
+State state = new State();
+state.setProps(commonProperties, specProperties);
+WorkUnit workUnit = WorkUnit.createEmpty();
+try {
+  WorkUnitState workUnitState = new WorkUnitState(workUnit, state);
+  workUnitState.setId("test" + new Random().nextInt());
+  workUnitState.setProp(ENABLE_PK_CHUNKING_KEY, true); // set extractor 
enable pk chunking
+  int chunkSize = workUnitState.getPropAsInt(PARTITION_PK_CHUNKING_SIZE, 
DEFAULT_PK_CHUNKING_SIZE);
+  workUnitState.setProp(PK_CHUNKING_SIZE_KEY, chunkSize); // set extractor 
pk chunking size
+  workUnitState.setProp(PK_CHUNKING_SKIP_COUNT_CHECK, true); // don't use 
count check for we couldn't get count
+  SalesforceExtractor salesforceExtractor = (SalesforceExtractor) 
this.getExtractor(workUnitState);
+  String entity = sourceState.getProp(ConfigurationKeys.SOURCE_ENTITY);
+  Partitioner partitioner = new Partitioner(sourceState);
+  if (isEarlyStopEnabled(state) && partitioner.isFullDump()) {
+throw new UnsupportedOperationException("Early stop mode cannot work 
with full dump mode.");
+  }
+  Partition partition = partitioner.getGlobalPartition(previousWatermark);
+  String condition = "";
 
 Review comment:
   How about we leave this refactoring later?
   It doesn't look like I can use the function directly. In PK-chunking case, 
we only have start-time, we don't have end-time. end-time is always the 
current-time.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition

2019-09-05 Thread GitBox
arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature 
that enables PK-chunking in partition
URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r321538410
 
 

 ##
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
 ##
 @@ -896,6 +928,43 @@ private void reinitializeBufferedReader() throws 
IOException, AsyncApiException
 }
   }
 
+  private List fetchWithRetry(
 
 Review comment:
   Closing the thread. Please reopen it, if necessary.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition

2019-09-04 Thread GitBox
arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature 
that enables PK-chunking in partition
URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r320968187
 
 

 ##
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
 ##
 @@ -896,6 +928,43 @@ private void reinitializeBufferedReader() throws 
IOException, AsyncApiException
 }
   }
 
+  private List fetchWithRetry(
 
 Review comment:
   sfdc retrying is `to prepare and create result set files from query data`. 
   After sfdc, create the result set files, we request to fetch/download the 
files.
   Our retrying is for fetch/download files.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition

2019-09-04 Thread GitBox
arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature 
that enables PK-chunking in partition
URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r320967132
 
 

 ##
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
 ##
 @@ -146,6 +156,95 @@ protected void addLineageSourceInfo(SourceState 
sourceState, SourceEntity entity
 
   @Override
   protected List generateWorkUnits(SourceEntity sourceEntity, 
SourceState state, long previousWatermark) {
+String partitionType = state.getProp(PARTITION_TYPE, "PK_CHUNKING");
+if (partitionType.equals("PK_CHUNKING")) {
+  return generateWorkUnitsPkChunking(sourceEntity, state, 
previousWatermark);
+} else {
+  return generateWorkUnitsStrategy(sourceEntity, state, previousWatermark);
+}
+  }
+
+  /**
+   * generate workUnit with noQuery=true
+   */
+  private List generateWorkUnitsPkChunking(SourceEntity 
sourceEntity, SourceState state, long previousWatermark) {
+  List batchIdAndResultIds = 
executeQueryWithPkChunking(state, previousWatermark);
+  List ret = createWorkUnits(sourceEntity, state, 
batchIdAndResultIds);
+  return ret;
+  }
+
+  private List 
executeQueryWithPkChunking(
+  SourceState sourceState,
+  long previousWatermark
+  ) throws RuntimeException {
+Properties commonProperties = sourceState.getCommonProperties();
+Properties specProperties = sourceState.getSpecProperties();
+State state = new State();
+state.setProps(commonProperties, specProperties);
+WorkUnit workUnit = WorkUnit.createEmpty();
+try {
+  WorkUnitState workUnitState = new WorkUnitState(workUnit, state);
+  workUnitState.setId("test" + new Random().nextInt());
+  workUnitState.setProp(ENABLE_PK_CHUNKING_KEY, true); // set extractor 
enable pk chunking
+  int chunkSize = workUnitState.getPropAsInt(PARTITION_PK_CHUNKING_SIZE, 
DEFAULT_PK_CHUNKING_SIZE);
+  workUnitState.setProp(PK_CHUNKING_SIZE_KEY, chunkSize); // set extractor 
pk chunking size
+  workUnitState.setProp(PK_CHUNKING_SKIP_COUNT_CHECK, true); // don't use 
count check for we couldn't get count
+  SalesforceExtractor salesforceExtractor = (SalesforceExtractor) 
this.getExtractor(workUnitState);
+  String entity = sourceState.getProp(ConfigurationKeys.SOURCE_ENTITY);
+  Partitioner partitioner = new Partitioner(sourceState);
+  if (isEarlyStopEnabled(state) && partitioner.isFullDump()) {
+throw new UnsupportedOperationException("Early stop mode cannot work 
with full dump mode.");
+  }
+  Partition partition = partitioner.getGlobalPartition(previousWatermark);
+  String condition = "";
+  Date startDate = Utils.toDate(partition.getLowWatermark(), 
Partitioner.WATERMARKTIMEFORMAT);
+  String field = 
sourceState.getProp(ConfigurationKeys.EXTRACT_DELTA_FIELDS_KEY);
+  if (startDate != null && field != null) {
+String lowWatermarkDate = Utils.dateToString(startDate, 
SalesforceExtractor.SALESFORCE_TIMESTAMP_FORMAT);
+condition = field + " >= " + lowWatermarkDate;
+  }
+  Predicate predicate = new Predicate(null, 0, condition, "", null);
+  List predicateList = Arrays.asList(predicate);
+  List ids = 
salesforceExtractor.getQueryResultIds(entity, predicateList);
+  return ids;
+} catch (Exception e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  private List createWorkUnits(
+  SourceEntity sourceEntity,
+  SourceState state,
+  List batchResultIds
+  ) {
+String nameSpaceName = 
state.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY);
+Extract.TableType tableType = 
Extract.TableType.valueOf(state.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY).toUpperCase());
+String outputTableName = sourceEntity.getDestTableName();
+Extract extract = createExtract(tableType, nameSpaceName, outputTableName);
+
+List workUnits = Lists.newArrayList();
+int partitionNumber = 
state.getPropAsInt(SOURCE_SOURCE_MAX_NUMBER_OF_PARTITIONS, 1);
+int maxPartition = (batchResultIds.size() + partitionNumber - 
1)/partitionNumber;
+List> partitionedResultIds = 
Lists.partition(batchResultIds, maxPartition);
+String bulkJobId = batchResultIds.get(0).getBulkJobId();
 
 Review comment:
   Made it 
   ```
   class SalesforceBulkJob {
 String jobId;
 List batchIdAndResultIdList;
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition

2019-09-03 Thread GitBox
arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature 
that enables PK-chunking in partition
URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r319713269
 
 

 ##
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
 ##
 @@ -146,6 +156,95 @@ protected void addLineageSourceInfo(SourceState 
sourceState, SourceEntity entity
 
   @Override
   protected List generateWorkUnits(SourceEntity sourceEntity, 
SourceState state, long previousWatermark) {
+String partitionType = state.getProp(PARTITION_TYPE, "PK_CHUNKING");
+if (partitionType.equals("PK_CHUNKING")) {
+  return generateWorkUnitsPkChunking(sourceEntity, state, 
previousWatermark);
+} else {
+  return generateWorkUnitsStrategy(sourceEntity, state, previousWatermark);
+}
+  }
+
+  /**
+   * generate workUnit with noQuery=true
+   */
+  private List generateWorkUnitsPkChunking(SourceEntity 
sourceEntity, SourceState state, long previousWatermark) {
+  List batchIdAndResultIds = 
executeQueryWithPkChunking(state, previousWatermark);
+  List ret = createWorkUnits(sourceEntity, state, 
batchIdAndResultIds);
+  return ret;
+  }
+
+  private List 
executeQueryWithPkChunking(
+  SourceState sourceState,
+  long previousWatermark
+  ) throws RuntimeException {
+Properties commonProperties = sourceState.getCommonProperties();
+Properties specProperties = sourceState.getSpecProperties();
+State state = new State();
+state.setProps(commonProperties, specProperties);
+WorkUnit workUnit = WorkUnit.createEmpty();
+try {
+  WorkUnitState workUnitState = new WorkUnitState(workUnit, state);
+  workUnitState.setId("test" + new Random().nextInt());
+  workUnitState.setProp(ENABLE_PK_CHUNKING_KEY, true); // set extractor 
enable pk chunking
+  int chunkSize = workUnitState.getPropAsInt(PARTITION_PK_CHUNKING_SIZE, 
DEFAULT_PK_CHUNKING_SIZE);
 
 Review comment:
   I was thinking we may want to keep 2nd level PK-chunking and better have 
different property for them.
   As we discussed, we don't think 2nd level PK-chunking makes sense. Will 
remove this property.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition

2019-08-31 Thread GitBox
arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature 
that enables PK-chunking in partition
URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r319713269
 
 

 ##
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
 ##
 @@ -146,6 +156,95 @@ protected void addLineageSourceInfo(SourceState 
sourceState, SourceEntity entity
 
   @Override
   protected List generateWorkUnits(SourceEntity sourceEntity, 
SourceState state, long previousWatermark) {
+String partitionType = state.getProp(PARTITION_TYPE, "PK_CHUNKING");
+if (partitionType.equals("PK_CHUNKING")) {
+  return generateWorkUnitsPkChunking(sourceEntity, state, 
previousWatermark);
+} else {
+  return generateWorkUnitsStrategy(sourceEntity, state, previousWatermark);
+}
+  }
+
+  /**
+   * generate workUnit with noQuery=true
+   */
+  private List generateWorkUnitsPkChunking(SourceEntity 
sourceEntity, SourceState state, long previousWatermark) {
+  List batchIdAndResultIds = 
executeQueryWithPkChunking(state, previousWatermark);
+  List ret = createWorkUnits(sourceEntity, state, 
batchIdAndResultIds);
+  return ret;
+  }
+
+  private List 
executeQueryWithPkChunking(
+  SourceState sourceState,
+  long previousWatermark
+  ) throws RuntimeException {
+Properties commonProperties = sourceState.getCommonProperties();
+Properties specProperties = sourceState.getSpecProperties();
+State state = new State();
+state.setProps(commonProperties, specProperties);
+WorkUnit workUnit = WorkUnit.createEmpty();
+try {
+  WorkUnitState workUnitState = new WorkUnitState(workUnit, state);
+  workUnitState.setId("test" + new Random().nextInt());
+  workUnitState.setProp(ENABLE_PK_CHUNKING_KEY, true); // set extractor 
enable pk chunking
+  int chunkSize = workUnitState.getPropAsInt(PARTITION_PK_CHUNKING_SIZE, 
DEFAULT_PK_CHUNKING_SIZE);
 
 Review comment:
   I was thinking we may want to keep 2nd level PK-chunking and better have 
different property for them.
   As we discussed, we don't 2nd level PK-chunking doesn't make sense. Will 
remove this property.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition

2019-08-31 Thread GitBox
arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature 
that enables PK-chunking in partition
URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r319713206
 
 

 ##
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
 ##
 @@ -146,6 +156,95 @@ protected void addLineageSourceInfo(SourceState 
sourceState, SourceEntity entity
 
   @Override
   protected List generateWorkUnits(SourceEntity sourceEntity, 
SourceState state, long previousWatermark) {
+String partitionType = state.getProp(PARTITION_TYPE, "PK_CHUNKING");
+if (partitionType.equals("PK_CHUNKING")) {
+  return generateWorkUnitsPkChunking(sourceEntity, state, 
previousWatermark);
+} else {
+  return generateWorkUnitsStrategy(sourceEntity, state, previousWatermark);
+}
+  }
+
+  /**
+   * generate workUnit with noQuery=true
+   */
+  private List generateWorkUnitsPkChunking(SourceEntity 
sourceEntity, SourceState state, long previousWatermark) {
+  List batchIdAndResultIds = 
executeQueryWithPkChunking(state, previousWatermark);
+  List ret = createWorkUnits(sourceEntity, state, 
batchIdAndResultIds);
+  return ret;
+  }
+
+  private List 
executeQueryWithPkChunking(
+  SourceState sourceState,
+  long previousWatermark
+  ) throws RuntimeException {
+Properties commonProperties = sourceState.getCommonProperties();
 
 Review comment:
   good to know. Will fix them.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition

2019-08-31 Thread GitBox
arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature 
that enables PK-chunking in partition
URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r319713206
 
 

 ##
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
 ##
 @@ -146,6 +156,95 @@ protected void addLineageSourceInfo(SourceState 
sourceState, SourceEntity entity
 
   @Override
   protected List generateWorkUnits(SourceEntity sourceEntity, 
SourceState state, long previousWatermark) {
+String partitionType = state.getProp(PARTITION_TYPE, "PK_CHUNKING");
+if (partitionType.equals("PK_CHUNKING")) {
+  return generateWorkUnitsPkChunking(sourceEntity, state, 
previousWatermark);
+} else {
+  return generateWorkUnitsStrategy(sourceEntity, state, previousWatermark);
+}
+  }
+
+  /**
+   * generate workUnit with noQuery=true
+   */
+  private List generateWorkUnitsPkChunking(SourceEntity 
sourceEntity, SourceState state, long previousWatermark) {
+  List batchIdAndResultIds = 
executeQueryWithPkChunking(state, previousWatermark);
+  List ret = createWorkUnits(sourceEntity, state, 
batchIdAndResultIds);
+  return ret;
+  }
+
+  private List 
executeQueryWithPkChunking(
+  SourceState sourceState,
+  long previousWatermark
+  ) throws RuntimeException {
+Properties commonProperties = sourceState.getCommonProperties();
 
 Review comment:
   Thanks! Good to know. Will fix them.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition

2019-08-31 Thread GitBox
arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature 
that enables PK-chunking in partition
URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r319713198
 
 

 ##
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
 ##
 @@ -896,6 +928,43 @@ private void reinitializeBufferedReader() throws 
IOException, AsyncApiException
 }
   }
 
+  private List fetchWithRetry(
 
 Review comment:
   Bulk API has retry. It means if the result set is too big, it would retry 15 
times creating/caching the result set
   This retry is our logic. When we fetch the result set file, if there is any 
failure, we want to retry specified times. It won't hurt. if the result file 
can be fetch at first time, the function returns the result set.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition

2019-08-29 Thread GitBox
arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature 
that enables PK-chunking in partition
URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r319273784
 
 

 ##
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceWorkUnit.java
 ##
 @@ -0,0 +1,15 @@
+package org.apache.gobblin.salesforce;
+
+import java.util.Iterator;
+import java.util.List;
+import org.apache.gobblin.source.workunit.Extract;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+
+public class SalesforceWorkUnit extends WorkUnit {
 
 Review comment:
   Thanks! Hung. Will remove this class. Will also do test with `launcher.type= 
MAPREDUCE`. I only tested `type=LOCAL`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services