[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-03-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=568132&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-568132
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 18/Mar/21 07:05
Start Date: 18/Mar/21 07:05
Worklog Time Spent: 10m 
  Work Description: pkumarsinha merged pull request #1936:
URL: https://github.com/apache/hive/pull/1936


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 568132)
Time Spent: 6h 40m  (was: 6.5h)

> Moving to file based iteration for copying data
> ---
>
> Key: HIVE-24718
> URL: https://issues.apache.org/jira/browse/HIVE-24718
> Project: Hive
>  Issue Type: Bug
>Reporter: Arko Sharma
>Assignee: Arko Sharma
>Priority: Major
>  Labels: pull-request-available
> Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, 
> HIVE-24718.04.patch, HIVE-24718.05.patch, HIVE-24718.06.patch
>
>  Time Spent: 6h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-03-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=566051&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-566051
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 15/Mar/21 07:39
Start Date: 15/Mar/21 07:39
Worklog Time Spent: 10m 
  Work Description: ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r594106033



##
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##
@@ -2225,17 +2224,11 @@ private void setupUDFJarOnHDFS(Path 
identityUdfLocalPath, Path identityUdfHdfsPa
   /*
* Method used from TestReplicationScenariosExclusiveReplica
*/
-  private void assertExternalFileInfo(List expected, String 
dumplocation, boolean isIncremental,
+  private void assertExternalFileList(List expected, String 
dumplocation,
   WarehouseInstance warehouseInstance)
   throws IOException {
 Path hivePath = new Path(dumplocation, ReplUtils.REPL_HIVE_BASE_DIR);
-Path metadataPath = new Path(hivePath, EximUtil.METADATA_PATH_NAME);
-Path externalTableInfoFile;
-if (isIncremental) {
-  externalTableInfoFile = new Path(hivePath, FILE_NAME);
-} else {
-  externalTableInfoFile = new Path(metadataPath, 
primaryDbName.toLowerCase() + File.separator + FILE_NAME);
-}
-ReplicationTestUtils.assertExternalFileInfo(warehouseInstance, expected, 
externalTableInfoFile);
+Path externalTblFileList = new Path(hivePath, EximUtil.FILE_LIST_EXTERNAL);

Review comment:
   Done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 566051)
Time Spent: 6.5h  (was: 6h 20m)

> Moving to file based iteration for copying data
> ---
>
> Key: HIVE-24718
> URL: https://issues.apache.org/jira/browse/HIVE-24718
> Project: Hive
>  Issue Type: Bug
>Reporter: Arko Sharma
>Assignee: Arko Sharma
>Priority: Major
>  Labels: pull-request-available
> Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, 
> HIVE-24718.04.patch, HIVE-24718.05.patch, HIVE-24718.06.patch
>
>  Time Spent: 6.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-03-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=566050&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-566050
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 15/Mar/21 07:39
Start Date: 15/Mar/21 07:39
Worklog Time Spent: 10m 
  Work Description: ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r594105835



##
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java
##
@@ -639,9 +629,11 @@ public void testIncrementalDumpEmptyDumpDirectory() throws 
Throwable {
 .verifyResult(inc2Tuple.lastReplicationId);
   }
 
-  private void assertFalseExternalFileList(Path externalTableFileList)
-  throws IOException {
+  private void assertFalseExternalFileList(String dumpLocation)

Review comment:
   Done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 566050)
Time Spent: 6h 20m  (was: 6h 10m)

> Moving to file based iteration for copying data
> ---
>
> Key: HIVE-24718
> URL: https://issues.apache.org/jira/browse/HIVE-24718
> Project: Hive
>  Issue Type: Bug
>Reporter: Arko Sharma
>Assignee: Arko Sharma
>Priority: Major
>  Labels: pull-request-available
> Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, 
> HIVE-24718.04.patch, HIVE-24718.05.patch, HIVE-24718.06.patch
>
>  Time Spent: 6h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-03-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=566049&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-566049
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 15/Mar/21 07:38
Start Date: 15/Mar/21 07:38
Worklog Time Spent: 10m 
  Work Description: ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r594105742



##
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java
##
@@ -639,9 +629,11 @@ public void testIncrementalDumpEmptyDumpDirectory() throws 
Throwable {
 .verifyResult(inc2Tuple.lastReplicationId);
   }
 
-  private void assertFalseExternalFileList(Path externalTableFileList)
-  throws IOException {
+  private void assertFalseExternalFileList(String dumpLocation)
+  throws IOException {

Review comment:
   getFileSystem() call throws IOException.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 566049)
Time Spent: 6h 10m  (was: 6h)

> Moving to file based iteration for copying data
> ---
>
> Key: HIVE-24718
> URL: https://issues.apache.org/jira/browse/HIVE-24718
> Project: Hive
>  Issue Type: Bug
>Reporter: Arko Sharma
>Assignee: Arko Sharma
>Priority: Major
>  Labels: pull-request-available
> Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, 
> HIVE-24718.04.patch, HIVE-24718.05.patch, HIVE-24718.06.patch
>
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-03-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=564628&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-564628
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 11/Mar/21 14:25
Start Date: 11/Mar/21 14:25
Worklog Time Spent: 10m 
  Work Description: pkumarsinha commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r592395172



##
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java
##
@@ -639,9 +629,11 @@ public void testIncrementalDumpEmptyDumpDirectory() throws 
Throwable {
 .verifyResult(inc2Tuple.lastReplicationId);
   }
 
-  private void assertFalseExternalFileList(Path externalTableFileList)
-  throws IOException {
+  private void assertFalseExternalFileList(String dumpLocation)

Review comment:
   Can you please move this method to ReplicationTestUtils itself. We have 
duplicate code for this method on two classes.

##
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java
##
@@ -639,9 +629,11 @@ public void testIncrementalDumpEmptyDumpDirectory() throws 
Throwable {
 .verifyResult(inc2Tuple.lastReplicationId);
   }
 
-  private void assertFalseExternalFileList(Path externalTableFileList)
-  throws IOException {
+  private void assertFalseExternalFileList(String dumpLocation)
+  throws IOException {

Review comment:
   I think it doesn't throw IOException 

##
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##
@@ -2225,17 +2224,11 @@ private void setupUDFJarOnHDFS(Path 
identityUdfLocalPath, Path identityUdfHdfsPa
   /*
* Method used from TestReplicationScenariosExclusiveReplica
*/
-  private void assertExternalFileInfo(List expected, String 
dumplocation, boolean isIncremental,
+  private void assertExternalFileList(List expected, String 
dumplocation,
   WarehouseInstance warehouseInstance)
   throws IOException {
 Path hivePath = new Path(dumplocation, ReplUtils.REPL_HIVE_BASE_DIR);
-Path metadataPath = new Path(hivePath, EximUtil.METADATA_PATH_NAME);
-Path externalTableInfoFile;
-if (isIncremental) {
-  externalTableInfoFile = new Path(hivePath, FILE_NAME);
-} else {
-  externalTableInfoFile = new Path(metadataPath, 
primaryDbName.toLowerCase() + File.separator + FILE_NAME);
-}
-ReplicationTestUtils.assertExternalFileInfo(warehouseInstance, expected, 
externalTableInfoFile);
+Path externalTblFileList = new Path(hivePath, EximUtil.FILE_LIST_EXTERNAL);

Review comment:
   This is still not addressed. The same method(and code) is defined in 
three classes. assertExternalFileList. And essentially they aren't doing more 
than the path formation. As discussed, can we not use the 
ReplicationTestUtils.assertExternalFileList directly by modifying the signature 
a bit.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 564628)
Time Spent: 6h  (was: 5h 50m)

> Moving to file based iteration for copying data
> ---
>
> Key: HIVE-24718
> URL: https://issues.apache.org/jira/browse/HIVE-24718
> Project: Hive
>  Issue Type: Bug
>Reporter: Arko Sharma
>Assignee: Arko Sharma
>Priority: Major
>  Labels: pull-request-available
> Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, 
> HIVE-24718.04.patch, HIVE-24718.05.patch, HIVE-24718.06.patch
>
>  Time Spent: 6h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-03-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=563490&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-563490
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 10/Mar/21 04:33
Start Date: 10/Mar/21 04:33
Worklog Time Spent: 10m 
  Work Description: ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r591024045



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##
@@ -18,158 +18,225 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = 
"file-list-streamer-";
-
-  private LinkedBlockingQueue cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
+  private String lastReadElement = null;
   private HiveConf conf;
+  private volatile boolean abortOperation = false;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
 this.backingFile = backingFile;
 this.conf = conf;
-if (cacheSize > 0) {
-  // Cache size must be > 0 for this list to be used for the write 
operation.
-  this.cache = new LinkedBlockingQueue<>(cacheSize);
-  fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-  thresholdPoint = getThreshold(cacheSize);
-  LOG.debug("File list backed by {} can be used for write operation.", 
backingFile);
+this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+if (conf.getBoolVar(HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY)) {
+  writeWithRetry(entry);
 } else {
-  thresholdHit = true;
+  writeEntry(entry);
 }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, 
LinkedBlockingQueue cache, HiveConf conf) {
-this.backingFile = backingFile;
-this.fileListStreamer = fileListStreamer;
-this.cache = cache;
-this.conf = conf;
-thresholdPoint = getThreshold(cache.remainingCapacity());
+  private synchronized void writeEntry(String entry) throws IOException {
+//retry only during creating the file, no retry during writes
+if (backingFileWriter == null) {
+  try {
+Retryable retryable = buildRetryable();
+retryable.executeCallable((Callable) () -> {
+  if(this.abortOperation) {
+return null;
+  }
+  backingFileWriter = getWriterCreateMode();
+  return null;
+});
+  } catch (Exception e) {
+this.abortOperation = true;
+throw new 
IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
+  }
+}
+if(this.abortOperation) {
+  return;
+}
+try {
+  backingFileWriter.writeBytes(getEntryWithNewline(entry));
+  LOG.info("Writing entry {} to file list backed by {}", entry, 
backingFile);
+} catch (IOException e) {
+  this.abortOperation = true;
+  LOG.error("Writing entry {} to file list {} failed.", entry, 
backingFile, e);
+  throw e;
+}
   }
 
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
-  public void add(String entry) throws SemanticException {
-if (thresholdHit && !fileListStreamer.isAlive()) {
-  throw new SemanticException("List is not getting saved anymore to file " 
+ backingFile.toString());
+  priv

[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-03-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=559852&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-559852
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 02/Mar/21 08:02
Start Date: 02/Mar/21 08:02
Worklog Time Spent: 10m 
  Work Description: pkumarsinha commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r585302812



##
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##
@@ -1836,6 +1837,64 @@ public void testHdfsNameserviceWithDataCopy() throws 
Throwable {
 .verifyResults(new String[]{"2", "3"});
   }
 
+  @Test
+  public void testReplWithRetryDisabledIterators() throws Throwable {
+List clause = new ArrayList<>();
+//NS replacement parameters has no effect when data is also copied to 
staging
+clause.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET + 
"'='false'");
+clause.add("'" + HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY + "'='false'");
+WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+.run("create table  acid_table (key int, value int) partitioned by 
(load_date date) " +
+"clustered by(key) into 2 buckets stored as orc 
tblproperties ('transactional'='true')")
+.run("create table table1 (i String)")
+.run("insert into table1 values (1)")
+.run("insert into table1 values (2)")
+.dump(primaryDbName, clause);
+assertFalseExternalFileList(new Path(new Path(tuple.dumpLocation,

Review comment:
   nit: If you just pass dumpLocation to the method, and do the path 
creation inside the method, this would look clean. Anyway the method 
assertFalseExternalFileList isn't doing much. So, alternatively, you can do the 
fs.exist() write there and get rid of method.

##
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##
@@ -653,6 +649,8 @@ private static void populateLlapDaemonVarsSet(Set 
llapDaemonVarsSetLocal
   new TimeValidator(TimeUnit.HOURS),
   "Total allowed retry duration in hours inclusive of all retries. Once 
this is exhausted, " +
 "the policy instance will be marked as failed and will need manual 
intervention to restart."),
+REPL_COPY_ITERATOR_RETRY("hive.repl.copy.iterator.retry", true,

Review comment:
   REPL_COPY_FILE_LIST_ITERATOR_RETRY ?

##
File path: 
ql/src/test/org/apache/hadoop/hive/ql/exec/repl/util/TestFileList.java
##
@@ -18,147 +18,266 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
-import org.mockito.Mock;
 import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
 import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-
 /**
  * Tests the File List implementation.
  */
 
-@RunWith(PowerMockRunner.class)
+@RunWith(MockitoJUnitRunner.class)
 @PrepareForTest({LoggerFactory.class})
 public class TestFileList {
 
-  @Mock
-  private BufferedWriter bufferedWriter;
-
-
-  @Test
-  public void testNoStreaming() throws Exception {
-Object tuple[] =  setupAndGetTuple(100, false);
-FileList fileList = (FileList) tuple[0];
-FileListStreamer fileListStreamer = (FileListStreamer) tuple[1];
-fileList.add("Entry1");
-fileList.add("Entry2");
-assertFalse(isStreamingToFile(fileListStreamer));
-  }
+  HiveConf conf = new HiveConf();
+  private FSDataOutputStream outStream;
+  private FSDataOutputStream testFileStream;
+  final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
+  File.separator + TestFileList.class.getCanonicalName() + "-" + 
System.currentTimeMillis()
+  ).getPath().replaceAll("", "/");
+  private Exception testException = new IOException("test");
 
   @Test
-  public void testAlwaysStreaming() throws Exception {
-Object tuple[] =  setupAndGetTuple(100, true);
-Fil

[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-03-01 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=559727&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-559727
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 02/Mar/21 03:27
Start Date: 02/Mar/21 03:27
Worklog Time Spent: 10m 
  Work Description: ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r585220794



##
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##
@@ -1836,6 +1835,60 @@ public void testHdfsNameserviceWithDataCopy() throws 
Throwable {
 .verifyResults(new String[]{"2", "3"});
   }
 
+  @Test
+  public void testReplWithRetryDisabledIterators() throws Throwable {
+List clause = new ArrayList<>();
+//NS replacement parameters has no effect when data is also copied to 
staging
+clause.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET + 
"'='false'");
+clause.add("'" + HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY + "'='false'");
+primary.run("use " + primaryDbName)
+.run("create table  acid_table (key int, value int) partitioned by 
(load_date date) " +
+"clustered by(key) into 2 buckets stored as orc 
tblproperties ('transactional'='true')")
+.run("create table table1 (i String)")
+.run("insert into table1 values (1)")
+.run("insert into table1 values (2)")
+.dump(primaryDbName, clause);

Review comment:
   Done.

##
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##
@@ -1836,6 +1835,60 @@ public void testHdfsNameserviceWithDataCopy() throws 
Throwable {
 .verifyResults(new String[]{"2", "3"});
   }
 
+  @Test
+  public void testReplWithRetryDisabledIterators() throws Throwable {
+List clause = new ArrayList<>();
+//NS replacement parameters has no effect when data is also copied to 
staging
+clause.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET + 
"'='false'");
+clause.add("'" + HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY + "'='false'");
+primary.run("use " + primaryDbName)
+.run("create table  acid_table (key int, value int) partitioned by 
(load_date date) " +
+"clustered by(key) into 2 buckets stored as orc 
tblproperties ('transactional'='true')")
+.run("create table table1 (i String)")
+.run("insert into table1 values (1)")
+.run("insert into table1 values (2)")
+.dump(primaryDbName, clause);
+replica.load(replicatedDbName, primaryDbName, clause)
+.run("use " + replicatedDbName)
+.run("show tables")
+.verifyResults(new String[] {"acid_table", "table1"})
+.run("select * from table1")
+.verifyResults(new String[] {"1", "2"});
+
+primary.run("use " + primaryDbName)
+.run("insert into table1 values (3)")
+.dump(primaryDbName, clause);
+replica.load(replicatedDbName, primaryDbName, clause)
+.run("use " + replicatedDbName)
+.run("show tables")
+.verifyResults(new String[]{"acid_table", "table1"})
+.run("select * from table1")
+.verifyResults(new String[]{"1", "2", "3"});
+
+clause.add("'" + 
HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname + 
"'='false'");
+primary.run("use " + primaryDbName)
+.run("create external table ext_table1 (id int)")
+.run("insert into ext_table1 values (3)")
+.run("insert into ext_table1 values (4)")
+.run("create external table  ext_table2 (key int, value int) 
partitioned by (load_time timestamp)")
+.run("insert into ext_table2 partition(load_time = '2012-02-21 
07:08:09.123') values(1,2)")
+.run("insert into ext_table2 partition(load_time = '2012-02-21 
07:08:09.124') values(1,3)")
+.run("show partitions ext_table2")
+.verifyResults(new String[]{
+"load_time=2012-02-21 07%3A08%3A09.123",
+"load_time=2012-02-21 07%3A08%3A09.124"})
+.dump(primaryDbName, clause);

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 559727)
Time Spent: 5.5h  (was: 5h 20m)

> Moving to file based iteration for copying data
> 

[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-03-01 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=559689&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-559689
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 02/Mar/21 01:37
Start Date: 02/Mar/21 01:37
Worklog Time Spent: 10m 
  Work Description: ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r584584623



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##
@@ -18,158 +18,225 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = 
"file-list-streamer-";
-
-  private LinkedBlockingQueue cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
+  private String lastReadElement = null;
   private HiveConf conf;
+  private volatile boolean abortOperation = false;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
 this.backingFile = backingFile;
 this.conf = conf;
-if (cacheSize > 0) {
-  // Cache size must be > 0 for this list to be used for the write 
operation.
-  this.cache = new LinkedBlockingQueue<>(cacheSize);
-  fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-  thresholdPoint = getThreshold(cacheSize);
-  LOG.debug("File list backed by {} can be used for write operation.", 
backingFile);
+this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+if (conf.getBoolVar(HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY)) {
+  writeWithRetry(entry);
 } else {
-  thresholdHit = true;
+  writeEntry(entry);
 }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, 
LinkedBlockingQueue cache, HiveConf conf) {
-this.backingFile = backingFile;
-this.fileListStreamer = fileListStreamer;
-this.cache = cache;
-this.conf = conf;
-thresholdPoint = getThreshold(cache.remainingCapacity());
+  private synchronized void writeEntry(String entry) throws IOException {
+//retry only during creating the file, no retry during writes
+if (backingFileWriter == null) {
+  try {
+Retryable retryable = buildRetryable();
+retryable.executeCallable((Callable) () -> {
+  if(this.abortOperation) {
+return null;
+  }
+  backingFileWriter = getWriterCreateMode();
+  return null;
+});
+  } catch (Exception e) {
+this.abortOperation = true;
+throw new 
IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
+  }
+}
+if(this.abortOperation) {
+  return;
+}
+try {
+  backingFileWriter.writeBytes(getEntryWithNewline(entry));
+  LOG.info("Writing entry {} to file list backed by {}", entry, 
backingFile);
+} catch (IOException e) {
+  this.abortOperation = true;
+  LOG.error("Writing entry {} to file list {} failed.", entry, 
backingFile, e);
+  throw e;
+}
   }
 
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
-  public void add(String entry) throws SemanticException {
-if (thresholdHit && !fileListStreamer.isAlive()) {
-  throw new SemanticException("List is not getting saved anymore to file " 
+ backingFile.toString());
+  priv

[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-03-01 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=559293&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-559293
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 01/Mar/21 10:07
Start Date: 01/Mar/21 10:07
Worklog Time Spent: 10m 
  Work Description: ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r584585551



##
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##
@@ -1836,6 +1835,60 @@ public void testHdfsNameserviceWithDataCopy() throws 
Throwable {
 .verifyResults(new String[]{"2", "3"});
   }
 
+  @Test
+  public void testReplWithRetryDisabledIterators() throws Throwable {

Review comment:
   No, this is just to have a case with replication working with this 
conf=false.
   To confirm retry is not happening, the test "testWriteNoRetry" is added in 
TestFileList class.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 559293)
Time Spent: 5h 10m  (was: 5h)

> Moving to file based iteration for copying data
> ---
>
> Key: HIVE-24718
> URL: https://issues.apache.org/jira/browse/HIVE-24718
> Project: Hive
>  Issue Type: Bug
>Reporter: Arko Sharma
>Assignee: Arko Sharma
>Priority: Major
>  Labels: pull-request-available
> Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, 
> HIVE-24718.04.patch, HIVE-24718.05.patch
>
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-03-01 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=559292&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-559292
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 01/Mar/21 10:05
Start Date: 01/Mar/21 10:05
Worklog Time Spent: 10m 
  Work Description: ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r584584623



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##
@@ -18,158 +18,225 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = 
"file-list-streamer-";
-
-  private LinkedBlockingQueue cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
+  private String lastReadElement = null;
   private HiveConf conf;
+  private volatile boolean abortOperation = false;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
 this.backingFile = backingFile;
 this.conf = conf;
-if (cacheSize > 0) {
-  // Cache size must be > 0 for this list to be used for the write 
operation.
-  this.cache = new LinkedBlockingQueue<>(cacheSize);
-  fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-  thresholdPoint = getThreshold(cacheSize);
-  LOG.debug("File list backed by {} can be used for write operation.", 
backingFile);
+this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+if (conf.getBoolVar(HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY)) {
+  writeWithRetry(entry);
 } else {
-  thresholdHit = true;
+  writeEntry(entry);
 }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, 
LinkedBlockingQueue cache, HiveConf conf) {
-this.backingFile = backingFile;
-this.fileListStreamer = fileListStreamer;
-this.cache = cache;
-this.conf = conf;
-thresholdPoint = getThreshold(cache.remainingCapacity());
+  private synchronized void writeEntry(String entry) throws IOException {
+//retry only during creating the file, no retry during writes
+if (backingFileWriter == null) {
+  try {
+Retryable retryable = buildRetryable();
+retryable.executeCallable((Callable) () -> {
+  if(this.abortOperation) {
+return null;
+  }
+  backingFileWriter = getWriterCreateMode();
+  return null;
+});
+  } catch (Exception e) {
+this.abortOperation = true;
+throw new 
IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
+  }
+}
+if(this.abortOperation) {
+  return;
+}
+try {
+  backingFileWriter.writeBytes(getEntryWithNewline(entry));
+  LOG.info("Writing entry {} to file list backed by {}", entry, 
backingFile);
+} catch (IOException e) {
+  this.abortOperation = true;
+  LOG.error("Writing entry {} to file list {} failed.", entry, 
backingFile, e);
+  throw e;
+}
   }
 
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
-  public void add(String entry) throws SemanticException {
-if (thresholdHit && !fileListStreamer.isAlive()) {
-  throw new SemanticException("List is not getting saved anymore to file " 
+ backingFile.toString());
+  priv

[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-03-01 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=559254&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-559254
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 01/Mar/21 08:16
Start Date: 01/Mar/21 08:16
Worklog Time Spent: 10m 
  Work Description: pkumarsinha commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r583233006



##
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##
@@ -1836,6 +1835,60 @@ public void testHdfsNameserviceWithDataCopy() throws 
Throwable {
 .verifyResults(new String[]{"2", "3"});
   }
 
+  @Test
+  public void testReplWithRetryDisabledIterators() throws Throwable {
+List clause = new ArrayList<>();
+//NS replacement parameters has no effect when data is also copied to 
staging
+clause.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET + 
"'='false'");
+clause.add("'" + HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY + "'='false'");
+primary.run("use " + primaryDbName)
+.run("create table  acid_table (key int, value int) partitioned by 
(load_date date) " +
+"clustered by(key) into 2 buckets stored as orc 
tblproperties ('transactional'='true')")
+.run("create table table1 (i String)")
+.run("insert into table1 values (1)")
+.run("insert into table1 values (2)")
+.dump(primaryDbName, clause);

Review comment:
   At this point the entries are written to the _file_list and 
_file_list_external. 
   Please add assertion on file content here.

##
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##
@@ -1836,6 +1835,60 @@ public void testHdfsNameserviceWithDataCopy() throws 
Throwable {
 .verifyResults(new String[]{"2", "3"});
   }
 
+  @Test
+  public void testReplWithRetryDisabledIterators() throws Throwable {
+List clause = new ArrayList<>();
+//NS replacement parameters has no effect when data is also copied to 
staging
+clause.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET + 
"'='false'");
+clause.add("'" + HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY + "'='false'");
+primary.run("use " + primaryDbName)
+.run("create table  acid_table (key int, value int) partitioned by 
(load_date date) " +
+"clustered by(key) into 2 buckets stored as orc 
tblproperties ('transactional'='true')")
+.run("create table table1 (i String)")
+.run("insert into table1 values (1)")
+.run("insert into table1 values (2)")
+.dump(primaryDbName, clause);
+replica.load(replicatedDbName, primaryDbName, clause)
+.run("use " + replicatedDbName)
+.run("show tables")
+.verifyResults(new String[] {"acid_table", "table1"})
+.run("select * from table1")
+.verifyResults(new String[] {"1", "2"});
+
+primary.run("use " + primaryDbName)
+.run("insert into table1 values (3)")
+.dump(primaryDbName, clause);
+replica.load(replicatedDbName, primaryDbName, clause)
+.run("use " + replicatedDbName)
+.run("show tables")
+.verifyResults(new String[]{"acid_table", "table1"})
+.run("select * from table1")
+.verifyResults(new String[]{"1", "2", "3"});
+
+clause.add("'" + 
HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname + 
"'='false'");
+primary.run("use " + primaryDbName)
+.run("create external table ext_table1 (id int)")
+.run("insert into ext_table1 values (3)")
+.run("insert into ext_table1 values (4)")
+.run("create external table  ext_table2 (key int, value int) 
partitioned by (load_time timestamp)")
+.run("insert into ext_table2 partition(load_time = '2012-02-21 
07:08:09.123') values(1,2)")
+.run("insert into ext_table2 partition(load_time = '2012-02-21 
07:08:09.124') values(1,3)")
+.run("show partitions ext_table2")
+.verifyResults(new String[]{
+"load_time=2012-02-21 07%3A08%3A09.123",
+"load_time=2012-02-21 07%3A08%3A09.124"})
+.dump(primaryDbName, clause);

Review comment:
   At this point the entries are written to the _file_list and 
_file_list_external.
   Please add assertion on file content here.

##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##
@@ -18,158 +18,225 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache

[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-02-24 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=557666&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-557666
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 25/Feb/21 01:47
Start Date: 25/Feb/21 01:47
Worklog Time Spent: 10m 
  Work Description: ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r582431522



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##
@@ -18,158 +18,176 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = 
"file-list-streamer-";
-
-  private LinkedBlockingQueue cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
   private HiveConf conf;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
 this.backingFile = backingFile;
 this.conf = conf;
-if (cacheSize > 0) {
-  // Cache size must be > 0 for this list to be used for the write 
operation.
-  this.cache = new LinkedBlockingQueue<>(cacheSize);
-  fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-  thresholdPoint = getThreshold(cacheSize);
-  LOG.debug("File list backed by {} can be used for write operation.", 
backingFile);
-} else {
-  thresholdHit = true;
+this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+Retryable retryable = buildRetryable();
+try {
+  retryable.executeCallable((Callable) ()-> {

Review comment:
   Dump operation fails if any thread encounters an exception.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 557666)
Time Spent: 4h 40m  (was: 4.5h)

> Moving to file based iteration for copying data
> ---
>
> Key: HIVE-24718
> URL: https://issues.apache.org/jira/browse/HIVE-24718
> Project: Hive
>  Issue Type: Bug
>Reporter: Arko Sharma
>Assignee: Arko Sharma
>Priority: Major
>  Labels: pull-request-available
> Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, 
> HIVE-24718.04.patch
>
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-02-24 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=557663&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-557663
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 25/Feb/21 01:46
Start Date: 25/Feb/21 01:46
Worklog Time Spent: 10m 
  Work Description: ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r582431043



##
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##
@@ -2225,17 +2224,11 @@ private void setupUDFJarOnHDFS(Path 
identityUdfLocalPath, Path identityUdfHdfsPa
   /*
* Method used from TestReplicationScenariosExclusiveReplica
*/
-  private void assertExternalFileInfo(List expected, String 
dumplocation, boolean isIncremental,
+  private void assertExternalFileList(List expected, String 
dumplocation,
   WarehouseInstance warehouseInstance)
   throws IOException {
 Path hivePath = new Path(dumplocation, ReplUtils.REPL_HIVE_BASE_DIR);
-Path metadataPath = new Path(hivePath, EximUtil.METADATA_PATH_NAME);
-Path externalTableInfoFile;
-if (isIncremental) {
-  externalTableInfoFile = new Path(hivePath, FILE_NAME);
-} else {
-  externalTableInfoFile = new Path(metadataPath, 
primaryDbName.toLowerCase() + File.separator + FILE_NAME);
-}
-ReplicationTestUtils.assertExternalFileInfo(warehouseInstance, expected, 
externalTableInfoFile);
+Path externalTblFileList = new Path(hivePath, EximUtil.FILE_LIST_EXTERNAL);

Review comment:
   Removing would cause code-duplication in most places, except in 
TestReplicationScenariosExternalTablesMetaDataOnly.java . Refactored the code 
there.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 557663)
Time Spent: 4h 20m  (was: 4h 10m)

> Moving to file based iteration for copying data
> ---
>
> Key: HIVE-24718
> URL: https://issues.apache.org/jira/browse/HIVE-24718
> Project: Hive
>  Issue Type: Bug
>Reporter: Arko Sharma
>Assignee: Arko Sharma
>Priority: Major
>  Labels: pull-request-available
> Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, 
> HIVE-24718.04.patch
>
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-02-24 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=557665&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-557665
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 25/Feb/21 01:46
Start Date: 25/Feb/21 01:46
Worklog Time Spent: 10m 
  Work Description: ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r582431197



##
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java
##
@@ -103,9 +102,9 @@ public void replicationWithoutExternalTables() throws 
Throwable {
 .run("insert into table t2 partition(country='france') values 
('paris')")
 .dump(primaryDbName, dumpWithClause);
 
-// the _external_tables_file info only should be created if external 
tables are to be replicated not otherwise
-assertFalse(primary.miniDFSCluster.getFileSystem()
-.exists(new Path(new Path(tuple.dumpLocation, 
primaryDbName.toLowerCase()), FILE_NAME)));
+// the _file_list_external only should be created if external tables are 
to be replicated not otherwise
+assertFalseExternalFileList(new Path(new Path(tuple.dumpLocation,

Review comment:
   Removing would cause code-duplication in most places, except in 
TestReplicationScenariosExternalTablesMetaDataOnly.java . Refactored the code 
there.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 557665)
Time Spent: 4.5h  (was: 4h 20m)

> Moving to file based iteration for copying data
> ---
>
> Key: HIVE-24718
> URL: https://issues.apache.org/jira/browse/HIVE-24718
> Project: Hive
>  Issue Type: Bug
>Reporter: Arko Sharma
>Assignee: Arko Sharma
>Priority: Major
>  Labels: pull-request-available
> Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, 
> HIVE-24718.04.patch
>
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-02-24 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=557661&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-557661
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 25/Feb/21 01:43
Start Date: 25/Feb/21 01:43
Worklog Time Spent: 10m 
  Work Description: ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r582430148



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##
@@ -18,158 +18,176 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = 
"file-list-streamer-";
-
-  private LinkedBlockingQueue cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
   private HiveConf conf;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
 this.backingFile = backingFile;
 this.conf = conf;
-if (cacheSize > 0) {
-  // Cache size must be > 0 for this list to be used for the write 
operation.
-  this.cache = new LinkedBlockingQueue<>(cacheSize);
-  fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-  thresholdPoint = getThreshold(cacheSize);
-  LOG.debug("File list backed by {} can be used for write operation.", 
backingFile);
-} else {
-  thresholdHit = true;
+this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+Retryable retryable = buildRetryable();
+try {
+  retryable.executeCallable((Callable) ()-> {
+synchronized (backingFile) {
+  try{
+if (backingFileWriter == null) {
+  backingFileWriter = initWriter();
+}
+backingFileWriter.writeBytes(getEntryWithNewline(entry));
+backingFileWriter.hflush();

Review comment:
   Currently we are checking for duplicate entries.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 557661)
Time Spent: 4h 10m  (was: 4h)

> Moving to file based iteration for copying data
> ---
>
> Key: HIVE-24718
> URL: https://issues.apache.org/jira/browse/HIVE-24718
> Project: Hive
>  Issue Type: Bug
>Reporter: Arko Sharma
>Assignee: Arko Sharma
>Priority: Major
>  Labels: pull-request-available
> Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, 
> HIVE-24718.04.patch
>
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=555925&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555925
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 22/Feb/21 18:02
Start Date: 22/Feb/21 18:02
Worklog Time Spent: 10m 
  Work Description: ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r580465458



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##
@@ -18,158 +18,176 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = 
"file-list-streamer-";
-
-  private LinkedBlockingQueue cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
   private HiveConf conf;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
 this.backingFile = backingFile;
 this.conf = conf;
-if (cacheSize > 0) {
-  // Cache size must be > 0 for this list to be used for the write 
operation.
-  this.cache = new LinkedBlockingQueue<>(cacheSize);
-  fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-  thresholdPoint = getThreshold(cacheSize);
-  LOG.debug("File list backed by {} can be used for write operation.", 
backingFile);
-} else {
-  thresholdHit = true;
+this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+Retryable retryable = buildRetryable();
+try {
+  retryable.executeCallable((Callable) ()-> {
+synchronized (backingFile) {
+  try{
+if (backingFileWriter == null) {
+  backingFileWriter = initWriter();
+}
+backingFileWriter.writeBytes(getEntryWithNewline(entry));
+backingFileWriter.hflush();

Review comment:
   This patch assumes that each flush operation either writes data 
completely (including ack) or does not write it at all. If this assumption 
changes, then we might need future investigation to determine the boundaries of 
each entry that needs to be written to the file. So currently checks for 
duplicate entries and invalid entries are not being done.
   
   Hflush guarantees that after successful return, every new reader would see 
the flushed data. This guarantee seems enough as we do not have reads during 
writes.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 555925)
Time Spent: 4h  (was: 3h 50m)

> Moving to file based iteration for copying data
> ---
>
> Key: HIVE-24718
> URL: https://issues.apache.org/jira/browse/HIVE-24718
> Project: Hive
>  Issue Type: Bug
>Reporter: Arko Sharma
>Assignee: Arko Sharma
>Priority: Major
>  Labels: pull-request-available
> Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, 
> HIVE-24718.04.patch
>
>  Time Spent: 4h
>  Remaini

[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=555850&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555850
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 22/Feb/21 15:05
Start Date: 22/Feb/21 15:05
Worklog Time Spent: 10m 
  Work Description: pkumarsinha commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r580321113



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##
@@ -18,158 +18,176 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = 
"file-list-streamer-";
-
-  private LinkedBlockingQueue cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
   private HiveConf conf;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
 this.backingFile = backingFile;
 this.conf = conf;
-if (cacheSize > 0) {
-  // Cache size must be > 0 for this list to be used for the write 
operation.
-  this.cache = new LinkedBlockingQueue<>(cacheSize);
-  fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-  thresholdPoint = getThreshold(cacheSize);
-  LOG.debug("File list backed by {} can be used for write operation.", 
backingFile);
-} else {
-  thresholdHit = true;
+this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+Retryable retryable = buildRetryable();
+try {
+  retryable.executeCallable((Callable) ()-> {
+synchronized (backingFile) {
+  try{
+if (backingFileWriter == null) {
+  backingFileWriter = initWriter();
+}
+backingFileWriter.writeBytes(getEntryWithNewline(entry));
+backingFileWriter.hflush();

Review comment:
   On hflush():
   a) AFAIK, this doesn't give guarantee that the data is written on the disk. 
What happens if all the data nodes goes down simultaneously? How does retry 
handle in that case?
   b) What happens when data is written but while receiving  ACK from data 
node(s), the connection breaks for some reason? Wouldn't we have duplicate 
entries in that case?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 555850)
Time Spent: 3h 50m  (was: 3h 40m)

> Moving to file based iteration for copying data
> ---
>
> Key: HIVE-24718
> URL: https://issues.apache.org/jira/browse/HIVE-24718
> Project: Hive
>  Issue Type: Bug
>Reporter: Arko Sharma
>Assignee: Arko Sharma
>Priority: Major
>  Labels: pull-request-available
> Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, 
> HIVE-24718.04.patch
>
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=555763&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555763
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 22/Feb/21 11:55
Start Date: 22/Feb/21 11:55
Worklog Time Spent: 10m 
  Work Description: pkumarsinha commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r580192079



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##
@@ -18,158 +18,176 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = 
"file-list-streamer-";
-
-  private LinkedBlockingQueue cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
   private HiveConf conf;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
 this.backingFile = backingFile;
 this.conf = conf;
-if (cacheSize > 0) {
-  // Cache size must be > 0 for this list to be used for the write 
operation.
-  this.cache = new LinkedBlockingQueue<>(cacheSize);
-  fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-  thresholdPoint = getThreshold(cacheSize);
-  LOG.debug("File list backed by {} can be used for write operation.", 
backingFile);
-} else {
-  thresholdHit = true;
+this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+Retryable retryable = buildRetryable();
+try {
+  retryable.executeCallable((Callable) ()-> {
+synchronized (backingFile) {
+  try{
+if (backingFileWriter == null) {
+  backingFileWriter = initWriter();
+}
+backingFileWriter.writeBytes(getEntryWithNewline(entry));
+backingFileWriter.hflush();
+LOG.info("Writing entry {} to file list backed by {}", entry, 
backingFile);
+  } catch (IOException e) {
+LOG.error("Writing entry {} to file list {} failed, attempting 
retry.", entry, backingFile, e);
+this.retryMode = true;
+close();
+throw e;
+  }
+}
+return null;
+  });
+} catch (Exception e) {
+  throw new 
IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
 }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, 
LinkedBlockingQueue cache, HiveConf conf) {
-this.backingFile = backingFile;
-this.fileListStreamer = fileListStreamer;
-this.cache = cache;
-this.conf = conf;
-thresholdPoint = getThreshold(cache.remainingCapacity());
+  Retryable buildRetryable() {
+return Retryable.builder()
+.withHiveConf(conf)
+.withRetryOnException(IOException.class).build();
   }
 
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
-  public void add(String entry) throws SemanticException {
-if (thresholdHit && !fileListStreamer.isAlive()) {
-  throw new SemanticException("List is not getting saved anymore to file " 
+ backingFile.toString());
+  // Return the entry ensuring it ends with newline.
+  private String getEntryWithNewline(String entry) {
+return new StringWriter()
+.append(entry)
+.append(System.lineSeparator())
+  

[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=555762&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555762
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 22/Feb/21 11:53
Start Date: 22/Feb/21 11:53
Worklog Time Spent: 10m 
  Work Description: pkumarsinha commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r580191067



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##
@@ -18,158 +18,176 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = 
"file-list-streamer-";
-
-  private LinkedBlockingQueue cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
   private HiveConf conf;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
 this.backingFile = backingFile;
 this.conf = conf;
-if (cacheSize > 0) {
-  // Cache size must be > 0 for this list to be used for the write 
operation.
-  this.cache = new LinkedBlockingQueue<>(cacheSize);
-  fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-  thresholdPoint = getThreshold(cacheSize);
-  LOG.debug("File list backed by {} can be used for write operation.", 
backingFile);
-} else {
-  thresholdHit = true;
+this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+Retryable retryable = buildRetryable();
+try {
+  retryable.executeCallable((Callable) ()-> {

Review comment:
   What I meant is, failure (retry exhaust case) in one thread would mean 
that entire dump operation has to fail. So success of each thread matters here, 
no? Are the other running tasks which are doing add operations getting 
interrupted if one of them fails?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 555762)
Time Spent: 3.5h  (was: 3h 20m)

> Moving to file based iteration for copying data
> ---
>
> Key: HIVE-24718
> URL: https://issues.apache.org/jira/browse/HIVE-24718
> Project: Hive
>  Issue Type: Bug
>Reporter: Arko Sharma
>Assignee: Arko Sharma
>Priority: Major
>  Labels: pull-request-available
> Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, 
> HIVE-24718.04.patch
>
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=555728&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555728
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 22/Feb/21 09:38
Start Date: 22/Feb/21 09:38
Worklog Time Spent: 10m 
  Work Description: ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r580104313



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##
@@ -18,158 +18,176 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = 
"file-list-streamer-";
-
-  private LinkedBlockingQueue cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
   private HiveConf conf;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
 this.backingFile = backingFile;
 this.conf = conf;
-if (cacheSize > 0) {
-  // Cache size must be > 0 for this list to be used for the write 
operation.
-  this.cache = new LinkedBlockingQueue<>(cacheSize);
-  fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-  thresholdPoint = getThreshold(cacheSize);
-  LOG.debug("File list backed by {} can be used for write operation.", 
backingFile);
-} else {
-  thresholdHit = true;
+this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+Retryable retryable = buildRetryable();
+try {
+  retryable.executeCallable((Callable) ()-> {
+synchronized (backingFile) {
+  try{
+if (backingFileWriter == null) {
+  backingFileWriter = initWriter();
+}
+backingFileWriter.writeBytes(getEntryWithNewline(entry));
+backingFileWriter.hflush();
+LOG.info("Writing entry {} to file list backed by {}", entry, 
backingFile);
+  } catch (IOException e) {
+LOG.error("Writing entry {} to file list {} failed, attempting 
retry.", entry, backingFile, e);
+this.retryMode = true;
+close();
+throw e;
+  }
+}
+return null;
+  });
+} catch (Exception e) {
+  throw new 
IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
 }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, 
LinkedBlockingQueue cache, HiveConf conf) {
-this.backingFile = backingFile;
-this.fileListStreamer = fileListStreamer;
-this.cache = cache;
-this.conf = conf;
-thresholdPoint = getThreshold(cache.remainingCapacity());
+  Retryable buildRetryable() {
+return Retryable.builder()
+.withHiveConf(conf)
+.withRetryOnException(IOException.class).build();
   }
 
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
-  public void add(String entry) throws SemanticException {
-if (thresholdHit && !fileListStreamer.isAlive()) {
-  throw new SemanticException("List is not getting saved anymore to file " 
+ backingFile.toString());
+  // Return the entry ensuring it ends with newline.
+  private String getEntryWithNewline(String entry) {
+return new StringWriter()
+.append(entry)
+.append(System.lineSeparator())
+   

[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=555727&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555727
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 22/Feb/21 09:37
Start Date: 22/Feb/21 09:37
Worklog Time Spent: 10m 
  Work Description: ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r580049385



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##
@@ -18,158 +18,176 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = 
"file-list-streamer-";
-
-  private LinkedBlockingQueue cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
   private HiveConf conf;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
 this.backingFile = backingFile;
 this.conf = conf;
-if (cacheSize > 0) {
-  // Cache size must be > 0 for this list to be used for the write 
operation.
-  this.cache = new LinkedBlockingQueue<>(cacheSize);
-  fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-  thresholdPoint = getThreshold(cacheSize);
-  LOG.debug("File list backed by {} can be used for write operation.", 
backingFile);
-} else {
-  thresholdHit = true;
+this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+Retryable retryable = buildRetryable();
+try {
+  retryable.executeCallable((Callable) ()-> {
+synchronized (backingFile) {
+  try{
+if (backingFileWriter == null) {
+  backingFileWriter = initWriter();
+}
+backingFileWriter.writeBytes(getEntryWithNewline(entry));
+backingFileWriter.hflush();
+LOG.info("Writing entry {} to file list backed by {}", entry, 
backingFile);
+  } catch (IOException e) {
+LOG.error("Writing entry {} to file list {} failed, attempting 
retry.", entry, backingFile, e);
+this.retryMode = true;
+close();
+throw e;
+  }
+}
+return null;
+  });
+} catch (Exception e) {
+  throw new 
IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
 }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, 
LinkedBlockingQueue cache, HiveConf conf) {
-this.backingFile = backingFile;
-this.fileListStreamer = fileListStreamer;
-this.cache = cache;
-this.conf = conf;
-thresholdPoint = getThreshold(cache.remainingCapacity());
+  Retryable buildRetryable() {
+return Retryable.builder()
+.withHiveConf(conf)
+.withRetryOnException(IOException.class).build();
   }
 
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
-  public void add(String entry) throws SemanticException {
-if (thresholdHit && !fileListStreamer.isAlive()) {
-  throw new SemanticException("List is not getting saved anymore to file " 
+ backingFile.toString());
+  // Return the entry ensuring it ends with newline.
+  private String getEntryWithNewline(String entry) {
+return new StringWriter()
+.append(entry)
+.append(System.lineSeparator())
+   

[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=555670&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555670
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 22/Feb/21 08:12
Start Date: 22/Feb/21 08:12
Worklog Time Spent: 10m 
  Work Description: ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r580051218



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
##
@@ -184,42 +190,86 @@ public void setResultValues(List resultValues) {
 this.resultValues = resultValues;
   }
 
-  public List> externalTableCopyTasks(TaskTracker tracker, HiveConf 
conf) {
+  public List> externalTableCopyTasks(TaskTracker tracker, HiveConf 
conf) throws IOException {
 if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY)) 
{
   return Collections.emptyList();
 }
 List> tasks = new ArrayList<>();
-while (externalTblCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) 
{
-  DirCopyWork dirCopyWork = new DirCopyWork(metricCollector, 
currentDumpPath.toString());
-  dirCopyWork.loadFromString(externalTblCopyPathIterator.next());
-  Task task = TaskFactory.get(dirCopyWork, conf);
-  tasks.add(task);
-  tracker.addTask(task);
-  LOG.debug("added task for {}", dirCopyWork);
+Retryable retryable = Retryable.builder()
+.withHiveConf(conf)
+.withRetryOnException(UncheckedIOException.class).build();
+try {
+  retryable.executeCallable((Callable) ()-> {
+try{
+  int numEntriesToSkip = tasks == null ? 0 : tasks.size();
+  while (externalTblCopyPathIterator.hasNext() &&  
tracker.canAddMoreTasks()) {
+if(numEntriesToSkip > 0) {
+  //skip tasks added in previous attempts of this retryable block
+  externalTblCopyPathIterator.next();
+  numEntriesToSkip--;
+  continue;
+}
+DirCopyWork dirCopyWork = new DirCopyWork(metricCollector, 
currentDumpPath.toString());
+dirCopyWork.loadFromString(externalTblCopyPathIterator.next());
+Task task = TaskFactory.get(dirCopyWork, conf);
+tasks.add(task);
+tracker.addTask(task);

Review comment:
   Also they are not added again during retires. This is done depending on 
the size of the 'tasks' list.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 555670)
Time Spent: 3h  (was: 2h 50m)

> Moving to file based iteration for copying data
> ---
>
> Key: HIVE-24718
> URL: https://issues.apache.org/jira/browse/HIVE-24718
> Project: Hive
>  Issue Type: Bug
>Reporter: Arko Sharma
>Assignee: Arko Sharma
>Priority: Major
>  Labels: pull-request-available
> Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, 
> HIVE-24718.04.patch
>
>  Time Spent: 3h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=555669&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555669
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 22/Feb/21 08:11
Start Date: 22/Feb/21 08:11
Worklog Time Spent: 10m 
  Work Description: ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r580050588



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
##
@@ -184,42 +190,86 @@ public void setResultValues(List resultValues) {
 this.resultValues = resultValues;
   }
 
-  public List> externalTableCopyTasks(TaskTracker tracker, HiveConf 
conf) {
+  public List> externalTableCopyTasks(TaskTracker tracker, HiveConf 
conf) throws IOException {
 if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY)) 
{
   return Collections.emptyList();
 }
 List> tasks = new ArrayList<>();
-while (externalTblCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) 
{
-  DirCopyWork dirCopyWork = new DirCopyWork(metricCollector, 
currentDumpPath.toString());
-  dirCopyWork.loadFromString(externalTblCopyPathIterator.next());
-  Task task = TaskFactory.get(dirCopyWork, conf);
-  tasks.add(task);
-  tracker.addTask(task);
-  LOG.debug("added task for {}", dirCopyWork);
+Retryable retryable = Retryable.builder()
+.withHiveConf(conf)
+.withRetryOnException(UncheckedIOException.class).build();
+try {
+  retryable.executeCallable((Callable) ()-> {
+try{
+  int numEntriesToSkip = tasks == null ? 0 : tasks.size();
+  while (externalTblCopyPathIterator.hasNext() &&  
tracker.canAddMoreTasks()) {
+if(numEntriesToSkip > 0) {
+  //skip tasks added in previous attempts of this retryable block
+  externalTblCopyPathIterator.next();
+  numEntriesToSkip--;
+  continue;
+}
+DirCopyWork dirCopyWork = new DirCopyWork(metricCollector, 
currentDumpPath.toString());
+dirCopyWork.loadFromString(externalTblCopyPathIterator.next());
+Task task = TaskFactory.get(dirCopyWork, conf);
+tasks.add(task);
+tracker.addTask(task);

Review comment:
   They remain in memory in the same list :  'tasks'.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 555669)
Time Spent: 2h 50m  (was: 2h 40m)

> Moving to file based iteration for copying data
> ---
>
> Key: HIVE-24718
> URL: https://issues.apache.org/jira/browse/HIVE-24718
> Project: Hive
>  Issue Type: Bug
>Reporter: Arko Sharma
>Assignee: Arko Sharma
>Priority: Major
>  Labels: pull-request-available
> Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, 
> HIVE-24718.04.patch
>
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=555668&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555668
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 22/Feb/21 08:10
Start Date: 22/Feb/21 08:10
Worklog Time Spent: 10m 
  Work Description: ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r580050377



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##
@@ -18,158 +18,176 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = 
"file-list-streamer-";
-
-  private LinkedBlockingQueue cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
   private HiveConf conf;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
 this.backingFile = backingFile;
 this.conf = conf;
-if (cacheSize > 0) {
-  // Cache size must be > 0 for this list to be used for the write 
operation.
-  this.cache = new LinkedBlockingQueue<>(cacheSize);
-  fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-  thresholdPoint = getThreshold(cacheSize);
-  LOG.debug("File list backed by {} can be used for write operation.", 
backingFile);
-} else {
-  thresholdHit = true;
+this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+Retryable retryable = buildRetryable();
+try {
+  retryable.executeCallable((Callable) ()-> {
+synchronized (backingFile) {
+  try{
+if (backingFileWriter == null) {
+  backingFileWriter = initWriter();
+}
+backingFileWriter.writeBytes(getEntryWithNewline(entry));
+backingFileWriter.hflush();
+LOG.info("Writing entry {} to file list backed by {}", entry, 
backingFile);
+  } catch (IOException e) {
+LOG.error("Writing entry {} to file list {} failed, attempting 
retry.", entry, backingFile, e);
+this.retryMode = true;
+close();
+throw e;
+  }
+}
+return null;
+  });
+} catch (Exception e) {
+  throw new 
IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
 }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, 
LinkedBlockingQueue cache, HiveConf conf) {
-this.backingFile = backingFile;
-this.fileListStreamer = fileListStreamer;
-this.cache = cache;
-this.conf = conf;
-thresholdPoint = getThreshold(cache.remainingCapacity());
+  Retryable buildRetryable() {
+return Retryable.builder()
+.withHiveConf(conf)
+.withRetryOnException(IOException.class).build();
   }
 
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
-  public void add(String entry) throws SemanticException {
-if (thresholdHit && !fileListStreamer.isAlive()) {
-  throw new SemanticException("List is not getting saved anymore to file " 
+ backingFile.toString());
+  // Return the entry ensuring it ends with newline.
+  private String getEntryWithNewline(String entry) {
+return new StringWriter()
+.append(entry)
+.append(System.lineSeparator())
+   

[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=555666&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555666
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 22/Feb/21 08:08
Start Date: 22/Feb/21 08:08
Worklog Time Spent: 10m 
  Work Description: ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r580049385



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##
@@ -18,158 +18,176 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = 
"file-list-streamer-";
-
-  private LinkedBlockingQueue cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
   private HiveConf conf;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
 this.backingFile = backingFile;
 this.conf = conf;
-if (cacheSize > 0) {
-  // Cache size must be > 0 for this list to be used for the write 
operation.
-  this.cache = new LinkedBlockingQueue<>(cacheSize);
-  fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-  thresholdPoint = getThreshold(cacheSize);
-  LOG.debug("File list backed by {} can be used for write operation.", 
backingFile);
-} else {
-  thresholdHit = true;
+this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+Retryable retryable = buildRetryable();
+try {
+  retryable.executeCallable((Callable) ()-> {
+synchronized (backingFile) {
+  try{
+if (backingFileWriter == null) {
+  backingFileWriter = initWriter();
+}
+backingFileWriter.writeBytes(getEntryWithNewline(entry));
+backingFileWriter.hflush();
+LOG.info("Writing entry {} to file list backed by {}", entry, 
backingFile);
+  } catch (IOException e) {
+LOG.error("Writing entry {} to file list {} failed, attempting 
retry.", entry, backingFile, e);
+this.retryMode = true;
+close();
+throw e;
+  }
+}
+return null;
+  });
+} catch (Exception e) {
+  throw new 
IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
 }
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, 
LinkedBlockingQueue cache, HiveConf conf) {
-this.backingFile = backingFile;
-this.fileListStreamer = fileListStreamer;
-this.cache = cache;
-this.conf = conf;
-thresholdPoint = getThreshold(cache.remainingCapacity());
+  Retryable buildRetryable() {
+return Retryable.builder()
+.withHiveConf(conf)
+.withRetryOnException(IOException.class).build();
   }
 
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
-  public void add(String entry) throws SemanticException {
-if (thresholdHit && !fileListStreamer.isAlive()) {
-  throw new SemanticException("List is not getting saved anymore to file " 
+ backingFile.toString());
+  // Return the entry ensuring it ends with newline.
+  private String getEntryWithNewline(String entry) {
+return new StringWriter()
+.append(entry)
+.append(System.lineSeparator())
+   

[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=555663&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555663
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 22/Feb/21 08:06
Start Date: 22/Feb/21 08:06
Worklog Time Spent: 10m 
  Work Description: ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r580048118



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##
@@ -18,158 +18,176 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = 
"file-list-streamer-";
-
-  private LinkedBlockingQueue cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
-  private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private final Path backingFile;
+  private String nextElement = null;
   private HiveConf conf;
+  private volatile boolean retryMode;
   private BufferedReader backingFileReader;
+  private volatile FSDataOutputStream backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
 this.backingFile = backingFile;
 this.conf = conf;
-if (cacheSize > 0) {
-  // Cache size must be > 0 for this list to be used for the write 
operation.
-  this.cache = new LinkedBlockingQueue<>(cacheSize);
-  fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-  thresholdPoint = getThreshold(cacheSize);
-  LOG.debug("File list backed by {} can be used for write operation.", 
backingFile);
-} else {
-  thresholdHit = true;
+this.retryMode = false;
+  }
+
+  public void add(String entry) throws IOException {
+Retryable retryable = buildRetryable();
+try {
+  retryable.executeCallable((Callable) ()-> {

Review comment:
   Yes, other threads are allowed to write.
   Failure in one thread itself doesn't imply failure in the other threads. 
This is because after failure, we close the output stream and reopen it for 
whichever thread is trying to do the next write.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 555663)
Time Spent: 2h 20m  (was: 2h 10m)

> Moving to file based iteration for copying data
> ---
>
> Key: HIVE-24718
> URL: https://issues.apache.org/jira/browse/HIVE-24718
> Project: Hive
>  Issue Type: Bug
>Reporter: Arko Sharma
>Assignee: Arko Sharma
>Priority: Major
>  Labels: pull-request-available
> Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, 
> HIVE-24718.04.patch
>
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=555661&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555661
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 22/Feb/21 08:00
Start Date: 22/Feb/21 08:00
Worklog Time Spent: 10m 
  Work Description: ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r580045609



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java
##
@@ -75,7 +77,9 @@ public String convertToString() {
 StringBuilder objInStr = new StringBuilder();
 objInStr.append(fullyQualifiedSourcePath)
 .append(URI_SEPARATOR)
-.append(fullyQualifiedTargetPath);
+.append(fullyQualifiedTargetPath)
+.append(URI_SEPARATOR)
+.append(tableName);

Review comment:
   In earlier version, source and target were the first and second entries. 
So introducing this change would still be consistent with any earlier dumps 
with that version.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 555661)
Time Spent: 2h 10m  (was: 2h)

> Moving to file based iteration for copying data
> ---
>
> Key: HIVE-24718
> URL: https://issues.apache.org/jira/browse/HIVE-24718
> Project: Hive
>  Issue Type: Bug
>Reporter: Arko Sharma
>Assignee: Arko Sharma
>Priority: Major
>  Labels: pull-request-available
> Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, 
> HIVE-24718.04.patch
>
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-02-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=555623&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555623
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 22/Feb/21 06:34
Start Date: 22/Feb/21 06:34
Worklog Time Spent: 10m 
  Work Description: pkumarsinha commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r579994523



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java
##
@@ -75,7 +77,9 @@ public String convertToString() {
 StringBuilder objInStr = new StringBuilder();
 objInStr.append(fullyQualifiedSourcePath)
 .append(URI_SEPARATOR)
-.append(fullyQualifiedTargetPath);
+.append(fullyQualifiedTargetPath)
+.append(URI_SEPARATOR)
+.append(tableName);

Review comment:
   Any specific reason for moving the tableName to the end?

##
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##
@@ -2225,17 +2224,11 @@ private void setupUDFJarOnHDFS(Path 
identityUdfLocalPath, Path identityUdfHdfsPa
   /*
* Method used from TestReplicationScenariosExclusiveReplica
*/
-  private void assertExternalFileInfo(List expected, String 
dumplocation, boolean isIncremental,
+  private void assertExternalFileList(List expected, String 
dumplocation,
   WarehouseInstance warehouseInstance)
   throws IOException {
 Path hivePath = new Path(dumplocation, ReplUtils.REPL_HIVE_BASE_DIR);
-Path metadataPath = new Path(hivePath, EximUtil.METADATA_PATH_NAME);
-Path externalTableInfoFile;
-if (isIncremental) {
-  externalTableInfoFile = new Path(hivePath, FILE_NAME);
-} else {
-  externalTableInfoFile = new Path(metadataPath, 
primaryDbName.toLowerCase() + File.separator + FILE_NAME);
-}
-ReplicationTestUtils.assertExternalFileInfo(warehouseInstance, expected, 
externalTableInfoFile);
+Path externalTblFileList = new Path(hivePath, EximUtil.FILE_LIST_EXTERNAL);

Review comment:
   Given that there is a fixed location for FILE_LIST_EXTERNAL in both 
incremental and bootstrap now. Do you really need this method any more?
   TestReplicationScenariosAcrossInstances.assertExternalFileList ?

##
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java
##
@@ -103,9 +102,9 @@ public void replicationWithoutExternalTables() throws 
Throwable {
 .run("insert into table t2 partition(country='france') values 
('paris')")
 .dump(primaryDbName, dumpWithClause);
 
-// the _external_tables_file info only should be created if external 
tables are to be replicated not otherwise
-assertFalse(primary.miniDFSCluster.getFileSystem()
-.exists(new Path(new Path(tuple.dumpLocation, 
primaryDbName.toLowerCase()), FILE_NAME)));
+// the _file_list_external only should be created if external tables are 
to be replicated not otherwise
+assertFalseExternalFileList(new Path(new Path(tuple.dumpLocation,

Review comment:
   Given that location for FILE_LIST_EXTERNAL is fixed for both bootstrap 
and incremental now. There are many methods not required anymore e.g. 
assertExternalFileList. Can you please check from this perspective and remove 
them off by doing a bit of refactoring.

##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##
@@ -18,158 +18,176 @@
 
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = 
"file-list-streamer-";
-
-  private LinkedBlockingQueue cache;
-  private volatile boole

[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-02-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=555496&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555496
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 21/Feb/21 20:34
Start Date: 21/Feb/21 20:34
Worklog Time Spent: 10m 
  Work Description: ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r579861655



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##
@@ -22,154 +22,118 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = 
"file-list-streamer-";
-
-  private LinkedBlockingQueue cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
   private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private String nextElement = null;
   private HiveConf conf;
   private BufferedReader backingFileReader;
+  private BufferedWriter backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
 this.backingFile = backingFile;
 this.conf = conf;
-if (cacheSize > 0) {
-  // Cache size must be > 0 for this list to be used for the write 
operation.
-  this.cache = new LinkedBlockingQueue<>(cacheSize);
-  fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-  thresholdPoint = getThreshold(cacheSize);
-  LOG.debug("File list backed by {} can be used for write operation.", 
backingFile);
-} else {
-  thresholdHit = true;
-}
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, 
LinkedBlockingQueue cache, HiveConf conf) {
-this.backingFile = backingFile;
-this.fileListStreamer = fileListStreamer;
-this.cache = cache;
-this.conf = conf;
-thresholdPoint = getThreshold(cache.remainingCapacity());
-  }
-
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
   public void add(String entry) throws SemanticException {
-if (thresholdHit && !fileListStreamer.isAlive()) {
-  throw new SemanticException("List is not getting saved anymore to file " 
+ backingFile.toString());
-}
+Retryable retryable = Retryable.builder()

Review comment:
   Currently it is used only in this add function, so creating it locally.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 555496)
Time Spent: 1h 50m  (was: 1h 40m)

> Moving to file based iteration for copying data
> ---
>
> Key: HIVE-24718
> URL: https://issues.apache.org/jira/browse/HIVE-24718
> Project: Hive
>  Issue Type: Bug
>Reporter: Arko Sharma
>Assignee: Arko Sharma
>Priority: Major
>  Labels: pull-request-available
> Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch
>
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-02-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=550602&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-550602
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 10/Feb/21 07:16
Start Date: 10/Feb/21 07:16
Worklog Time Spent: 10m 
  Work Description: pkumarsinha commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r573496104



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##
@@ -22,154 +22,118 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = 
"file-list-streamer-";
-
-  private LinkedBlockingQueue cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
   private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private String nextElement = null;
   private HiveConf conf;
   private BufferedReader backingFileReader;
+  private BufferedWriter backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
 this.backingFile = backingFile;
 this.conf = conf;
-if (cacheSize > 0) {
-  // Cache size must be > 0 for this list to be used for the write 
operation.
-  this.cache = new LinkedBlockingQueue<>(cacheSize);
-  fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-  thresholdPoint = getThreshold(cacheSize);
-  LOG.debug("File list backed by {} can be used for write operation.", 
backingFile);
-} else {
-  thresholdHit = true;
-}
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, 
LinkedBlockingQueue cache, HiveConf conf) {
-this.backingFile = backingFile;
-this.fileListStreamer = fileListStreamer;
-this.cache = cache;
-this.conf = conf;
-thresholdPoint = getThreshold(cache.remainingCapacity());
-  }
-
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
   public void add(String entry) throws SemanticException {
-if (thresholdHit && !fileListStreamer.isAlive()) {
-  throw new SemanticException("List is not getting saved anymore to file " 
+ backingFile.toString());
-}
+Retryable retryable = Retryable.builder()
+.withHiveConf(conf)
+.withRetryOnException(IOException.class).build();
 try {
-  cache.put(entry);
-} catch (InterruptedException e) {
-  throw new SemanticException(e);
-}
-if (!thresholdHit && cache.size() >= thresholdPoint) {
-  initStoreToFile(cache.size());
+  retryable.executeCallable((Callable) ()-> {
+synchronized (backingFile ) {

Review comment:
   Ok, the whole discussion was with the point that the write is going to 
be single threaded op. Agree, with multiple threads writing it makes sense. 
However, how does retry work in multiple thread case and is this ordering 
guarantee not impacted in retry case?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 550602)
Time Spent: 1h 40m  (was: 1.5h)

> Moving to file based iteration for copying data
> ---
>
> Key: HIVE-24718
> URL: https://issues.apache.org/jira/browse/HIVE-24718
> Project: Hive
>  Issue Type: Bug
>Reporter: Arko Sharma
>Assignee: Arko Sharma
>Priority: Major
>  

[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-02-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=550598&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-550598
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 10/Feb/21 07:02
Start Date: 10/Feb/21 07:02
Worklog Time Spent: 10m 
  Work Description: ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r573490313



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##
@@ -22,154 +22,118 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = 
"file-list-streamer-";
-
-  private LinkedBlockingQueue cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
   private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private String nextElement = null;
   private HiveConf conf;
   private BufferedReader backingFileReader;
+  private BufferedWriter backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
 this.backingFile = backingFile;
 this.conf = conf;
-if (cacheSize > 0) {
-  // Cache size must be > 0 for this list to be used for the write 
operation.
-  this.cache = new LinkedBlockingQueue<>(cacheSize);
-  fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-  thresholdPoint = getThreshold(cacheSize);
-  LOG.debug("File list backed by {} can be used for write operation.", 
backingFile);
-} else {
-  thresholdHit = true;
-}
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, 
LinkedBlockingQueue cache, HiveConf conf) {
-this.backingFile = backingFile;
-this.fileListStreamer = fileListStreamer;
-this.cache = cache;
-this.conf = conf;
-thresholdPoint = getThreshold(cache.remainingCapacity());
-  }
-
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
   public void add(String entry) throws SemanticException {
-if (thresholdHit && !fileListStreamer.isAlive()) {
-  throw new SemanticException("List is not getting saved anymore to file " 
+ backingFile.toString());
-}
+Retryable retryable = Retryable.builder()
+.withHiveConf(conf)
+.withRetryOnException(IOException.class).build();
 try {
-  cache.put(entry);
-} catch (InterruptedException e) {
-  throw new SemanticException(e);
-}
-if (!thresholdHit && cache.size() >= thresholdPoint) {
-  initStoreToFile(cache.size());
+  retryable.executeCallable((Callable) ()-> {
+synchronized (backingFile ) {

Review comment:
   During bootstrap dump of managed-table partitions, there could be 
multiple threads trying to access this writer. This number is governed by the 
conf REPL_PARTITIONS_DUMP_PARALLELISM. So, the order could be messed up.
   
   Earlier I was referring to the implementation of BufferedWriter, which makes 
write operation thread safe, meaning only one thread writes some value at a 
time. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 550598)
Time Spent: 1.5h  (was: 1h 20m)

> Moving to file based iteration for copying data
> ---
>
> Key: HIVE-24718
> URL: https://issues.apache.org/jira/browse/HIVE-24718
> Project: Hive
>   

[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-02-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=550590&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-550590
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 10/Feb/21 06:32
Start Date: 10/Feb/21 06:32
Worklog Time Spent: 10m 
  Work Description: pkumarsinha commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r573480027



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##
@@ -22,154 +22,118 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = 
"file-list-streamer-";
-
-  private LinkedBlockingQueue cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
   private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private String nextElement = null;
   private HiveConf conf;
   private BufferedReader backingFileReader;
+  private BufferedWriter backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
 this.backingFile = backingFile;
 this.conf = conf;
-if (cacheSize > 0) {
-  // Cache size must be > 0 for this list to be used for the write 
operation.
-  this.cache = new LinkedBlockingQueue<>(cacheSize);
-  fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-  thresholdPoint = getThreshold(cacheSize);
-  LOG.debug("File list backed by {} can be used for write operation.", 
backingFile);
-} else {
-  thresholdHit = true;
-}
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, 
LinkedBlockingQueue cache, HiveConf conf) {
-this.backingFile = backingFile;
-this.fileListStreamer = fileListStreamer;
-this.cache = cache;
-this.conf = conf;
-thresholdPoint = getThreshold(cache.remainingCapacity());
-  }
-
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
   public void add(String entry) throws SemanticException {
-if (thresholdHit && !fileListStreamer.isAlive()) {
-  throw new SemanticException("List is not getting saved anymore to file " 
+ backingFile.toString());
-}
+Retryable retryable = Retryable.builder()
+.withHiveConf(conf)
+.withRetryOnException(IOException.class).build();
 try {
-  cache.put(entry);
-} catch (InterruptedException e) {
-  throw new SemanticException(e);
-}
-if (!thresholdHit && cache.size() >= thresholdPoint) {
-  initStoreToFile(cache.size());
+  retryable.executeCallable((Callable) ()-> {
+synchronized (backingFile ) {

Review comment:
   If it is a single thread writing the entries, ordering wouldn't be 
messed up, no? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 550590)
Time Spent: 1h 20m  (was: 1h 10m)

> Moving to file based iteration for copying data
> ---
>
> Key: HIVE-24718
> URL: https://issues.apache.org/jira/browse/HIVE-24718
> Project: Hive
>  Issue Type: Bug
>Reporter: Arko Sharma
>Assignee: Arko Sharma
>Priority: Major
>  Labels: pull-request-available
> Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>




--
This m

[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-02-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=550576&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-550576
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 10/Feb/21 05:42
Start Date: 10/Feb/21 05:42
Worklog Time Spent: 10m 
  Work Description: ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r573463126



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##
@@ -22,154 +22,118 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = 
"file-list-streamer-";
-
-  private LinkedBlockingQueue cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
   private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private String nextElement = null;
   private HiveConf conf;
   private BufferedReader backingFileReader;
+  private BufferedWriter backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
 this.backingFile = backingFile;
 this.conf = conf;
-if (cacheSize > 0) {
-  // Cache size must be > 0 for this list to be used for the write 
operation.
-  this.cache = new LinkedBlockingQueue<>(cacheSize);
-  fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-  thresholdPoint = getThreshold(cacheSize);
-  LOG.debug("File list backed by {} can be used for write operation.", 
backingFile);
-} else {
-  thresholdHit = true;
-}
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, 
LinkedBlockingQueue cache, HiveConf conf) {
-this.backingFile = backingFile;
-this.fileListStreamer = fileListStreamer;
-this.cache = cache;
-this.conf = conf;
-thresholdPoint = getThreshold(cache.remainingCapacity());
-  }
-
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
   public void add(String entry) throws SemanticException {
-if (thresholdHit && !fileListStreamer.isAlive()) {
-  throw new SemanticException("List is not getting saved anymore to file " 
+ backingFile.toString());
-}
+Retryable retryable = Retryable.builder()
+.withHiveConf(conf)
+.withRetryOnException(IOException.class).build();
 try {
-  cache.put(entry);
-} catch (InterruptedException e) {
-  throw new SemanticException(e);
-}
-if (!thresholdHit && cache.size() >= thresholdPoint) {
-  initStoreToFile(cache.size());
+  retryable.executeCallable((Callable) ()-> {
+synchronized (backingFile ) {

Review comment:
   Printing the entry and writing the newline character are being done in 
separate statements. For two entries e1,e2, we need the order of writing to be 
e1,newline,e2,newline or e2,newline,e1,newline. This order would be guaranteed 
only if we use 'synchronized' block containing the two statements. Otherwise, 
we could have cases like e1,e2,newline,newline, which would cause errors.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 550576)
Time Spent: 1h 10m  (was: 1h)

> Moving to file based iteration for copying data
> ---
>
> Key: HIVE-24718
> URL: https://issues.apache.org/jira/browse/HIVE-24718
> Project: Hive
>  Is

[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-02-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=550573&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-550573
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 10/Feb/21 05:20
Start Date: 10/Feb/21 05:20
Worklog Time Spent: 10m 
  Work Description: pkumarsinha commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r573456193



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##
@@ -22,154 +22,118 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = 
"file-list-streamer-";
-
-  private LinkedBlockingQueue cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
   private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private String nextElement = null;
   private HiveConf conf;
   private BufferedReader backingFileReader;
+  private BufferedWriter backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
 this.backingFile = backingFile;
 this.conf = conf;
-if (cacheSize > 0) {
-  // Cache size must be > 0 for this list to be used for the write 
operation.
-  this.cache = new LinkedBlockingQueue<>(cacheSize);
-  fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-  thresholdPoint = getThreshold(cacheSize);
-  LOG.debug("File list backed by {} can be used for write operation.", 
backingFile);
-} else {
-  thresholdHit = true;
-}
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, 
LinkedBlockingQueue cache, HiveConf conf) {
-this.backingFile = backingFile;
-this.fileListStreamer = fileListStreamer;
-this.cache = cache;
-this.conf = conf;
-thresholdPoint = getThreshold(cache.remainingCapacity());
-  }
-
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
   public void add(String entry) throws SemanticException {
-if (thresholdHit && !fileListStreamer.isAlive()) {
-  throw new SemanticException("List is not getting saved anymore to file " 
+ backingFile.toString());
-}
+Retryable retryable = Retryable.builder()
+.withHiveConf(conf)
+.withRetryOnException(IOException.class).build();
 try {
-  cache.put(entry);
-} catch (InterruptedException e) {
-  throw new SemanticException(e);
-}
-if (!thresholdHit && cache.size() >= thresholdPoint) {
-  initStoreToFile(cache.size());
+  retryable.executeCallable((Callable) ()-> {
+synchronized (backingFile ) {

Review comment:
   Why do we need synchronization if it is just one thread writing? How 
does synchronizing it guarantees that atomicity (either or none) which you are 
expecting? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 550573)
Time Spent: 1h  (was: 50m)

> Moving to file based iteration for copying data
> ---
>
> Key: HIVE-24718
> URL: https://issues.apache.org/jira/browse/HIVE-24718
> Project: Hive
>  Issue Type: Bug
>Reporter: Arko Sharma
>Assignee: Arko Sharma
>Priority: Major
>  Labels: pull-request-available
> Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch
>

[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-02-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=550567&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-550567
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 10/Feb/21 05:04
Start Date: 10/Feb/21 05:04
Worklog Time Spent: 10m 
  Work Description: ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r573451500



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##
@@ -22,154 +22,118 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = 
"file-list-streamer-";
-
-  private LinkedBlockingQueue cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
   private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private String nextElement = null;
   private HiveConf conf;
   private BufferedReader backingFileReader;
+  private BufferedWriter backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
 this.backingFile = backingFile;
 this.conf = conf;
-if (cacheSize > 0) {
-  // Cache size must be > 0 for this list to be used for the write 
operation.
-  this.cache = new LinkedBlockingQueue<>(cacheSize);
-  fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-  thresholdPoint = getThreshold(cacheSize);
-  LOG.debug("File list backed by {} can be used for write operation.", 
backingFile);
-} else {
-  thresholdHit = true;
-}
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, 
LinkedBlockingQueue cache, HiveConf conf) {
-this.backingFile = backingFile;
-this.fileListStreamer = fileListStreamer;
-this.cache = cache;
-this.conf = conf;
-thresholdPoint = getThreshold(cache.remainingCapacity());
-  }
-
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
   public void add(String entry) throws SemanticException {
-if (thresholdHit && !fileListStreamer.isAlive()) {
-  throw new SemanticException("List is not getting saved anymore to file " 
+ backingFile.toString());
-}
+Retryable retryable = Retryable.builder()
+.withHiveConf(conf)
+.withRetryOnException(IOException.class).build();
 try {
-  cache.put(entry);
-} catch (InterruptedException e) {
-  throw new SemanticException(e);
-}
-if (!thresholdHit && cache.size() >= thresholdPoint) {
-  initStoreToFile(cache.size());
+  retryable.executeCallable((Callable) ()-> {
+synchronized (backingFile ) {

Review comment:
   1 thread can write at a moment but synchronisation is needed since we 
have two elements to write per entry. One being the entry and other is the 
newline. These two elements need to be written atomically.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 550567)
Time Spent: 50m  (was: 40m)

> Moving to file based iteration for copying data
> ---
>
> Key: HIVE-24718
> URL: https://issues.apache.org/jira/browse/HIVE-24718
> Project: Hive
>  Issue Type: Bug
>Reporter: Arko Sharma
>Assignee: Arko Sharma
>Priority: Major
>  Labels: pull-request-available
> Attachments:

[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-02-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=550566&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-550566
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 10/Feb/21 05:02
Start Date: 10/Feb/21 05:02
Worklog Time Spent: 10m 
  Work Description: ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r573450950



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##
@@ -22,154 +22,118 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = 
"file-list-streamer-";
-
-  private LinkedBlockingQueue cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
   private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private String nextElement = null;
   private HiveConf conf;
   private BufferedReader backingFileReader;
+  private BufferedWriter backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
 this.backingFile = backingFile;
 this.conf = conf;
-if (cacheSize > 0) {
-  // Cache size must be > 0 for this list to be used for the write 
operation.
-  this.cache = new LinkedBlockingQueue<>(cacheSize);
-  fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-  thresholdPoint = getThreshold(cacheSize);
-  LOG.debug("File list backed by {} can be used for write operation.", 
backingFile);
-} else {
-  thresholdHit = true;
-}
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, 
LinkedBlockingQueue cache, HiveConf conf) {
-this.backingFile = backingFile;
-this.fileListStreamer = fileListStreamer;
-this.cache = cache;
-this.conf = conf;
-thresholdPoint = getThreshold(cache.remainingCapacity());
-  }
-
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
   public void add(String entry) throws SemanticException {
-if (thresholdHit && !fileListStreamer.isAlive()) {
-  throw new SemanticException("List is not getting saved anymore to file " 
+ backingFile.toString());
-}
+Retryable retryable = Retryable.builder()
+.withHiveConf(conf)
+.withRetryOnException(IOException.class).build();
 try {
-  cache.put(entry);
-} catch (InterruptedException e) {
-  throw new SemanticException(e);
-}
-if (!thresholdHit && cache.size() >= thresholdPoint) {
-  initStoreToFile(cache.size());
+  retryable.executeCallable((Callable) ()-> {
+synchronized (backingFile ) {
+  if (backingFileWriter == null) {
+backingFileWriter = initWriter();
+  }
+  backingFileWriter.write(entry);
+  backingFileWriter.newLine();
+}
+LOG.info("Writing entry {} to file list backed by {}", entry, 
backingFile);
+return null;
+  });
+} catch (Exception e) {
+  throw new 
SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage(),
+  String.valueOf(ErrorMsg.getErrorMsg(e).getErrorCode(;
 }
   }
 
+  BufferedWriter initWriter() throws IOException {
+FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
+return new BufferedWriter(new OutputStreamWriter(fs.create(backingFile)));
+  }
+
   @Override
   public boolean hasNext() {
-if (!thresholdHit) {
-  return (cache != null && !cache.isEmpty());
-}
 if (nextElement != null) {
   return true;
+} else {
+  try {
+nextElement = readNextLine();
+return (nextElement != null);
+  } catch (IOException e) {
+throw new UncheckedIOException(e);
+

[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data

2021-02-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=550473&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-550473
 ]

ASF GitHub Bot logged work on HIVE-24718:
-

Author: ASF GitHub Bot
Created on: 09/Feb/21 23:05
Start Date: 09/Feb/21 23:05
Worklog Time Spent: 10m 
  Work Description: pkumarsinha commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r573292875



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##
@@ -22,154 +22,118 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = 
"file-list-streamer-";
-
-  private LinkedBlockingQueue cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
   private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private String nextElement;
-  private boolean noMoreElement;
+  private String nextElement = null;
   private HiveConf conf;
   private BufferedReader backingFileReader;
+  private BufferedWriter backingFileWriter;
 
-
-  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+  public FileList(Path backingFile, HiveConf conf) {
 this.backingFile = backingFile;
 this.conf = conf;
-if (cacheSize > 0) {
-  // Cache size must be > 0 for this list to be used for the write 
operation.
-  this.cache = new LinkedBlockingQueue<>(cacheSize);
-  fileListStreamer = new FileListStreamer(cache, backingFile, conf);
-  thresholdPoint = getThreshold(cacheSize);
-  LOG.debug("File list backed by {} can be used for write operation.", 
backingFile);
-} else {
-  thresholdHit = true;
-}
   }
 
-  @VisibleForTesting
-  FileList(Path backingFile, FileListStreamer fileListStreamer, 
LinkedBlockingQueue cache, HiveConf conf) {
-this.backingFile = backingFile;
-this.fileListStreamer = fileListStreamer;
-this.cache = cache;
-this.conf = conf;
-thresholdPoint = getThreshold(cache.remainingCapacity());
-  }
-
-  /**
-   * Only add operation is safe for concurrent operations.
-   */
   public void add(String entry) throws SemanticException {
-if (thresholdHit && !fileListStreamer.isAlive()) {
-  throw new SemanticException("List is not getting saved anymore to file " 
+ backingFile.toString());
-}
+Retryable retryable = Retryable.builder()

Review comment:
   Can we create this just one per FileList?

##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##
@@ -22,154 +22,118 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
 public class FileList implements AutoCloseable, Iterator {
   private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
-  private static int fileListStreamerID = 0;
-  private static final String  FILE_LIST_STREAMER_PREFIX = 
"file-list-streamer-";
-
-  private LinkedBlockingQueue cache;
-  private volatile boolean thresholdHit = false;
-  private int thresholdPoint;
-  private float thresholdFactor = 0.9f;
   private Path backingFile;
-  private FileListStreamer fileListStreamer;
-  private St