[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-28 Thread ASF GitHub Bot (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 ASF GitHub Bot added a worklog on  HIVE-26319  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
 kasakrisz merged PR #3362: URL: https://github.com/apache/hive/pull/3362   
 

  
 
 
 
 

 
 Worklog entry:  
 10m logged by ASF GitHub Bot  
 
 
Change By: 
 ASF GitHub Bot  
 
 
Time Spent: 
 8h  10m  
 

  
 
 
 
 

 
 
 

 
 
 Add Comment  
 

  
 

  
 
 
 
  
 

  
 
 
 
 

 
 This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)  
 
 

 
   
 

  
 

  
 

   



[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-28 Thread ASF GitHub Bot (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 ASF GitHub Bot added a worklog on  HIVE-26319  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
 kasakrisz commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r908335271 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -162,53 +168,102 @@ public void abortTask(TaskAttemptContext originalContext) throws IOException { TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(originalContext);  // Clean up writer data from the local store 
 
Map writers = WriterRegistry.removeWriters(context.getTaskAttemptID()); + Map> writerMap = WriterRegistry.removeWriters(context.getTaskAttemptID()); 
  // Remove files if it was not done already 
 
if (writers != null) { 
for (HiveIcebergWriter writer : writers.values()) { 
writer.close(true); + if (writerMap != null) { + for (List writerList : writerMap.values()) 

Unknown macro: {+ for (HiveIcebergWriter writer } 
  } } 
 + @Override + public void commitJob(JobContext originalContext) throws IOException  { + commitJobs(Collections.singletonList(originalContext)); + } + + /** + * Wrapper class for storing output  {@link Table}  and it's context for committing changes: + * JobContext, CommitInfo. + */ + private static class OutputTable { + private final String catalogName; + private final String tableName; + private final Table table; + private final JobContext jobContext; + private final SessionStateUtil.CommitInfo commitInfo; + + private OutputTable(String catalogName, String tableName, Table table, JobContext jobContext, + SessionStateUtil.CommitInfo commitInfo)  { + this.catalogName = catalogName; + this.tableName = tableName; + this.table = table; + this.jobContext = jobContext; + this.commitInfo = commitInfo; + } + + @Override + public boolean equals(Object o) { + if (this == o)  { + return true; + } + if (o == null || getClass() != o.getClass())  { + return false; + } + OutputTable output1 = (OutputTable) o; + return Objects.equals(tableName, output1.tableName) && + Objects.equals(jobContext.getJobID(), output1.jobContext.getJobID()); + } + + @Override + public int hashCode()  { + return Objects.hash(tableName, jobContext.getJobID()); + } + + public Optional getCommitInfo()  { + return Optional.ofNullable(commitInfo); + } + } + /** 
 
Reads the commit files stored in the temp directories and collects the generated committed data files. 
Appends the data files to the tables. At the end removes the temporary directories. 
 
 
* @param originalContext The job context + * @param originalContextList The job context list 
 
 
@throws IOException if there is a failure accessing the files */ 
 
 
@Override 
public void commitJob(JobContext 

[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-28 Thread ASF GitHub Bot (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 ASF GitHub Bot added a worklog on  HIVE-26319  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
 kasakrisz commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r908334326 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -231,25 +314,47 @@ public void commitJob(JobContext originalContext) throws IOException { */ @Override public void abortJob(JobContext originalContext, int status) throws IOException  { - JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext); - JobConf jobConf = jobContext.getJobConf(); + abortJobs(Collections.singletonList(originalContext)); + } + + public void abortJobs(List jobContexts, JobStatus.State runState) throws IOException { + int state = runState.getValue(); + if (state != JobStatus.FAILED && state != JobStatus.KILLED) { + throw new IOException("Invalid job run state : " + runState.name()); + } else { + this.abortJobs(jobContexts); + } + }  - LOG.info("Job {} is aborted. Data file cleaning started", jobContext.getJobID()); - Collection outputs = HiveIcebergStorageHandler.outputTables(jobContext.getJobConf()); + public void abortJobs(List originalContextList) throws IOException { + if (originalContextList.isEmpty()) { + return;  Review Comment: Removed the empty list check since it is checked in the caller.## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -231,25 +314,47 @@ public void commitJob(JobContext originalContext) throws IOException { */ @Override public void abortJob(JobContext originalContext, int status) throws IOException {- JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext);- JobConf jobConf = jobContext.getJobConf();+ abortJobs(Collections.singletonList(originalContext));+ } + + public void abortJobs(List jobContexts, JobStatus.State runState) throws IOException { + int state = runState.getValue(); + if (state != JobStatus.FAILED && state != JobStatus.KILLED)  { + throw new IOException("Invalid job run state : " + runState.name()); + }  else  { + this.abortJobs(jobContexts); + } + } 
 
LOG.info("Job {} is aborted. Data file cleaning started", jobContext.getJobID()); 
Collection outputs = HiveIcebergStorageHandler.outputTables(jobContext.getJobConf()); + public void abortJobs(List originalContextList) throws IOException { + if (originalContextList.isEmpty()) { + return; + } + + List jobContextList = originalContextList.stream() + .map(TezUtil::enrichContextWithVertexId) + .collect(Collectors.toList()); + String ids = jobContextList.stream() + .map(jobContext -> jobContext.getJobID().toString()).collect(Collectors.joining(",")); + Set outputs = collectOutputs(jobContextList); + + LOG.info("Job(s) {} are aborted. Data file cleaning started", ids); Collection jobLocations = new ConcurrentLinkedQueue<>(); 
 
 
ExecutorService fileExecutor = fileExecutor(jobConf); 
ExecutorService tableExecutor = tableExecutor(jobConf, outputs.size()); + ExecutorService fileExecutor = fileExecutor(jobContextList.get(0).getJobConf()); + ExecutorService tableExecutor = tableExecutor(jobContextList.get(0).getJobConf(), outputs.size()); try { // Cleans up the changes for the output tables in parallel Tasks.foreach(outputs) .suppressFailureWhenFinished() .executeWith(tableExecutor) .onFailure((output, exc) -> LOG.warn("Failed cleanup table {} on abort job", output, exc)) .run(output -> { + JobContext jobContext = output.jobContext; + JobConf jobConf = output.jobContext.getJobConf(); 
 Review Comment: fixed   
 
  

[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-28 Thread ASF GitHub Bot (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 ASF GitHub Bot added a worklog on  HIVE-26319  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
 pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r908302321 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -231,25 +314,47 @@ public void commitJob(JobContext originalContext) throws IOException { */ @Override public void abortJob(JobContext originalContext, int status) throws IOException  { - JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext); - JobConf jobConf = jobContext.getJobConf(); + abortJobs(Collections.singletonList(originalContext)); + } + + public void abortJobs(List jobContexts, JobStatus.State runState) throws IOException { + int state = runState.getValue(); + if (state != JobStatus.FAILED && state != JobStatus.KILLED) { Review Comment: Isn't it only us who is calling this method, and we only call it with `FAILED`.   
 

  
 
 
 
 

 
 Worklog entry:  
 10m logged by ASF GitHub Bot  
 
 
Change By: 
 ASF GitHub Bot  
 
 
Time Spent: 
 7.5h 7h 40m  
 

  
 
 
 
 

 
 
 

 
 
 Add Comment  
 

  
 

  
 
 
 
  
 

  
 
 
 
 

 
 This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)  
 
 

 
   
 

  
 

  
 

   



[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-28 Thread ASF GitHub Bot (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 ASF GitHub Bot added a worklog on  HIVE-26319  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
 kasakrisz commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r908296153 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -231,25 +314,47 @@ public void commitJob(JobContext originalContext) throws IOException { */ @Override public void abortJob(JobContext originalContext, int status) throws IOException  { - JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext); - JobConf jobConf = jobContext.getJobConf(); + abortJobs(Collections.singletonList(originalContext)); + } + + public void abortJobs(List jobContexts, JobStatus.State runState) throws IOException { + int state = runState.getValue(); + if (state != JobStatus.FAILED && state != JobStatus.KILLED) { Review Comment: This logic comes from hadoop. https://github.com/apache/hadoop/blob/a177232ebc3be826774701aefe63ed30a6b36fe7/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java#L304  I think only a running job can be aborted.   
 

  
 
 
 
 

 
 Worklog entry:  
 10m logged by ASF GitHub Bot  
 
 
Change By: 
 ASF GitHub Bot  
 
 
Time Spent: 
 7h 20m 7.5h  
 

  
 
 
 
 

 
 
 

 
 
 Add Comment  
 

  
 

  
 
 
 
  
 

  
 
 
 
 

 
 This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)  
 
 

 
   
 


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-28 Thread ASF GitHub Bot (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 ASF GitHub Bot added a worklog on  HIVE-26319  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
 pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r908295358 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -162,53 +168,102 @@ public void abortTask(TaskAttemptContext originalContext) throws IOException { TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(originalContext);  // Clean up writer data from the local store 
 
Map writers = WriterRegistry.removeWriters(context.getTaskAttemptID()); + Map> writerMap = WriterRegistry.removeWriters(context.getTaskAttemptID()); 
  // Remove files if it was not done already 
 
if (writers != null) { 
for (HiveIcebergWriter writer : writers.values()) { 
writer.close(true); + if (writerMap != null) { + for (List writerList : writerMap.values()) 

Unknown macro: {+ for (HiveIcebergWriter writer } 
  } } 
 + @Override + public void commitJob(JobContext originalContext) throws IOException  { + commitJobs(Collections.singletonList(originalContext)); + } + + /** + * Wrapper class for storing output  {@link Table}  and it's context for committing changes: + * JobContext, CommitInfo. + */ + private static class OutputTable { + private final String catalogName; + private final String tableName; + private final Table table; + private final JobContext jobContext; + private final SessionStateUtil.CommitInfo commitInfo; + + private OutputTable(String catalogName, String tableName, Table table, JobContext jobContext, + SessionStateUtil.CommitInfo commitInfo)  { + this.catalogName = catalogName; + this.tableName = tableName; + this.table = table; + this.jobContext = jobContext; + this.commitInfo = commitInfo; + } + + @Override + public boolean equals(Object o) { + if (this == o)  { + return true; + } + if (o == null || getClass() != o.getClass())  { + return false; + } + OutputTable output1 = (OutputTable) o; + return Objects.equals(tableName, output1.tableName) && + Objects.equals(jobContext.getJobID(), output1.jobContext.getJobID()); + } + + @Override + public int hashCode()  { + return Objects.hash(tableName, jobContext.getJobID()); + } + + public Optional getCommitInfo()  { + return Optional.ofNullable(commitInfo); + } + } + /** 
 
Reads the commit files stored in the temp directories and collects the generated committed data files. 
Appends the data files to the tables. At the end removes the temporary directories. 
 
 
* @param originalContext The job context + * @param originalContextList The job context list 
 
 
@throws IOException if there is a failure accessing the files */ 
 
 
@Override 
public void commitJob(JobContext 

[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-28 Thread ASF GitHub Bot (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 ASF GitHub Bot added a worklog on  HIVE-26319  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
 kasakrisz commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r908287057 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -162,53 +168,102 @@ public void abortTask(TaskAttemptContext originalContext) throws IOException { TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(originalContext);  // Clean up writer data from the local store 
 
Map writers = WriterRegistry.removeWriters(context.getTaskAttemptID()); + Map> writerMap = WriterRegistry.removeWriters(context.getTaskAttemptID()); 
  // Remove files if it was not done already 
 
if (writers != null) { 
for (HiveIcebergWriter writer : writers.values()) { 
writer.close(true); + if (writerMap != null) { + for (List writerList : writerMap.values()) 

Unknown macro: {+ for (HiveIcebergWriter writer } 
  } } 
 + @Override + public void commitJob(JobContext originalContext) throws IOException  { + commitJobs(Collections.singletonList(originalContext)); + } + + /** + * Wrapper class for storing output  {@link Table}  and it's context for committing changes: + * JobContext, CommitInfo. + */ + private static class OutputTable { + private final String catalogName; + private final String tableName; + private final Table table; + private final JobContext jobContext; + private final SessionStateUtil.CommitInfo commitInfo; + + private OutputTable(String catalogName, String tableName, Table table, JobContext jobContext, + SessionStateUtil.CommitInfo commitInfo)  { + this.catalogName = catalogName; + this.tableName = tableName; + this.table = table; + this.jobContext = jobContext; + this.commitInfo = commitInfo; + } + + @Override + public boolean equals(Object o) { + if (this == o)  { + return true; + } + if (o == null || getClass() != o.getClass())  { + return false; + } + OutputTable output1 = (OutputTable) o; + return Objects.equals(tableName, output1.tableName) && + Objects.equals(jobContext.getJobID(), output1.jobContext.getJobID()); + } + + @Override + public int hashCode()  { + return Objects.hash(tableName, jobContext.getJobID()); + } + + public Optional getCommitInfo()  { + return Optional.ofNullable(commitInfo); + } + } + /** 
 
Reads the commit files stored in the temp directories and collects the generated committed data files. 
Appends the data files to the tables. At the end removes the temporary directories. 
 
 
* @param originalContext The job context + * @param originalContextList The job context list 
 
 
@throws IOException if there is a failure accessing the files */ 
 
 
@Override 
public void commitJob(JobContext 

[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-28 Thread ASF GitHub Bot (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 ASF GitHub Bot added a worklog on  HIVE-26319  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
 pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r908277556 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -231,25 +314,47 @@ public void commitJob(JobContext originalContext) throws IOException { */ @Override public void abortJob(JobContext originalContext, int status) throws IOException  { - JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext); - JobConf jobConf = jobContext.getJobConf(); + abortJobs(Collections.singletonList(originalContext)); + } + + public void abortJobs(List jobContexts, JobStatus.State runState) throws IOException { + int state = runState.getValue(); + if (state != JobStatus.FAILED && state != JobStatus.KILLED)  { + throw new IOException("Invalid job run state : " + runState.name()); + }  else  { + this.abortJobs(jobContexts); + } + } 
 
LOG.info("Job {} is aborted. Data file cleaning started", jobContext.getJobID()); 
Collection outputs = HiveIcebergStorageHandler.outputTables(jobContext.getJobConf()); + public void abortJobs(List originalContextList) throws IOException { + if (originalContextList.isEmpty()) { + return; + } + + List jobContextList = originalContextList.stream() + .map(TezUtil::enrichContextWithVertexId) + .collect(Collectors.toList()); + String ids = jobContextList.stream() + .map(jobContext -> jobContext.getJobID().toString()).collect(Collectors.joining(",")); + Set outputs = collectOutputs(jobContextList); + + LOG.info("Job(s) {} are aborted. Data file cleaning started", ids); Collection jobLocations = new ConcurrentLinkedQueue<>(); 
 
 
ExecutorService fileExecutor = fileExecutor(jobConf); 
ExecutorService tableExecutor = tableExecutor(jobConf, outputs.size()); + ExecutorService fileExecutor = fileExecutor(jobContextList.get(0).getJobConf()); + ExecutorService tableExecutor = tableExecutor(jobContextList.get(0).getJobConf(), outputs.size()); try { // Cleans up the changes for the output tables in parallel Tasks.foreach(outputs) .suppressFailureWhenFinished() .executeWith(tableExecutor) .onFailure((output, exc) -> LOG.warn("Failed cleanup table {} on abort job", output, exc)) .run(output -> { + JobContext jobContext = output.jobContext; + JobConf jobConf = output.jobContext.getJobConf(); 
 Review Comment: nit?: ``` JobConf jobConf = jobContext.getJobConf(); ```   
 

  
 
 
 
 

 
 Worklog entry:  
 10m logged by ASF GitHub Bot  
 
 
Change By: 
 ASF GitHub Bot  
 
 
Time Spent: 
 6h  40m  50m  
 

  
 
 
 

[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-28 Thread ASF GitHub Bot (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 ASF GitHub Bot added a worklog on  HIVE-26319  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
 pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r908278352 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -231,25 +314,47 @@ public void commitJob(JobContext originalContext) throws IOException { */ @Override public void abortJob(JobContext originalContext, int status) throws IOException  { - JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext); - JobConf jobConf = jobContext.getJobConf(); + abortJobs(Collections.singletonList(originalContext)); + } + + public void abortJobs(List jobContexts, JobStatus.State runState) throws IOException { + int state = runState.getValue(); + if (state != JobStatus.FAILED && state != JobStatus.KILLED)  { + throw new IOException("Invalid job run state : " + runState.name()); + }  else  { + this.abortJobs(jobContexts); + } + } 
 
LOG.info("Job {} is aborted. Data file cleaning started", jobContext.getJobID()); 
Collection outputs = HiveIcebergStorageHandler.outputTables(jobContext.getJobConf()); + public void abortJobs(List originalContextList) throws IOException { + if (originalContextList.isEmpty()) { + return; 
 Review Comment: maybe debug log here?   
 

  
 
 
 
 

 
 Worklog entry:  
 10m logged by ASF GitHub Bot  
 
 
Change By: 
 ASF GitHub Bot  
 
 
Time Spent: 
 6h 50m 7h  
 

  
 
 
 
 

 
 
 

 
 
 Add Comment  
 

  
 

  
 
 
 
  
 

  
 
 
 
 

 
 This message was sent by Atlassian Jira 

[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-28 Thread ASF GitHub Bot (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 ASF GitHub Bot added a worklog on  HIVE-26319  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
 pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r908275092 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -162,53 +168,102 @@ public void abortTask(TaskAttemptContext originalContext) throws IOException { TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(originalContext);  // Clean up writer data from the local store 
 
Map writers = WriterRegistry.removeWriters(context.getTaskAttemptID()); + Map> writerMap = WriterRegistry.removeWriters(context.getTaskAttemptID()); 
  // Remove files if it was not done already 
 
if (writers != null) { 
for (HiveIcebergWriter writer : writers.values()) { 
writer.close(true); + if (writerMap != null) { + for (List writerList : writerMap.values()) 

Unknown macro: {+ for (HiveIcebergWriter writer } 
  } } 
 + @Override + public void commitJob(JobContext originalContext) throws IOException  { + commitJobs(Collections.singletonList(originalContext)); + } + + /** + * Wrapper class for storing output  {@link Table}  and it's context for committing changes: + * JobContext, CommitInfo. + */ + private static class OutputTable { + private final String catalogName; + private final String tableName; + private final Table table; + private final JobContext jobContext; + private final SessionStateUtil.CommitInfo commitInfo; + + private OutputTable(String catalogName, String tableName, Table table, JobContext jobContext, + SessionStateUtil.CommitInfo commitInfo)  { + this.catalogName = catalogName; + this.tableName = tableName; + this.table = table; + this.jobContext = jobContext; + this.commitInfo = commitInfo; + } + + @Override + public boolean equals(Object o) { + if (this == o)  { + return true; + } + if (o == null || getClass() != o.getClass())  { + return false; + } + OutputTable output1 = (OutputTable) o; + return Objects.equals(tableName, output1.tableName) && + Objects.equals(jobContext.getJobID(), output1.jobContext.getJobID()); + } + + @Override + public int hashCode()  { + return Objects.hash(tableName, jobContext.getJobID()); + } + + public Optional getCommitInfo()  { + return Optional.ofNullable(commitInfo); + } + } + /** 
 
Reads the commit files stored in the temp directories and collects the generated committed data files. 
Appends the data files to the tables. At the end removes the temporary directories. 
 
 
* @param originalContext The job context + * @param originalContextList The job context list 
 
 
@throws IOException if there is a failure accessing the files */ 
 
 
@Override 
public void commitJob(JobContext 

[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-28 Thread ASF GitHub Bot (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 ASF GitHub Bot added a worklog on  HIVE-26319  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
 pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r908273788 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -231,25 +314,47 @@ public void commitJob(JobContext originalContext) throws IOException { */ @Override public void abortJob(JobContext originalContext, int status) throws IOException  { - JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext); - JobConf jobConf = jobContext.getJobConf(); + abortJobs(Collections.singletonList(originalContext)); + } + + public void abortJobs(List jobContexts, JobStatus.State runState) throws IOException { + int state = runState.getValue(); + if (state != JobStatus.FAILED && state != JobStatus.KILLED) { Review Comment: Why is this invalid?   
 

  
 
 
 
 

 
 Worklog entry:  
 10m logged by ASF GitHub Bot  
 
 
Change By: 
 ASF GitHub Bot  
 
 
Time Spent: 
 6h 20m 6.5h  
 

  
 
 
 
 

 
 
 

 
 
 Add Comment  
 

  
 

  
 
 
 
  
 

  
 
 
 
 

 
 This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)  
 
 

 
   
 

  
 

  
 

   



[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-28 Thread ASF GitHub Bot (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 ASF GitHub Bot added a worklog on  HIVE-26319  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
 pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r908269817 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -162,53 +168,102 @@ public void abortTask(TaskAttemptContext originalContext) throws IOException { TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(originalContext);  // Clean up writer data from the local store 
 
Map writers = WriterRegistry.removeWriters(context.getTaskAttemptID()); + Map> writerMap = WriterRegistry.removeWriters(context.getTaskAttemptID()); 
  // Remove files if it was not done already 
 
if (writers != null) { 
for (HiveIcebergWriter writer : writers.values()) { 
writer.close(true); + if (writerMap != null) { + for (List writerList : writerMap.values()) 

Unknown macro: {+ for (HiveIcebergWriter writer } 
  } } 
 + @Override + public void commitJob(JobContext originalContext) throws IOException  { + commitJobs(Collections.singletonList(originalContext)); + } + + /** + * Wrapper class for storing output  {@link Table}  and it's context for committing changes: + * JobContext, CommitInfo. + */ + private static class OutputTable { + private final String catalogName; + private final String tableName; + private final Table table; + private final JobContext jobContext; + private final SessionStateUtil.CommitInfo commitInfo; + + private OutputTable(String catalogName, String tableName, Table table, JobContext jobContext, + SessionStateUtil.CommitInfo commitInfo)  { + this.catalogName = catalogName; + this.tableName = tableName; + this.table = table; + this.jobContext = jobContext; + this.commitInfo = commitInfo; + } + + @Override + public boolean equals(Object o) { + if (this == o)  { + return true; + } + if (o == null || getClass() != o.getClass())  { + return false; + } + OutputTable output1 = (OutputTable) o; + return Objects.equals(tableName, output1.tableName) && + Objects.equals(jobContext.getJobID(), output1.jobContext.getJobID()); + } + + @Override + public int hashCode()  { + return Objects.hash(tableName, jobContext.getJobID()); + } + + public Optional getCommitInfo()  { + return Optional.ofNullable(commitInfo); + } + } + /** 
 
Reads the commit files stored in the temp directories and collects the generated committed data files. 
Appends the data files to the tables. At the end removes the temporary directories. 
 
 
* @param originalContext The job context + * @param originalContextList The job context list 
 
 
@throws IOException if there is a failure accessing the files */ 
 
 
@Override 
public void commitJob(JobContext 

[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785165=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785165
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 27/Jun/22 16:04
Start Date: 27/Jun/22 16:04
Worklog Time Spent: 10m 
  Work Description: kasakrisz commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r907549481


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -127,14 +133,23 @@ public void commitTask(TaskAttemptContext 
originalContext) throws IOException {
   .run(output -> {
 Table table = 
HiveIcebergStorageHandler.table(context.getJobConf(), output);
 if (table != null) {
-  HiveIcebergWriter writer = writers.get(output);
+  Collection dataFiles = Lists.newArrayList();
+  Collection deleteFiles = Lists.newArrayList();
   String fileForCommitLocation = 
generateFileForCommitLocation(table.location(), jobConf,
-  attemptID.getJobID(), attemptID.getTaskID().getId());
-  if (writer != null) {
-createFileForCommit(writer.files(), fileForCommitLocation, 
table.io());
-  } else {
+  attemptID.getJobID(), attemptID.getTaskID().getId());
+  if (writers.get(output) != null) {
+for (HiveIcebergWriter writer : writers.get(output)) {
+  if (writer != null) {
+dataFiles.addAll(writer.files().dataFiles());
+deleteFiles.addAll(writer.files().deleteFiles());

Review Comment:
   Reverted this.



##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -162,53 +177,96 @@ public void abortTask(TaskAttemptContext originalContext) 
throws IOException {
 TaskAttemptContext context = 
TezUtil.enrichContextWithAttemptWrapper(originalContext);
 
 // Clean up writer data from the local store
-Map writers = 
WriterRegistry.removeWriters(context.getTaskAttemptID());
+Map> writerMap = 
WriterRegistry.removeWriters(context.getTaskAttemptID());
 
 // Remove files if it was not done already
-if (writers != null) {
-  for (HiveIcebergWriter writer : writers.values()) {
-writer.close(true);
+if (writerMap != null) {
+  for (List writerList : writerMap.values()) {
+for (HiveIcebergWriter writer : writerList) {
+  writer.close(true);
+}
   }
 }
   }
 
+  @Override
+  public void commitJob(JobContext originalContext) throws IOException {
+commitJobs(Collections.singletonList(originalContext));
+  }
+
+  private static class OutputTable {

Review Comment:
   Added.





Issue Time Tracking
---

Worklog Id: (was: 785165)
Time Spent: 6h  (was: 5h 50m)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 6h
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785167=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785167
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 27/Jun/22 16:05
Start Date: 27/Jun/22 16:05
Worklog Time Spent: 10m 
  Work Description: kasakrisz commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r907550930


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -305,13 +414,14 @@ private Set listForCommits(JobConf jobConf, 
String jobLocation) thro
* Collects the additions to a single table and adds/commits the new files 
to the Iceberg table.
* @param io The io to read the forCommit files
* @param executor The executor used to read the forCommit files
-   * @param jobContext The job context
-   * @param name The name of the table used for loading from the catalog
+   * @param outputTable The table used for loading from the catalog
* @param location The location of the table used for loading from the 
catalog
* @param catalogName The name of the catalog that contains the table
*/
-  private void commitTable(FileIO io, ExecutorService executor, JobContext 
jobContext, String name, String location,
+  private void commitTable(FileIO io, ExecutorService executor, OutputTable 
outputTable, String location,

Review Comment:
   Moved `catalogName` into `OutputTable` and `location` can be accessed like
   ```
   outputTable.table.location()
   ```





Issue Time Tracking
---

Worklog Id: (was: 785167)
Time Spent: 6h 10m  (was: 6h)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785164=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785164
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 27/Jun/22 16:04
Start Date: 27/Jun/22 16:04
Worklog Time Spent: 10m 
  Work Description: kasakrisz commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r907549301


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -127,14 +133,23 @@ public void commitTask(TaskAttemptContext 
originalContext) throws IOException {
   .run(output -> {
 Table table = 
HiveIcebergStorageHandler.table(context.getJobConf(), output);
 if (table != null) {
-  HiveIcebergWriter writer = writers.get(output);
+  Collection dataFiles = Lists.newArrayList();
+  Collection deleteFiles = Lists.newArrayList();
   String fileForCommitLocation = 
generateFileForCommitLocation(table.location(), jobConf,
-  attemptID.getJobID(), attemptID.getTaskID().getId());
-  if (writer != null) {
-createFileForCommit(writer.files(), fileForCommitLocation, 
table.io());
-  } else {
+  attemptID.getJobID(), attemptID.getTaskID().getId());
+  if (writers.get(output) != null) {

Review Comment:
   The writer can be null if we don't write anything like
   ```
   INSERT OVERWRITE TABLE target SELECT * FROM source WHERE FALSE
   ``` 
   In this case output is not null but the writers map does not have an entry 
for the key.
   
   I moved the null check before the for loop iterates through the writers.
   
   I think it does not worth converting this loop to use the streaming api 
because `createFileForCommit` can throw `IOException` and IMHO this would add 
additional complexity because of the try-catch block in the lambda expression.





Issue Time Tracking
---

Worklog Id: (was: 785164)
Time Spent: 5h 50m  (was: 5h 40m)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785160=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785160
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 27/Jun/22 15:58
Start Date: 27/Jun/22 15:58
Worklog Time Spent: 10m 
  Work Description: kasakrisz commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r907544276


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -127,14 +133,23 @@ public void commitTask(TaskAttemptContext 
originalContext) throws IOException {
   .run(output -> {
 Table table = 
HiveIcebergStorageHandler.table(context.getJobConf(), output);
 if (table != null) {
-  HiveIcebergWriter writer = writers.get(output);
+  Collection dataFiles = Lists.newArrayList();
+  Collection deleteFiles = Lists.newArrayList();
   String fileForCommitLocation = 
generateFileForCommitLocation(table.location(), jobConf,

Review Comment:
   This was used when the sorting of the delete branch was missing. Since 
sorting is a must we never generate a plan any more which can handle both 
branches in the mapper and commit files are generated by separate jobs so I 
reverted this change.





Issue Time Tracking
---

Worklog Id: (was: 785160)
Time Spent: 5h 40m  (was: 5.5h)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785159=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785159
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 27/Jun/22 15:56
Start Date: 27/Jun/22 15:56
Worklog Time Spent: 10m 
  Work Description: kasakrisz commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r907542152


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -109,8 +115,8 @@ public void commitTask(TaskAttemptContext originalContext) 
throws IOException {
 
 TaskAttemptID attemptID = context.getTaskAttemptID();
 JobConf jobConf = context.getJobConf();
-Collection outputs = 
HiveIcebergStorageHandler.outputTables(context.getJobConf());
-Map writers = 
Optional.ofNullable(WriterRegistry.writers(attemptID))
+Set outputs = 
Sets.newHashSet(HiveIcebergStorageHandler.outputTables(context.getJobConf()));

Review Comment:
   Changed





Issue Time Tracking
---

Worklog Id: (was: 785159)
Time Spent: 5.5h  (was: 5h 20m)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785158=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785158
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 27/Jun/22 15:55
Start Date: 27/Jun/22 15:55
Worklog Time Spent: 10m 
  Work Description: kasakrisz commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r907541930


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterRegistry.java:
##
@@ -19,26 +19,29 @@
 
 package org.apache.iceberg.mr.hive.writer;
 
+import java.util.List;
 import java.util.Map;
 import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 
 public class WriterRegistry {
-  private static final Map> 
writers = Maps.newConcurrentMap();
+  private static final Map>> writers = Maps.newConcurrentMap();
 
   private WriterRegistry() {
   }
 
-  public static Map removeWriters(TaskAttemptID 
taskAttemptID) {
+  public static Map> 
removeWriters(TaskAttemptID taskAttemptID) {
 return writers.remove(taskAttemptID);
   }
 
   public static void registerWriter(TaskAttemptID taskAttemptID, String 
tableName, HiveIcebergWriter writer) {
 writers.putIfAbsent(taskAttemptID, Maps.newConcurrentMap());
-writers.get(taskAttemptID).put(tableName, writer);
+writers.get(taskAttemptID).putIfAbsent(tableName, Lists.newArrayList());

Review Comment:
   Done





Issue Time Tracking
---

Worklog Id: (was: 785158)
Time Spent: 5h 20m  (was: 5h 10m)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785157=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785157
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 27/Jun/22 15:55
Start Date: 27/Jun/22 15:55
Worklog Time Spent: 10m 
  Work Description: kasakrisz commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r907541673


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##
@@ -858,19 +861,25 @@ private static boolean 
hasParquetListColumnSupport(Properties tableProps, Schema
* @param overwrite If we have to overwrite the existing table or just add 
the new data
* @return The generated JobContext
*/
-  private Optional generateJobContext(Configuration configuration, 
String tableName, boolean overwrite) {
+  private Optional> generateJobContext(Configuration 
configuration, String tableName,
+  boolean overwrite) {
 JobConf jobConf = new JobConf(configuration);
-Optional commitInfo = 
SessionStateUtil.getCommitInfo(jobConf, tableName);
-if (commitInfo.isPresent()) {
-  JobID jobID = JobID.forName(commitInfo.get().getJobIdStr());
-  commitInfo.get().getProps().forEach(jobConf::set);
-  jobConf.setBoolean(InputFormatConfig.IS_OVERWRITE, overwrite);
-
-  // we should only commit this current table because
-  // for multi-table inserts, this hook method will be called sequentially 
for each target table
-  jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName);
-
-  return Optional.of(new JobContextImpl(jobConf, jobID, null));
+Optional> commitInfoMap =
+SessionStateUtil.getCommitInfo(jobConf, tableName);
+if (commitInfoMap.isPresent()) {
+  List jobContextList = Lists.newLinkedList();
+  for (SessionStateUtil.CommitInfo commitInfo : 
commitInfoMap.get().values()) {
+JobID jobID = JobID.forName(commitInfo.getJobIdStr());
+commitInfo.getProps().forEach(jobConf::set);
+jobConf.setBoolean(InputFormatConfig.IS_OVERWRITE, overwrite);
+
+// we should only commit this current table because
+// for multi-table inserts, this hook method will be called 
sequentially for each target table
+jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName);
+
+jobContextList.add(new JobContextImpl(jobConf, jobID, null));
+  }
+  return Optional.of(jobContextList);

Review Comment:
   changed return type from `Optional` to `List`





Issue Time Tracking
---

Worklog Id: (was: 785157)
Time Spent: 5h 10m  (was: 5h)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785156=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785156
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 27/Jun/22 15:54
Start Date: 27/Jun/22 15:54
Worklog Time Spent: 10m 
  Work Description: kasakrisz commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r907540855


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##
@@ -858,19 +861,25 @@ private static boolean 
hasParquetListColumnSupport(Properties tableProps, Schema
* @param overwrite If we have to overwrite the existing table or just add 
the new data
* @return The generated JobContext

Review Comment:
   Done





Issue Time Tracking
---

Worklog Id: (was: 785156)
Time Spent: 5h  (was: 4h 50m)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 5h
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785071=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785071
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 27/Jun/22 12:34
Start Date: 27/Jun/22 12:34
Worklog Time Spent: 10m 
  Work Description: pvary commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r907336065


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -305,13 +414,14 @@ private Set listForCommits(JobConf jobConf, 
String jobLocation) thro
* Collects the additions to a single table and adds/commits the new files 
to the Iceberg table.
* @param io The io to read the forCommit files
* @param executor The executor used to read the forCommit files
-   * @param jobContext The job context
-   * @param name The name of the table used for loading from the catalog
+   * @param outputTable The table used for loading from the catalog
* @param location The location of the table used for loading from the 
catalog
* @param catalogName The name of the catalog that contains the table
*/
-  private void commitTable(FileIO io, ExecutorService executor, JobContext 
jobContext, String name, String location,
+  private void commitTable(FileIO io, ExecutorService executor, OutputTable 
outputTable, String location,

Review Comment:
   Would it make sense to put the `location` and the `catalogName` to the 
`OutputTable` object?





Issue Time Tracking
---

Worklog Id: (was: 785071)
Time Spent: 4h 50m  (was: 4h 40m)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785069=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785069
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 27/Jun/22 12:31
Start Date: 27/Jun/22 12:31
Worklog Time Spent: 10m 
  Work Description: pvary commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r907332483


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -162,53 +177,96 @@ public void abortTask(TaskAttemptContext originalContext) 
throws IOException {
 TaskAttemptContext context = 
TezUtil.enrichContextWithAttemptWrapper(originalContext);
 
 // Clean up writer data from the local store
-Map writers = 
WriterRegistry.removeWriters(context.getTaskAttemptID());
+Map> writerMap = 
WriterRegistry.removeWriters(context.getTaskAttemptID());
 
 // Remove files if it was not done already
-if (writers != null) {
-  for (HiveIcebergWriter writer : writers.values()) {
-writer.close(true);
+if (writerMap != null) {
+  for (List writerList : writerMap.values()) {
+for (HiveIcebergWriter writer : writerList) {
+  writer.close(true);
+}
   }
 }
   }
 
+  @Override
+  public void commitJob(JobContext originalContext) throws IOException {
+commitJobs(Collections.singletonList(originalContext));
+  }
+
+  private static class OutputTable {

Review Comment:
   Some javadoc would be nice





Issue Time Tracking
---

Worklog Id: (was: 785069)
Time Spent: 4h 40m  (was: 4.5h)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785068=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785068
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 27/Jun/22 12:29
Start Date: 27/Jun/22 12:29
Worklog Time Spent: 10m 
  Work Description: pvary commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r90733


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -127,14 +133,23 @@ public void commitTask(TaskAttemptContext 
originalContext) throws IOException {
   .run(output -> {
 Table table = 
HiveIcebergStorageHandler.table(context.getJobConf(), output);
 if (table != null) {
-  HiveIcebergWriter writer = writers.get(output);
+  Collection dataFiles = Lists.newArrayList();
+  Collection deleteFiles = Lists.newArrayList();
   String fileForCommitLocation = 
generateFileForCommitLocation(table.location(), jobConf,
-  attemptID.getJobID(), attemptID.getTaskID().getId());
-  if (writer != null) {
-createFileForCommit(writer.files(), fileForCommitLocation, 
table.io());
-  } else {
+  attemptID.getJobID(), attemptID.getTaskID().getId());
+  if (writers.get(output) != null) {

Review Comment:
   Maybe when we do not do the null checks we can use the streaming api to 
simplify the code





Issue Time Tracking
---

Worklog Id: (was: 785068)
Time Spent: 4.5h  (was: 4h 20m)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785066=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785066
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 27/Jun/22 12:29
Start Date: 27/Jun/22 12:29
Worklog Time Spent: 10m 
  Work Description: pvary commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r907330452


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -127,14 +133,23 @@ public void commitTask(TaskAttemptContext 
originalContext) throws IOException {
   .run(output -> {
 Table table = 
HiveIcebergStorageHandler.table(context.getJobConf(), output);
 if (table != null) {
-  HiveIcebergWriter writer = writers.get(output);
+  Collection dataFiles = Lists.newArrayList();
+  Collection deleteFiles = Lists.newArrayList();
   String fileForCommitLocation = 
generateFileForCommitLocation(table.location(), jobConf,
-  attemptID.getJobID(), attemptID.getTaskID().getId());
-  if (writer != null) {
-createFileForCommit(writer.files(), fileForCommitLocation, 
table.io());
-  } else {
+  attemptID.getJobID(), attemptID.getTaskID().getId());
+  if (writers.get(output) != null) {
+for (HiveIcebergWriter writer : writers.get(output)) {
+  if (writer != null) {
+dataFiles.addAll(writer.files().dataFiles());
+deleteFiles.addAll(writer.files().deleteFiles());

Review Comment:
   `writer.files()` might create an object every time. We should keep the 
object and reuse it.





Issue Time Tracking
---

Worklog Id: (was: 785066)
Time Spent: 4h 20m  (was: 4h 10m)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785063=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785063
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 27/Jun/22 12:26
Start Date: 27/Jun/22 12:26
Worklog Time Spent: 10m 
  Work Description: pvary commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r907328344


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -127,14 +133,23 @@ public void commitTask(TaskAttemptContext 
originalContext) throws IOException {
   .run(output -> {
 Table table = 
HiveIcebergStorageHandler.table(context.getJobConf(), output);
 if (table != null) {
-  HiveIcebergWriter writer = writers.get(output);
+  Collection dataFiles = Lists.newArrayList();
+  Collection deleteFiles = Lists.newArrayList();
   String fileForCommitLocation = 
generateFileForCommitLocation(table.location(), jobConf,
-  attemptID.getJobID(), attemptID.getTaskID().getId());
-  if (writer != null) {
-createFileForCommit(writer.files(), fileForCommitLocation, 
table.io());
-  } else {
+  attemptID.getJobID(), attemptID.getTaskID().getId());
+  if (writers.get(output) != null) {

Review Comment:
   Do we put `null` values to the map? Do we really need the null check?





Issue Time Tracking
---

Worklog Id: (was: 785063)
Time Spent: 4h 10m  (was: 4h)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785062=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785062
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 27/Jun/22 12:25
Start Date: 27/Jun/22 12:25
Worklog Time Spent: 10m 
  Work Description: pvary commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r907327310


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -127,14 +133,23 @@ public void commitTask(TaskAttemptContext 
originalContext) throws IOException {
   .run(output -> {
 Table table = 
HiveIcebergStorageHandler.table(context.getJobConf(), output);
 if (table != null) {
-  HiveIcebergWriter writer = writers.get(output);
+  Collection dataFiles = Lists.newArrayList();
+  Collection deleteFiles = Lists.newArrayList();
   String fileForCommitLocation = 
generateFileForCommitLocation(table.location(), jobConf,

Review Comment:
   Is this used?





Issue Time Tracking
---

Worklog Id: (was: 785062)
Time Spent: 4h  (was: 3h 50m)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785059=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785059
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 27/Jun/22 12:22
Start Date: 27/Jun/22 12:22
Worklog Time Spent: 10m 
  Work Description: pvary commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r907324564


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -109,8 +115,8 @@ public void commitTask(TaskAttemptContext originalContext) 
throws IOException {
 
 TaskAttemptID attemptID = context.getTaskAttemptID();
 JobConf jobConf = context.getJobConf();
-Collection outputs = 
HiveIcebergStorageHandler.outputTables(context.getJobConf());
-Map writers = 
Optional.ofNullable(WriterRegistry.writers(attemptID))
+Set outputs = 
Sets.newHashSet(HiveIcebergStorageHandler.outputTables(context.getJobConf()));

Review Comment:
   Could we change the return type for `outputTables` to `Set`?





Issue Time Tracking
---

Worklog Id: (was: 785059)
Time Spent: 3h 50m  (was: 3h 40m)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785058=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785058
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 27/Jun/22 12:19
Start Date: 27/Jun/22 12:19
Worklog Time Spent: 10m 
  Work Description: pvary commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r907321359


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterRegistry.java:
##
@@ -19,26 +19,29 @@
 
 package org.apache.iceberg.mr.hive.writer;
 
+import java.util.List;
 import java.util.Map;
 import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 
 public class WriterRegistry {
-  private static final Map> 
writers = Maps.newConcurrentMap();
+  private static final Map>> writers = Maps.newConcurrentMap();
 
   private WriterRegistry() {
   }
 
-  public static Map removeWriters(TaskAttemptID 
taskAttemptID) {
+  public static Map> 
removeWriters(TaskAttemptID taskAttemptID) {
 return writers.remove(taskAttemptID);
   }
 
   public static void registerWriter(TaskAttemptID taskAttemptID, String 
tableName, HiveIcebergWriter writer) {
 writers.putIfAbsent(taskAttemptID, Maps.newConcurrentMap());
-writers.get(taskAttemptID).put(tableName, writer);
+writers.get(taskAttemptID).putIfAbsent(tableName, Lists.newArrayList());

Review Comment:
   nit: would it worth to declare a variable for this?





Issue Time Tracking
---

Worklog Id: (was: 785058)
Time Spent: 3h 40m  (was: 3.5h)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785057=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785057
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 27/Jun/22 12:17
Start Date: 27/Jun/22 12:17
Worklog Time Spent: 10m 
  Work Description: pvary commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r907320335


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##
@@ -858,19 +861,25 @@ private static boolean 
hasParquetListColumnSupport(Properties tableProps, Schema
* @param overwrite If we have to overwrite the existing table or just add 
the new data
* @return The generated JobContext
*/
-  private Optional generateJobContext(Configuration configuration, 
String tableName, boolean overwrite) {
+  private Optional> generateJobContext(Configuration 
configuration, String tableName,
+  boolean overwrite) {
 JobConf jobConf = new JobConf(configuration);
-Optional commitInfo = 
SessionStateUtil.getCommitInfo(jobConf, tableName);
-if (commitInfo.isPresent()) {
-  JobID jobID = JobID.forName(commitInfo.get().getJobIdStr());
-  commitInfo.get().getProps().forEach(jobConf::set);
-  jobConf.setBoolean(InputFormatConfig.IS_OVERWRITE, overwrite);
-
-  // we should only commit this current table because
-  // for multi-table inserts, this hook method will be called sequentially 
for each target table
-  jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName);
-
-  return Optional.of(new JobContextImpl(jobConf, jobID, null));
+Optional> commitInfoMap =
+SessionStateUtil.getCommitInfo(jobConf, tableName);
+if (commitInfoMap.isPresent()) {
+  List jobContextList = Lists.newLinkedList();
+  for (SessionStateUtil.CommitInfo commitInfo : 
commitInfoMap.get().values()) {
+JobID jobID = JobID.forName(commitInfo.getJobIdStr());
+commitInfo.getProps().forEach(jobConf::set);
+jobConf.setBoolean(InputFormatConfig.IS_OVERWRITE, overwrite);
+
+// we should only commit this current table because
+// for multi-table inserts, this hook method will be called 
sequentially for each target table
+jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName);
+
+jobContextList.add(new JobContextImpl(jobConf, jobID, null));
+  }
+  return Optional.of(jobContextList);

Review Comment:
   Why not empty list instead of Optional?





Issue Time Tracking
---

Worklog Id: (was: 785057)
Time Spent: 3.5h  (was: 3h 20m)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785056=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785056
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 27/Jun/22 12:17
Start Date: 27/Jun/22 12:17
Worklog Time Spent: 10m 
  Work Description: pvary commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r907319537


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##
@@ -858,19 +861,25 @@ private static boolean 
hasParquetListColumnSupport(Properties tableProps, Schema
* @param overwrite If we have to overwrite the existing table or just add 
the new data
* @return The generated JobContext

Review Comment:
   Please update the javadoc





Issue Time Tracking
---

Worklog Id: (was: 785056)
Time Spent: 3h 20m  (was: 3h 10m)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785051=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785051
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 27/Jun/22 11:54
Start Date: 27/Jun/22 11:54
Worklog Time Spent: 10m 
  Work Description: kasakrisz commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r907299669


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##
@@ -411,23 +411,27 @@ public boolean commitInMoveTask() {
   public void storageHandlerCommit(Properties commitProperties, boolean 
overwrite) throws HiveException {
 String tableName = commitProperties.getProperty(Catalogs.NAME);
 Configuration configuration = SessionState.getSessionConf();
-Optional jobContext = generateJobContext(configuration, 
tableName, overwrite);
-if (jobContext.isPresent()) {
+Optional> jobContextList = 
generateJobContext(configuration, tableName, overwrite);
+if (!jobContextList.isPresent()) {
+  return;
+}
+
+for (JobContext jobContext : jobContextList.get()) {
   OutputCommitter committer = new HiveIcebergOutputCommitter();
   try {
-committer.commitJob(jobContext.get());
+committer.commitJob(jobContext);
   } catch (Throwable e) {
 // Aborting the job if the commit has failed
 LOG.error("Error while trying to commit job: {}, starting rollback 
changes for table: {}",
-jobContext.get().getJobID(), tableName, e);
+jobContext.getJobID(), tableName, e);
 try {
-  committer.abortJob(jobContext.get(), JobStatus.State.FAILED);
+  committer.abortJob(jobContext, JobStatus.State.FAILED);

Review Comment:
   Discussed this offline: current implementation does not handle the rollback 
of successful jobs in case of any failure and it should be implemented in a 
follow-up patch for example by adding an `onFailure` handler to the Tasks:
   ```
Tasks.foreach(outputs)
 .onFailure( ... )
   ```
   





Issue Time Tracking
---

Worklog Id: (was: 785051)
Time Spent: 3h 10m  (was: 3h)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785050=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785050
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 27/Jun/22 11:49
Start Date: 27/Jun/22 11:49
Worklog Time Spent: 10m 
  Work Description: kasakrisz commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r907295916


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -325,7 +339,15 @@ private void commitTable(FileIO io, ExecutorService 
executor, JobContext jobCont
 LOG.info("Committing job has started for table: {}, using location: {}",
 table, generateJobLocation(location, conf, jobContext.getJobID()));
 
-int numTasks = SessionStateUtil.getCommitInfo(conf, name).map(info -> 
info.getTaskNum()).orElseGet(() -> {
+Optional commitInfo;
+if (SessionStateUtil.getCommitInfo(conf, name).isPresent()) {
+  commitInfo = SessionStateUtil.getCommitInfo(conf, name).get()
+  .stream().filter(ci -> 
ci.getJobIdStr().equals(jobContext.getJobID().toString())).findFirst();

Review Comment:
   Replaced the `List` to `Map` where the key 
is the string representation of the `jobId` associated to the `CommitInfo`.





Issue Time Tracking
---

Worklog Id: (was: 785050)
Time Spent: 3h  (was: 2h 50m)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=783293=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-783293
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 21/Jun/22 09:44
Start Date: 21/Jun/22 09:44
Worklog Time Spent: 10m 
  Work Description: pvary commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r902401629


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##
@@ -411,23 +411,27 @@ public boolean commitInMoveTask() {
   public void storageHandlerCommit(Properties commitProperties, boolean 
overwrite) throws HiveException {
 String tableName = commitProperties.getProperty(Catalogs.NAME);
 Configuration configuration = SessionState.getSessionConf();
-Optional jobContext = generateJobContext(configuration, 
tableName, overwrite);
-if (jobContext.isPresent()) {
+Optional> jobContextList = 
generateJobContext(configuration, tableName, overwrite);
+if (!jobContextList.isPresent()) {
+  return;
+}
+
+for (JobContext jobContext : jobContextList.get()) {
   OutputCommitter committer = new HiveIcebergOutputCommitter();
   try {
-committer.commitJob(jobContext.get());
+committer.commitJob(jobContext);
   } catch (Throwable e) {
 // Aborting the job if the commit has failed
 LOG.error("Error while trying to commit job: {}, starting rollback 
changes for table: {}",
-jobContext.get().getJobID(), tableName, e);
+jobContext.getJobID(), tableName, e);
 try {
-  committer.abortJob(jobContext.get(), JobStatus.State.FAILED);
+  committer.abortJob(jobContext, JobStatus.State.FAILED);

Review Comment:
   I do not think we have tests where the 2 inserts are inserting into the same 
table - not even sure that it is correct





Issue Time Tracking
---

Worklog Id: (was: 783293)
Time Spent: 2h 50m  (was: 2h 40m)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=783291=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-783291
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 21/Jun/22 09:44
Start Date: 21/Jun/22 09:44
Worklog Time Spent: 10m 
  Work Description: pvary commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r902401629


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##
@@ -411,23 +411,27 @@ public boolean commitInMoveTask() {
   public void storageHandlerCommit(Properties commitProperties, boolean 
overwrite) throws HiveException {
 String tableName = commitProperties.getProperty(Catalogs.NAME);
 Configuration configuration = SessionState.getSessionConf();
-Optional jobContext = generateJobContext(configuration, 
tableName, overwrite);
-if (jobContext.isPresent()) {
+Optional> jobContextList = 
generateJobContext(configuration, tableName, overwrite);
+if (!jobContextList.isPresent()) {
+  return;
+}
+
+for (JobContext jobContext : jobContextList.get()) {
   OutputCommitter committer = new HiveIcebergOutputCommitter();
   try {
-committer.commitJob(jobContext.get());
+committer.commitJob(jobContext);
   } catch (Throwable e) {
 // Aborting the job if the commit has failed
 LOG.error("Error while trying to commit job: {}, starting rollback 
changes for table: {}",
-jobContext.get().getJobID(), tableName, e);
+jobContext.getJobID(), tableName, e);
 try {
-  committer.abortJob(jobContext.get(), JobStatus.State.FAILED);
+  committer.abortJob(jobContext, JobStatus.State.FAILED);

Review Comment:
   I do not think we have tests where the 2 inserts are inserting into the same 
table





Issue Time Tracking
---

Worklog Id: (was: 783291)
Time Spent: 2h 40m  (was: 2.5h)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=783290=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-783290
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 21/Jun/22 09:43
Start Date: 21/Jun/22 09:43
Worklog Time Spent: 10m 
  Work Description: pvary commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r902401099


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##
@@ -411,23 +411,27 @@ public boolean commitInMoveTask() {
   public void storageHandlerCommit(Properties commitProperties, boolean 
overwrite) throws HiveException {
 String tableName = commitProperties.getProperty(Catalogs.NAME);
 Configuration configuration = SessionState.getSessionConf();
-Optional jobContext = generateJobContext(configuration, 
tableName, overwrite);
-if (jobContext.isPresent()) {
+Optional> jobContextList = 
generateJobContext(configuration, tableName, overwrite);
+if (!jobContextList.isPresent()) {
+  return;
+}
+
+for (JobContext jobContext : jobContextList.get()) {
   OutputCommitter committer = new HiveIcebergOutputCommitter();
   try {
-committer.commitJob(jobContext.get());
+committer.commitJob(jobContext);
   } catch (Throwable e) {
 // Aborting the job if the commit has failed
 LOG.error("Error while trying to commit job: {}, starting rollback 
changes for table: {}",
-jobContext.get().getJobID(), tableName, e);
+jobContext.getJobID(), tableName, e);
 try {
-  committer.abortJob(jobContext.get(), JobStatus.State.FAILED);
+  committer.abortJob(jobContext, JobStatus.State.FAILED);

Review Comment:
   `TestHiveIcebergInserts.testMultiTableInsert` - this should check multi 
table inserts:
   ```
   // simple insert: should create a single vertex writing to both target 
tables
   shell.executeStatement("FROM customers " +
   "INSERT INTO target1 SELECT customer_id, first_name " +
   "INSERT INTO target2 SELECT last_name, customer_id");
   ```





Issue Time Tracking
---

Worklog Id: (was: 783290)
Time Spent: 2.5h  (was: 2h 20m)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=783286=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-783286
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 21/Jun/22 09:41
Start Date: 21/Jun/22 09:41
Worklog Time Spent: 10m 
  Work Description: pvary commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r902399038


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -325,7 +339,15 @@ private void commitTable(FileIO io, ExecutorService 
executor, JobContext jobCont
 LOG.info("Committing job has started for table: {}, using location: {}",
 table, generateJobLocation(location, conf, jobContext.getJobID()));
 
-int numTasks = SessionStateUtil.getCommitInfo(conf, name).map(info -> 
info.getTaskNum()).orElseGet(() -> {
+Optional commitInfo;
+if (SessionStateUtil.getCommitInfo(conf, name).isPresent()) {
+  commitInfo = SessionStateUtil.getCommitInfo(conf, name).get()
+  .stream().filter(ci -> 
ci.getJobIdStr().equals(jobContext.getJobID().toString())).findFirst();

Review Comment:
   Then why `.findFirst()`?





Issue Time Tracking
---

Worklog Id: (was: 783286)
Time Spent: 2h 20m  (was: 2h 10m)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=782339=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-782339
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 17/Jun/22 09:26
Start Date: 17/Jun/22 09:26
Worklog Time Spent: 10m 
  Work Description: kasakrisz commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r899942979


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##
@@ -858,19 +862,24 @@ private static boolean 
hasParquetListColumnSupport(Properties tableProps, Schema
* @param overwrite If we have to overwrite the existing table or just add 
the new data
* @return The generated JobContext
*/
-  private Optional generateJobContext(Configuration configuration, 
String tableName, boolean overwrite) {
+  private Optional> generateJobContext(

Review Comment:
   rewritten.





Issue Time Tracking
---

Worklog Id: (was: 782339)
Time Spent: 2h 10m  (was: 2h)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=782338=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-782338
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 17/Jun/22 09:26
Start Date: 17/Jun/22 09:26
Worklog Time Spent: 10m 
  Work Description: kasakrisz commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r899942578


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##
@@ -411,23 +411,27 @@ public boolean commitInMoveTask() {
   public void storageHandlerCommit(Properties commitProperties, boolean 
overwrite) throws HiveException {
 String tableName = commitProperties.getProperty(Catalogs.NAME);
 Configuration configuration = SessionState.getSessionConf();
-Optional jobContext = generateJobContext(configuration, 
tableName, overwrite);
-if (jobContext.isPresent()) {
+Optional> jobContextList = 
generateJobContext(configuration, tableName, overwrite);
+if (!jobContextList.isPresent()) {
+  return;
+}
+
+for (JobContext jobContext : jobContextList.get()) {
   OutputCommitter committer = new HiveIcebergOutputCommitter();
   try {
-committer.commitJob(jobContext.get());
+committer.commitJob(jobContext);
   } catch (Throwable e) {
 // Aborting the job if the commit has failed
 LOG.error("Error while trying to commit job: {}, starting rollback 
changes for table: {}",
-jobContext.get().getJobID(), tableName, e);
+jobContext.getJobID(), tableName, e);
 try {
-  committer.abortJob(jobContext.get(), JobStatus.State.FAILED);
+  committer.abortJob(jobContext, JobStatus.State.FAILED);

Review Comment:
   I think all jobs should be rolled back in case of error when committing any 
of them. To achieve this we are using `org.apache.iceberg.util.Tasks`:
   ```
 Tasks.foreach(outputs)
 .throwFailureWhenFinished()
 .stopOnFailure()
 .run(output -> {
   ...
   ```
   which can revert all tasks in case of error even if some of them are already 
succeeded.
   
   The initial implementation committed each job independently: all jobs 
launched a separate batch of tasks.
   I refactored this part to collect all outputs from all jobs and launch it in 
one batch.
   I also found that this is done parallel and we are looking up the necessary 
data for commit in the SessionState which is stored thread locally. I 
experienced that this is working only if one output exists since only one 
worker thread is used and that is the main thread where the `SessionState` is 
initialized. However if more than one outputs exists in a batch threads other 
than the main thread does not have the necessary data for commit in the 
`SessionState`.
   So I extracted collecting these data prior launching the tasks.
   
   This affects multi inserts, split updates and merge statements. I haven't 
found any tests for multi inserting into an iceberg table (please share some if 
any exists) I assume this issue haven't came up before.
   
   Please share your thoughts.
   





Issue Time Tracking
---

Worklog Id: (was: 782338)
Time Spent: 2h  (was: 1h 50m)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=782328=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-782328
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 17/Jun/22 09:08
Start Date: 17/Jun/22 09:08
Worklog Time Spent: 10m 
  Work Description: kasakrisz commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r899928173


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##
@@ -411,23 +411,27 @@ public boolean commitInMoveTask() {
   public void storageHandlerCommit(Properties commitProperties, boolean 
overwrite) throws HiveException {
 String tableName = commitProperties.getProperty(Catalogs.NAME);
 Configuration configuration = SessionState.getSessionConf();
-Optional jobContext = generateJobContext(configuration, 
tableName, overwrite);
-if (jobContext.isPresent()) {
+Optional> jobContextList = 
generateJobContext(configuration, tableName, overwrite);
+if (!jobContextList.isPresent()) {
+  return;
+}
+
+for (JobContext jobContext : jobContextList.get()) {
   OutputCommitter committer = new HiveIcebergOutputCommitter();
   try {
-committer.commitJob(jobContext.get());
+committer.commitJob(jobContext);
   } catch (Throwable e) {
 // Aborting the job if the commit has failed
 LOG.error("Error while trying to commit job: {}, starting rollback 
changes for table: {}",
-jobContext.get().getJobID(), tableName, e);
+jobContext.getJobID(), tableName, e);
 try {
-  committer.abortJob(jobContext.get(), JobStatus.State.FAILED);
+  committer.abortJob(jobContext, JobStatus.State.FAILED);
 } catch (IOException ioe) {
   LOG.error("Error while trying to abort failed job. There might be 
uncleaned data files.", ioe);
   // no throwing here because the original exception should be 
propagated
 }
 throw new HiveException(
-"Error committing job: " + jobContext.get().getJobID() + " for 
table: " + tableName, e);
+"Error committing job: " + jobContext.getJobID() + " for 
table: " + tableName, e);

Review Comment:
   removed



##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##
@@ -411,23 +411,27 @@ public boolean commitInMoveTask() {
   public void storageHandlerCommit(Properties commitProperties, boolean 
overwrite) throws HiveException {
 String tableName = commitProperties.getProperty(Catalogs.NAME);
 Configuration configuration = SessionState.getSessionConf();
-Optional jobContext = generateJobContext(configuration, 
tableName, overwrite);
-if (jobContext.isPresent()) {
+Optional> jobContextList = 
generateJobContext(configuration, tableName, overwrite);
+if (!jobContextList.isPresent()) {
+  return;
+}
+
+for (JobContext jobContext : jobContextList.get()) {
   OutputCommitter committer = new HiveIcebergOutputCommitter();
   try {
-committer.commitJob(jobContext.get());
+committer.commitJob(jobContext);
   } catch (Throwable e) {
 // Aborting the job if the commit has failed
 LOG.error("Error while trying to commit job: {}, starting rollback 
changes for table: {}",
-jobContext.get().getJobID(), tableName, e);
+jobContext.getJobID(), tableName, e);

Review Comment:
   removed





Issue Time Tracking
---

Worklog Id: (was: 782328)
Time Spent: 1h 50m  (was: 1h 40m)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=782322=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-782322
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 17/Jun/22 09:04
Start Date: 17/Jun/22 09:04
Worklog Time Spent: 10m 
  Work Description: kasakrisz commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r899924819


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -127,14 +130,23 @@ public void commitTask(TaskAttemptContext 
originalContext) throws IOException {
   .run(output -> {
 Table table = 
HiveIcebergStorageHandler.table(context.getJobConf(), output);
 if (table != null) {
-  HiveIcebergWriter writer = writers.get(output);
+  Collection dataFiles = Lists.newArrayList();
+  Collection deleteFiles = Lists.newArrayList();
   String fileForCommitLocation = 
generateFileForCommitLocation(table.location(), jobConf,
-  attemptID.getJobID(), attemptID.getTaskID().getId());
-  if (writer != null) {
-createFileForCommit(writer.files(), fileForCommitLocation, 
table.io());
-  } else {
+  attemptID.getJobID(), attemptID.getTaskID().getId());
+  if (writers.get(output) != null) {
+for (HiveIcebergWriter writer : writers.get(output)) {
+  if (writer != null) {
+dataFiles.addAll(writer.files().dataFiles());

Review Comment:
   I wouldn't change this code until it turns out to be a bottleneck.





Issue Time Tracking
---

Worklog Id: (was: 782322)
Time Spent: 1h 40m  (was: 1.5h)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=782321=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-782321
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 17/Jun/22 09:02
Start Date: 17/Jun/22 09:02
Worklog Time Spent: 10m 
  Work Description: kasakrisz commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r899923444


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -325,7 +339,15 @@ private void commitTable(FileIO io, ExecutorService 
executor, JobContext jobCont
 LOG.info("Committing job has started for table: {}, using location: {}",
 table, generateJobLocation(location, conf, jobContext.getJobID()));
 
-int numTasks = SessionStateUtil.getCommitInfo(conf, name).map(info -> 
info.getTaskNum()).orElseGet(() -> {
+Optional commitInfo;
+if (SessionStateUtil.getCommitInfo(conf, name).isPresent()) {
+  commitInfo = SessionStateUtil.getCommitInfo(conf, name).get()
+  .stream().filter(ci -> 
ci.getJobIdStr().equals(jobContext.getJobID().toString())).findFirst();

Review Comment:
   AFAIK only one `CommitInfo` object should be associated to a jobContext.





Issue Time Tracking
---

Worklog Id: (was: 782321)
Time Spent: 1.5h  (was: 1h 20m)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=782025=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-782025
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 16/Jun/22 11:52
Start Date: 16/Jun/22 11:52
Worklog Time Spent: 10m 
  Work Description: pvary commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r899000415


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -127,14 +130,23 @@ public void commitTask(TaskAttemptContext 
originalContext) throws IOException {
   .run(output -> {
 Table table = 
HiveIcebergStorageHandler.table(context.getJobConf(), output);
 if (table != null) {
-  HiveIcebergWriter writer = writers.get(output);
+  Collection dataFiles = Lists.newArrayList();
+  Collection deleteFiles = Lists.newArrayList();
   String fileForCommitLocation = 
generateFileForCommitLocation(table.location(), jobConf,
-  attemptID.getJobID(), attemptID.getTaskID().getId());
-  if (writer != null) {
-createFileForCommit(writer.files(), fileForCommitLocation, 
table.io());
-  } else {
+  attemptID.getJobID(), attemptID.getTaskID().getId());
+  if (writers.get(output) != null) {
+for (HiveIcebergWriter writer : writers.get(output)) {
+  if (writer != null) {
+dataFiles.addAll(writer.files().dataFiles());

Review Comment:
   I leave this decision to you  
   I do not have a clear preference





Issue Time Tracking
---

Worklog Id: (was: 782025)
Time Spent: 1h 20m  (was: 1h 10m)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=782016=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-782016
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 16/Jun/22 11:11
Start Date: 16/Jun/22 11:11
Worklog Time Spent: 10m 
  Work Description: kasakrisz commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r898969956


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -127,14 +130,23 @@ public void commitTask(TaskAttemptContext 
originalContext) throws IOException {
   .run(output -> {
 Table table = 
HiveIcebergStorageHandler.table(context.getJobConf(), output);
 if (table != null) {
-  HiveIcebergWriter writer = writers.get(output);
+  Collection dataFiles = Lists.newArrayList();
+  Collection deleteFiles = Lists.newArrayList();
   String fileForCommitLocation = 
generateFileForCommitLocation(table.location(), jobConf,
-  attemptID.getJobID(), attemptID.getTaskID().getId());
-  if (writer != null) {
-createFileForCommit(writer.files(), fileForCommitLocation, 
table.io());
-  } else {
+  attemptID.getJobID(), attemptID.getTaskID().getId());
+  if (writers.get(output) != null) {
+for (HiveIcebergWriter writer : writers.get(output)) {
+  if (writer != null) {
+dataFiles.addAll(writer.files().dataFiles());

Review Comment:
   I found this usage:
   
https://github.com/apache/hive/blob/67c2d4910ff17c694653eb8bd9c9ed2405cec38b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterBase.java#L59
   
   `HiveIcebergWriter` does not have `dataFiles()`, `deleteFiles()` methods and 
it can be a `HiveIcebergRecordWriter`, `HiveIcebergDeleteWriter` etc which 
treats data and delete files a different way.
   
   If we want to avoid creating the `FilesForCommit` object creation to replace 
`HiveIcebergWriter.files()`
   * create a method like `HiveIcebergWriter.collectFiles(List<> dataFiles, 
List<> deleteFiles)`
   or 
   * create dataFiles(), deleteFiles() methods. I prefer wrapping the returned 
lists into unmodifiableList which is also a new object creation.
   
   Which do you prefer? 
   
   On the other hand I don't think creating the `FilesForCommit` objects is 
critical. These are created only when a result of a statement should be 
committed/aborted not per record basis





Issue Time Tracking
---

Worklog Id: (was: 782016)
Time Spent: 1h 10m  (was: 1h)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=781982=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-781982
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 16/Jun/22 09:31
Start Date: 16/Jun/22 09:31
Worklog Time Spent: 10m 
  Work Description: pvary commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r898890734


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##
@@ -858,19 +862,24 @@ private static boolean 
hasParquetListColumnSupport(Properties tableProps, Schema
* @param overwrite If we have to overwrite the existing table or just add 
the new data
* @return The generated JobContext
*/
-  private Optional generateJobContext(Configuration configuration, 
String tableName, boolean overwrite) {
+  private Optional> generateJobContext(

Review Comment:
   nit: In Iceberg code we usually break lines like this:
   ```
 private Optional> generateJobContext(Configuration 
configuration, String tableName,
 boolean overwrite) {
   ```





Issue Time Tracking
---

Worklog Id: (was: 781982)
Time Spent: 1h  (was: 50m)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=781980=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-781980
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 16/Jun/22 09:30
Start Date: 16/Jun/22 09:30
Worklog Time Spent: 10m 
  Work Description: pvary commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r898889313


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##
@@ -411,23 +411,27 @@ public boolean commitInMoveTask() {
   public void storageHandlerCommit(Properties commitProperties, boolean 
overwrite) throws HiveException {
 String tableName = commitProperties.getProperty(Catalogs.NAME);
 Configuration configuration = SessionState.getSessionConf();
-Optional jobContext = generateJobContext(configuration, 
tableName, overwrite);
-if (jobContext.isPresent()) {
+Optional> jobContextList = 
generateJobContext(configuration, tableName, overwrite);
+if (!jobContextList.isPresent()) {
+  return;
+}
+
+for (JobContext jobContext : jobContextList.get()) {
   OutputCommitter committer = new HiveIcebergOutputCommitter();
   try {
-committer.commitJob(jobContext.get());
+committer.commitJob(jobContext);
   } catch (Throwable e) {
 // Aborting the job if the commit has failed
 LOG.error("Error while trying to commit job: {}, starting rollback 
changes for table: {}",
-jobContext.get().getJobID(), tableName, e);
+jobContext.getJobID(), tableName, e);
 try {
-  committer.abortJob(jobContext.get(), JobStatus.State.FAILED);
+  committer.abortJob(jobContext, JobStatus.State.FAILED);

Review Comment:
   Shall we abort all of the other jobs as well?
   What is our strategy here?





Issue Time Tracking
---

Worklog Id: (was: 781980)
Time Spent: 50m  (was: 40m)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=781978=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-781978
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 16/Jun/22 09:28
Start Date: 16/Jun/22 09:28
Worklog Time Spent: 10m 
  Work Description: pvary commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r898887564


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##
@@ -411,23 +411,27 @@ public boolean commitInMoveTask() {
   public void storageHandlerCommit(Properties commitProperties, boolean 
overwrite) throws HiveException {
 String tableName = commitProperties.getProperty(Catalogs.NAME);
 Configuration configuration = SessionState.getSessionConf();
-Optional jobContext = generateJobContext(configuration, 
tableName, overwrite);
-if (jobContext.isPresent()) {
+Optional> jobContextList = 
generateJobContext(configuration, tableName, overwrite);
+if (!jobContextList.isPresent()) {
+  return;
+}
+
+for (JobContext jobContext : jobContextList.get()) {
   OutputCommitter committer = new HiveIcebergOutputCommitter();
   try {
-committer.commitJob(jobContext.get());
+committer.commitJob(jobContext);
   } catch (Throwable e) {
 // Aborting the job if the commit has failed
 LOG.error("Error while trying to commit job: {}, starting rollback 
changes for table: {}",
-jobContext.get().getJobID(), tableName, e);
+jobContext.getJobID(), tableName, e);

Review Comment:
   please remove the extra spaces



##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##
@@ -411,23 +411,27 @@ public boolean commitInMoveTask() {
   public void storageHandlerCommit(Properties commitProperties, boolean 
overwrite) throws HiveException {
 String tableName = commitProperties.getProperty(Catalogs.NAME);
 Configuration configuration = SessionState.getSessionConf();
-Optional jobContext = generateJobContext(configuration, 
tableName, overwrite);
-if (jobContext.isPresent()) {
+Optional> jobContextList = 
generateJobContext(configuration, tableName, overwrite);
+if (!jobContextList.isPresent()) {
+  return;
+}
+
+for (JobContext jobContext : jobContextList.get()) {
   OutputCommitter committer = new HiveIcebergOutputCommitter();
   try {
-committer.commitJob(jobContext.get());
+committer.commitJob(jobContext);
   } catch (Throwable e) {
 // Aborting the job if the commit has failed
 LOG.error("Error while trying to commit job: {}, starting rollback 
changes for table: {}",
-jobContext.get().getJobID(), tableName, e);
+jobContext.getJobID(), tableName, e);
 try {
-  committer.abortJob(jobContext.get(), JobStatus.State.FAILED);
+  committer.abortJob(jobContext, JobStatus.State.FAILED);
 } catch (IOException ioe) {
   LOG.error("Error while trying to abort failed job. There might be 
uncleaned data files.", ioe);
   // no throwing here because the original exception should be 
propagated
 }
 throw new HiveException(
-"Error committing job: " + jobContext.get().getJobID() + " for 
table: " + tableName, e);
+"Error committing job: " + jobContext.getJobID() + " for 
table: " + tableName, e);

Review Comment:
   please remove the extra spaces





Issue Time Tracking
---

Worklog Id: (was: 781978)
Time Spent: 40m  (was: 0.5h)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=781977=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-781977
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 16/Jun/22 09:27
Start Date: 16/Jun/22 09:27
Worklog Time Spent: 10m 
  Work Description: pvary commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r898886482


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -325,7 +339,15 @@ private void commitTable(FileIO io, ExecutorService 
executor, JobContext jobCont
 LOG.info("Committing job has started for table: {}, using location: {}",
 table, generateJobLocation(location, conf, jobContext.getJobID()));
 
-int numTasks = SessionStateUtil.getCommitInfo(conf, name).map(info -> 
info.getTaskNum()).orElseGet(() -> {
+Optional commitInfo;
+if (SessionStateUtil.getCommitInfo(conf, name).isPresent()) {
+  commitInfo = SessionStateUtil.getCommitInfo(conf, name).get()
+  .stream().filter(ci -> 
ci.getJobIdStr().equals(jobContext.getJobID().toString())).findFirst();

Review Comment:
   Are we sure that the first `commitInfo` has the same number of tasks as all 
of them?





Issue Time Tracking
---

Worklog Id: (was: 781977)
Time Spent: 0.5h  (was: 20m)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=781973=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-781973
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 16/Jun/22 09:22
Start Date: 16/Jun/22 09:22
Worklog Time Spent: 10m 
  Work Description: pvary commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r898882678


##
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##
@@ -127,14 +130,23 @@ public void commitTask(TaskAttemptContext 
originalContext) throws IOException {
   .run(output -> {
 Table table = 
HiveIcebergStorageHandler.table(context.getJobConf(), output);
 if (table != null) {
-  HiveIcebergWriter writer = writers.get(output);
+  Collection dataFiles = Lists.newArrayList();
+  Collection deleteFiles = Lists.newArrayList();
   String fileForCommitLocation = 
generateFileForCommitLocation(table.location(), jobConf,
-  attemptID.getJobID(), attemptID.getTaskID().getId());
-  if (writer != null) {
-createFileForCommit(writer.files(), fileForCommitLocation, 
table.io());
-  } else {
+  attemptID.getJobID(), attemptID.getTaskID().getId());
+  if (writers.get(output) != null) {
+for (HiveIcebergWriter writer : writers.get(output)) {
+  if (writer != null) {
+dataFiles.addAll(writer.files().dataFiles());

Review Comment:
   Is there any place where we use the `writer.files()` directly instead of 
calling `dataFiles()`, `deleteFiles()`? We might want to remove the unnecessary 
object creation then.





Issue Time Tracking
---

Worklog Id: (was: 781973)
Time Spent: 20m  (was: 10m)

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>  Components: File Formats
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early

2022-06-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=780788=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-780788
 ]

ASF GitHub Bot logged work on HIVE-26319:
-

Author: ASF GitHub Bot
Created on: 13/Jun/22 11:46
Start Date: 13/Jun/22 11:46
Worklog Time Spent: 10m 
  Work Description: kasakrisz opened a new pull request, #3362:
URL: https://github.com/apache/hive/pull/3362

   ### What changes were proposed in this pull request?
   Rewrite update statements of iceberg tables to multi insert statement 
similarly in case of native acid tables.
   
   When generating the rewritten statement:
   * Get the virtual columns from the table's storage handler in case of non 
native acid tables
   * Include the old values to the select clause of the delete branch of the 
multi insert statement.
   
   When executing the multi insert:
   * Two iceberg writers are used which produce a data delta file and a delete 
delta file. The result of these writers should be merged into one 
`FilesForCommit` if both writers are run in the same task.
   * In case of more complex statements (ex. partitioned and/or bucketed) more 
than one Tez task produces commit info so this patch enables storing all of 
them.
   * Every `FileSinkOperator` creates its own jobConf instance because the 
iceberg write operation is stored in it and it is different in both instance.
   
   
   ### Why are the changes needed?
   See #2855
   + Preparation for iceberg Merge implementation.
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   ### How was this patch tested?
   ```
   mvn test -Dtest.output.overwrite -DskipSparkTests 
-Dtest=TestIcebergLlapLocalCliDriver -Dqfile=update_iceberg_partitioned_orc2.q 
-pl itests/qtest-iceberg -Piceberg -Pitests
   ```




Issue Time Tracking
---

Worklog Id: (was: 780788)
Remaining Estimate: 0h
Time Spent: 10m

> Iceberg integration: Perform update split early
> ---
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
>  Issue Type: Improvement
>Reporter: Krisztian Kasa
>Assignee: Krisztian Kasa
>Priority: Major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native 
> acid tables



--
This message was sent by Atlassian Jira
(v8.20.7#820007)