[GitHub] [hive] pkumarsinha commented on a change in pull request #1358: HIVE-23955 : Classification of Error Codes in Replication

2020-08-06 Thread GitBox


pkumarsinha commented on a change in pull request #1358:
URL: https://github.com/apache/hive/pull/1358#discussion_r466578320



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java
##
@@ -86,107 +87,59 @@ private boolean checkIfPathExist(Path sourcePath, 
UserGroupInformation proxyUser
 return proxyUser.doAs((PrivilegedExceptionAction) () -> 
sourcePath.getFileSystem(conf).exists(sourcePath));
   }
 
-  private int handleException(Exception e, Path sourcePath, Path targetPath,
-  int currentRetry, UserGroupInformation 
proxyUser) {
-try {
-  LOG.info("Checking if source path " + sourcePath + " is missing for 
exception ", e);
-  if (!checkIfPathExist(sourcePath, proxyUser)) {
-LOG.info("Source path is missing. Ignoring exception.");
-return 0;
-  }
-} catch (Exception ex) {
-  LOG.warn("Source path missing check failed. ", ex);
-}
-// retry logic only for i/o exception
-if (!(e instanceof IOException)) {
-  LOG.error("Unable to copy {} to {}", sourcePath, targetPath, e);
-  setException(e);
-  return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
-}
-
-if (currentRetry <= MAX_COPY_RETRY) {
-  LOG.warn("Unable to copy {} to {}", sourcePath, targetPath, e);
-} else {
-  LOG.error("Unable to copy {} to {} even after retrying for {} time", 
sourcePath, targetPath, currentRetry, e);
-  setException(e);
-  return ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getErrorCode();
-}
-int sleepTime = FileUtils.getSleepTime(currentRetry);
-LOG.info("Sleep for " + sleepTime + " milliseconds before retry no " + 
(currentRetry));
-try {
-  Thread.sleep(sleepTime);
-} catch (InterruptedException timerEx) {
-  LOG.info("Sleep interrupted", timerEx.getMessage());
-}
-try {
-  if (proxyUser == null) {
-proxyUser = Utils.getUGI();
-  }
-  FileSystem.closeAllForUGI(proxyUser);
-} catch (Exception ex) {
-  LOG.warn("Unable to closeAllForUGI for user " + proxyUser, ex);
-}
-return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
-  }
-
   @Override
   public int execute() {
 String distCpDoAsUser = 
conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
+Retryable retryable = Retryable.builder()
+  .withHiveConf(conf)
+  .withRetryOnException(IOException.class).build();
+try {
+  return retryable.executeCallable(() -> {
+UserGroupInformation proxyUser = null;
+Path sourcePath = work.getFullyQualifiedSourcePath();
+Path targetPath = work.getFullyQualifiedTargetPath();
+try {
+  if 
(conf.getBoolVar(HiveConf.ConfVars.REPL_ADD_RAW_RESERVED_NAMESPACE)) {
+sourcePath = 
reservedRawPath(work.getFullyQualifiedSourcePath().toUri());
+targetPath = 
reservedRawPath(work.getFullyQualifiedTargetPath().toUri());
+  }
+  UserGroupInformation ugi = Utils.getUGI();
+  String currentUser = ugi.getShortUserName();
+  if (distCpDoAsUser != null && !currentUser.equals(distCpDoAsUser)) {
+proxyUser = UserGroupInformation.createProxyUser(
+  distCpDoAsUser, UserGroupInformation.getLoginUser());
+  }
 
-Path sourcePath = work.getFullyQualifiedSourcePath();
-Path targetPath = work.getFullyQualifiedTargetPath();
-if (conf.getBoolVar(HiveConf.ConfVars.REPL_ADD_RAW_RESERVED_NAMESPACE)) {
-  sourcePath = reservedRawPath(work.getFullyQualifiedSourcePath().toUri());
-  targetPath = reservedRawPath(work.getFullyQualifiedTargetPath().toUri());
-}
-int currentRetry = 0;
-int error = 0;
-UserGroupInformation proxyUser = null;
-while (currentRetry <= MAX_COPY_RETRY) {
-  try {
-UserGroupInformation ugi = Utils.getUGI();
-String currentUser = ugi.getShortUserName();
-if (distCpDoAsUser != null && !currentUser.equals(distCpDoAsUser)) {
-  proxyUser = UserGroupInformation.createProxyUser(
-  distCpDoAsUser, UserGroupInformation.getLoginUser());
-}
-
-setTargetPathOwner(targetPath, sourcePath, proxyUser);
-
-// do we create a new conf and only here provide this additional 
option so that we get away from
-// differences of data in two location for the same directories ?
-// basically add distcp.options.delete to hiveconf new object ?
-FileUtils.distCp(
-sourcePath.getFileSystem(conf), // source file system
-Collections.singletonList(sourcePath),  // list of source paths
-targetPath,
-false,
-proxyUser,
-conf,
-ShimLoader.getHadoopShims());
-return 0;
-  } catch (Exception e) {
-currentRetry++;
-error = handleException(e, sourcePath, targetPath, currentRetry, 
proxyUser);
-if (error == 0) {
-  

[GitHub] [hive] pkumarsinha commented on a change in pull request #1358: HIVE-23955 : Classification of Error Codes in Replication

2020-08-06 Thread GitBox


pkumarsinha commented on a change in pull request #1358:
URL: https://github.com/apache/hive/pull/1358#discussion_r466576634



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
##
@@ -196,12 +203,14 @@ private String checkHiveEntityGuid(AtlasRequestBuilder 
atlasRequestBuilder, Stri
 AtlasObjectId objectId = atlasRequestBuilder.getItemToExport(clusterName, 
srcDb);
 Set> entries = 
objectId.getUniqueAttributes().entrySet();
 if (entries == null || entries.isEmpty()) {
-  throw new SemanticException("Could find entries in objectId for:" + 
clusterName);
+  throw new 
SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format("Could find " 
+
+"entries in objectId for:" + clusterName, "atlas"));

Review comment:
   was referring to "atlas" part here and in other places like "ranger", 
"hive" . Should we have one const defined per service and use that in stead?, 
something like:
   final String ReplUtils.ATLAS_SVC = "atlas";
   ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format("Could find " +
   "entries in objectId for:" + clusterName, ReplUtils.ATLAS_SVC);





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



[GitHub] [hive] pkumarsinha commented on a change in pull request #1358: HIVE-23955 : Classification of Error Codes in Replication

2020-08-06 Thread GitBox


pkumarsinha commented on a change in pull request #1358:
URL: https://github.com/apache/hive/pull/1358#discussion_r466574279



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
##
@@ -132,31 +130,40 @@ private AtlasReplInfo createAtlasReplInfo() throws 
SemanticException, MalformedU
 
   private long lastStoredTimeStamp() throws SemanticException {
 Path prevMetadataPath = new Path(work.getPrevAtlasDumpDir(), 
EximUtil.METADATA_NAME);
-BufferedReader br = null;
+Retryable retryable = Retryable.builder()
+  .withHiveConf(conf)
+  .withRetryOnException(IOException.class)
+  .withFailOnException(FileNotFoundException.class).build();
 try {
-  FileSystem fs = prevMetadataPath.getFileSystem(conf);
-  br = new BufferedReader(new InputStreamReader(fs.open(prevMetadataPath), 
Charset.defaultCharset()));
-  String line = br.readLine();
-  if (line == null) {
-throw new SemanticException("Could not read lastStoredTimeStamp from 
atlas metadata file");
-  }
-  String[] lineContents = line.split("\t", 5);
-  return Long.parseLong(lineContents[1]);
-} catch (Exception ex) {
-  throw new SemanticException(ex);
-} finally {
-  if (br != null) {
+  return retryable.executeCallable(() -> {
+BufferedReader br = null;
 try {
-  br.close();
-} catch (IOException e) {
-  throw new SemanticException(e);
+  FileSystem fs = prevMetadataPath.getFileSystem(conf);
+  br = new BufferedReader(new 
InputStreamReader(fs.open(prevMetadataPath), Charset.defaultCharset()));
+  String line = br.readLine();
+  if (line == null) {
+throw new 
SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE
+  .format("Could not read lastStoredTimeStamp from atlas metadata 
file", "atlas"));
+  }
+  String[] lineContents = line.split("\t", 5);
+  return Long.parseLong(lineContents[1]);
+} finally {
+  if (br != null) {
+try {
+  br.close();
+} catch (IOException e) {
+  //Do nothing
+}
+  }
 }
-  }
+  });
+} catch (Exception e) {
+  throw new 
SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);

Review comment:
   yes, here we are catching as that, I was referring to line 144:if (line 
== null) {
   throw new 
SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE
 .format("Could not read lastStoredTimeStamp from atlas 
metadata file", "atlas"));
 }
   where we are throwing SemanticException already which we catch here, can't 
we just throw the same e in that case?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org



[GitHub] [hive] pkumarsinha commented on a change in pull request #1358: HIVE-23955 : Classification of Error Codes in Replication

2020-08-06 Thread GitBox


pkumarsinha commented on a change in pull request #1358:
URL: https://github.com/apache/hive/pull/1358#discussion_r466420110



##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
##
@@ -132,31 +130,40 @@ private AtlasReplInfo createAtlasReplInfo() throws 
SemanticException, MalformedU
 
   private long lastStoredTimeStamp() throws SemanticException {
 Path prevMetadataPath = new Path(work.getPrevAtlasDumpDir(), 
EximUtil.METADATA_NAME);
-BufferedReader br = null;
+Retryable retryable = Retryable.builder()
+  .withHiveConf(conf)
+  .withRetryOnException(IOException.class)
+  .withFailOnException(FileNotFoundException.class).build();
 try {
-  FileSystem fs = prevMetadataPath.getFileSystem(conf);
-  br = new BufferedReader(new InputStreamReader(fs.open(prevMetadataPath), 
Charset.defaultCharset()));
-  String line = br.readLine();
-  if (line == null) {
-throw new SemanticException("Could not read lastStoredTimeStamp from 
atlas metadata file");
-  }
-  String[] lineContents = line.split("\t", 5);
-  return Long.parseLong(lineContents[1]);
-} catch (Exception ex) {
-  throw new SemanticException(ex);
-} finally {
-  if (br != null) {
+  return retryable.executeCallable(() -> {
+BufferedReader br = null;
 try {
-  br.close();
-} catch (IOException e) {
-  throw new SemanticException(e);
+  FileSystem fs = prevMetadataPath.getFileSystem(conf);
+  br = new BufferedReader(new 
InputStreamReader(fs.open(prevMetadataPath), Charset.defaultCharset()));
+  String line = br.readLine();
+  if (line == null) {
+throw new 
SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE

Review comment:
   lastStoredTimeStamp is maintained by hive itself. Should we have better 
error message category for this? 

##
File path: common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
##
@@ -505,18 +505,9 @@
   " queue: {1}. Please fix and try again.", true),
   SPARK_RUNTIME_OOM(20015, "Spark job failed because of out of memory."),
 
-  //if the error message is changed for REPL_EVENTS_MISSING_IN_METASTORE, then 
need modification in getNextNotification
-  //method in HiveMetaStoreClient
-  REPL_EVENTS_MISSING_IN_METASTORE(20016, "Notification events are missing in 
the meta store."),
-  REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID(20017, "Load path {0} not valid as target 
database is bootstrapped " +
-  "from some other path : {1}."),
-  REPL_FILE_MISSING_FROM_SRC_AND_CM_PATH(20018, "File is missing from both 
source and cm path."),
-  REPL_LOAD_PATH_NOT_FOUND(20019, "Load path does not exist."),
-  REPL_DATABASE_IS_NOT_SOURCE_OF_REPLICATION(20020,
-  "Source of replication (repl.source.for) is not set in the database 
properties."),
-  REPL_INVALID_DB_OR_TABLE_PATTERN(20021,
-  "Invalid pattern for the DB or table name in the replication policy. 
"
-  + "It should be a valid regex enclosed within single or 
double quotes."),
+  REPL_FILE_MISSING_FROM_SRC_AND_CM_PATH(20016, "File is missing from both 
source and cm path."),
+  REPL_EXTERNAL_SERVICE_CONNECTION_ERROR(20017, "Failed to connect to {0} 
service. Error code {1}.",
+true),

Review comment:
   nit: Can accommodate in same line.

##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
##
@@ -42,11 +43,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Serializable;
+import java.io.*;

Review comment:
   Should we revert this?

##
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java
##
@@ -125,17 +127,15 @@ private AtlasImportResult 
getDefaultAtlasImportResult(AtlasImportRequest request
 return new AtlasImportResult(request, "", "", "", 0L);
   }
 
-  public AtlasServer getServer(String endpoint) throws SemanticException {
+  public AtlasServer getServer(String endpoint, HiveConf conf) throws 
SemanticException {
+Retryable retryable = Retryable.builder()
+  .withHiveConf(conf)
+  .withRetryOnException(Exception.class).build();

Review comment:
   Should we not retry on just AtlasServiceException and catch finally only 
that exception as that's what getServer says to throw?

##
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java
##
@@ -43,15 +44,15 @@
  */
 public class DirCopyTask extends Task implements Serializable {
   private static final Logger LOG = LoggerFactory.getLogger(DirCopyTask.class);
-  private static final int MAX_COPY_RETRY = 5;
 
   private boolean createAndSetPathOwner(Path destPath, Path sourcePath) throws 
IOException {
 FileSystem targetFs =