[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
Title: Message Title ASF GitHub Bot added a worklog on HIVE-26319 kasakrisz merged PR #3362: URL: https://github.com/apache/hive/pull/3362 Worklog entry: 10m logged by ASF GitHub Bot Change By: ASF GitHub Bot Time Spent: 8h 10m Add Comment This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
Title: Message Title ASF GitHub Bot added a worklog on HIVE-26319 kasakrisz commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r908335271 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -162,53 +168,102 @@ public void abortTask(TaskAttemptContext originalContext) throws IOException { TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(originalContext); // Clean up writer data from the local store Map writers = WriterRegistry.removeWriters(context.getTaskAttemptID()); + Map> writerMap = WriterRegistry.removeWriters(context.getTaskAttemptID()); // Remove files if it was not done already if (writers != null) { for (HiveIcebergWriter writer : writers.values()) { writer.close(true); + if (writerMap != null) { + for (List writerList : writerMap.values()) Unknown macro: {+ for (HiveIcebergWriter writer } } } + @Override + public void commitJob(JobContext originalContext) throws IOException { + commitJobs(Collections.singletonList(originalContext)); + } + + /** + * Wrapper class for storing output {@link Table} and it's context for committing changes: + * JobContext, CommitInfo. + */ + private static class OutputTable { + private final String catalogName; + private final String tableName; + private final Table table; + private final JobContext jobContext; + private final SessionStateUtil.CommitInfo commitInfo; + + private OutputTable(String catalogName, String tableName, Table table, JobContext jobContext, + SessionStateUtil.CommitInfo commitInfo) { + this.catalogName = catalogName; + this.tableName = tableName; + this.table = table; + this.jobContext = jobContext; + this.commitInfo = commitInfo; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + OutputTable output1 = (OutputTable) o; + return Objects.equals(tableName, output1.tableName) && + Objects.equals(jobContext.getJobID(), output1.jobContext.getJobID()); + } + + @Override + public int hashCode() { + return Objects.hash(tableName, jobContext.getJobID()); + } + + public Optional getCommitInfo() { + return Optional.ofNullable(commitInfo); + } + } + /** Reads the commit files stored in the temp directories and collects the generated committed data files. Appends the data files to the tables. At the end removes the temporary directories. * @param originalContext The job context + * @param originalContextList The job context list @throws IOException if there is a failure accessing the files */ @Override public void commitJob(JobContext
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
Title: Message Title ASF GitHub Bot added a worklog on HIVE-26319 kasakrisz commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r908334326 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -231,25 +314,47 @@ public void commitJob(JobContext originalContext) throws IOException { */ @Override public void abortJob(JobContext originalContext, int status) throws IOException { - JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext); - JobConf jobConf = jobContext.getJobConf(); + abortJobs(Collections.singletonList(originalContext)); + } + + public void abortJobs(List jobContexts, JobStatus.State runState) throws IOException { + int state = runState.getValue(); + if (state != JobStatus.FAILED && state != JobStatus.KILLED) { + throw new IOException("Invalid job run state : " + runState.name()); + } else { + this.abortJobs(jobContexts); + } + } - LOG.info("Job {} is aborted. Data file cleaning started", jobContext.getJobID()); - Collection outputs = HiveIcebergStorageHandler.outputTables(jobContext.getJobConf()); + public void abortJobs(List originalContextList) throws IOException { + if (originalContextList.isEmpty()) { + return; Review Comment: Removed the empty list check since it is checked in the caller.## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -231,25 +314,47 @@ public void commitJob(JobContext originalContext) throws IOException { */ @Override public void abortJob(JobContext originalContext, int status) throws IOException {- JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext);- JobConf jobConf = jobContext.getJobConf();+ abortJobs(Collections.singletonList(originalContext));+ } + + public void abortJobs(List jobContexts, JobStatus.State runState) throws IOException { + int state = runState.getValue(); + if (state != JobStatus.FAILED && state != JobStatus.KILLED) { + throw new IOException("Invalid job run state : " + runState.name()); + } else { + this.abortJobs(jobContexts); + } + } LOG.info("Job {} is aborted. Data file cleaning started", jobContext.getJobID()); Collection outputs = HiveIcebergStorageHandler.outputTables(jobContext.getJobConf()); + public void abortJobs(List originalContextList) throws IOException { + if (originalContextList.isEmpty()) { + return; + } + + List jobContextList = originalContextList.stream() + .map(TezUtil::enrichContextWithVertexId) + .collect(Collectors.toList()); + String ids = jobContextList.stream() + .map(jobContext -> jobContext.getJobID().toString()).collect(Collectors.joining(",")); + Set outputs = collectOutputs(jobContextList); + + LOG.info("Job(s) {} are aborted. Data file cleaning started", ids); Collection jobLocations = new ConcurrentLinkedQueue<>(); ExecutorService fileExecutor = fileExecutor(jobConf); ExecutorService tableExecutor = tableExecutor(jobConf, outputs.size()); + ExecutorService fileExecutor = fileExecutor(jobContextList.get(0).getJobConf()); + ExecutorService tableExecutor = tableExecutor(jobContextList.get(0).getJobConf(), outputs.size()); try { // Cleans up the changes for the output tables in parallel Tasks.foreach(outputs) .suppressFailureWhenFinished() .executeWith(tableExecutor) .onFailure((output, exc) -> LOG.warn("Failed cleanup table {} on abort job", output, exc)) .run(output -> { + JobContext jobContext = output.jobContext; + JobConf jobConf = output.jobContext.getJobConf(); Review Comment: fixed
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
Title: Message Title ASF GitHub Bot added a worklog on HIVE-26319 pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r908302321 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -231,25 +314,47 @@ public void commitJob(JobContext originalContext) throws IOException { */ @Override public void abortJob(JobContext originalContext, int status) throws IOException { - JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext); - JobConf jobConf = jobContext.getJobConf(); + abortJobs(Collections.singletonList(originalContext)); + } + + public void abortJobs(List jobContexts, JobStatus.State runState) throws IOException { + int state = runState.getValue(); + if (state != JobStatus.FAILED && state != JobStatus.KILLED) { Review Comment: Isn't it only us who is calling this method, and we only call it with `FAILED`. Worklog entry: 10m logged by ASF GitHub Bot Change By: ASF GitHub Bot Time Spent: 7.5h 7h 40m Add Comment This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
Title: Message Title ASF GitHub Bot added a worklog on HIVE-26319 kasakrisz commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r908296153 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -231,25 +314,47 @@ public void commitJob(JobContext originalContext) throws IOException { */ @Override public void abortJob(JobContext originalContext, int status) throws IOException { - JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext); - JobConf jobConf = jobContext.getJobConf(); + abortJobs(Collections.singletonList(originalContext)); + } + + public void abortJobs(List jobContexts, JobStatus.State runState) throws IOException { + int state = runState.getValue(); + if (state != JobStatus.FAILED && state != JobStatus.KILLED) { Review Comment: This logic comes from hadoop. https://github.com/apache/hadoop/blob/a177232ebc3be826774701aefe63ed30a6b36fe7/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java#L304 I think only a running job can be aborted. Worklog entry: 10m logged by ASF GitHub Bot Change By: ASF GitHub Bot Time Spent: 7h 20m 7.5h Add Comment This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
Title: Message Title ASF GitHub Bot added a worklog on HIVE-26319 pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r908295358 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -162,53 +168,102 @@ public void abortTask(TaskAttemptContext originalContext) throws IOException { TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(originalContext); // Clean up writer data from the local store Map writers = WriterRegistry.removeWriters(context.getTaskAttemptID()); + Map> writerMap = WriterRegistry.removeWriters(context.getTaskAttemptID()); // Remove files if it was not done already if (writers != null) { for (HiveIcebergWriter writer : writers.values()) { writer.close(true); + if (writerMap != null) { + for (List writerList : writerMap.values()) Unknown macro: {+ for (HiveIcebergWriter writer } } } + @Override + public void commitJob(JobContext originalContext) throws IOException { + commitJobs(Collections.singletonList(originalContext)); + } + + /** + * Wrapper class for storing output {@link Table} and it's context for committing changes: + * JobContext, CommitInfo. + */ + private static class OutputTable { + private final String catalogName; + private final String tableName; + private final Table table; + private final JobContext jobContext; + private final SessionStateUtil.CommitInfo commitInfo; + + private OutputTable(String catalogName, String tableName, Table table, JobContext jobContext, + SessionStateUtil.CommitInfo commitInfo) { + this.catalogName = catalogName; + this.tableName = tableName; + this.table = table; + this.jobContext = jobContext; + this.commitInfo = commitInfo; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + OutputTable output1 = (OutputTable) o; + return Objects.equals(tableName, output1.tableName) && + Objects.equals(jobContext.getJobID(), output1.jobContext.getJobID()); + } + + @Override + public int hashCode() { + return Objects.hash(tableName, jobContext.getJobID()); + } + + public Optional getCommitInfo() { + return Optional.ofNullable(commitInfo); + } + } + /** Reads the commit files stored in the temp directories and collects the generated committed data files. Appends the data files to the tables. At the end removes the temporary directories. * @param originalContext The job context + * @param originalContextList The job context list @throws IOException if there is a failure accessing the files */ @Override public void commitJob(JobContext
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
Title: Message Title ASF GitHub Bot added a worklog on HIVE-26319 kasakrisz commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r908287057 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -162,53 +168,102 @@ public void abortTask(TaskAttemptContext originalContext) throws IOException { TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(originalContext); // Clean up writer data from the local store Map writers = WriterRegistry.removeWriters(context.getTaskAttemptID()); + Map> writerMap = WriterRegistry.removeWriters(context.getTaskAttemptID()); // Remove files if it was not done already if (writers != null) { for (HiveIcebergWriter writer : writers.values()) { writer.close(true); + if (writerMap != null) { + for (List writerList : writerMap.values()) Unknown macro: {+ for (HiveIcebergWriter writer } } } + @Override + public void commitJob(JobContext originalContext) throws IOException { + commitJobs(Collections.singletonList(originalContext)); + } + + /** + * Wrapper class for storing output {@link Table} and it's context for committing changes: + * JobContext, CommitInfo. + */ + private static class OutputTable { + private final String catalogName; + private final String tableName; + private final Table table; + private final JobContext jobContext; + private final SessionStateUtil.CommitInfo commitInfo; + + private OutputTable(String catalogName, String tableName, Table table, JobContext jobContext, + SessionStateUtil.CommitInfo commitInfo) { + this.catalogName = catalogName; + this.tableName = tableName; + this.table = table; + this.jobContext = jobContext; + this.commitInfo = commitInfo; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + OutputTable output1 = (OutputTable) o; + return Objects.equals(tableName, output1.tableName) && + Objects.equals(jobContext.getJobID(), output1.jobContext.getJobID()); + } + + @Override + public int hashCode() { + return Objects.hash(tableName, jobContext.getJobID()); + } + + public Optional getCommitInfo() { + return Optional.ofNullable(commitInfo); + } + } + /** Reads the commit files stored in the temp directories and collects the generated committed data files. Appends the data files to the tables. At the end removes the temporary directories. * @param originalContext The job context + * @param originalContextList The job context list @throws IOException if there is a failure accessing the files */ @Override public void commitJob(JobContext
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
Title: Message Title ASF GitHub Bot added a worklog on HIVE-26319 pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r908277556 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -231,25 +314,47 @@ public void commitJob(JobContext originalContext) throws IOException { */ @Override public void abortJob(JobContext originalContext, int status) throws IOException { - JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext); - JobConf jobConf = jobContext.getJobConf(); + abortJobs(Collections.singletonList(originalContext)); + } + + public void abortJobs(List jobContexts, JobStatus.State runState) throws IOException { + int state = runState.getValue(); + if (state != JobStatus.FAILED && state != JobStatus.KILLED) { + throw new IOException("Invalid job run state : " + runState.name()); + } else { + this.abortJobs(jobContexts); + } + } LOG.info("Job {} is aborted. Data file cleaning started", jobContext.getJobID()); Collection outputs = HiveIcebergStorageHandler.outputTables(jobContext.getJobConf()); + public void abortJobs(List originalContextList) throws IOException { + if (originalContextList.isEmpty()) { + return; + } + + List jobContextList = originalContextList.stream() + .map(TezUtil::enrichContextWithVertexId) + .collect(Collectors.toList()); + String ids = jobContextList.stream() + .map(jobContext -> jobContext.getJobID().toString()).collect(Collectors.joining(",")); + Set outputs = collectOutputs(jobContextList); + + LOG.info("Job(s) {} are aborted. Data file cleaning started", ids); Collection jobLocations = new ConcurrentLinkedQueue<>(); ExecutorService fileExecutor = fileExecutor(jobConf); ExecutorService tableExecutor = tableExecutor(jobConf, outputs.size()); + ExecutorService fileExecutor = fileExecutor(jobContextList.get(0).getJobConf()); + ExecutorService tableExecutor = tableExecutor(jobContextList.get(0).getJobConf(), outputs.size()); try { // Cleans up the changes for the output tables in parallel Tasks.foreach(outputs) .suppressFailureWhenFinished() .executeWith(tableExecutor) .onFailure((output, exc) -> LOG.warn("Failed cleanup table {} on abort job", output, exc)) .run(output -> { + JobContext jobContext = output.jobContext; + JobConf jobConf = output.jobContext.getJobConf(); Review Comment: nit?: ``` JobConf jobConf = jobContext.getJobConf(); ``` Worklog entry: 10m logged by ASF GitHub Bot Change By: ASF GitHub Bot Time Spent: 6h 40m 50m
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
Title: Message Title ASF GitHub Bot added a worklog on HIVE-26319 pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r908278352 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -231,25 +314,47 @@ public void commitJob(JobContext originalContext) throws IOException { */ @Override public void abortJob(JobContext originalContext, int status) throws IOException { - JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext); - JobConf jobConf = jobContext.getJobConf(); + abortJobs(Collections.singletonList(originalContext)); + } + + public void abortJobs(List jobContexts, JobStatus.State runState) throws IOException { + int state = runState.getValue(); + if (state != JobStatus.FAILED && state != JobStatus.KILLED) { + throw new IOException("Invalid job run state : " + runState.name()); + } else { + this.abortJobs(jobContexts); + } + } LOG.info("Job {} is aborted. Data file cleaning started", jobContext.getJobID()); Collection outputs = HiveIcebergStorageHandler.outputTables(jobContext.getJobConf()); + public void abortJobs(List originalContextList) throws IOException { + if (originalContextList.isEmpty()) { + return; Review Comment: maybe debug log here? Worklog entry: 10m logged by ASF GitHub Bot Change By: ASF GitHub Bot Time Spent: 6h 50m 7h Add Comment This message was sent by Atlassian Jira
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
Title: Message Title ASF GitHub Bot added a worklog on HIVE-26319 pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r908275092 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -162,53 +168,102 @@ public void abortTask(TaskAttemptContext originalContext) throws IOException { TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(originalContext); // Clean up writer data from the local store Map writers = WriterRegistry.removeWriters(context.getTaskAttemptID()); + Map> writerMap = WriterRegistry.removeWriters(context.getTaskAttemptID()); // Remove files if it was not done already if (writers != null) { for (HiveIcebergWriter writer : writers.values()) { writer.close(true); + if (writerMap != null) { + for (List writerList : writerMap.values()) Unknown macro: {+ for (HiveIcebergWriter writer } } } + @Override + public void commitJob(JobContext originalContext) throws IOException { + commitJobs(Collections.singletonList(originalContext)); + } + + /** + * Wrapper class for storing output {@link Table} and it's context for committing changes: + * JobContext, CommitInfo. + */ + private static class OutputTable { + private final String catalogName; + private final String tableName; + private final Table table; + private final JobContext jobContext; + private final SessionStateUtil.CommitInfo commitInfo; + + private OutputTable(String catalogName, String tableName, Table table, JobContext jobContext, + SessionStateUtil.CommitInfo commitInfo) { + this.catalogName = catalogName; + this.tableName = tableName; + this.table = table; + this.jobContext = jobContext; + this.commitInfo = commitInfo; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + OutputTable output1 = (OutputTable) o; + return Objects.equals(tableName, output1.tableName) && + Objects.equals(jobContext.getJobID(), output1.jobContext.getJobID()); + } + + @Override + public int hashCode() { + return Objects.hash(tableName, jobContext.getJobID()); + } + + public Optional getCommitInfo() { + return Optional.ofNullable(commitInfo); + } + } + /** Reads the commit files stored in the temp directories and collects the generated committed data files. Appends the data files to the tables. At the end removes the temporary directories. * @param originalContext The job context + * @param originalContextList The job context list @throws IOException if there is a failure accessing the files */ @Override public void commitJob(JobContext
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
Title: Message Title ASF GitHub Bot added a worklog on HIVE-26319 pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r908273788 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -231,25 +314,47 @@ public void commitJob(JobContext originalContext) throws IOException { */ @Override public void abortJob(JobContext originalContext, int status) throws IOException { - JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext); - JobConf jobConf = jobContext.getJobConf(); + abortJobs(Collections.singletonList(originalContext)); + } + + public void abortJobs(List jobContexts, JobStatus.State runState) throws IOException { + int state = runState.getValue(); + if (state != JobStatus.FAILED && state != JobStatus.KILLED) { Review Comment: Why is this invalid? Worklog entry: 10m logged by ASF GitHub Bot Change By: ASF GitHub Bot Time Spent: 6h 20m 6.5h Add Comment This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
Title: Message Title ASF GitHub Bot added a worklog on HIVE-26319 pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r908269817 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -162,53 +168,102 @@ public void abortTask(TaskAttemptContext originalContext) throws IOException { TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(originalContext); // Clean up writer data from the local store Map writers = WriterRegistry.removeWriters(context.getTaskAttemptID()); + Map> writerMap = WriterRegistry.removeWriters(context.getTaskAttemptID()); // Remove files if it was not done already if (writers != null) { for (HiveIcebergWriter writer : writers.values()) { writer.close(true); + if (writerMap != null) { + for (List writerList : writerMap.values()) Unknown macro: {+ for (HiveIcebergWriter writer } } } + @Override + public void commitJob(JobContext originalContext) throws IOException { + commitJobs(Collections.singletonList(originalContext)); + } + + /** + * Wrapper class for storing output {@link Table} and it's context for committing changes: + * JobContext, CommitInfo. + */ + private static class OutputTable { + private final String catalogName; + private final String tableName; + private final Table table; + private final JobContext jobContext; + private final SessionStateUtil.CommitInfo commitInfo; + + private OutputTable(String catalogName, String tableName, Table table, JobContext jobContext, + SessionStateUtil.CommitInfo commitInfo) { + this.catalogName = catalogName; + this.tableName = tableName; + this.table = table; + this.jobContext = jobContext; + this.commitInfo = commitInfo; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + OutputTable output1 = (OutputTable) o; + return Objects.equals(tableName, output1.tableName) && + Objects.equals(jobContext.getJobID(), output1.jobContext.getJobID()); + } + + @Override + public int hashCode() { + return Objects.hash(tableName, jobContext.getJobID()); + } + + public Optional getCommitInfo() { + return Optional.ofNullable(commitInfo); + } + } + /** Reads the commit files stored in the temp directories and collects the generated committed data files. Appends the data files to the tables. At the end removes the temporary directories. * @param originalContext The job context + * @param originalContextList The job context list @throws IOException if there is a failure accessing the files */ @Override public void commitJob(JobContext
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785165=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785165 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 27/Jun/22 16:04 Start Date: 27/Jun/22 16:04 Worklog Time Spent: 10m Work Description: kasakrisz commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r907549481 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -127,14 +133,23 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException { .run(output -> { Table table = HiveIcebergStorageHandler.table(context.getJobConf(), output); if (table != null) { - HiveIcebergWriter writer = writers.get(output); + Collection dataFiles = Lists.newArrayList(); + Collection deleteFiles = Lists.newArrayList(); String fileForCommitLocation = generateFileForCommitLocation(table.location(), jobConf, - attemptID.getJobID(), attemptID.getTaskID().getId()); - if (writer != null) { -createFileForCommit(writer.files(), fileForCommitLocation, table.io()); - } else { + attemptID.getJobID(), attemptID.getTaskID().getId()); + if (writers.get(output) != null) { +for (HiveIcebergWriter writer : writers.get(output)) { + if (writer != null) { +dataFiles.addAll(writer.files().dataFiles()); +deleteFiles.addAll(writer.files().deleteFiles()); Review Comment: Reverted this. ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -162,53 +177,96 @@ public void abortTask(TaskAttemptContext originalContext) throws IOException { TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(originalContext); // Clean up writer data from the local store -Map writers = WriterRegistry.removeWriters(context.getTaskAttemptID()); +Map> writerMap = WriterRegistry.removeWriters(context.getTaskAttemptID()); // Remove files if it was not done already -if (writers != null) { - for (HiveIcebergWriter writer : writers.values()) { -writer.close(true); +if (writerMap != null) { + for (List writerList : writerMap.values()) { +for (HiveIcebergWriter writer : writerList) { + writer.close(true); +} } } } + @Override + public void commitJob(JobContext originalContext) throws IOException { +commitJobs(Collections.singletonList(originalContext)); + } + + private static class OutputTable { Review Comment: Added. Issue Time Tracking --- Worklog Id: (was: 785165) Time Spent: 6h (was: 5h 50m) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 6h > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785167=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785167 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 27/Jun/22 16:05 Start Date: 27/Jun/22 16:05 Worklog Time Spent: 10m Work Description: kasakrisz commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r907550930 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -305,13 +414,14 @@ private Set listForCommits(JobConf jobConf, String jobLocation) thro * Collects the additions to a single table and adds/commits the new files to the Iceberg table. * @param io The io to read the forCommit files * @param executor The executor used to read the forCommit files - * @param jobContext The job context - * @param name The name of the table used for loading from the catalog + * @param outputTable The table used for loading from the catalog * @param location The location of the table used for loading from the catalog * @param catalogName The name of the catalog that contains the table */ - private void commitTable(FileIO io, ExecutorService executor, JobContext jobContext, String name, String location, + private void commitTable(FileIO io, ExecutorService executor, OutputTable outputTable, String location, Review Comment: Moved `catalogName` into `OutputTable` and `location` can be accessed like ``` outputTable.table.location() ``` Issue Time Tracking --- Worklog Id: (was: 785167) Time Spent: 6h 10m (was: 6h) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 6h 10m > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785164=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785164 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 27/Jun/22 16:04 Start Date: 27/Jun/22 16:04 Worklog Time Spent: 10m Work Description: kasakrisz commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r907549301 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -127,14 +133,23 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException { .run(output -> { Table table = HiveIcebergStorageHandler.table(context.getJobConf(), output); if (table != null) { - HiveIcebergWriter writer = writers.get(output); + Collection dataFiles = Lists.newArrayList(); + Collection deleteFiles = Lists.newArrayList(); String fileForCommitLocation = generateFileForCommitLocation(table.location(), jobConf, - attemptID.getJobID(), attemptID.getTaskID().getId()); - if (writer != null) { -createFileForCommit(writer.files(), fileForCommitLocation, table.io()); - } else { + attemptID.getJobID(), attemptID.getTaskID().getId()); + if (writers.get(output) != null) { Review Comment: The writer can be null if we don't write anything like ``` INSERT OVERWRITE TABLE target SELECT * FROM source WHERE FALSE ``` In this case output is not null but the writers map does not have an entry for the key. I moved the null check before the for loop iterates through the writers. I think it does not worth converting this loop to use the streaming api because `createFileForCommit` can throw `IOException` and IMHO this would add additional complexity because of the try-catch block in the lambda expression. Issue Time Tracking --- Worklog Id: (was: 785164) Time Spent: 5h 50m (was: 5h 40m) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 5h 50m > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785160=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785160 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 27/Jun/22 15:58 Start Date: 27/Jun/22 15:58 Worklog Time Spent: 10m Work Description: kasakrisz commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r907544276 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -127,14 +133,23 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException { .run(output -> { Table table = HiveIcebergStorageHandler.table(context.getJobConf(), output); if (table != null) { - HiveIcebergWriter writer = writers.get(output); + Collection dataFiles = Lists.newArrayList(); + Collection deleteFiles = Lists.newArrayList(); String fileForCommitLocation = generateFileForCommitLocation(table.location(), jobConf, Review Comment: This was used when the sorting of the delete branch was missing. Since sorting is a must we never generate a plan any more which can handle both branches in the mapper and commit files are generated by separate jobs so I reverted this change. Issue Time Tracking --- Worklog Id: (was: 785160) Time Spent: 5h 40m (was: 5.5h) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 5h 40m > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785159=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785159 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 27/Jun/22 15:56 Start Date: 27/Jun/22 15:56 Worklog Time Spent: 10m Work Description: kasakrisz commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r907542152 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -109,8 +115,8 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException { TaskAttemptID attemptID = context.getTaskAttemptID(); JobConf jobConf = context.getJobConf(); -Collection outputs = HiveIcebergStorageHandler.outputTables(context.getJobConf()); -Map writers = Optional.ofNullable(WriterRegistry.writers(attemptID)) +Set outputs = Sets.newHashSet(HiveIcebergStorageHandler.outputTables(context.getJobConf())); Review Comment: Changed Issue Time Tracking --- Worklog Id: (was: 785159) Time Spent: 5.5h (was: 5h 20m) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 5.5h > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785158=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785158 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 27/Jun/22 15:55 Start Date: 27/Jun/22 15:55 Worklog Time Spent: 10m Work Description: kasakrisz commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r907541930 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterRegistry.java: ## @@ -19,26 +19,29 @@ package org.apache.iceberg.mr.hive.writer; +import java.util.List; import java.util.Map; import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; public class WriterRegistry { - private static final Map> writers = Maps.newConcurrentMap(); + private static final Map>> writers = Maps.newConcurrentMap(); private WriterRegistry() { } - public static Map removeWriters(TaskAttemptID taskAttemptID) { + public static Map> removeWriters(TaskAttemptID taskAttemptID) { return writers.remove(taskAttemptID); } public static void registerWriter(TaskAttemptID taskAttemptID, String tableName, HiveIcebergWriter writer) { writers.putIfAbsent(taskAttemptID, Maps.newConcurrentMap()); -writers.get(taskAttemptID).put(tableName, writer); +writers.get(taskAttemptID).putIfAbsent(tableName, Lists.newArrayList()); Review Comment: Done Issue Time Tracking --- Worklog Id: (was: 785158) Time Spent: 5h 20m (was: 5h 10m) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 5h 20m > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785157=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785157 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 27/Jun/22 15:55 Start Date: 27/Jun/22 15:55 Worklog Time Spent: 10m Work Description: kasakrisz commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r907541673 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ## @@ -858,19 +861,25 @@ private static boolean hasParquetListColumnSupport(Properties tableProps, Schema * @param overwrite If we have to overwrite the existing table or just add the new data * @return The generated JobContext */ - private Optional generateJobContext(Configuration configuration, String tableName, boolean overwrite) { + private Optional> generateJobContext(Configuration configuration, String tableName, + boolean overwrite) { JobConf jobConf = new JobConf(configuration); -Optional commitInfo = SessionStateUtil.getCommitInfo(jobConf, tableName); -if (commitInfo.isPresent()) { - JobID jobID = JobID.forName(commitInfo.get().getJobIdStr()); - commitInfo.get().getProps().forEach(jobConf::set); - jobConf.setBoolean(InputFormatConfig.IS_OVERWRITE, overwrite); - - // we should only commit this current table because - // for multi-table inserts, this hook method will be called sequentially for each target table - jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName); - - return Optional.of(new JobContextImpl(jobConf, jobID, null)); +Optional> commitInfoMap = +SessionStateUtil.getCommitInfo(jobConf, tableName); +if (commitInfoMap.isPresent()) { + List jobContextList = Lists.newLinkedList(); + for (SessionStateUtil.CommitInfo commitInfo : commitInfoMap.get().values()) { +JobID jobID = JobID.forName(commitInfo.getJobIdStr()); +commitInfo.getProps().forEach(jobConf::set); +jobConf.setBoolean(InputFormatConfig.IS_OVERWRITE, overwrite); + +// we should only commit this current table because +// for multi-table inserts, this hook method will be called sequentially for each target table +jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName); + +jobContextList.add(new JobContextImpl(jobConf, jobID, null)); + } + return Optional.of(jobContextList); Review Comment: changed return type from `Optional` to `List` Issue Time Tracking --- Worklog Id: (was: 785157) Time Spent: 5h 10m (was: 5h) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 5h 10m > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785156=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785156 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 27/Jun/22 15:54 Start Date: 27/Jun/22 15:54 Worklog Time Spent: 10m Work Description: kasakrisz commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r907540855 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ## @@ -858,19 +861,25 @@ private static boolean hasParquetListColumnSupport(Properties tableProps, Schema * @param overwrite If we have to overwrite the existing table or just add the new data * @return The generated JobContext Review Comment: Done Issue Time Tracking --- Worklog Id: (was: 785156) Time Spent: 5h (was: 4h 50m) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 5h > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785071=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785071 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 27/Jun/22 12:34 Start Date: 27/Jun/22 12:34 Worklog Time Spent: 10m Work Description: pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r907336065 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -305,13 +414,14 @@ private Set listForCommits(JobConf jobConf, String jobLocation) thro * Collects the additions to a single table and adds/commits the new files to the Iceberg table. * @param io The io to read the forCommit files * @param executor The executor used to read the forCommit files - * @param jobContext The job context - * @param name The name of the table used for loading from the catalog + * @param outputTable The table used for loading from the catalog * @param location The location of the table used for loading from the catalog * @param catalogName The name of the catalog that contains the table */ - private void commitTable(FileIO io, ExecutorService executor, JobContext jobContext, String name, String location, + private void commitTable(FileIO io, ExecutorService executor, OutputTable outputTable, String location, Review Comment: Would it make sense to put the `location` and the `catalogName` to the `OutputTable` object? Issue Time Tracking --- Worklog Id: (was: 785071) Time Spent: 4h 50m (was: 4h 40m) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 4h 50m > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785069=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785069 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 27/Jun/22 12:31 Start Date: 27/Jun/22 12:31 Worklog Time Spent: 10m Work Description: pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r907332483 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -162,53 +177,96 @@ public void abortTask(TaskAttemptContext originalContext) throws IOException { TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(originalContext); // Clean up writer data from the local store -Map writers = WriterRegistry.removeWriters(context.getTaskAttemptID()); +Map> writerMap = WriterRegistry.removeWriters(context.getTaskAttemptID()); // Remove files if it was not done already -if (writers != null) { - for (HiveIcebergWriter writer : writers.values()) { -writer.close(true); +if (writerMap != null) { + for (List writerList : writerMap.values()) { +for (HiveIcebergWriter writer : writerList) { + writer.close(true); +} } } } + @Override + public void commitJob(JobContext originalContext) throws IOException { +commitJobs(Collections.singletonList(originalContext)); + } + + private static class OutputTable { Review Comment: Some javadoc would be nice Issue Time Tracking --- Worklog Id: (was: 785069) Time Spent: 4h 40m (was: 4.5h) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 4h 40m > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785068=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785068 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 27/Jun/22 12:29 Start Date: 27/Jun/22 12:29 Worklog Time Spent: 10m Work Description: pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r90733 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -127,14 +133,23 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException { .run(output -> { Table table = HiveIcebergStorageHandler.table(context.getJobConf(), output); if (table != null) { - HiveIcebergWriter writer = writers.get(output); + Collection dataFiles = Lists.newArrayList(); + Collection deleteFiles = Lists.newArrayList(); String fileForCommitLocation = generateFileForCommitLocation(table.location(), jobConf, - attemptID.getJobID(), attemptID.getTaskID().getId()); - if (writer != null) { -createFileForCommit(writer.files(), fileForCommitLocation, table.io()); - } else { + attemptID.getJobID(), attemptID.getTaskID().getId()); + if (writers.get(output) != null) { Review Comment: Maybe when we do not do the null checks we can use the streaming api to simplify the code Issue Time Tracking --- Worklog Id: (was: 785068) Time Spent: 4.5h (was: 4h 20m) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 4.5h > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785066=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785066 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 27/Jun/22 12:29 Start Date: 27/Jun/22 12:29 Worklog Time Spent: 10m Work Description: pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r907330452 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -127,14 +133,23 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException { .run(output -> { Table table = HiveIcebergStorageHandler.table(context.getJobConf(), output); if (table != null) { - HiveIcebergWriter writer = writers.get(output); + Collection dataFiles = Lists.newArrayList(); + Collection deleteFiles = Lists.newArrayList(); String fileForCommitLocation = generateFileForCommitLocation(table.location(), jobConf, - attemptID.getJobID(), attemptID.getTaskID().getId()); - if (writer != null) { -createFileForCommit(writer.files(), fileForCommitLocation, table.io()); - } else { + attemptID.getJobID(), attemptID.getTaskID().getId()); + if (writers.get(output) != null) { +for (HiveIcebergWriter writer : writers.get(output)) { + if (writer != null) { +dataFiles.addAll(writer.files().dataFiles()); +deleteFiles.addAll(writer.files().deleteFiles()); Review Comment: `writer.files()` might create an object every time. We should keep the object and reuse it. Issue Time Tracking --- Worklog Id: (was: 785066) Time Spent: 4h 20m (was: 4h 10m) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 4h 20m > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785063=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785063 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 27/Jun/22 12:26 Start Date: 27/Jun/22 12:26 Worklog Time Spent: 10m Work Description: pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r907328344 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -127,14 +133,23 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException { .run(output -> { Table table = HiveIcebergStorageHandler.table(context.getJobConf(), output); if (table != null) { - HiveIcebergWriter writer = writers.get(output); + Collection dataFiles = Lists.newArrayList(); + Collection deleteFiles = Lists.newArrayList(); String fileForCommitLocation = generateFileForCommitLocation(table.location(), jobConf, - attemptID.getJobID(), attemptID.getTaskID().getId()); - if (writer != null) { -createFileForCommit(writer.files(), fileForCommitLocation, table.io()); - } else { + attemptID.getJobID(), attemptID.getTaskID().getId()); + if (writers.get(output) != null) { Review Comment: Do we put `null` values to the map? Do we really need the null check? Issue Time Tracking --- Worklog Id: (was: 785063) Time Spent: 4h 10m (was: 4h) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 4h 10m > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785062=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785062 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 27/Jun/22 12:25 Start Date: 27/Jun/22 12:25 Worklog Time Spent: 10m Work Description: pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r907327310 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -127,14 +133,23 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException { .run(output -> { Table table = HiveIcebergStorageHandler.table(context.getJobConf(), output); if (table != null) { - HiveIcebergWriter writer = writers.get(output); + Collection dataFiles = Lists.newArrayList(); + Collection deleteFiles = Lists.newArrayList(); String fileForCommitLocation = generateFileForCommitLocation(table.location(), jobConf, Review Comment: Is this used? Issue Time Tracking --- Worklog Id: (was: 785062) Time Spent: 4h (was: 3h 50m) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 4h > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785059=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785059 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 27/Jun/22 12:22 Start Date: 27/Jun/22 12:22 Worklog Time Spent: 10m Work Description: pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r907324564 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -109,8 +115,8 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException { TaskAttemptID attemptID = context.getTaskAttemptID(); JobConf jobConf = context.getJobConf(); -Collection outputs = HiveIcebergStorageHandler.outputTables(context.getJobConf()); -Map writers = Optional.ofNullable(WriterRegistry.writers(attemptID)) +Set outputs = Sets.newHashSet(HiveIcebergStorageHandler.outputTables(context.getJobConf())); Review Comment: Could we change the return type for `outputTables` to `Set`? Issue Time Tracking --- Worklog Id: (was: 785059) Time Spent: 3h 50m (was: 3h 40m) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 3h 50m > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785058=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785058 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 27/Jun/22 12:19 Start Date: 27/Jun/22 12:19 Worklog Time Spent: 10m Work Description: pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r907321359 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterRegistry.java: ## @@ -19,26 +19,29 @@ package org.apache.iceberg.mr.hive.writer; +import java.util.List; import java.util.Map; import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; public class WriterRegistry { - private static final Map> writers = Maps.newConcurrentMap(); + private static final Map>> writers = Maps.newConcurrentMap(); private WriterRegistry() { } - public static Map removeWriters(TaskAttemptID taskAttemptID) { + public static Map> removeWriters(TaskAttemptID taskAttemptID) { return writers.remove(taskAttemptID); } public static void registerWriter(TaskAttemptID taskAttemptID, String tableName, HiveIcebergWriter writer) { writers.putIfAbsent(taskAttemptID, Maps.newConcurrentMap()); -writers.get(taskAttemptID).put(tableName, writer); +writers.get(taskAttemptID).putIfAbsent(tableName, Lists.newArrayList()); Review Comment: nit: would it worth to declare a variable for this? Issue Time Tracking --- Worklog Id: (was: 785058) Time Spent: 3h 40m (was: 3.5h) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 3h 40m > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785057=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785057 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 27/Jun/22 12:17 Start Date: 27/Jun/22 12:17 Worklog Time Spent: 10m Work Description: pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r907320335 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ## @@ -858,19 +861,25 @@ private static boolean hasParquetListColumnSupport(Properties tableProps, Schema * @param overwrite If we have to overwrite the existing table or just add the new data * @return The generated JobContext */ - private Optional generateJobContext(Configuration configuration, String tableName, boolean overwrite) { + private Optional> generateJobContext(Configuration configuration, String tableName, + boolean overwrite) { JobConf jobConf = new JobConf(configuration); -Optional commitInfo = SessionStateUtil.getCommitInfo(jobConf, tableName); -if (commitInfo.isPresent()) { - JobID jobID = JobID.forName(commitInfo.get().getJobIdStr()); - commitInfo.get().getProps().forEach(jobConf::set); - jobConf.setBoolean(InputFormatConfig.IS_OVERWRITE, overwrite); - - // we should only commit this current table because - // for multi-table inserts, this hook method will be called sequentially for each target table - jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName); - - return Optional.of(new JobContextImpl(jobConf, jobID, null)); +Optional> commitInfoMap = +SessionStateUtil.getCommitInfo(jobConf, tableName); +if (commitInfoMap.isPresent()) { + List jobContextList = Lists.newLinkedList(); + for (SessionStateUtil.CommitInfo commitInfo : commitInfoMap.get().values()) { +JobID jobID = JobID.forName(commitInfo.getJobIdStr()); +commitInfo.getProps().forEach(jobConf::set); +jobConf.setBoolean(InputFormatConfig.IS_OVERWRITE, overwrite); + +// we should only commit this current table because +// for multi-table inserts, this hook method will be called sequentially for each target table +jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName); + +jobContextList.add(new JobContextImpl(jobConf, jobID, null)); + } + return Optional.of(jobContextList); Review Comment: Why not empty list instead of Optional? Issue Time Tracking --- Worklog Id: (was: 785057) Time Spent: 3.5h (was: 3h 20m) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 3.5h > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785056=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785056 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 27/Jun/22 12:17 Start Date: 27/Jun/22 12:17 Worklog Time Spent: 10m Work Description: pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r907319537 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ## @@ -858,19 +861,25 @@ private static boolean hasParquetListColumnSupport(Properties tableProps, Schema * @param overwrite If we have to overwrite the existing table or just add the new data * @return The generated JobContext Review Comment: Please update the javadoc Issue Time Tracking --- Worklog Id: (was: 785056) Time Spent: 3h 20m (was: 3h 10m) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 3h 20m > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785051=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785051 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 27/Jun/22 11:54 Start Date: 27/Jun/22 11:54 Worklog Time Spent: 10m Work Description: kasakrisz commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r907299669 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ## @@ -411,23 +411,27 @@ public boolean commitInMoveTask() { public void storageHandlerCommit(Properties commitProperties, boolean overwrite) throws HiveException { String tableName = commitProperties.getProperty(Catalogs.NAME); Configuration configuration = SessionState.getSessionConf(); -Optional jobContext = generateJobContext(configuration, tableName, overwrite); -if (jobContext.isPresent()) { +Optional> jobContextList = generateJobContext(configuration, tableName, overwrite); +if (!jobContextList.isPresent()) { + return; +} + +for (JobContext jobContext : jobContextList.get()) { OutputCommitter committer = new HiveIcebergOutputCommitter(); try { -committer.commitJob(jobContext.get()); +committer.commitJob(jobContext); } catch (Throwable e) { // Aborting the job if the commit has failed LOG.error("Error while trying to commit job: {}, starting rollback changes for table: {}", -jobContext.get().getJobID(), tableName, e); +jobContext.getJobID(), tableName, e); try { - committer.abortJob(jobContext.get(), JobStatus.State.FAILED); + committer.abortJob(jobContext, JobStatus.State.FAILED); Review Comment: Discussed this offline: current implementation does not handle the rollback of successful jobs in case of any failure and it should be implemented in a follow-up patch for example by adding an `onFailure` handler to the Tasks: ``` Tasks.foreach(outputs) .onFailure( ... ) ``` Issue Time Tracking --- Worklog Id: (was: 785051) Time Spent: 3h 10m (was: 3h) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 3h 10m > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=785050=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-785050 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 27/Jun/22 11:49 Start Date: 27/Jun/22 11:49 Worklog Time Spent: 10m Work Description: kasakrisz commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r907295916 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -325,7 +339,15 @@ private void commitTable(FileIO io, ExecutorService executor, JobContext jobCont LOG.info("Committing job has started for table: {}, using location: {}", table, generateJobLocation(location, conf, jobContext.getJobID())); -int numTasks = SessionStateUtil.getCommitInfo(conf, name).map(info -> info.getTaskNum()).orElseGet(() -> { +Optional commitInfo; +if (SessionStateUtil.getCommitInfo(conf, name).isPresent()) { + commitInfo = SessionStateUtil.getCommitInfo(conf, name).get() + .stream().filter(ci -> ci.getJobIdStr().equals(jobContext.getJobID().toString())).findFirst(); Review Comment: Replaced the `List` to `Map` where the key is the string representation of the `jobId` associated to the `CommitInfo`. Issue Time Tracking --- Worklog Id: (was: 785050) Time Spent: 3h (was: 2h 50m) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 3h > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=783293=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-783293 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 21/Jun/22 09:44 Start Date: 21/Jun/22 09:44 Worklog Time Spent: 10m Work Description: pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r902401629 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ## @@ -411,23 +411,27 @@ public boolean commitInMoveTask() { public void storageHandlerCommit(Properties commitProperties, boolean overwrite) throws HiveException { String tableName = commitProperties.getProperty(Catalogs.NAME); Configuration configuration = SessionState.getSessionConf(); -Optional jobContext = generateJobContext(configuration, tableName, overwrite); -if (jobContext.isPresent()) { +Optional> jobContextList = generateJobContext(configuration, tableName, overwrite); +if (!jobContextList.isPresent()) { + return; +} + +for (JobContext jobContext : jobContextList.get()) { OutputCommitter committer = new HiveIcebergOutputCommitter(); try { -committer.commitJob(jobContext.get()); +committer.commitJob(jobContext); } catch (Throwable e) { // Aborting the job if the commit has failed LOG.error("Error while trying to commit job: {}, starting rollback changes for table: {}", -jobContext.get().getJobID(), tableName, e); +jobContext.getJobID(), tableName, e); try { - committer.abortJob(jobContext.get(), JobStatus.State.FAILED); + committer.abortJob(jobContext, JobStatus.State.FAILED); Review Comment: I do not think we have tests where the 2 inserts are inserting into the same table - not even sure that it is correct Issue Time Tracking --- Worklog Id: (was: 783293) Time Spent: 2h 50m (was: 2h 40m) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 2h 50m > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=783291=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-783291 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 21/Jun/22 09:44 Start Date: 21/Jun/22 09:44 Worklog Time Spent: 10m Work Description: pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r902401629 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ## @@ -411,23 +411,27 @@ public boolean commitInMoveTask() { public void storageHandlerCommit(Properties commitProperties, boolean overwrite) throws HiveException { String tableName = commitProperties.getProperty(Catalogs.NAME); Configuration configuration = SessionState.getSessionConf(); -Optional jobContext = generateJobContext(configuration, tableName, overwrite); -if (jobContext.isPresent()) { +Optional> jobContextList = generateJobContext(configuration, tableName, overwrite); +if (!jobContextList.isPresent()) { + return; +} + +for (JobContext jobContext : jobContextList.get()) { OutputCommitter committer = new HiveIcebergOutputCommitter(); try { -committer.commitJob(jobContext.get()); +committer.commitJob(jobContext); } catch (Throwable e) { // Aborting the job if the commit has failed LOG.error("Error while trying to commit job: {}, starting rollback changes for table: {}", -jobContext.get().getJobID(), tableName, e); +jobContext.getJobID(), tableName, e); try { - committer.abortJob(jobContext.get(), JobStatus.State.FAILED); + committer.abortJob(jobContext, JobStatus.State.FAILED); Review Comment: I do not think we have tests where the 2 inserts are inserting into the same table Issue Time Tracking --- Worklog Id: (was: 783291) Time Spent: 2h 40m (was: 2.5h) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 2h 40m > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=783290=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-783290 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 21/Jun/22 09:43 Start Date: 21/Jun/22 09:43 Worklog Time Spent: 10m Work Description: pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r902401099 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ## @@ -411,23 +411,27 @@ public boolean commitInMoveTask() { public void storageHandlerCommit(Properties commitProperties, boolean overwrite) throws HiveException { String tableName = commitProperties.getProperty(Catalogs.NAME); Configuration configuration = SessionState.getSessionConf(); -Optional jobContext = generateJobContext(configuration, tableName, overwrite); -if (jobContext.isPresent()) { +Optional> jobContextList = generateJobContext(configuration, tableName, overwrite); +if (!jobContextList.isPresent()) { + return; +} + +for (JobContext jobContext : jobContextList.get()) { OutputCommitter committer = new HiveIcebergOutputCommitter(); try { -committer.commitJob(jobContext.get()); +committer.commitJob(jobContext); } catch (Throwable e) { // Aborting the job if the commit has failed LOG.error("Error while trying to commit job: {}, starting rollback changes for table: {}", -jobContext.get().getJobID(), tableName, e); +jobContext.getJobID(), tableName, e); try { - committer.abortJob(jobContext.get(), JobStatus.State.FAILED); + committer.abortJob(jobContext, JobStatus.State.FAILED); Review Comment: `TestHiveIcebergInserts.testMultiTableInsert` - this should check multi table inserts: ``` // simple insert: should create a single vertex writing to both target tables shell.executeStatement("FROM customers " + "INSERT INTO target1 SELECT customer_id, first_name " + "INSERT INTO target2 SELECT last_name, customer_id"); ``` Issue Time Tracking --- Worklog Id: (was: 783290) Time Spent: 2.5h (was: 2h 20m) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 2.5h > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=783286=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-783286 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 21/Jun/22 09:41 Start Date: 21/Jun/22 09:41 Worklog Time Spent: 10m Work Description: pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r902399038 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -325,7 +339,15 @@ private void commitTable(FileIO io, ExecutorService executor, JobContext jobCont LOG.info("Committing job has started for table: {}, using location: {}", table, generateJobLocation(location, conf, jobContext.getJobID())); -int numTasks = SessionStateUtil.getCommitInfo(conf, name).map(info -> info.getTaskNum()).orElseGet(() -> { +Optional commitInfo; +if (SessionStateUtil.getCommitInfo(conf, name).isPresent()) { + commitInfo = SessionStateUtil.getCommitInfo(conf, name).get() + .stream().filter(ci -> ci.getJobIdStr().equals(jobContext.getJobID().toString())).findFirst(); Review Comment: Then why `.findFirst()`? Issue Time Tracking --- Worklog Id: (was: 783286) Time Spent: 2h 20m (was: 2h 10m) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 2h 20m > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=782339=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-782339 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 17/Jun/22 09:26 Start Date: 17/Jun/22 09:26 Worklog Time Spent: 10m Work Description: kasakrisz commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r899942979 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ## @@ -858,19 +862,24 @@ private static boolean hasParquetListColumnSupport(Properties tableProps, Schema * @param overwrite If we have to overwrite the existing table or just add the new data * @return The generated JobContext */ - private Optional generateJobContext(Configuration configuration, String tableName, boolean overwrite) { + private Optional> generateJobContext( Review Comment: rewritten. Issue Time Tracking --- Worklog Id: (was: 782339) Time Spent: 2h 10m (was: 2h) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 2h 10m > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=782338=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-782338 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 17/Jun/22 09:26 Start Date: 17/Jun/22 09:26 Worklog Time Spent: 10m Work Description: kasakrisz commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r899942578 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ## @@ -411,23 +411,27 @@ public boolean commitInMoveTask() { public void storageHandlerCommit(Properties commitProperties, boolean overwrite) throws HiveException { String tableName = commitProperties.getProperty(Catalogs.NAME); Configuration configuration = SessionState.getSessionConf(); -Optional jobContext = generateJobContext(configuration, tableName, overwrite); -if (jobContext.isPresent()) { +Optional> jobContextList = generateJobContext(configuration, tableName, overwrite); +if (!jobContextList.isPresent()) { + return; +} + +for (JobContext jobContext : jobContextList.get()) { OutputCommitter committer = new HiveIcebergOutputCommitter(); try { -committer.commitJob(jobContext.get()); +committer.commitJob(jobContext); } catch (Throwable e) { // Aborting the job if the commit has failed LOG.error("Error while trying to commit job: {}, starting rollback changes for table: {}", -jobContext.get().getJobID(), tableName, e); +jobContext.getJobID(), tableName, e); try { - committer.abortJob(jobContext.get(), JobStatus.State.FAILED); + committer.abortJob(jobContext, JobStatus.State.FAILED); Review Comment: I think all jobs should be rolled back in case of error when committing any of them. To achieve this we are using `org.apache.iceberg.util.Tasks`: ``` Tasks.foreach(outputs) .throwFailureWhenFinished() .stopOnFailure() .run(output -> { ... ``` which can revert all tasks in case of error even if some of them are already succeeded. The initial implementation committed each job independently: all jobs launched a separate batch of tasks. I refactored this part to collect all outputs from all jobs and launch it in one batch. I also found that this is done parallel and we are looking up the necessary data for commit in the SessionState which is stored thread locally. I experienced that this is working only if one output exists since only one worker thread is used and that is the main thread where the `SessionState` is initialized. However if more than one outputs exists in a batch threads other than the main thread does not have the necessary data for commit in the `SessionState`. So I extracted collecting these data prior launching the tasks. This affects multi inserts, split updates and merge statements. I haven't found any tests for multi inserting into an iceberg table (please share some if any exists) I assume this issue haven't came up before. Please share your thoughts. Issue Time Tracking --- Worklog Id: (was: 782338) Time Spent: 2h (was: 1h 50m) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 2h > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=782328=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-782328 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 17/Jun/22 09:08 Start Date: 17/Jun/22 09:08 Worklog Time Spent: 10m Work Description: kasakrisz commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r899928173 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ## @@ -411,23 +411,27 @@ public boolean commitInMoveTask() { public void storageHandlerCommit(Properties commitProperties, boolean overwrite) throws HiveException { String tableName = commitProperties.getProperty(Catalogs.NAME); Configuration configuration = SessionState.getSessionConf(); -Optional jobContext = generateJobContext(configuration, tableName, overwrite); -if (jobContext.isPresent()) { +Optional> jobContextList = generateJobContext(configuration, tableName, overwrite); +if (!jobContextList.isPresent()) { + return; +} + +for (JobContext jobContext : jobContextList.get()) { OutputCommitter committer = new HiveIcebergOutputCommitter(); try { -committer.commitJob(jobContext.get()); +committer.commitJob(jobContext); } catch (Throwable e) { // Aborting the job if the commit has failed LOG.error("Error while trying to commit job: {}, starting rollback changes for table: {}", -jobContext.get().getJobID(), tableName, e); +jobContext.getJobID(), tableName, e); try { - committer.abortJob(jobContext.get(), JobStatus.State.FAILED); + committer.abortJob(jobContext, JobStatus.State.FAILED); } catch (IOException ioe) { LOG.error("Error while trying to abort failed job. There might be uncleaned data files.", ioe); // no throwing here because the original exception should be propagated } throw new HiveException( -"Error committing job: " + jobContext.get().getJobID() + " for table: " + tableName, e); +"Error committing job: " + jobContext.getJobID() + " for table: " + tableName, e); Review Comment: removed ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ## @@ -411,23 +411,27 @@ public boolean commitInMoveTask() { public void storageHandlerCommit(Properties commitProperties, boolean overwrite) throws HiveException { String tableName = commitProperties.getProperty(Catalogs.NAME); Configuration configuration = SessionState.getSessionConf(); -Optional jobContext = generateJobContext(configuration, tableName, overwrite); -if (jobContext.isPresent()) { +Optional> jobContextList = generateJobContext(configuration, tableName, overwrite); +if (!jobContextList.isPresent()) { + return; +} + +for (JobContext jobContext : jobContextList.get()) { OutputCommitter committer = new HiveIcebergOutputCommitter(); try { -committer.commitJob(jobContext.get()); +committer.commitJob(jobContext); } catch (Throwable e) { // Aborting the job if the commit has failed LOG.error("Error while trying to commit job: {}, starting rollback changes for table: {}", -jobContext.get().getJobID(), tableName, e); +jobContext.getJobID(), tableName, e); Review Comment: removed Issue Time Tracking --- Worklog Id: (was: 782328) Time Spent: 1h 50m (was: 1h 40m) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 1h 50m > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=782322=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-782322 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 17/Jun/22 09:04 Start Date: 17/Jun/22 09:04 Worklog Time Spent: 10m Work Description: kasakrisz commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r899924819 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -127,14 +130,23 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException { .run(output -> { Table table = HiveIcebergStorageHandler.table(context.getJobConf(), output); if (table != null) { - HiveIcebergWriter writer = writers.get(output); + Collection dataFiles = Lists.newArrayList(); + Collection deleteFiles = Lists.newArrayList(); String fileForCommitLocation = generateFileForCommitLocation(table.location(), jobConf, - attemptID.getJobID(), attemptID.getTaskID().getId()); - if (writer != null) { -createFileForCommit(writer.files(), fileForCommitLocation, table.io()); - } else { + attemptID.getJobID(), attemptID.getTaskID().getId()); + if (writers.get(output) != null) { +for (HiveIcebergWriter writer : writers.get(output)) { + if (writer != null) { +dataFiles.addAll(writer.files().dataFiles()); Review Comment: I wouldn't change this code until it turns out to be a bottleneck. Issue Time Tracking --- Worklog Id: (was: 782322) Time Spent: 1h 40m (was: 1.5h) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 1h 40m > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=782321=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-782321 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 17/Jun/22 09:02 Start Date: 17/Jun/22 09:02 Worklog Time Spent: 10m Work Description: kasakrisz commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r899923444 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -325,7 +339,15 @@ private void commitTable(FileIO io, ExecutorService executor, JobContext jobCont LOG.info("Committing job has started for table: {}, using location: {}", table, generateJobLocation(location, conf, jobContext.getJobID())); -int numTasks = SessionStateUtil.getCommitInfo(conf, name).map(info -> info.getTaskNum()).orElseGet(() -> { +Optional commitInfo; +if (SessionStateUtil.getCommitInfo(conf, name).isPresent()) { + commitInfo = SessionStateUtil.getCommitInfo(conf, name).get() + .stream().filter(ci -> ci.getJobIdStr().equals(jobContext.getJobID().toString())).findFirst(); Review Comment: AFAIK only one `CommitInfo` object should be associated to a jobContext. Issue Time Tracking --- Worklog Id: (was: 782321) Time Spent: 1.5h (was: 1h 20m) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 1.5h > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=782025=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-782025 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 16/Jun/22 11:52 Start Date: 16/Jun/22 11:52 Worklog Time Spent: 10m Work Description: pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r899000415 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -127,14 +130,23 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException { .run(output -> { Table table = HiveIcebergStorageHandler.table(context.getJobConf(), output); if (table != null) { - HiveIcebergWriter writer = writers.get(output); + Collection dataFiles = Lists.newArrayList(); + Collection deleteFiles = Lists.newArrayList(); String fileForCommitLocation = generateFileForCommitLocation(table.location(), jobConf, - attemptID.getJobID(), attemptID.getTaskID().getId()); - if (writer != null) { -createFileForCommit(writer.files(), fileForCommitLocation, table.io()); - } else { + attemptID.getJobID(), attemptID.getTaskID().getId()); + if (writers.get(output) != null) { +for (HiveIcebergWriter writer : writers.get(output)) { + if (writer != null) { +dataFiles.addAll(writer.files().dataFiles()); Review Comment: I leave this decision to you I do not have a clear preference Issue Time Tracking --- Worklog Id: (was: 782025) Time Spent: 1h 20m (was: 1h 10m) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 1h 20m > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=782016=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-782016 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 16/Jun/22 11:11 Start Date: 16/Jun/22 11:11 Worklog Time Spent: 10m Work Description: kasakrisz commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r898969956 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -127,14 +130,23 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException { .run(output -> { Table table = HiveIcebergStorageHandler.table(context.getJobConf(), output); if (table != null) { - HiveIcebergWriter writer = writers.get(output); + Collection dataFiles = Lists.newArrayList(); + Collection deleteFiles = Lists.newArrayList(); String fileForCommitLocation = generateFileForCommitLocation(table.location(), jobConf, - attemptID.getJobID(), attemptID.getTaskID().getId()); - if (writer != null) { -createFileForCommit(writer.files(), fileForCommitLocation, table.io()); - } else { + attemptID.getJobID(), attemptID.getTaskID().getId()); + if (writers.get(output) != null) { +for (HiveIcebergWriter writer : writers.get(output)) { + if (writer != null) { +dataFiles.addAll(writer.files().dataFiles()); Review Comment: I found this usage: https://github.com/apache/hive/blob/67c2d4910ff17c694653eb8bd9c9ed2405cec38b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterBase.java#L59 `HiveIcebergWriter` does not have `dataFiles()`, `deleteFiles()` methods and it can be a `HiveIcebergRecordWriter`, `HiveIcebergDeleteWriter` etc which treats data and delete files a different way. If we want to avoid creating the `FilesForCommit` object creation to replace `HiveIcebergWriter.files()` * create a method like `HiveIcebergWriter.collectFiles(List<> dataFiles, List<> deleteFiles)` or * create dataFiles(), deleteFiles() methods. I prefer wrapping the returned lists into unmodifiableList which is also a new object creation. Which do you prefer? On the other hand I don't think creating the `FilesForCommit` objects is critical. These are created only when a result of a statement should be committed/aborted not per record basis Issue Time Tracking --- Worklog Id: (was: 782016) Time Spent: 1h 10m (was: 1h) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 1h 10m > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=781982=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-781982 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 16/Jun/22 09:31 Start Date: 16/Jun/22 09:31 Worklog Time Spent: 10m Work Description: pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r898890734 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ## @@ -858,19 +862,24 @@ private static boolean hasParquetListColumnSupport(Properties tableProps, Schema * @param overwrite If we have to overwrite the existing table or just add the new data * @return The generated JobContext */ - private Optional generateJobContext(Configuration configuration, String tableName, boolean overwrite) { + private Optional> generateJobContext( Review Comment: nit: In Iceberg code we usually break lines like this: ``` private Optional> generateJobContext(Configuration configuration, String tableName, boolean overwrite) { ``` Issue Time Tracking --- Worklog Id: (was: 781982) Time Spent: 1h (was: 50m) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 1h > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=781980=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-781980 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 16/Jun/22 09:30 Start Date: 16/Jun/22 09:30 Worklog Time Spent: 10m Work Description: pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r898889313 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ## @@ -411,23 +411,27 @@ public boolean commitInMoveTask() { public void storageHandlerCommit(Properties commitProperties, boolean overwrite) throws HiveException { String tableName = commitProperties.getProperty(Catalogs.NAME); Configuration configuration = SessionState.getSessionConf(); -Optional jobContext = generateJobContext(configuration, tableName, overwrite); -if (jobContext.isPresent()) { +Optional> jobContextList = generateJobContext(configuration, tableName, overwrite); +if (!jobContextList.isPresent()) { + return; +} + +for (JobContext jobContext : jobContextList.get()) { OutputCommitter committer = new HiveIcebergOutputCommitter(); try { -committer.commitJob(jobContext.get()); +committer.commitJob(jobContext); } catch (Throwable e) { // Aborting the job if the commit has failed LOG.error("Error while trying to commit job: {}, starting rollback changes for table: {}", -jobContext.get().getJobID(), tableName, e); +jobContext.getJobID(), tableName, e); try { - committer.abortJob(jobContext.get(), JobStatus.State.FAILED); + committer.abortJob(jobContext, JobStatus.State.FAILED); Review Comment: Shall we abort all of the other jobs as well? What is our strategy here? Issue Time Tracking --- Worklog Id: (was: 781980) Time Spent: 50m (was: 40m) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 50m > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=781978=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-781978 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 16/Jun/22 09:28 Start Date: 16/Jun/22 09:28 Worklog Time Spent: 10m Work Description: pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r898887564 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ## @@ -411,23 +411,27 @@ public boolean commitInMoveTask() { public void storageHandlerCommit(Properties commitProperties, boolean overwrite) throws HiveException { String tableName = commitProperties.getProperty(Catalogs.NAME); Configuration configuration = SessionState.getSessionConf(); -Optional jobContext = generateJobContext(configuration, tableName, overwrite); -if (jobContext.isPresent()) { +Optional> jobContextList = generateJobContext(configuration, tableName, overwrite); +if (!jobContextList.isPresent()) { + return; +} + +for (JobContext jobContext : jobContextList.get()) { OutputCommitter committer = new HiveIcebergOutputCommitter(); try { -committer.commitJob(jobContext.get()); +committer.commitJob(jobContext); } catch (Throwable e) { // Aborting the job if the commit has failed LOG.error("Error while trying to commit job: {}, starting rollback changes for table: {}", -jobContext.get().getJobID(), tableName, e); +jobContext.getJobID(), tableName, e); Review Comment: please remove the extra spaces ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ## @@ -411,23 +411,27 @@ public boolean commitInMoveTask() { public void storageHandlerCommit(Properties commitProperties, boolean overwrite) throws HiveException { String tableName = commitProperties.getProperty(Catalogs.NAME); Configuration configuration = SessionState.getSessionConf(); -Optional jobContext = generateJobContext(configuration, tableName, overwrite); -if (jobContext.isPresent()) { +Optional> jobContextList = generateJobContext(configuration, tableName, overwrite); +if (!jobContextList.isPresent()) { + return; +} + +for (JobContext jobContext : jobContextList.get()) { OutputCommitter committer = new HiveIcebergOutputCommitter(); try { -committer.commitJob(jobContext.get()); +committer.commitJob(jobContext); } catch (Throwable e) { // Aborting the job if the commit has failed LOG.error("Error while trying to commit job: {}, starting rollback changes for table: {}", -jobContext.get().getJobID(), tableName, e); +jobContext.getJobID(), tableName, e); try { - committer.abortJob(jobContext.get(), JobStatus.State.FAILED); + committer.abortJob(jobContext, JobStatus.State.FAILED); } catch (IOException ioe) { LOG.error("Error while trying to abort failed job. There might be uncleaned data files.", ioe); // no throwing here because the original exception should be propagated } throw new HiveException( -"Error committing job: " + jobContext.get().getJobID() + " for table: " + tableName, e); +"Error committing job: " + jobContext.getJobID() + " for table: " + tableName, e); Review Comment: please remove the extra spaces Issue Time Tracking --- Worklog Id: (was: 781978) Time Spent: 40m (was: 0.5h) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 40m > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=781977=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-781977 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 16/Jun/22 09:27 Start Date: 16/Jun/22 09:27 Worklog Time Spent: 10m Work Description: pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r898886482 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -325,7 +339,15 @@ private void commitTable(FileIO io, ExecutorService executor, JobContext jobCont LOG.info("Committing job has started for table: {}, using location: {}", table, generateJobLocation(location, conf, jobContext.getJobID())); -int numTasks = SessionStateUtil.getCommitInfo(conf, name).map(info -> info.getTaskNum()).orElseGet(() -> { +Optional commitInfo; +if (SessionStateUtil.getCommitInfo(conf, name).isPresent()) { + commitInfo = SessionStateUtil.getCommitInfo(conf, name).get() + .stream().filter(ci -> ci.getJobIdStr().equals(jobContext.getJobID().toString())).findFirst(); Review Comment: Are we sure that the first `commitInfo` has the same number of tasks as all of them? Issue Time Tracking --- Worklog Id: (was: 781977) Time Spent: 0.5h (was: 20m) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 0.5h > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=781973=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-781973 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 16/Jun/22 09:22 Start Date: 16/Jun/22 09:22 Worklog Time Spent: 10m Work Description: pvary commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r898882678 ## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ## @@ -127,14 +130,23 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException { .run(output -> { Table table = HiveIcebergStorageHandler.table(context.getJobConf(), output); if (table != null) { - HiveIcebergWriter writer = writers.get(output); + Collection dataFiles = Lists.newArrayList(); + Collection deleteFiles = Lists.newArrayList(); String fileForCommitLocation = generateFileForCommitLocation(table.location(), jobConf, - attemptID.getJobID(), attemptID.getTaskID().getId()); - if (writer != null) { -createFileForCommit(writer.files(), fileForCommitLocation, table.io()); - } else { + attemptID.getJobID(), attemptID.getTaskID().getId()); + if (writers.get(output) != null) { +for (HiveIcebergWriter writer : writers.get(output)) { + if (writer != null) { +dataFiles.addAll(writer.files().dataFiles()); Review Comment: Is there any place where we use the `writer.files()` directly instead of calling `dataFiles()`, `deleteFiles()`? We might want to remove the unnecessary object creation then. Issue Time Tracking --- Worklog Id: (was: 781973) Time Spent: 20m (was: 10m) > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Work logged] (HIVE-26319) Iceberg integration: Perform update split early
[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=780788=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-780788 ] ASF GitHub Bot logged work on HIVE-26319: - Author: ASF GitHub Bot Created on: 13/Jun/22 11:46 Start Date: 13/Jun/22 11:46 Worklog Time Spent: 10m Work Description: kasakrisz opened a new pull request, #3362: URL: https://github.com/apache/hive/pull/3362 ### What changes were proposed in this pull request? Rewrite update statements of iceberg tables to multi insert statement similarly in case of native acid tables. When generating the rewritten statement: * Get the virtual columns from the table's storage handler in case of non native acid tables * Include the old values to the select clause of the delete branch of the multi insert statement. When executing the multi insert: * Two iceberg writers are used which produce a data delta file and a delete delta file. The result of these writers should be merged into one `FilesForCommit` if both writers are run in the same task. * In case of more complex statements (ex. partitioned and/or bucketed) more than one Tez task produces commit info so this patch enables storing all of them. * Every `FileSinkOperator` creates its own jobConf instance because the iceberg write operation is stored in it and it is different in both instance. ### Why are the changes needed? See #2855 + Preparation for iceberg Merge implementation. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? ``` mvn test -Dtest.output.overwrite -DskipSparkTests -Dtest=TestIcebergLlapLocalCliDriver -Dqfile=update_iceberg_partitioned_orc2.q -pl itests/qtest-iceberg -Piceberg -Pitests ``` Issue Time Tracking --- Worklog Id: (was: 780788) Remaining Estimate: 0h Time Spent: 10m > Iceberg integration: Perform update split early > --- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement >Reporter: Krisztian Kasa >Assignee: Krisztian Kasa >Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)