[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=568132&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-568132 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 18/Mar/21 07:05 Start Date: 18/Mar/21 07:05 Worklog Time Spent: 10m Work Description: pkumarsinha merged pull request #1936: URL: https://github.com/apache/hive/pull/1936 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 568132) Time Spent: 6h 40m (was: 6.5h) > Moving to file based iteration for copying data > --- > > Key: HIVE-24718 > URL: https://issues.apache.org/jira/browse/HIVE-24718 > Project: Hive > Issue Type: Bug >Reporter: Arko Sharma >Assignee: Arko Sharma >Priority: Major > Labels: pull-request-available > Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, > HIVE-24718.04.patch, HIVE-24718.05.patch, HIVE-24718.06.patch > > Time Spent: 6h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=566051&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-566051 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 15/Mar/21 07:39 Start Date: 15/Mar/21 07:39 Worklog Time Spent: 10m Work Description: ArkoSharma commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r594106033 ## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java ## @@ -2225,17 +2224,11 @@ private void setupUDFJarOnHDFS(Path identityUdfLocalPath, Path identityUdfHdfsPa /* * Method used from TestReplicationScenariosExclusiveReplica */ - private void assertExternalFileInfo(List expected, String dumplocation, boolean isIncremental, + private void assertExternalFileList(List expected, String dumplocation, WarehouseInstance warehouseInstance) throws IOException { Path hivePath = new Path(dumplocation, ReplUtils.REPL_HIVE_BASE_DIR); -Path metadataPath = new Path(hivePath, EximUtil.METADATA_PATH_NAME); -Path externalTableInfoFile; -if (isIncremental) { - externalTableInfoFile = new Path(hivePath, FILE_NAME); -} else { - externalTableInfoFile = new Path(metadataPath, primaryDbName.toLowerCase() + File.separator + FILE_NAME); -} -ReplicationTestUtils.assertExternalFileInfo(warehouseInstance, expected, externalTableInfoFile); +Path externalTblFileList = new Path(hivePath, EximUtil.FILE_LIST_EXTERNAL); Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 566051) Time Spent: 6.5h (was: 6h 20m) > Moving to file based iteration for copying data > --- > > Key: HIVE-24718 > URL: https://issues.apache.org/jira/browse/HIVE-24718 > Project: Hive > Issue Type: Bug >Reporter: Arko Sharma >Assignee: Arko Sharma >Priority: Major > Labels: pull-request-available > Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, > HIVE-24718.04.patch, HIVE-24718.05.patch, HIVE-24718.06.patch > > Time Spent: 6.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=566050&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-566050 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 15/Mar/21 07:39 Start Date: 15/Mar/21 07:39 Worklog Time Spent: 10m Work Description: ArkoSharma commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r594105835 ## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java ## @@ -639,9 +629,11 @@ public void testIncrementalDumpEmptyDumpDirectory() throws Throwable { .verifyResult(inc2Tuple.lastReplicationId); } - private void assertFalseExternalFileList(Path externalTableFileList) - throws IOException { + private void assertFalseExternalFileList(String dumpLocation) Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 566050) Time Spent: 6h 20m (was: 6h 10m) > Moving to file based iteration for copying data > --- > > Key: HIVE-24718 > URL: https://issues.apache.org/jira/browse/HIVE-24718 > Project: Hive > Issue Type: Bug >Reporter: Arko Sharma >Assignee: Arko Sharma >Priority: Major > Labels: pull-request-available > Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, > HIVE-24718.04.patch, HIVE-24718.05.patch, HIVE-24718.06.patch > > Time Spent: 6h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=566049&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-566049 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 15/Mar/21 07:38 Start Date: 15/Mar/21 07:38 Worklog Time Spent: 10m Work Description: ArkoSharma commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r594105742 ## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java ## @@ -639,9 +629,11 @@ public void testIncrementalDumpEmptyDumpDirectory() throws Throwable { .verifyResult(inc2Tuple.lastReplicationId); } - private void assertFalseExternalFileList(Path externalTableFileList) - throws IOException { + private void assertFalseExternalFileList(String dumpLocation) + throws IOException { Review comment: getFileSystem() call throws IOException. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 566049) Time Spent: 6h 10m (was: 6h) > Moving to file based iteration for copying data > --- > > Key: HIVE-24718 > URL: https://issues.apache.org/jira/browse/HIVE-24718 > Project: Hive > Issue Type: Bug >Reporter: Arko Sharma >Assignee: Arko Sharma >Priority: Major > Labels: pull-request-available > Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, > HIVE-24718.04.patch, HIVE-24718.05.patch, HIVE-24718.06.patch > > Time Spent: 6h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=564628&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-564628 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 11/Mar/21 14:25 Start Date: 11/Mar/21 14:25 Worklog Time Spent: 10m Work Description: pkumarsinha commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r592395172 ## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java ## @@ -639,9 +629,11 @@ public void testIncrementalDumpEmptyDumpDirectory() throws Throwable { .verifyResult(inc2Tuple.lastReplicationId); } - private void assertFalseExternalFileList(Path externalTableFileList) - throws IOException { + private void assertFalseExternalFileList(String dumpLocation) Review comment: Can you please move this method to ReplicationTestUtils itself. We have duplicate code for this method on two classes. ## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java ## @@ -639,9 +629,11 @@ public void testIncrementalDumpEmptyDumpDirectory() throws Throwable { .verifyResult(inc2Tuple.lastReplicationId); } - private void assertFalseExternalFileList(Path externalTableFileList) - throws IOException { + private void assertFalseExternalFileList(String dumpLocation) + throws IOException { Review comment: I think it doesn't throw IOException ## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java ## @@ -2225,17 +2224,11 @@ private void setupUDFJarOnHDFS(Path identityUdfLocalPath, Path identityUdfHdfsPa /* * Method used from TestReplicationScenariosExclusiveReplica */ - private void assertExternalFileInfo(List expected, String dumplocation, boolean isIncremental, + private void assertExternalFileList(List expected, String dumplocation, WarehouseInstance warehouseInstance) throws IOException { Path hivePath = new Path(dumplocation, ReplUtils.REPL_HIVE_BASE_DIR); -Path metadataPath = new Path(hivePath, EximUtil.METADATA_PATH_NAME); -Path externalTableInfoFile; -if (isIncremental) { - externalTableInfoFile = new Path(hivePath, FILE_NAME); -} else { - externalTableInfoFile = new Path(metadataPath, primaryDbName.toLowerCase() + File.separator + FILE_NAME); -} -ReplicationTestUtils.assertExternalFileInfo(warehouseInstance, expected, externalTableInfoFile); +Path externalTblFileList = new Path(hivePath, EximUtil.FILE_LIST_EXTERNAL); Review comment: This is still not addressed. The same method(and code) is defined in three classes. assertExternalFileList. And essentially they aren't doing more than the path formation. As discussed, can we not use the ReplicationTestUtils.assertExternalFileList directly by modifying the signature a bit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 564628) Time Spent: 6h (was: 5h 50m) > Moving to file based iteration for copying data > --- > > Key: HIVE-24718 > URL: https://issues.apache.org/jira/browse/HIVE-24718 > Project: Hive > Issue Type: Bug >Reporter: Arko Sharma >Assignee: Arko Sharma >Priority: Major > Labels: pull-request-available > Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, > HIVE-24718.04.patch, HIVE-24718.05.patch, HIVE-24718.06.patch > > Time Spent: 6h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=563490&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-563490 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 10/Mar/21 04:33 Start Date: 10/Mar/21 04:33 Worklog Time Spent: 10m Work Description: ArkoSharma commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r591024045 ## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ## @@ -18,158 +18,225 @@ package org.apache.hadoop.hive.ql.exec.repl.util; -import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.io.StringWriter; +import java.io.UncheckedIOException; import java.util.Iterator; import java.util.NoSuchElementException; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Callable; - -/** - * A file backed list of Strings which is in-memory till the threshold. - */ public class FileList implements AutoCloseable, Iterator { private static final Logger LOG = LoggerFactory.getLogger(FileList.class); - private static int fileListStreamerID = 0; - private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; - - private LinkedBlockingQueue cache; - private volatile boolean thresholdHit = false; - private int thresholdPoint; - private float thresholdFactor = 0.9f; - private Path backingFile; - private FileListStreamer fileListStreamer; - private String nextElement; - private boolean noMoreElement; + private final Path backingFile; + private String nextElement = null; + private String lastReadElement = null; private HiveConf conf; + private volatile boolean abortOperation = false; + private volatile boolean retryMode; private BufferedReader backingFileReader; + private volatile FSDataOutputStream backingFileWriter; - - public FileList(Path backingFile, int cacheSize, HiveConf conf) { + public FileList(Path backingFile, HiveConf conf) { this.backingFile = backingFile; this.conf = conf; -if (cacheSize > 0) { - // Cache size must be > 0 for this list to be used for the write operation. - this.cache = new LinkedBlockingQueue<>(cacheSize); - fileListStreamer = new FileListStreamer(cache, backingFile, conf); - thresholdPoint = getThreshold(cacheSize); - LOG.debug("File list backed by {} can be used for write operation.", backingFile); +this.retryMode = false; + } + + public void add(String entry) throws IOException { +if (conf.getBoolVar(HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY)) { + writeWithRetry(entry); } else { - thresholdHit = true; + writeEntry(entry); } } - @VisibleForTesting - FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue cache, HiveConf conf) { -this.backingFile = backingFile; -this.fileListStreamer = fileListStreamer; -this.cache = cache; -this.conf = conf; -thresholdPoint = getThreshold(cache.remainingCapacity()); + private synchronized void writeEntry(String entry) throws IOException { +//retry only during creating the file, no retry during writes +if (backingFileWriter == null) { + try { +Retryable retryable = buildRetryable(); +retryable.executeCallable((Callable) () -> { + if(this.abortOperation) { +return null; + } + backingFileWriter = getWriterCreateMode(); + return null; +}); + } catch (Exception e) { +this.abortOperation = true; +throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage())); + } +} +if(this.abortOperation) { + return; +} +try { + backingFileWriter.writeBytes(getEntryWithNewline(entry)); + LOG.info("Writing entry {} to file list backed by {}", entry, backingFile); +} catch (IOException e) { + this.abortOperation = true; + LOG.error("Writing entry {} to file list {} failed.", entry, backingFile, e); + throw e; +} } - /** - * Only add operation is safe for concurrent operations. - */ - public void add(String entry) throws SemanticException { -if (thresholdHit && !fileListStreamer.isAlive()) { - throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString()); + priv
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=559852&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-559852 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 02/Mar/21 08:02 Start Date: 02/Mar/21 08:02 Worklog Time Spent: 10m Work Description: pkumarsinha commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r585302812 ## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java ## @@ -1836,6 +1837,64 @@ public void testHdfsNameserviceWithDataCopy() throws Throwable { .verifyResults(new String[]{"2", "3"}); } + @Test + public void testReplWithRetryDisabledIterators() throws Throwable { +List clause = new ArrayList<>(); +//NS replacement parameters has no effect when data is also copied to staging +clause.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET + "'='false'"); +clause.add("'" + HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY + "'='false'"); +WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) +.run("create table acid_table (key int, value int) partitioned by (load_date date) " + +"clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')") +.run("create table table1 (i String)") +.run("insert into table1 values (1)") +.run("insert into table1 values (2)") +.dump(primaryDbName, clause); +assertFalseExternalFileList(new Path(new Path(tuple.dumpLocation, Review comment: nit: If you just pass dumpLocation to the method, and do the path creation inside the method, this would look clean. Anyway the method assertFalseExternalFileList isn't doing much. So, alternatively, you can do the fs.exist() write there and get rid of method. ## File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ## @@ -653,6 +649,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal new TimeValidator(TimeUnit.HOURS), "Total allowed retry duration in hours inclusive of all retries. Once this is exhausted, " + "the policy instance will be marked as failed and will need manual intervention to restart."), +REPL_COPY_ITERATOR_RETRY("hive.repl.copy.iterator.retry", true, Review comment: REPL_COPY_FILE_LIST_ITERATOR_RETRY ? ## File path: ql/src/test/org/apache/hadoop/hive/ql/exec/repl/util/TestFileList.java ## @@ -18,147 +18,266 @@ package org.apache.hadoop.hive.ql.exec.repl.util; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.util.Retryable; +import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; -import org.mockito.Mock; import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; import org.slf4j.LoggerFactory; -import java.io.BufferedWriter; +import java.io.File; +import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - - /** * Tests the File List implementation. */ -@RunWith(PowerMockRunner.class) +@RunWith(MockitoJUnitRunner.class) @PrepareForTest({LoggerFactory.class}) public class TestFileList { - @Mock - private BufferedWriter bufferedWriter; - - - @Test - public void testNoStreaming() throws Exception { -Object tuple[] = setupAndGetTuple(100, false); -FileList fileList = (FileList) tuple[0]; -FileListStreamer fileListStreamer = (FileListStreamer) tuple[1]; -fileList.add("Entry1"); -fileList.add("Entry2"); -assertFalse(isStreamingToFile(fileListStreamer)); - } + HiveConf conf = new HiveConf(); + private FSDataOutputStream outStream; + private FSDataOutputStream testFileStream; + final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + + File.separator + TestFileList.class.getCanonicalName() + "-" + System.currentTimeMillis() + ).getPath().replaceAll("", "/"); + private Exception testException = new IOException("test"); @Test - public void testAlwaysStreaming() throws Exception { -Object tuple[] = setupAndGetTuple(100, true); -Fil
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=559727&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-559727 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 02/Mar/21 03:27 Start Date: 02/Mar/21 03:27 Worklog Time Spent: 10m Work Description: ArkoSharma commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r585220794 ## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java ## @@ -1836,6 +1835,60 @@ public void testHdfsNameserviceWithDataCopy() throws Throwable { .verifyResults(new String[]{"2", "3"}); } + @Test + public void testReplWithRetryDisabledIterators() throws Throwable { +List clause = new ArrayList<>(); +//NS replacement parameters has no effect when data is also copied to staging +clause.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET + "'='false'"); +clause.add("'" + HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY + "'='false'"); +primary.run("use " + primaryDbName) +.run("create table acid_table (key int, value int) partitioned by (load_date date) " + +"clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')") +.run("create table table1 (i String)") +.run("insert into table1 values (1)") +.run("insert into table1 values (2)") +.dump(primaryDbName, clause); Review comment: Done. ## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java ## @@ -1836,6 +1835,60 @@ public void testHdfsNameserviceWithDataCopy() throws Throwable { .verifyResults(new String[]{"2", "3"}); } + @Test + public void testReplWithRetryDisabledIterators() throws Throwable { +List clause = new ArrayList<>(); +//NS replacement parameters has no effect when data is also copied to staging +clause.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET + "'='false'"); +clause.add("'" + HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY + "'='false'"); +primary.run("use " + primaryDbName) +.run("create table acid_table (key int, value int) partitioned by (load_date date) " + +"clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')") +.run("create table table1 (i String)") +.run("insert into table1 values (1)") +.run("insert into table1 values (2)") +.dump(primaryDbName, clause); +replica.load(replicatedDbName, primaryDbName, clause) +.run("use " + replicatedDbName) +.run("show tables") +.verifyResults(new String[] {"acid_table", "table1"}) +.run("select * from table1") +.verifyResults(new String[] {"1", "2"}); + +primary.run("use " + primaryDbName) +.run("insert into table1 values (3)") +.dump(primaryDbName, clause); +replica.load(replicatedDbName, primaryDbName, clause) +.run("use " + replicatedDbName) +.run("show tables") +.verifyResults(new String[]{"acid_table", "table1"}) +.run("select * from table1") +.verifyResults(new String[]{"1", "2", "3"}); + +clause.add("'" + HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname + "'='false'"); +primary.run("use " + primaryDbName) +.run("create external table ext_table1 (id int)") +.run("insert into ext_table1 values (3)") +.run("insert into ext_table1 values (4)") +.run("create external table ext_table2 (key int, value int) partitioned by (load_time timestamp)") +.run("insert into ext_table2 partition(load_time = '2012-02-21 07:08:09.123') values(1,2)") +.run("insert into ext_table2 partition(load_time = '2012-02-21 07:08:09.124') values(1,3)") +.run("show partitions ext_table2") +.verifyResults(new String[]{ +"load_time=2012-02-21 07%3A08%3A09.123", +"load_time=2012-02-21 07%3A08%3A09.124"}) +.dump(primaryDbName, clause); Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 559727) Time Spent: 5.5h (was: 5h 20m) > Moving to file based iteration for copying data >
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=559689&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-559689 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 02/Mar/21 01:37 Start Date: 02/Mar/21 01:37 Worklog Time Spent: 10m Work Description: ArkoSharma commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r584584623 ## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ## @@ -18,158 +18,225 @@ package org.apache.hadoop.hive.ql.exec.repl.util; -import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.io.StringWriter; +import java.io.UncheckedIOException; import java.util.Iterator; import java.util.NoSuchElementException; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Callable; - -/** - * A file backed list of Strings which is in-memory till the threshold. - */ public class FileList implements AutoCloseable, Iterator { private static final Logger LOG = LoggerFactory.getLogger(FileList.class); - private static int fileListStreamerID = 0; - private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; - - private LinkedBlockingQueue cache; - private volatile boolean thresholdHit = false; - private int thresholdPoint; - private float thresholdFactor = 0.9f; - private Path backingFile; - private FileListStreamer fileListStreamer; - private String nextElement; - private boolean noMoreElement; + private final Path backingFile; + private String nextElement = null; + private String lastReadElement = null; private HiveConf conf; + private volatile boolean abortOperation = false; + private volatile boolean retryMode; private BufferedReader backingFileReader; + private volatile FSDataOutputStream backingFileWriter; - - public FileList(Path backingFile, int cacheSize, HiveConf conf) { + public FileList(Path backingFile, HiveConf conf) { this.backingFile = backingFile; this.conf = conf; -if (cacheSize > 0) { - // Cache size must be > 0 for this list to be used for the write operation. - this.cache = new LinkedBlockingQueue<>(cacheSize); - fileListStreamer = new FileListStreamer(cache, backingFile, conf); - thresholdPoint = getThreshold(cacheSize); - LOG.debug("File list backed by {} can be used for write operation.", backingFile); +this.retryMode = false; + } + + public void add(String entry) throws IOException { +if (conf.getBoolVar(HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY)) { + writeWithRetry(entry); } else { - thresholdHit = true; + writeEntry(entry); } } - @VisibleForTesting - FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue cache, HiveConf conf) { -this.backingFile = backingFile; -this.fileListStreamer = fileListStreamer; -this.cache = cache; -this.conf = conf; -thresholdPoint = getThreshold(cache.remainingCapacity()); + private synchronized void writeEntry(String entry) throws IOException { +//retry only during creating the file, no retry during writes +if (backingFileWriter == null) { + try { +Retryable retryable = buildRetryable(); +retryable.executeCallable((Callable) () -> { + if(this.abortOperation) { +return null; + } + backingFileWriter = getWriterCreateMode(); + return null; +}); + } catch (Exception e) { +this.abortOperation = true; +throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage())); + } +} +if(this.abortOperation) { + return; +} +try { + backingFileWriter.writeBytes(getEntryWithNewline(entry)); + LOG.info("Writing entry {} to file list backed by {}", entry, backingFile); +} catch (IOException e) { + this.abortOperation = true; + LOG.error("Writing entry {} to file list {} failed.", entry, backingFile, e); + throw e; +} } - /** - * Only add operation is safe for concurrent operations. - */ - public void add(String entry) throws SemanticException { -if (thresholdHit && !fileListStreamer.isAlive()) { - throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString()); + priv
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=559293&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-559293 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 01/Mar/21 10:07 Start Date: 01/Mar/21 10:07 Worklog Time Spent: 10m Work Description: ArkoSharma commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r584585551 ## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java ## @@ -1836,6 +1835,60 @@ public void testHdfsNameserviceWithDataCopy() throws Throwable { .verifyResults(new String[]{"2", "3"}); } + @Test + public void testReplWithRetryDisabledIterators() throws Throwable { Review comment: No, this is just to have a case with replication working with this conf=false. To confirm retry is not happening, the test "testWriteNoRetry" is added in TestFileList class. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 559293) Time Spent: 5h 10m (was: 5h) > Moving to file based iteration for copying data > --- > > Key: HIVE-24718 > URL: https://issues.apache.org/jira/browse/HIVE-24718 > Project: Hive > Issue Type: Bug >Reporter: Arko Sharma >Assignee: Arko Sharma >Priority: Major > Labels: pull-request-available > Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, > HIVE-24718.04.patch, HIVE-24718.05.patch > > Time Spent: 5h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=559292&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-559292 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 01/Mar/21 10:05 Start Date: 01/Mar/21 10:05 Worklog Time Spent: 10m Work Description: ArkoSharma commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r584584623 ## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ## @@ -18,158 +18,225 @@ package org.apache.hadoop.hive.ql.exec.repl.util; -import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.io.StringWriter; +import java.io.UncheckedIOException; import java.util.Iterator; import java.util.NoSuchElementException; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Callable; - -/** - * A file backed list of Strings which is in-memory till the threshold. - */ public class FileList implements AutoCloseable, Iterator { private static final Logger LOG = LoggerFactory.getLogger(FileList.class); - private static int fileListStreamerID = 0; - private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; - - private LinkedBlockingQueue cache; - private volatile boolean thresholdHit = false; - private int thresholdPoint; - private float thresholdFactor = 0.9f; - private Path backingFile; - private FileListStreamer fileListStreamer; - private String nextElement; - private boolean noMoreElement; + private final Path backingFile; + private String nextElement = null; + private String lastReadElement = null; private HiveConf conf; + private volatile boolean abortOperation = false; + private volatile boolean retryMode; private BufferedReader backingFileReader; + private volatile FSDataOutputStream backingFileWriter; - - public FileList(Path backingFile, int cacheSize, HiveConf conf) { + public FileList(Path backingFile, HiveConf conf) { this.backingFile = backingFile; this.conf = conf; -if (cacheSize > 0) { - // Cache size must be > 0 for this list to be used for the write operation. - this.cache = new LinkedBlockingQueue<>(cacheSize); - fileListStreamer = new FileListStreamer(cache, backingFile, conf); - thresholdPoint = getThreshold(cacheSize); - LOG.debug("File list backed by {} can be used for write operation.", backingFile); +this.retryMode = false; + } + + public void add(String entry) throws IOException { +if (conf.getBoolVar(HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY)) { + writeWithRetry(entry); } else { - thresholdHit = true; + writeEntry(entry); } } - @VisibleForTesting - FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue cache, HiveConf conf) { -this.backingFile = backingFile; -this.fileListStreamer = fileListStreamer; -this.cache = cache; -this.conf = conf; -thresholdPoint = getThreshold(cache.remainingCapacity()); + private synchronized void writeEntry(String entry) throws IOException { +//retry only during creating the file, no retry during writes +if (backingFileWriter == null) { + try { +Retryable retryable = buildRetryable(); +retryable.executeCallable((Callable) () -> { + if(this.abortOperation) { +return null; + } + backingFileWriter = getWriterCreateMode(); + return null; +}); + } catch (Exception e) { +this.abortOperation = true; +throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage())); + } +} +if(this.abortOperation) { + return; +} +try { + backingFileWriter.writeBytes(getEntryWithNewline(entry)); + LOG.info("Writing entry {} to file list backed by {}", entry, backingFile); +} catch (IOException e) { + this.abortOperation = true; + LOG.error("Writing entry {} to file list {} failed.", entry, backingFile, e); + throw e; +} } - /** - * Only add operation is safe for concurrent operations. - */ - public void add(String entry) throws SemanticException { -if (thresholdHit && !fileListStreamer.isAlive()) { - throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString()); + priv
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=559254&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-559254 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 01/Mar/21 08:16 Start Date: 01/Mar/21 08:16 Worklog Time Spent: 10m Work Description: pkumarsinha commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r583233006 ## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java ## @@ -1836,6 +1835,60 @@ public void testHdfsNameserviceWithDataCopy() throws Throwable { .verifyResults(new String[]{"2", "3"}); } + @Test + public void testReplWithRetryDisabledIterators() throws Throwable { +List clause = new ArrayList<>(); +//NS replacement parameters has no effect when data is also copied to staging +clause.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET + "'='false'"); +clause.add("'" + HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY + "'='false'"); +primary.run("use " + primaryDbName) +.run("create table acid_table (key int, value int) partitioned by (load_date date) " + +"clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')") +.run("create table table1 (i String)") +.run("insert into table1 values (1)") +.run("insert into table1 values (2)") +.dump(primaryDbName, clause); Review comment: At this point the entries are written to the _file_list and _file_list_external. Please add assertion on file content here. ## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java ## @@ -1836,6 +1835,60 @@ public void testHdfsNameserviceWithDataCopy() throws Throwable { .verifyResults(new String[]{"2", "3"}); } + @Test + public void testReplWithRetryDisabledIterators() throws Throwable { +List clause = new ArrayList<>(); +//NS replacement parameters has no effect when data is also copied to staging +clause.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET + "'='false'"); +clause.add("'" + HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY + "'='false'"); +primary.run("use " + primaryDbName) +.run("create table acid_table (key int, value int) partitioned by (load_date date) " + +"clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')") +.run("create table table1 (i String)") +.run("insert into table1 values (1)") +.run("insert into table1 values (2)") +.dump(primaryDbName, clause); +replica.load(replicatedDbName, primaryDbName, clause) +.run("use " + replicatedDbName) +.run("show tables") +.verifyResults(new String[] {"acid_table", "table1"}) +.run("select * from table1") +.verifyResults(new String[] {"1", "2"}); + +primary.run("use " + primaryDbName) +.run("insert into table1 values (3)") +.dump(primaryDbName, clause); +replica.load(replicatedDbName, primaryDbName, clause) +.run("use " + replicatedDbName) +.run("show tables") +.verifyResults(new String[]{"acid_table", "table1"}) +.run("select * from table1") +.verifyResults(new String[]{"1", "2", "3"}); + +clause.add("'" + HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname + "'='false'"); +primary.run("use " + primaryDbName) +.run("create external table ext_table1 (id int)") +.run("insert into ext_table1 values (3)") +.run("insert into ext_table1 values (4)") +.run("create external table ext_table2 (key int, value int) partitioned by (load_time timestamp)") +.run("insert into ext_table2 partition(load_time = '2012-02-21 07:08:09.123') values(1,2)") +.run("insert into ext_table2 partition(load_time = '2012-02-21 07:08:09.124') values(1,3)") +.run("show partitions ext_table2") +.verifyResults(new String[]{ +"load_time=2012-02-21 07%3A08%3A09.123", +"load_time=2012-02-21 07%3A08%3A09.124"}) +.dump(primaryDbName, clause); Review comment: At this point the entries are written to the _file_list and _file_list_external. Please add assertion on file content here. ## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ## @@ -18,158 +18,225 @@ package org.apache.hadoop.hive.ql.exec.repl.util; -import com.google.common.annotations.VisibleForTesting; +import org.apache
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=557666&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-557666 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 25/Feb/21 01:47 Start Date: 25/Feb/21 01:47 Worklog Time Spent: 10m Work Description: ArkoSharma commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r582431522 ## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ## @@ -18,158 +18,176 @@ package org.apache.hadoop.hive.ql.exec.repl.util; -import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.io.StringWriter; +import java.io.UncheckedIOException; import java.util.Iterator; import java.util.NoSuchElementException; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Callable; - -/** - * A file backed list of Strings which is in-memory till the threshold. - */ public class FileList implements AutoCloseable, Iterator { private static final Logger LOG = LoggerFactory.getLogger(FileList.class); - private static int fileListStreamerID = 0; - private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; - - private LinkedBlockingQueue cache; - private volatile boolean thresholdHit = false; - private int thresholdPoint; - private float thresholdFactor = 0.9f; - private Path backingFile; - private FileListStreamer fileListStreamer; - private String nextElement; - private boolean noMoreElement; + private final Path backingFile; + private String nextElement = null; private HiveConf conf; + private volatile boolean retryMode; private BufferedReader backingFileReader; + private volatile FSDataOutputStream backingFileWriter; - - public FileList(Path backingFile, int cacheSize, HiveConf conf) { + public FileList(Path backingFile, HiveConf conf) { this.backingFile = backingFile; this.conf = conf; -if (cacheSize > 0) { - // Cache size must be > 0 for this list to be used for the write operation. - this.cache = new LinkedBlockingQueue<>(cacheSize); - fileListStreamer = new FileListStreamer(cache, backingFile, conf); - thresholdPoint = getThreshold(cacheSize); - LOG.debug("File list backed by {} can be used for write operation.", backingFile); -} else { - thresholdHit = true; +this.retryMode = false; + } + + public void add(String entry) throws IOException { +Retryable retryable = buildRetryable(); +try { + retryable.executeCallable((Callable) ()-> { Review comment: Dump operation fails if any thread encounters an exception. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 557666) Time Spent: 4h 40m (was: 4.5h) > Moving to file based iteration for copying data > --- > > Key: HIVE-24718 > URL: https://issues.apache.org/jira/browse/HIVE-24718 > Project: Hive > Issue Type: Bug >Reporter: Arko Sharma >Assignee: Arko Sharma >Priority: Major > Labels: pull-request-available > Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, > HIVE-24718.04.patch > > Time Spent: 4h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=557663&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-557663 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 25/Feb/21 01:46 Start Date: 25/Feb/21 01:46 Worklog Time Spent: 10m Work Description: ArkoSharma commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r582431043 ## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java ## @@ -2225,17 +2224,11 @@ private void setupUDFJarOnHDFS(Path identityUdfLocalPath, Path identityUdfHdfsPa /* * Method used from TestReplicationScenariosExclusiveReplica */ - private void assertExternalFileInfo(List expected, String dumplocation, boolean isIncremental, + private void assertExternalFileList(List expected, String dumplocation, WarehouseInstance warehouseInstance) throws IOException { Path hivePath = new Path(dumplocation, ReplUtils.REPL_HIVE_BASE_DIR); -Path metadataPath = new Path(hivePath, EximUtil.METADATA_PATH_NAME); -Path externalTableInfoFile; -if (isIncremental) { - externalTableInfoFile = new Path(hivePath, FILE_NAME); -} else { - externalTableInfoFile = new Path(metadataPath, primaryDbName.toLowerCase() + File.separator + FILE_NAME); -} -ReplicationTestUtils.assertExternalFileInfo(warehouseInstance, expected, externalTableInfoFile); +Path externalTblFileList = new Path(hivePath, EximUtil.FILE_LIST_EXTERNAL); Review comment: Removing would cause code-duplication in most places, except in TestReplicationScenariosExternalTablesMetaDataOnly.java . Refactored the code there. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 557663) Time Spent: 4h 20m (was: 4h 10m) > Moving to file based iteration for copying data > --- > > Key: HIVE-24718 > URL: https://issues.apache.org/jira/browse/HIVE-24718 > Project: Hive > Issue Type: Bug >Reporter: Arko Sharma >Assignee: Arko Sharma >Priority: Major > Labels: pull-request-available > Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, > HIVE-24718.04.patch > > Time Spent: 4h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=557665&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-557665 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 25/Feb/21 01:46 Start Date: 25/Feb/21 01:46 Worklog Time Spent: 10m Work Description: ArkoSharma commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r582431197 ## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java ## @@ -103,9 +102,9 @@ public void replicationWithoutExternalTables() throws Throwable { .run("insert into table t2 partition(country='france') values ('paris')") .dump(primaryDbName, dumpWithClause); -// the _external_tables_file info only should be created if external tables are to be replicated not otherwise -assertFalse(primary.miniDFSCluster.getFileSystem() -.exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME))); +// the _file_list_external only should be created if external tables are to be replicated not otherwise +assertFalseExternalFileList(new Path(new Path(tuple.dumpLocation, Review comment: Removing would cause code-duplication in most places, except in TestReplicationScenariosExternalTablesMetaDataOnly.java . Refactored the code there. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 557665) Time Spent: 4.5h (was: 4h 20m) > Moving to file based iteration for copying data > --- > > Key: HIVE-24718 > URL: https://issues.apache.org/jira/browse/HIVE-24718 > Project: Hive > Issue Type: Bug >Reporter: Arko Sharma >Assignee: Arko Sharma >Priority: Major > Labels: pull-request-available > Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, > HIVE-24718.04.patch > > Time Spent: 4.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=557661&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-557661 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 25/Feb/21 01:43 Start Date: 25/Feb/21 01:43 Worklog Time Spent: 10m Work Description: ArkoSharma commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r582430148 ## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ## @@ -18,158 +18,176 @@ package org.apache.hadoop.hive.ql.exec.repl.util; -import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.io.StringWriter; +import java.io.UncheckedIOException; import java.util.Iterator; import java.util.NoSuchElementException; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Callable; - -/** - * A file backed list of Strings which is in-memory till the threshold. - */ public class FileList implements AutoCloseable, Iterator { private static final Logger LOG = LoggerFactory.getLogger(FileList.class); - private static int fileListStreamerID = 0; - private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; - - private LinkedBlockingQueue cache; - private volatile boolean thresholdHit = false; - private int thresholdPoint; - private float thresholdFactor = 0.9f; - private Path backingFile; - private FileListStreamer fileListStreamer; - private String nextElement; - private boolean noMoreElement; + private final Path backingFile; + private String nextElement = null; private HiveConf conf; + private volatile boolean retryMode; private BufferedReader backingFileReader; + private volatile FSDataOutputStream backingFileWriter; - - public FileList(Path backingFile, int cacheSize, HiveConf conf) { + public FileList(Path backingFile, HiveConf conf) { this.backingFile = backingFile; this.conf = conf; -if (cacheSize > 0) { - // Cache size must be > 0 for this list to be used for the write operation. - this.cache = new LinkedBlockingQueue<>(cacheSize); - fileListStreamer = new FileListStreamer(cache, backingFile, conf); - thresholdPoint = getThreshold(cacheSize); - LOG.debug("File list backed by {} can be used for write operation.", backingFile); -} else { - thresholdHit = true; +this.retryMode = false; + } + + public void add(String entry) throws IOException { +Retryable retryable = buildRetryable(); +try { + retryable.executeCallable((Callable) ()-> { +synchronized (backingFile) { + try{ +if (backingFileWriter == null) { + backingFileWriter = initWriter(); +} +backingFileWriter.writeBytes(getEntryWithNewline(entry)); +backingFileWriter.hflush(); Review comment: Currently we are checking for duplicate entries. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 557661) Time Spent: 4h 10m (was: 4h) > Moving to file based iteration for copying data > --- > > Key: HIVE-24718 > URL: https://issues.apache.org/jira/browse/HIVE-24718 > Project: Hive > Issue Type: Bug >Reporter: Arko Sharma >Assignee: Arko Sharma >Priority: Major > Labels: pull-request-available > Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, > HIVE-24718.04.patch > > Time Spent: 4h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=555925&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555925 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 22/Feb/21 18:02 Start Date: 22/Feb/21 18:02 Worklog Time Spent: 10m Work Description: ArkoSharma commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r580465458 ## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ## @@ -18,158 +18,176 @@ package org.apache.hadoop.hive.ql.exec.repl.util; -import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.io.StringWriter; +import java.io.UncheckedIOException; import java.util.Iterator; import java.util.NoSuchElementException; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Callable; - -/** - * A file backed list of Strings which is in-memory till the threshold. - */ public class FileList implements AutoCloseable, Iterator { private static final Logger LOG = LoggerFactory.getLogger(FileList.class); - private static int fileListStreamerID = 0; - private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; - - private LinkedBlockingQueue cache; - private volatile boolean thresholdHit = false; - private int thresholdPoint; - private float thresholdFactor = 0.9f; - private Path backingFile; - private FileListStreamer fileListStreamer; - private String nextElement; - private boolean noMoreElement; + private final Path backingFile; + private String nextElement = null; private HiveConf conf; + private volatile boolean retryMode; private BufferedReader backingFileReader; + private volatile FSDataOutputStream backingFileWriter; - - public FileList(Path backingFile, int cacheSize, HiveConf conf) { + public FileList(Path backingFile, HiveConf conf) { this.backingFile = backingFile; this.conf = conf; -if (cacheSize > 0) { - // Cache size must be > 0 for this list to be used for the write operation. - this.cache = new LinkedBlockingQueue<>(cacheSize); - fileListStreamer = new FileListStreamer(cache, backingFile, conf); - thresholdPoint = getThreshold(cacheSize); - LOG.debug("File list backed by {} can be used for write operation.", backingFile); -} else { - thresholdHit = true; +this.retryMode = false; + } + + public void add(String entry) throws IOException { +Retryable retryable = buildRetryable(); +try { + retryable.executeCallable((Callable) ()-> { +synchronized (backingFile) { + try{ +if (backingFileWriter == null) { + backingFileWriter = initWriter(); +} +backingFileWriter.writeBytes(getEntryWithNewline(entry)); +backingFileWriter.hflush(); Review comment: This patch assumes that each flush operation either writes data completely (including ack) or does not write it at all. If this assumption changes, then we might need future investigation to determine the boundaries of each entry that needs to be written to the file. So currently checks for duplicate entries and invalid entries are not being done. Hflush guarantees that after successful return, every new reader would see the flushed data. This guarantee seems enough as we do not have reads during writes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 555925) Time Spent: 4h (was: 3h 50m) > Moving to file based iteration for copying data > --- > > Key: HIVE-24718 > URL: https://issues.apache.org/jira/browse/HIVE-24718 > Project: Hive > Issue Type: Bug >Reporter: Arko Sharma >Assignee: Arko Sharma >Priority: Major > Labels: pull-request-available > Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, > HIVE-24718.04.patch > > Time Spent: 4h > Remaini
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=555850&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555850 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 22/Feb/21 15:05 Start Date: 22/Feb/21 15:05 Worklog Time Spent: 10m Work Description: pkumarsinha commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r580321113 ## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ## @@ -18,158 +18,176 @@ package org.apache.hadoop.hive.ql.exec.repl.util; -import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.io.StringWriter; +import java.io.UncheckedIOException; import java.util.Iterator; import java.util.NoSuchElementException; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Callable; - -/** - * A file backed list of Strings which is in-memory till the threshold. - */ public class FileList implements AutoCloseable, Iterator { private static final Logger LOG = LoggerFactory.getLogger(FileList.class); - private static int fileListStreamerID = 0; - private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; - - private LinkedBlockingQueue cache; - private volatile boolean thresholdHit = false; - private int thresholdPoint; - private float thresholdFactor = 0.9f; - private Path backingFile; - private FileListStreamer fileListStreamer; - private String nextElement; - private boolean noMoreElement; + private final Path backingFile; + private String nextElement = null; private HiveConf conf; + private volatile boolean retryMode; private BufferedReader backingFileReader; + private volatile FSDataOutputStream backingFileWriter; - - public FileList(Path backingFile, int cacheSize, HiveConf conf) { + public FileList(Path backingFile, HiveConf conf) { this.backingFile = backingFile; this.conf = conf; -if (cacheSize > 0) { - // Cache size must be > 0 for this list to be used for the write operation. - this.cache = new LinkedBlockingQueue<>(cacheSize); - fileListStreamer = new FileListStreamer(cache, backingFile, conf); - thresholdPoint = getThreshold(cacheSize); - LOG.debug("File list backed by {} can be used for write operation.", backingFile); -} else { - thresholdHit = true; +this.retryMode = false; + } + + public void add(String entry) throws IOException { +Retryable retryable = buildRetryable(); +try { + retryable.executeCallable((Callable) ()-> { +synchronized (backingFile) { + try{ +if (backingFileWriter == null) { + backingFileWriter = initWriter(); +} +backingFileWriter.writeBytes(getEntryWithNewline(entry)); +backingFileWriter.hflush(); Review comment: On hflush(): a) AFAIK, this doesn't give guarantee that the data is written on the disk. What happens if all the data nodes goes down simultaneously? How does retry handle in that case? b) What happens when data is written but while receiving ACK from data node(s), the connection breaks for some reason? Wouldn't we have duplicate entries in that case? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 555850) Time Spent: 3h 50m (was: 3h 40m) > Moving to file based iteration for copying data > --- > > Key: HIVE-24718 > URL: https://issues.apache.org/jira/browse/HIVE-24718 > Project: Hive > Issue Type: Bug >Reporter: Arko Sharma >Assignee: Arko Sharma >Priority: Major > Labels: pull-request-available > Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, > HIVE-24718.04.patch > > Time Spent: 3h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=555763&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555763 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 22/Feb/21 11:55 Start Date: 22/Feb/21 11:55 Worklog Time Spent: 10m Work Description: pkumarsinha commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r580192079 ## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ## @@ -18,158 +18,176 @@ package org.apache.hadoop.hive.ql.exec.repl.util; -import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.io.StringWriter; +import java.io.UncheckedIOException; import java.util.Iterator; import java.util.NoSuchElementException; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Callable; - -/** - * A file backed list of Strings which is in-memory till the threshold. - */ public class FileList implements AutoCloseable, Iterator { private static final Logger LOG = LoggerFactory.getLogger(FileList.class); - private static int fileListStreamerID = 0; - private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; - - private LinkedBlockingQueue cache; - private volatile boolean thresholdHit = false; - private int thresholdPoint; - private float thresholdFactor = 0.9f; - private Path backingFile; - private FileListStreamer fileListStreamer; - private String nextElement; - private boolean noMoreElement; + private final Path backingFile; + private String nextElement = null; private HiveConf conf; + private volatile boolean retryMode; private BufferedReader backingFileReader; + private volatile FSDataOutputStream backingFileWriter; - - public FileList(Path backingFile, int cacheSize, HiveConf conf) { + public FileList(Path backingFile, HiveConf conf) { this.backingFile = backingFile; this.conf = conf; -if (cacheSize > 0) { - // Cache size must be > 0 for this list to be used for the write operation. - this.cache = new LinkedBlockingQueue<>(cacheSize); - fileListStreamer = new FileListStreamer(cache, backingFile, conf); - thresholdPoint = getThreshold(cacheSize); - LOG.debug("File list backed by {} can be used for write operation.", backingFile); -} else { - thresholdHit = true; +this.retryMode = false; + } + + public void add(String entry) throws IOException { +Retryable retryable = buildRetryable(); +try { + retryable.executeCallable((Callable) ()-> { +synchronized (backingFile) { + try{ +if (backingFileWriter == null) { + backingFileWriter = initWriter(); +} +backingFileWriter.writeBytes(getEntryWithNewline(entry)); +backingFileWriter.hflush(); +LOG.info("Writing entry {} to file list backed by {}", entry, backingFile); + } catch (IOException e) { +LOG.error("Writing entry {} to file list {} failed, attempting retry.", entry, backingFile, e); +this.retryMode = true; +close(); +throw e; + } +} +return null; + }); +} catch (Exception e) { + throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage())); } } - @VisibleForTesting - FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue cache, HiveConf conf) { -this.backingFile = backingFile; -this.fileListStreamer = fileListStreamer; -this.cache = cache; -this.conf = conf; -thresholdPoint = getThreshold(cache.remainingCapacity()); + Retryable buildRetryable() { +return Retryable.builder() +.withHiveConf(conf) +.withRetryOnException(IOException.class).build(); } - /** - * Only add operation is safe for concurrent operations. - */ - public void add(String entry) throws SemanticException { -if (thresholdHit && !fileListStreamer.isAlive()) { - throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString()); + // Return the entry ensuring it ends with newline. + private String getEntryWithNewline(String entry) { +return new StringWriter() +.append(entry) +.append(System.lineSeparator()) +
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=555762&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555762 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 22/Feb/21 11:53 Start Date: 22/Feb/21 11:53 Worklog Time Spent: 10m Work Description: pkumarsinha commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r580191067 ## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ## @@ -18,158 +18,176 @@ package org.apache.hadoop.hive.ql.exec.repl.util; -import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.io.StringWriter; +import java.io.UncheckedIOException; import java.util.Iterator; import java.util.NoSuchElementException; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Callable; - -/** - * A file backed list of Strings which is in-memory till the threshold. - */ public class FileList implements AutoCloseable, Iterator { private static final Logger LOG = LoggerFactory.getLogger(FileList.class); - private static int fileListStreamerID = 0; - private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; - - private LinkedBlockingQueue cache; - private volatile boolean thresholdHit = false; - private int thresholdPoint; - private float thresholdFactor = 0.9f; - private Path backingFile; - private FileListStreamer fileListStreamer; - private String nextElement; - private boolean noMoreElement; + private final Path backingFile; + private String nextElement = null; private HiveConf conf; + private volatile boolean retryMode; private BufferedReader backingFileReader; + private volatile FSDataOutputStream backingFileWriter; - - public FileList(Path backingFile, int cacheSize, HiveConf conf) { + public FileList(Path backingFile, HiveConf conf) { this.backingFile = backingFile; this.conf = conf; -if (cacheSize > 0) { - // Cache size must be > 0 for this list to be used for the write operation. - this.cache = new LinkedBlockingQueue<>(cacheSize); - fileListStreamer = new FileListStreamer(cache, backingFile, conf); - thresholdPoint = getThreshold(cacheSize); - LOG.debug("File list backed by {} can be used for write operation.", backingFile); -} else { - thresholdHit = true; +this.retryMode = false; + } + + public void add(String entry) throws IOException { +Retryable retryable = buildRetryable(); +try { + retryable.executeCallable((Callable) ()-> { Review comment: What I meant is, failure (retry exhaust case) in one thread would mean that entire dump operation has to fail. So success of each thread matters here, no? Are the other running tasks which are doing add operations getting interrupted if one of them fails? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 555762) Time Spent: 3.5h (was: 3h 20m) > Moving to file based iteration for copying data > --- > > Key: HIVE-24718 > URL: https://issues.apache.org/jira/browse/HIVE-24718 > Project: Hive > Issue Type: Bug >Reporter: Arko Sharma >Assignee: Arko Sharma >Priority: Major > Labels: pull-request-available > Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, > HIVE-24718.04.patch > > Time Spent: 3.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=555728&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555728 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 22/Feb/21 09:38 Start Date: 22/Feb/21 09:38 Worklog Time Spent: 10m Work Description: ArkoSharma commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r580104313 ## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ## @@ -18,158 +18,176 @@ package org.apache.hadoop.hive.ql.exec.repl.util; -import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.io.StringWriter; +import java.io.UncheckedIOException; import java.util.Iterator; import java.util.NoSuchElementException; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Callable; - -/** - * A file backed list of Strings which is in-memory till the threshold. - */ public class FileList implements AutoCloseable, Iterator { private static final Logger LOG = LoggerFactory.getLogger(FileList.class); - private static int fileListStreamerID = 0; - private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; - - private LinkedBlockingQueue cache; - private volatile boolean thresholdHit = false; - private int thresholdPoint; - private float thresholdFactor = 0.9f; - private Path backingFile; - private FileListStreamer fileListStreamer; - private String nextElement; - private boolean noMoreElement; + private final Path backingFile; + private String nextElement = null; private HiveConf conf; + private volatile boolean retryMode; private BufferedReader backingFileReader; + private volatile FSDataOutputStream backingFileWriter; - - public FileList(Path backingFile, int cacheSize, HiveConf conf) { + public FileList(Path backingFile, HiveConf conf) { this.backingFile = backingFile; this.conf = conf; -if (cacheSize > 0) { - // Cache size must be > 0 for this list to be used for the write operation. - this.cache = new LinkedBlockingQueue<>(cacheSize); - fileListStreamer = new FileListStreamer(cache, backingFile, conf); - thresholdPoint = getThreshold(cacheSize); - LOG.debug("File list backed by {} can be used for write operation.", backingFile); -} else { - thresholdHit = true; +this.retryMode = false; + } + + public void add(String entry) throws IOException { +Retryable retryable = buildRetryable(); +try { + retryable.executeCallable((Callable) ()-> { +synchronized (backingFile) { + try{ +if (backingFileWriter == null) { + backingFileWriter = initWriter(); +} +backingFileWriter.writeBytes(getEntryWithNewline(entry)); +backingFileWriter.hflush(); +LOG.info("Writing entry {} to file list backed by {}", entry, backingFile); + } catch (IOException e) { +LOG.error("Writing entry {} to file list {} failed, attempting retry.", entry, backingFile, e); +this.retryMode = true; +close(); +throw e; + } +} +return null; + }); +} catch (Exception e) { + throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage())); } } - @VisibleForTesting - FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue cache, HiveConf conf) { -this.backingFile = backingFile; -this.fileListStreamer = fileListStreamer; -this.cache = cache; -this.conf = conf; -thresholdPoint = getThreshold(cache.remainingCapacity()); + Retryable buildRetryable() { +return Retryable.builder() +.withHiveConf(conf) +.withRetryOnException(IOException.class).build(); } - /** - * Only add operation is safe for concurrent operations. - */ - public void add(String entry) throws SemanticException { -if (thresholdHit && !fileListStreamer.isAlive()) { - throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString()); + // Return the entry ensuring it ends with newline. + private String getEntryWithNewline(String entry) { +return new StringWriter() +.append(entry) +.append(System.lineSeparator()) +
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=555727&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555727 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 22/Feb/21 09:37 Start Date: 22/Feb/21 09:37 Worklog Time Spent: 10m Work Description: ArkoSharma commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r580049385 ## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ## @@ -18,158 +18,176 @@ package org.apache.hadoop.hive.ql.exec.repl.util; -import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.io.StringWriter; +import java.io.UncheckedIOException; import java.util.Iterator; import java.util.NoSuchElementException; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Callable; - -/** - * A file backed list of Strings which is in-memory till the threshold. - */ public class FileList implements AutoCloseable, Iterator { private static final Logger LOG = LoggerFactory.getLogger(FileList.class); - private static int fileListStreamerID = 0; - private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; - - private LinkedBlockingQueue cache; - private volatile boolean thresholdHit = false; - private int thresholdPoint; - private float thresholdFactor = 0.9f; - private Path backingFile; - private FileListStreamer fileListStreamer; - private String nextElement; - private boolean noMoreElement; + private final Path backingFile; + private String nextElement = null; private HiveConf conf; + private volatile boolean retryMode; private BufferedReader backingFileReader; + private volatile FSDataOutputStream backingFileWriter; - - public FileList(Path backingFile, int cacheSize, HiveConf conf) { + public FileList(Path backingFile, HiveConf conf) { this.backingFile = backingFile; this.conf = conf; -if (cacheSize > 0) { - // Cache size must be > 0 for this list to be used for the write operation. - this.cache = new LinkedBlockingQueue<>(cacheSize); - fileListStreamer = new FileListStreamer(cache, backingFile, conf); - thresholdPoint = getThreshold(cacheSize); - LOG.debug("File list backed by {} can be used for write operation.", backingFile); -} else { - thresholdHit = true; +this.retryMode = false; + } + + public void add(String entry) throws IOException { +Retryable retryable = buildRetryable(); +try { + retryable.executeCallable((Callable) ()-> { +synchronized (backingFile) { + try{ +if (backingFileWriter == null) { + backingFileWriter = initWriter(); +} +backingFileWriter.writeBytes(getEntryWithNewline(entry)); +backingFileWriter.hflush(); +LOG.info("Writing entry {} to file list backed by {}", entry, backingFile); + } catch (IOException e) { +LOG.error("Writing entry {} to file list {} failed, attempting retry.", entry, backingFile, e); +this.retryMode = true; +close(); +throw e; + } +} +return null; + }); +} catch (Exception e) { + throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage())); } } - @VisibleForTesting - FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue cache, HiveConf conf) { -this.backingFile = backingFile; -this.fileListStreamer = fileListStreamer; -this.cache = cache; -this.conf = conf; -thresholdPoint = getThreshold(cache.remainingCapacity()); + Retryable buildRetryable() { +return Retryable.builder() +.withHiveConf(conf) +.withRetryOnException(IOException.class).build(); } - /** - * Only add operation is safe for concurrent operations. - */ - public void add(String entry) throws SemanticException { -if (thresholdHit && !fileListStreamer.isAlive()) { - throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString()); + // Return the entry ensuring it ends with newline. + private String getEntryWithNewline(String entry) { +return new StringWriter() +.append(entry) +.append(System.lineSeparator()) +
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=555670&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555670 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 22/Feb/21 08:12 Start Date: 22/Feb/21 08:12 Worklog Time Spent: 10m Work Description: ArkoSharma commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r580051218 ## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java ## @@ -184,42 +190,86 @@ public void setResultValues(List resultValues) { this.resultValues = resultValues; } - public List> externalTableCopyTasks(TaskTracker tracker, HiveConf conf) { + public List> externalTableCopyTasks(TaskTracker tracker, HiveConf conf) throws IOException { if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY)) { return Collections.emptyList(); } List> tasks = new ArrayList<>(); -while (externalTblCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) { - DirCopyWork dirCopyWork = new DirCopyWork(metricCollector, currentDumpPath.toString()); - dirCopyWork.loadFromString(externalTblCopyPathIterator.next()); - Task task = TaskFactory.get(dirCopyWork, conf); - tasks.add(task); - tracker.addTask(task); - LOG.debug("added task for {}", dirCopyWork); +Retryable retryable = Retryable.builder() +.withHiveConf(conf) +.withRetryOnException(UncheckedIOException.class).build(); +try { + retryable.executeCallable((Callable) ()-> { +try{ + int numEntriesToSkip = tasks == null ? 0 : tasks.size(); + while (externalTblCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) { +if(numEntriesToSkip > 0) { + //skip tasks added in previous attempts of this retryable block + externalTblCopyPathIterator.next(); + numEntriesToSkip--; + continue; +} +DirCopyWork dirCopyWork = new DirCopyWork(metricCollector, currentDumpPath.toString()); +dirCopyWork.loadFromString(externalTblCopyPathIterator.next()); +Task task = TaskFactory.get(dirCopyWork, conf); +tasks.add(task); +tracker.addTask(task); Review comment: Also they are not added again during retires. This is done depending on the size of the 'tasks' list. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 555670) Time Spent: 3h (was: 2h 50m) > Moving to file based iteration for copying data > --- > > Key: HIVE-24718 > URL: https://issues.apache.org/jira/browse/HIVE-24718 > Project: Hive > Issue Type: Bug >Reporter: Arko Sharma >Assignee: Arko Sharma >Priority: Major > Labels: pull-request-available > Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, > HIVE-24718.04.patch > > Time Spent: 3h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=555669&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555669 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 22/Feb/21 08:11 Start Date: 22/Feb/21 08:11 Worklog Time Spent: 10m Work Description: ArkoSharma commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r580050588 ## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java ## @@ -184,42 +190,86 @@ public void setResultValues(List resultValues) { this.resultValues = resultValues; } - public List> externalTableCopyTasks(TaskTracker tracker, HiveConf conf) { + public List> externalTableCopyTasks(TaskTracker tracker, HiveConf conf) throws IOException { if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY)) { return Collections.emptyList(); } List> tasks = new ArrayList<>(); -while (externalTblCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) { - DirCopyWork dirCopyWork = new DirCopyWork(metricCollector, currentDumpPath.toString()); - dirCopyWork.loadFromString(externalTblCopyPathIterator.next()); - Task task = TaskFactory.get(dirCopyWork, conf); - tasks.add(task); - tracker.addTask(task); - LOG.debug("added task for {}", dirCopyWork); +Retryable retryable = Retryable.builder() +.withHiveConf(conf) +.withRetryOnException(UncheckedIOException.class).build(); +try { + retryable.executeCallable((Callable) ()-> { +try{ + int numEntriesToSkip = tasks == null ? 0 : tasks.size(); + while (externalTblCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) { +if(numEntriesToSkip > 0) { + //skip tasks added in previous attempts of this retryable block + externalTblCopyPathIterator.next(); + numEntriesToSkip--; + continue; +} +DirCopyWork dirCopyWork = new DirCopyWork(metricCollector, currentDumpPath.toString()); +dirCopyWork.loadFromString(externalTblCopyPathIterator.next()); +Task task = TaskFactory.get(dirCopyWork, conf); +tasks.add(task); +tracker.addTask(task); Review comment: They remain in memory in the same list : 'tasks'. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 555669) Time Spent: 2h 50m (was: 2h 40m) > Moving to file based iteration for copying data > --- > > Key: HIVE-24718 > URL: https://issues.apache.org/jira/browse/HIVE-24718 > Project: Hive > Issue Type: Bug >Reporter: Arko Sharma >Assignee: Arko Sharma >Priority: Major > Labels: pull-request-available > Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, > HIVE-24718.04.patch > > Time Spent: 2h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=555668&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555668 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 22/Feb/21 08:10 Start Date: 22/Feb/21 08:10 Worklog Time Spent: 10m Work Description: ArkoSharma commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r580050377 ## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ## @@ -18,158 +18,176 @@ package org.apache.hadoop.hive.ql.exec.repl.util; -import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.io.StringWriter; +import java.io.UncheckedIOException; import java.util.Iterator; import java.util.NoSuchElementException; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Callable; - -/** - * A file backed list of Strings which is in-memory till the threshold. - */ public class FileList implements AutoCloseable, Iterator { private static final Logger LOG = LoggerFactory.getLogger(FileList.class); - private static int fileListStreamerID = 0; - private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; - - private LinkedBlockingQueue cache; - private volatile boolean thresholdHit = false; - private int thresholdPoint; - private float thresholdFactor = 0.9f; - private Path backingFile; - private FileListStreamer fileListStreamer; - private String nextElement; - private boolean noMoreElement; + private final Path backingFile; + private String nextElement = null; private HiveConf conf; + private volatile boolean retryMode; private BufferedReader backingFileReader; + private volatile FSDataOutputStream backingFileWriter; - - public FileList(Path backingFile, int cacheSize, HiveConf conf) { + public FileList(Path backingFile, HiveConf conf) { this.backingFile = backingFile; this.conf = conf; -if (cacheSize > 0) { - // Cache size must be > 0 for this list to be used for the write operation. - this.cache = new LinkedBlockingQueue<>(cacheSize); - fileListStreamer = new FileListStreamer(cache, backingFile, conf); - thresholdPoint = getThreshold(cacheSize); - LOG.debug("File list backed by {} can be used for write operation.", backingFile); -} else { - thresholdHit = true; +this.retryMode = false; + } + + public void add(String entry) throws IOException { +Retryable retryable = buildRetryable(); +try { + retryable.executeCallable((Callable) ()-> { +synchronized (backingFile) { + try{ +if (backingFileWriter == null) { + backingFileWriter = initWriter(); +} +backingFileWriter.writeBytes(getEntryWithNewline(entry)); +backingFileWriter.hflush(); +LOG.info("Writing entry {} to file list backed by {}", entry, backingFile); + } catch (IOException e) { +LOG.error("Writing entry {} to file list {} failed, attempting retry.", entry, backingFile, e); +this.retryMode = true; +close(); +throw e; + } +} +return null; + }); +} catch (Exception e) { + throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage())); } } - @VisibleForTesting - FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue cache, HiveConf conf) { -this.backingFile = backingFile; -this.fileListStreamer = fileListStreamer; -this.cache = cache; -this.conf = conf; -thresholdPoint = getThreshold(cache.remainingCapacity()); + Retryable buildRetryable() { +return Retryable.builder() +.withHiveConf(conf) +.withRetryOnException(IOException.class).build(); } - /** - * Only add operation is safe for concurrent operations. - */ - public void add(String entry) throws SemanticException { -if (thresholdHit && !fileListStreamer.isAlive()) { - throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString()); + // Return the entry ensuring it ends with newline. + private String getEntryWithNewline(String entry) { +return new StringWriter() +.append(entry) +.append(System.lineSeparator()) +
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=555666&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555666 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 22/Feb/21 08:08 Start Date: 22/Feb/21 08:08 Worklog Time Spent: 10m Work Description: ArkoSharma commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r580049385 ## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ## @@ -18,158 +18,176 @@ package org.apache.hadoop.hive.ql.exec.repl.util; -import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.io.StringWriter; +import java.io.UncheckedIOException; import java.util.Iterator; import java.util.NoSuchElementException; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Callable; - -/** - * A file backed list of Strings which is in-memory till the threshold. - */ public class FileList implements AutoCloseable, Iterator { private static final Logger LOG = LoggerFactory.getLogger(FileList.class); - private static int fileListStreamerID = 0; - private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; - - private LinkedBlockingQueue cache; - private volatile boolean thresholdHit = false; - private int thresholdPoint; - private float thresholdFactor = 0.9f; - private Path backingFile; - private FileListStreamer fileListStreamer; - private String nextElement; - private boolean noMoreElement; + private final Path backingFile; + private String nextElement = null; private HiveConf conf; + private volatile boolean retryMode; private BufferedReader backingFileReader; + private volatile FSDataOutputStream backingFileWriter; - - public FileList(Path backingFile, int cacheSize, HiveConf conf) { + public FileList(Path backingFile, HiveConf conf) { this.backingFile = backingFile; this.conf = conf; -if (cacheSize > 0) { - // Cache size must be > 0 for this list to be used for the write operation. - this.cache = new LinkedBlockingQueue<>(cacheSize); - fileListStreamer = new FileListStreamer(cache, backingFile, conf); - thresholdPoint = getThreshold(cacheSize); - LOG.debug("File list backed by {} can be used for write operation.", backingFile); -} else { - thresholdHit = true; +this.retryMode = false; + } + + public void add(String entry) throws IOException { +Retryable retryable = buildRetryable(); +try { + retryable.executeCallable((Callable) ()-> { +synchronized (backingFile) { + try{ +if (backingFileWriter == null) { + backingFileWriter = initWriter(); +} +backingFileWriter.writeBytes(getEntryWithNewline(entry)); +backingFileWriter.hflush(); +LOG.info("Writing entry {} to file list backed by {}", entry, backingFile); + } catch (IOException e) { +LOG.error("Writing entry {} to file list {} failed, attempting retry.", entry, backingFile, e); +this.retryMode = true; +close(); +throw e; + } +} +return null; + }); +} catch (Exception e) { + throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage())); } } - @VisibleForTesting - FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue cache, HiveConf conf) { -this.backingFile = backingFile; -this.fileListStreamer = fileListStreamer; -this.cache = cache; -this.conf = conf; -thresholdPoint = getThreshold(cache.remainingCapacity()); + Retryable buildRetryable() { +return Retryable.builder() +.withHiveConf(conf) +.withRetryOnException(IOException.class).build(); } - /** - * Only add operation is safe for concurrent operations. - */ - public void add(String entry) throws SemanticException { -if (thresholdHit && !fileListStreamer.isAlive()) { - throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString()); + // Return the entry ensuring it ends with newline. + private String getEntryWithNewline(String entry) { +return new StringWriter() +.append(entry) +.append(System.lineSeparator()) +
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=555663&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555663 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 22/Feb/21 08:06 Start Date: 22/Feb/21 08:06 Worklog Time Spent: 10m Work Description: ArkoSharma commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r580048118 ## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ## @@ -18,158 +18,176 @@ package org.apache.hadoop.hive.ql.exec.repl.util; -import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.io.StringWriter; +import java.io.UncheckedIOException; import java.util.Iterator; import java.util.NoSuchElementException; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Callable; - -/** - * A file backed list of Strings which is in-memory till the threshold. - */ public class FileList implements AutoCloseable, Iterator { private static final Logger LOG = LoggerFactory.getLogger(FileList.class); - private static int fileListStreamerID = 0; - private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; - - private LinkedBlockingQueue cache; - private volatile boolean thresholdHit = false; - private int thresholdPoint; - private float thresholdFactor = 0.9f; - private Path backingFile; - private FileListStreamer fileListStreamer; - private String nextElement; - private boolean noMoreElement; + private final Path backingFile; + private String nextElement = null; private HiveConf conf; + private volatile boolean retryMode; private BufferedReader backingFileReader; + private volatile FSDataOutputStream backingFileWriter; - - public FileList(Path backingFile, int cacheSize, HiveConf conf) { + public FileList(Path backingFile, HiveConf conf) { this.backingFile = backingFile; this.conf = conf; -if (cacheSize > 0) { - // Cache size must be > 0 for this list to be used for the write operation. - this.cache = new LinkedBlockingQueue<>(cacheSize); - fileListStreamer = new FileListStreamer(cache, backingFile, conf); - thresholdPoint = getThreshold(cacheSize); - LOG.debug("File list backed by {} can be used for write operation.", backingFile); -} else { - thresholdHit = true; +this.retryMode = false; + } + + public void add(String entry) throws IOException { +Retryable retryable = buildRetryable(); +try { + retryable.executeCallable((Callable) ()-> { Review comment: Yes, other threads are allowed to write. Failure in one thread itself doesn't imply failure in the other threads. This is because after failure, we close the output stream and reopen it for whichever thread is trying to do the next write. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 555663) Time Spent: 2h 20m (was: 2h 10m) > Moving to file based iteration for copying data > --- > > Key: HIVE-24718 > URL: https://issues.apache.org/jira/browse/HIVE-24718 > Project: Hive > Issue Type: Bug >Reporter: Arko Sharma >Assignee: Arko Sharma >Priority: Major > Labels: pull-request-available > Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, > HIVE-24718.04.patch > > Time Spent: 2h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=555661&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555661 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 22/Feb/21 08:00 Start Date: 22/Feb/21 08:00 Worklog Time Spent: 10m Work Description: ArkoSharma commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r580045609 ## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java ## @@ -75,7 +77,9 @@ public String convertToString() { StringBuilder objInStr = new StringBuilder(); objInStr.append(fullyQualifiedSourcePath) .append(URI_SEPARATOR) -.append(fullyQualifiedTargetPath); +.append(fullyQualifiedTargetPath) +.append(URI_SEPARATOR) +.append(tableName); Review comment: In earlier version, source and target were the first and second entries. So introducing this change would still be consistent with any earlier dumps with that version. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 555661) Time Spent: 2h 10m (was: 2h) > Moving to file based iteration for copying data > --- > > Key: HIVE-24718 > URL: https://issues.apache.org/jira/browse/HIVE-24718 > Project: Hive > Issue Type: Bug >Reporter: Arko Sharma >Assignee: Arko Sharma >Priority: Major > Labels: pull-request-available > Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch, > HIVE-24718.04.patch > > Time Spent: 2h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=555623&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555623 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 22/Feb/21 06:34 Start Date: 22/Feb/21 06:34 Worklog Time Spent: 10m Work Description: pkumarsinha commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r579994523 ## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java ## @@ -75,7 +77,9 @@ public String convertToString() { StringBuilder objInStr = new StringBuilder(); objInStr.append(fullyQualifiedSourcePath) .append(URI_SEPARATOR) -.append(fullyQualifiedTargetPath); +.append(fullyQualifiedTargetPath) +.append(URI_SEPARATOR) +.append(tableName); Review comment: Any specific reason for moving the tableName to the end? ## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java ## @@ -2225,17 +2224,11 @@ private void setupUDFJarOnHDFS(Path identityUdfLocalPath, Path identityUdfHdfsPa /* * Method used from TestReplicationScenariosExclusiveReplica */ - private void assertExternalFileInfo(List expected, String dumplocation, boolean isIncremental, + private void assertExternalFileList(List expected, String dumplocation, WarehouseInstance warehouseInstance) throws IOException { Path hivePath = new Path(dumplocation, ReplUtils.REPL_HIVE_BASE_DIR); -Path metadataPath = new Path(hivePath, EximUtil.METADATA_PATH_NAME); -Path externalTableInfoFile; -if (isIncremental) { - externalTableInfoFile = new Path(hivePath, FILE_NAME); -} else { - externalTableInfoFile = new Path(metadataPath, primaryDbName.toLowerCase() + File.separator + FILE_NAME); -} -ReplicationTestUtils.assertExternalFileInfo(warehouseInstance, expected, externalTableInfoFile); +Path externalTblFileList = new Path(hivePath, EximUtil.FILE_LIST_EXTERNAL); Review comment: Given that there is a fixed location for FILE_LIST_EXTERNAL in both incremental and bootstrap now. Do you really need this method any more? TestReplicationScenariosAcrossInstances.assertExternalFileList ? ## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java ## @@ -103,9 +102,9 @@ public void replicationWithoutExternalTables() throws Throwable { .run("insert into table t2 partition(country='france') values ('paris')") .dump(primaryDbName, dumpWithClause); -// the _external_tables_file info only should be created if external tables are to be replicated not otherwise -assertFalse(primary.miniDFSCluster.getFileSystem() -.exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME))); +// the _file_list_external only should be created if external tables are to be replicated not otherwise +assertFalseExternalFileList(new Path(new Path(tuple.dumpLocation, Review comment: Given that location for FILE_LIST_EXTERNAL is fixed for both bootstrap and incremental now. There are many methods not required anymore e.g. assertExternalFileList. Can you please check from this perspective and remove them off by doing a bit of refactoring. ## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ## @@ -18,158 +18,176 @@ package org.apache.hadoop.hive.ql.exec.repl.util; -import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.io.StringWriter; +import java.io.UncheckedIOException; import java.util.Iterator; import java.util.NoSuchElementException; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Callable; - -/** - * A file backed list of Strings which is in-memory till the threshold. - */ public class FileList implements AutoCloseable, Iterator { private static final Logger LOG = LoggerFactory.getLogger(FileList.class); - private static int fileListStreamerID = 0; - private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; - - private LinkedBlockingQueue cache; - private volatile boole
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=555496&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555496 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 21/Feb/21 20:34 Start Date: 21/Feb/21 20:34 Worklog Time Spent: 10m Work Description: ArkoSharma commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r579861655 ## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ## @@ -22,154 +22,118 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; +import java.io.BufferedWriter; import java.io.IOException; import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.UncheckedIOException; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingQueue; - -/** - * A file backed list of Strings which is in-memory till the threshold. - */ public class FileList implements AutoCloseable, Iterator { private static final Logger LOG = LoggerFactory.getLogger(FileList.class); - private static int fileListStreamerID = 0; - private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; - - private LinkedBlockingQueue cache; - private volatile boolean thresholdHit = false; - private int thresholdPoint; - private float thresholdFactor = 0.9f; private Path backingFile; - private FileListStreamer fileListStreamer; - private String nextElement; - private boolean noMoreElement; + private String nextElement = null; private HiveConf conf; private BufferedReader backingFileReader; + private BufferedWriter backingFileWriter; - - public FileList(Path backingFile, int cacheSize, HiveConf conf) { + public FileList(Path backingFile, HiveConf conf) { this.backingFile = backingFile; this.conf = conf; -if (cacheSize > 0) { - // Cache size must be > 0 for this list to be used for the write operation. - this.cache = new LinkedBlockingQueue<>(cacheSize); - fileListStreamer = new FileListStreamer(cache, backingFile, conf); - thresholdPoint = getThreshold(cacheSize); - LOG.debug("File list backed by {} can be used for write operation.", backingFile); -} else { - thresholdHit = true; -} } - @VisibleForTesting - FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue cache, HiveConf conf) { -this.backingFile = backingFile; -this.fileListStreamer = fileListStreamer; -this.cache = cache; -this.conf = conf; -thresholdPoint = getThreshold(cache.remainingCapacity()); - } - - /** - * Only add operation is safe for concurrent operations. - */ public void add(String entry) throws SemanticException { -if (thresholdHit && !fileListStreamer.isAlive()) { - throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString()); -} +Retryable retryable = Retryable.builder() Review comment: Currently it is used only in this add function, so creating it locally. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 555496) Time Spent: 1h 50m (was: 1h 40m) > Moving to file based iteration for copying data > --- > > Key: HIVE-24718 > URL: https://issues.apache.org/jira/browse/HIVE-24718 > Project: Hive > Issue Type: Bug >Reporter: Arko Sharma >Assignee: Arko Sharma >Priority: Major > Labels: pull-request-available > Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch > > Time Spent: 1h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=550602&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-550602 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 10/Feb/21 07:16 Start Date: 10/Feb/21 07:16 Worklog Time Spent: 10m Work Description: pkumarsinha commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r573496104 ## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ## @@ -22,154 +22,118 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; +import java.io.BufferedWriter; import java.io.IOException; import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.UncheckedIOException; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingQueue; - -/** - * A file backed list of Strings which is in-memory till the threshold. - */ public class FileList implements AutoCloseable, Iterator { private static final Logger LOG = LoggerFactory.getLogger(FileList.class); - private static int fileListStreamerID = 0; - private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; - - private LinkedBlockingQueue cache; - private volatile boolean thresholdHit = false; - private int thresholdPoint; - private float thresholdFactor = 0.9f; private Path backingFile; - private FileListStreamer fileListStreamer; - private String nextElement; - private boolean noMoreElement; + private String nextElement = null; private HiveConf conf; private BufferedReader backingFileReader; + private BufferedWriter backingFileWriter; - - public FileList(Path backingFile, int cacheSize, HiveConf conf) { + public FileList(Path backingFile, HiveConf conf) { this.backingFile = backingFile; this.conf = conf; -if (cacheSize > 0) { - // Cache size must be > 0 for this list to be used for the write operation. - this.cache = new LinkedBlockingQueue<>(cacheSize); - fileListStreamer = new FileListStreamer(cache, backingFile, conf); - thresholdPoint = getThreshold(cacheSize); - LOG.debug("File list backed by {} can be used for write operation.", backingFile); -} else { - thresholdHit = true; -} } - @VisibleForTesting - FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue cache, HiveConf conf) { -this.backingFile = backingFile; -this.fileListStreamer = fileListStreamer; -this.cache = cache; -this.conf = conf; -thresholdPoint = getThreshold(cache.remainingCapacity()); - } - - /** - * Only add operation is safe for concurrent operations. - */ public void add(String entry) throws SemanticException { -if (thresholdHit && !fileListStreamer.isAlive()) { - throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString()); -} +Retryable retryable = Retryable.builder() +.withHiveConf(conf) +.withRetryOnException(IOException.class).build(); try { - cache.put(entry); -} catch (InterruptedException e) { - throw new SemanticException(e); -} -if (!thresholdHit && cache.size() >= thresholdPoint) { - initStoreToFile(cache.size()); + retryable.executeCallable((Callable) ()-> { +synchronized (backingFile ) { Review comment: Ok, the whole discussion was with the point that the write is going to be single threaded op. Agree, with multiple threads writing it makes sense. However, how does retry work in multiple thread case and is this ordering guarantee not impacted in retry case? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 550602) Time Spent: 1h 40m (was: 1.5h) > Moving to file based iteration for copying data > --- > > Key: HIVE-24718 > URL: https://issues.apache.org/jira/browse/HIVE-24718 > Project: Hive > Issue Type: Bug >Reporter: Arko Sharma >Assignee: Arko Sharma >Priority: Major >
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=550598&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-550598 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 10/Feb/21 07:02 Start Date: 10/Feb/21 07:02 Worklog Time Spent: 10m Work Description: ArkoSharma commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r573490313 ## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ## @@ -22,154 +22,118 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; +import java.io.BufferedWriter; import java.io.IOException; import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.UncheckedIOException; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingQueue; - -/** - * A file backed list of Strings which is in-memory till the threshold. - */ public class FileList implements AutoCloseable, Iterator { private static final Logger LOG = LoggerFactory.getLogger(FileList.class); - private static int fileListStreamerID = 0; - private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; - - private LinkedBlockingQueue cache; - private volatile boolean thresholdHit = false; - private int thresholdPoint; - private float thresholdFactor = 0.9f; private Path backingFile; - private FileListStreamer fileListStreamer; - private String nextElement; - private boolean noMoreElement; + private String nextElement = null; private HiveConf conf; private BufferedReader backingFileReader; + private BufferedWriter backingFileWriter; - - public FileList(Path backingFile, int cacheSize, HiveConf conf) { + public FileList(Path backingFile, HiveConf conf) { this.backingFile = backingFile; this.conf = conf; -if (cacheSize > 0) { - // Cache size must be > 0 for this list to be used for the write operation. - this.cache = new LinkedBlockingQueue<>(cacheSize); - fileListStreamer = new FileListStreamer(cache, backingFile, conf); - thresholdPoint = getThreshold(cacheSize); - LOG.debug("File list backed by {} can be used for write operation.", backingFile); -} else { - thresholdHit = true; -} } - @VisibleForTesting - FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue cache, HiveConf conf) { -this.backingFile = backingFile; -this.fileListStreamer = fileListStreamer; -this.cache = cache; -this.conf = conf; -thresholdPoint = getThreshold(cache.remainingCapacity()); - } - - /** - * Only add operation is safe for concurrent operations. - */ public void add(String entry) throws SemanticException { -if (thresholdHit && !fileListStreamer.isAlive()) { - throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString()); -} +Retryable retryable = Retryable.builder() +.withHiveConf(conf) +.withRetryOnException(IOException.class).build(); try { - cache.put(entry); -} catch (InterruptedException e) { - throw new SemanticException(e); -} -if (!thresholdHit && cache.size() >= thresholdPoint) { - initStoreToFile(cache.size()); + retryable.executeCallable((Callable) ()-> { +synchronized (backingFile ) { Review comment: During bootstrap dump of managed-table partitions, there could be multiple threads trying to access this writer. This number is governed by the conf REPL_PARTITIONS_DUMP_PARALLELISM. So, the order could be messed up. Earlier I was referring to the implementation of BufferedWriter, which makes write operation thread safe, meaning only one thread writes some value at a time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 550598) Time Spent: 1.5h (was: 1h 20m) > Moving to file based iteration for copying data > --- > > Key: HIVE-24718 > URL: https://issues.apache.org/jira/browse/HIVE-24718 > Project: Hive >
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=550590&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-550590 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 10/Feb/21 06:32 Start Date: 10/Feb/21 06:32 Worklog Time Spent: 10m Work Description: pkumarsinha commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r573480027 ## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ## @@ -22,154 +22,118 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; +import java.io.BufferedWriter; import java.io.IOException; import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.UncheckedIOException; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingQueue; - -/** - * A file backed list of Strings which is in-memory till the threshold. - */ public class FileList implements AutoCloseable, Iterator { private static final Logger LOG = LoggerFactory.getLogger(FileList.class); - private static int fileListStreamerID = 0; - private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; - - private LinkedBlockingQueue cache; - private volatile boolean thresholdHit = false; - private int thresholdPoint; - private float thresholdFactor = 0.9f; private Path backingFile; - private FileListStreamer fileListStreamer; - private String nextElement; - private boolean noMoreElement; + private String nextElement = null; private HiveConf conf; private BufferedReader backingFileReader; + private BufferedWriter backingFileWriter; - - public FileList(Path backingFile, int cacheSize, HiveConf conf) { + public FileList(Path backingFile, HiveConf conf) { this.backingFile = backingFile; this.conf = conf; -if (cacheSize > 0) { - // Cache size must be > 0 for this list to be used for the write operation. - this.cache = new LinkedBlockingQueue<>(cacheSize); - fileListStreamer = new FileListStreamer(cache, backingFile, conf); - thresholdPoint = getThreshold(cacheSize); - LOG.debug("File list backed by {} can be used for write operation.", backingFile); -} else { - thresholdHit = true; -} } - @VisibleForTesting - FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue cache, HiveConf conf) { -this.backingFile = backingFile; -this.fileListStreamer = fileListStreamer; -this.cache = cache; -this.conf = conf; -thresholdPoint = getThreshold(cache.remainingCapacity()); - } - - /** - * Only add operation is safe for concurrent operations. - */ public void add(String entry) throws SemanticException { -if (thresholdHit && !fileListStreamer.isAlive()) { - throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString()); -} +Retryable retryable = Retryable.builder() +.withHiveConf(conf) +.withRetryOnException(IOException.class).build(); try { - cache.put(entry); -} catch (InterruptedException e) { - throw new SemanticException(e); -} -if (!thresholdHit && cache.size() >= thresholdPoint) { - initStoreToFile(cache.size()); + retryable.executeCallable((Callable) ()-> { +synchronized (backingFile ) { Review comment: If it is a single thread writing the entries, ordering wouldn't be messed up, no? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 550590) Time Spent: 1h 20m (was: 1h 10m) > Moving to file based iteration for copying data > --- > > Key: HIVE-24718 > URL: https://issues.apache.org/jira/browse/HIVE-24718 > Project: Hive > Issue Type: Bug >Reporter: Arko Sharma >Assignee: Arko Sharma >Priority: Major > Labels: pull-request-available > Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch > > Time Spent: 1h 20m > Remaining Estimate: 0h > -- This m
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=550576&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-550576 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 10/Feb/21 05:42 Start Date: 10/Feb/21 05:42 Worklog Time Spent: 10m Work Description: ArkoSharma commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r573463126 ## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ## @@ -22,154 +22,118 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; +import java.io.BufferedWriter; import java.io.IOException; import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.UncheckedIOException; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingQueue; - -/** - * A file backed list of Strings which is in-memory till the threshold. - */ public class FileList implements AutoCloseable, Iterator { private static final Logger LOG = LoggerFactory.getLogger(FileList.class); - private static int fileListStreamerID = 0; - private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; - - private LinkedBlockingQueue cache; - private volatile boolean thresholdHit = false; - private int thresholdPoint; - private float thresholdFactor = 0.9f; private Path backingFile; - private FileListStreamer fileListStreamer; - private String nextElement; - private boolean noMoreElement; + private String nextElement = null; private HiveConf conf; private BufferedReader backingFileReader; + private BufferedWriter backingFileWriter; - - public FileList(Path backingFile, int cacheSize, HiveConf conf) { + public FileList(Path backingFile, HiveConf conf) { this.backingFile = backingFile; this.conf = conf; -if (cacheSize > 0) { - // Cache size must be > 0 for this list to be used for the write operation. - this.cache = new LinkedBlockingQueue<>(cacheSize); - fileListStreamer = new FileListStreamer(cache, backingFile, conf); - thresholdPoint = getThreshold(cacheSize); - LOG.debug("File list backed by {} can be used for write operation.", backingFile); -} else { - thresholdHit = true; -} } - @VisibleForTesting - FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue cache, HiveConf conf) { -this.backingFile = backingFile; -this.fileListStreamer = fileListStreamer; -this.cache = cache; -this.conf = conf; -thresholdPoint = getThreshold(cache.remainingCapacity()); - } - - /** - * Only add operation is safe for concurrent operations. - */ public void add(String entry) throws SemanticException { -if (thresholdHit && !fileListStreamer.isAlive()) { - throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString()); -} +Retryable retryable = Retryable.builder() +.withHiveConf(conf) +.withRetryOnException(IOException.class).build(); try { - cache.put(entry); -} catch (InterruptedException e) { - throw new SemanticException(e); -} -if (!thresholdHit && cache.size() >= thresholdPoint) { - initStoreToFile(cache.size()); + retryable.executeCallable((Callable) ()-> { +synchronized (backingFile ) { Review comment: Printing the entry and writing the newline character are being done in separate statements. For two entries e1,e2, we need the order of writing to be e1,newline,e2,newline or e2,newline,e1,newline. This order would be guaranteed only if we use 'synchronized' block containing the two statements. Otherwise, we could have cases like e1,e2,newline,newline, which would cause errors. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 550576) Time Spent: 1h 10m (was: 1h) > Moving to file based iteration for copying data > --- > > Key: HIVE-24718 > URL: https://issues.apache.org/jira/browse/HIVE-24718 > Project: Hive > Is
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=550573&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-550573 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 10/Feb/21 05:20 Start Date: 10/Feb/21 05:20 Worklog Time Spent: 10m Work Description: pkumarsinha commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r573456193 ## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ## @@ -22,154 +22,118 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; +import java.io.BufferedWriter; import java.io.IOException; import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.UncheckedIOException; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingQueue; - -/** - * A file backed list of Strings which is in-memory till the threshold. - */ public class FileList implements AutoCloseable, Iterator { private static final Logger LOG = LoggerFactory.getLogger(FileList.class); - private static int fileListStreamerID = 0; - private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; - - private LinkedBlockingQueue cache; - private volatile boolean thresholdHit = false; - private int thresholdPoint; - private float thresholdFactor = 0.9f; private Path backingFile; - private FileListStreamer fileListStreamer; - private String nextElement; - private boolean noMoreElement; + private String nextElement = null; private HiveConf conf; private BufferedReader backingFileReader; + private BufferedWriter backingFileWriter; - - public FileList(Path backingFile, int cacheSize, HiveConf conf) { + public FileList(Path backingFile, HiveConf conf) { this.backingFile = backingFile; this.conf = conf; -if (cacheSize > 0) { - // Cache size must be > 0 for this list to be used for the write operation. - this.cache = new LinkedBlockingQueue<>(cacheSize); - fileListStreamer = new FileListStreamer(cache, backingFile, conf); - thresholdPoint = getThreshold(cacheSize); - LOG.debug("File list backed by {} can be used for write operation.", backingFile); -} else { - thresholdHit = true; -} } - @VisibleForTesting - FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue cache, HiveConf conf) { -this.backingFile = backingFile; -this.fileListStreamer = fileListStreamer; -this.cache = cache; -this.conf = conf; -thresholdPoint = getThreshold(cache.remainingCapacity()); - } - - /** - * Only add operation is safe for concurrent operations. - */ public void add(String entry) throws SemanticException { -if (thresholdHit && !fileListStreamer.isAlive()) { - throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString()); -} +Retryable retryable = Retryable.builder() +.withHiveConf(conf) +.withRetryOnException(IOException.class).build(); try { - cache.put(entry); -} catch (InterruptedException e) { - throw new SemanticException(e); -} -if (!thresholdHit && cache.size() >= thresholdPoint) { - initStoreToFile(cache.size()); + retryable.executeCallable((Callable) ()-> { +synchronized (backingFile ) { Review comment: Why do we need synchronization if it is just one thread writing? How does synchronizing it guarantees that atomicity (either or none) which you are expecting? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 550573) Time Spent: 1h (was: 50m) > Moving to file based iteration for copying data > --- > > Key: HIVE-24718 > URL: https://issues.apache.org/jira/browse/HIVE-24718 > Project: Hive > Issue Type: Bug >Reporter: Arko Sharma >Assignee: Arko Sharma >Priority: Major > Labels: pull-request-available > Attachments: HIVE-24718.01.patch, HIVE-24718.02.patch >
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=550567&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-550567 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 10/Feb/21 05:04 Start Date: 10/Feb/21 05:04 Worklog Time Spent: 10m Work Description: ArkoSharma commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r573451500 ## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ## @@ -22,154 +22,118 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; +import java.io.BufferedWriter; import java.io.IOException; import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.UncheckedIOException; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingQueue; - -/** - * A file backed list of Strings which is in-memory till the threshold. - */ public class FileList implements AutoCloseable, Iterator { private static final Logger LOG = LoggerFactory.getLogger(FileList.class); - private static int fileListStreamerID = 0; - private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; - - private LinkedBlockingQueue cache; - private volatile boolean thresholdHit = false; - private int thresholdPoint; - private float thresholdFactor = 0.9f; private Path backingFile; - private FileListStreamer fileListStreamer; - private String nextElement; - private boolean noMoreElement; + private String nextElement = null; private HiveConf conf; private BufferedReader backingFileReader; + private BufferedWriter backingFileWriter; - - public FileList(Path backingFile, int cacheSize, HiveConf conf) { + public FileList(Path backingFile, HiveConf conf) { this.backingFile = backingFile; this.conf = conf; -if (cacheSize > 0) { - // Cache size must be > 0 for this list to be used for the write operation. - this.cache = new LinkedBlockingQueue<>(cacheSize); - fileListStreamer = new FileListStreamer(cache, backingFile, conf); - thresholdPoint = getThreshold(cacheSize); - LOG.debug("File list backed by {} can be used for write operation.", backingFile); -} else { - thresholdHit = true; -} } - @VisibleForTesting - FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue cache, HiveConf conf) { -this.backingFile = backingFile; -this.fileListStreamer = fileListStreamer; -this.cache = cache; -this.conf = conf; -thresholdPoint = getThreshold(cache.remainingCapacity()); - } - - /** - * Only add operation is safe for concurrent operations. - */ public void add(String entry) throws SemanticException { -if (thresholdHit && !fileListStreamer.isAlive()) { - throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString()); -} +Retryable retryable = Retryable.builder() +.withHiveConf(conf) +.withRetryOnException(IOException.class).build(); try { - cache.put(entry); -} catch (InterruptedException e) { - throw new SemanticException(e); -} -if (!thresholdHit && cache.size() >= thresholdPoint) { - initStoreToFile(cache.size()); + retryable.executeCallable((Callable) ()-> { +synchronized (backingFile ) { Review comment: 1 thread can write at a moment but synchronisation is needed since we have two elements to write per entry. One being the entry and other is the newline. These two elements need to be written atomically. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 550567) Time Spent: 50m (was: 40m) > Moving to file based iteration for copying data > --- > > Key: HIVE-24718 > URL: https://issues.apache.org/jira/browse/HIVE-24718 > Project: Hive > Issue Type: Bug >Reporter: Arko Sharma >Assignee: Arko Sharma >Priority: Major > Labels: pull-request-available > Attachments:
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=550566&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-550566 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 10/Feb/21 05:02 Start Date: 10/Feb/21 05:02 Worklog Time Spent: 10m Work Description: ArkoSharma commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r573450950 ## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ## @@ -22,154 +22,118 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; +import java.io.BufferedWriter; import java.io.IOException; import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.UncheckedIOException; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingQueue; - -/** - * A file backed list of Strings which is in-memory till the threshold. - */ public class FileList implements AutoCloseable, Iterator { private static final Logger LOG = LoggerFactory.getLogger(FileList.class); - private static int fileListStreamerID = 0; - private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; - - private LinkedBlockingQueue cache; - private volatile boolean thresholdHit = false; - private int thresholdPoint; - private float thresholdFactor = 0.9f; private Path backingFile; - private FileListStreamer fileListStreamer; - private String nextElement; - private boolean noMoreElement; + private String nextElement = null; private HiveConf conf; private BufferedReader backingFileReader; + private BufferedWriter backingFileWriter; - - public FileList(Path backingFile, int cacheSize, HiveConf conf) { + public FileList(Path backingFile, HiveConf conf) { this.backingFile = backingFile; this.conf = conf; -if (cacheSize > 0) { - // Cache size must be > 0 for this list to be used for the write operation. - this.cache = new LinkedBlockingQueue<>(cacheSize); - fileListStreamer = new FileListStreamer(cache, backingFile, conf); - thresholdPoint = getThreshold(cacheSize); - LOG.debug("File list backed by {} can be used for write operation.", backingFile); -} else { - thresholdHit = true; -} } - @VisibleForTesting - FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue cache, HiveConf conf) { -this.backingFile = backingFile; -this.fileListStreamer = fileListStreamer; -this.cache = cache; -this.conf = conf; -thresholdPoint = getThreshold(cache.remainingCapacity()); - } - - /** - * Only add operation is safe for concurrent operations. - */ public void add(String entry) throws SemanticException { -if (thresholdHit && !fileListStreamer.isAlive()) { - throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString()); -} +Retryable retryable = Retryable.builder() +.withHiveConf(conf) +.withRetryOnException(IOException.class).build(); try { - cache.put(entry); -} catch (InterruptedException e) { - throw new SemanticException(e); -} -if (!thresholdHit && cache.size() >= thresholdPoint) { - initStoreToFile(cache.size()); + retryable.executeCallable((Callable) ()-> { +synchronized (backingFile ) { + if (backingFileWriter == null) { +backingFileWriter = initWriter(); + } + backingFileWriter.write(entry); + backingFileWriter.newLine(); +} +LOG.info("Writing entry {} to file list backed by {}", entry, backingFile); +return null; + }); +} catch (Exception e) { + throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage(), + String.valueOf(ErrorMsg.getErrorMsg(e).getErrorCode(; } } + BufferedWriter initWriter() throws IOException { +FileSystem fs = FileSystem.get(backingFile.toUri(), conf); +return new BufferedWriter(new OutputStreamWriter(fs.create(backingFile))); + } + @Override public boolean hasNext() { -if (!thresholdHit) { - return (cache != null && !cache.isEmpty()); -} if (nextElement != null) { return true; +} else { + try { +nextElement = readNextLine(); +return (nextElement != null); + } catch (IOException e) { +throw new UncheckedIOException(e); +
[jira] [Work logged] (HIVE-24718) Moving to file based iteration for copying data
[ https://issues.apache.org/jira/browse/HIVE-24718?focusedWorklogId=550473&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-550473 ] ASF GitHub Bot logged work on HIVE-24718: - Author: ASF GitHub Bot Created on: 09/Feb/21 23:05 Start Date: 09/Feb/21 23:05 Worklog Time Spent: 10m Work Description: pkumarsinha commented on a change in pull request #1936: URL: https://github.com/apache/hive/pull/1936#discussion_r573292875 ## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ## @@ -22,154 +22,118 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; +import java.io.BufferedWriter; import java.io.IOException; import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.UncheckedIOException; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingQueue; - -/** - * A file backed list of Strings which is in-memory till the threshold. - */ public class FileList implements AutoCloseable, Iterator { private static final Logger LOG = LoggerFactory.getLogger(FileList.class); - private static int fileListStreamerID = 0; - private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; - - private LinkedBlockingQueue cache; - private volatile boolean thresholdHit = false; - private int thresholdPoint; - private float thresholdFactor = 0.9f; private Path backingFile; - private FileListStreamer fileListStreamer; - private String nextElement; - private boolean noMoreElement; + private String nextElement = null; private HiveConf conf; private BufferedReader backingFileReader; + private BufferedWriter backingFileWriter; - - public FileList(Path backingFile, int cacheSize, HiveConf conf) { + public FileList(Path backingFile, HiveConf conf) { this.backingFile = backingFile; this.conf = conf; -if (cacheSize > 0) { - // Cache size must be > 0 for this list to be used for the write operation. - this.cache = new LinkedBlockingQueue<>(cacheSize); - fileListStreamer = new FileListStreamer(cache, backingFile, conf); - thresholdPoint = getThreshold(cacheSize); - LOG.debug("File list backed by {} can be used for write operation.", backingFile); -} else { - thresholdHit = true; -} } - @VisibleForTesting - FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue cache, HiveConf conf) { -this.backingFile = backingFile; -this.fileListStreamer = fileListStreamer; -this.cache = cache; -this.conf = conf; -thresholdPoint = getThreshold(cache.remainingCapacity()); - } - - /** - * Only add operation is safe for concurrent operations. - */ public void add(String entry) throws SemanticException { -if (thresholdHit && !fileListStreamer.isAlive()) { - throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString()); -} +Retryable retryable = Retryable.builder() Review comment: Can we create this just one per FileList? ## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java ## @@ -22,154 +22,118 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; +import java.io.BufferedWriter; import java.io.IOException; import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.UncheckedIOException; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingQueue; - -/** - * A file backed list of Strings which is in-memory till the threshold. - */ public class FileList implements AutoCloseable, Iterator { private static final Logger LOG = LoggerFactory.getLogger(FileList.class); - private static int fileListStreamerID = 0; - private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; - - private LinkedBlockingQueue cache; - private volatile boolean thresholdHit = false; - private int thresholdPoint; - private float thresholdFactor = 0.9f; private Path backingFile; - private FileListStreamer fileListStreamer; - private St