This is an automated email from the ASF dual-hosted git repository.

anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 6617116  HIVE-24327: AtlasServer entity may not be present during 
first Atlas metadata dump  (Pravin Kumar Sinha, reviewed by Aasha Medhi)
6617116 is described below

commit 66171166e79a059c58e8d778d259cf0a9528cee2
Author: Anishek Agarwal <anis...@gmail.com>
AuthorDate: Thu Nov 12 09:44:34 2020 +0530

    HIVE-24327: AtlasServer entity may not be present during first Atlas 
metadata dump  (Pravin Kumar Sinha, reviewed by Aasha Medhi)
---
 .../ql/exec/repl/atlas/AtlasRestClientImpl.java    | 14 +++-
 .../hive/ql/exec/repl/TestAtlasDumpTask.java       | 78 +++++++++++++++++++---
 2 files changed, 81 insertions(+), 11 deletions(-)

diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java
index ed7485d..7397749 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java
@@ -133,7 +133,19 @@ public class AtlasRestClientImpl extends 
RetryingClientTimeBased implements Atla
       .withHiveConf(conf)
       .withRetryOnException(AtlasServiceException.class).build();
     try {
-      return retryable.executeCallable(() -> clientV2.getServer(endpoint));
+      return retryable.executeCallable((Callable<AtlasServer>) () -> {
+        try {
+          return clientV2.getServer(endpoint);
+        } catch (AtlasServiceException e) {
+          int statusCode = e.getStatus() != null ? 
e.getStatus().getStatusCode() : -1;
+          if (NOT_FOUND.getStatusCode() == statusCode) {
+            // Atlas server entity is initialized on first import/export o/p.
+            LOG.info("Atlas server entity is not found");
+            return null;
+          }
+          throw e;
+        }
+      });
     } catch (Exception e) {
       throw new 
SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
     }
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java
index 935dc99..5f79db0 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java
@@ -18,9 +18,12 @@
 
 package org.apache.hadoop.hive.ql.exec.repl;
 
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasBaseClient;
 import org.apache.atlas.AtlasClientV2;
 import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasServer;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
@@ -32,7 +35,6 @@ import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.ReplState;
 import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
-import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Assert;
 import org.junit.Test;
@@ -46,7 +48,8 @@ import org.powermock.reflect.Whitebox;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.mockito.ArgumentMatchers.any;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.Response;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -96,8 +99,9 @@ public class TestAtlasDumpTask {
     Logger logger = Mockito.mock(Logger.class);
     Whitebox.setInternalState(ReplState.class, logger);
     Mockito.doReturn(0L).when(atlasDumpTaskSpy)
-      .dumpAtlasMetaData(any(AtlasRequestBuilder.class), 
any(AtlasReplInfo.class));
-    
Mockito.doNothing().when(atlasDumpTaskSpy).createDumpMetadata(any(AtlasReplInfo.class),
 any(Long.class));
+      .dumpAtlasMetaData(Mockito.any(AtlasRequestBuilder.class), 
Mockito.any(AtlasReplInfo.class));
+    
Mockito.doNothing().when(atlasDumpTaskSpy).createDumpMetadata(Mockito.any(AtlasReplInfo.class),
+                                                                  
Mockito.any(Long.class));
     int status = atlasDumpTaskSpy.execute();
     Assert.assertEquals(0, status);
     ArgumentCaptor<String> replStateCaptor = 
ArgumentCaptor.forClass(String.class);
@@ -130,7 +134,7 @@ public class TestAtlasDumpTask {
     AtlasExportRequest exportRequest = mock(AtlasExportRequest.class);
     String exportResponseData = "dumpExportContent";
     InputStream exportedMetadataIS = new 
ByteArrayInputStream(exportResponseData.getBytes(StandardCharsets.UTF_8));
-    
when(atlasClientV2.exportData(any(AtlasExportRequest.class))).thenReturn(exportedMetadataIS);
+    
when(atlasClientV2.exportData(Mockito.any(AtlasExportRequest.class))).thenReturn(exportedMetadataIS);
     when(exportRequest.toString()).thenReturn("dummyExportRequest");
     when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION, 
TimeUnit.SECONDS)).thenReturn(60L);
     when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY, 
TimeUnit.SECONDS)).thenReturn(1L);
@@ -152,12 +156,9 @@ public class TestAtlasDumpTask {
     AtlasExportRequest exportRequest = mock(AtlasExportRequest.class);
     AtlasServiceException atlasServiceException = 
mock(AtlasServiceException.class);
     when(atlasServiceException.getMessage()).thenReturn("import or export is 
in progress");
-    
when(atlasClientV2.exportData(any(AtlasExportRequest.class))).thenThrow(atlasServiceException);
+    
when(atlasClientV2.exportData(Mockito.any(AtlasExportRequest.class))).thenThrow(atlasServiceException);
     when(exportRequest.toString()).thenReturn("dummyExportRequest");
-    when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION, 
TimeUnit.SECONDS)).thenReturn(60L);
-    when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY, 
TimeUnit.SECONDS)).thenReturn(10L);
-    
when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES, 
TimeUnit.SECONDS)).thenReturn(20L);
-    
when(conf.getFloatVar(HiveConf.ConfVars.REPL_RETRY_BACKOFF_COEFFICIENT)).thenReturn(2.0f);
+    setupConfForRetry();
     AtlasRestClient atlasClient = new AtlasRestClientImpl(atlasClientV2, conf);
     AtlasRestClientImpl atlasRestClientImpl = (AtlasRestClientImpl)atlasClient;
     InputStream inputStream = null;
@@ -175,4 +176,61 @@ public class TestAtlasDumpTask {
     }
     Assert.assertTrue(inputStream == null);
   }
+
+  @Test
+  public void testAtlasServerEntity() throws AtlasServiceException, 
SemanticException {
+    AtlasClientV2 atlasClientV2 = mock(AtlasClientV2.class);
+    AtlasServer atlasServer = mock(AtlasServer.class);
+    when(atlasClientV2.getServer(Mockito.anyString())).thenReturn(atlasServer);
+    AtlasRestClient atlasClient = new AtlasRestClientImpl(atlasClientV2, conf);
+    AtlasServer atlasServerRet = atlasClient.getServer("src", conf);
+    Assert.assertTrue(atlasServer == atlasServerRet);
+  }
+
+  @Test
+  public void testAtlasServerEntityNotFound() throws AtlasServiceException, 
SemanticException {
+    setupConfForRetry();
+    AtlasServiceException atlasServiceException = 
getAtlasServiceException(ClientResponse.Status.NOT_FOUND);
+    AtlasClientV2 atlasClientV2 = mock(AtlasClientV2.class);
+    
when(atlasClientV2.getServer(Mockito.anyString())).thenThrow(atlasServiceException);
+    AtlasRestClient atlasClient = new AtlasRestClientImpl(atlasClientV2, conf);
+    AtlasServer atlasServerRet = atlasClient.getServer("src", conf);
+    Assert.assertNull(atlasServerRet);
+    ArgumentCaptor<String> getServerReqCaptor = 
ArgumentCaptor.forClass(String.class);
+    Mockito.verify(atlasClientV2, 
Mockito.times(1)).getServer(getServerReqCaptor.capture());
+  }
+
+  @Test
+  public void testAtlasServerEntityRetryExhausted() throws 
AtlasServiceException {
+    setupConfForRetry();
+    AtlasServiceException atlasServiceException = 
getAtlasServiceException(ClientResponse.Status.BAD_REQUEST);
+    AtlasClientV2 atlasClientV2 = mock(AtlasClientV2.class);
+    
when(atlasClientV2.getServer(Mockito.anyString())).thenThrow(atlasServiceException);
+    AtlasRestClient atlasClient = new AtlasRestClientImpl(atlasClientV2, conf);
+    try {
+      atlasClient.getServer("src", conf);
+      Assert.fail("Should have thrown SemanticException.");
+    } catch (SemanticException ex) {
+      Assert.assertTrue(ex.getMessage().contains("Retry exhausted for 
retryable error code"));
+      Assert.assertTrue(atlasServiceException == ex.getCause());
+    }
+    ArgumentCaptor<String> getServerReqCaptor = 
ArgumentCaptor.forClass(String.class);
+    Mockito.verify(atlasClientV2, 
Mockito.times(4)).getServer(getServerReqCaptor.capture());
+  }
+
+  private void setupConfForRetry() {
+    when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION, 
TimeUnit.SECONDS)).thenReturn(60L);
+    when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY, 
TimeUnit.SECONDS)).thenReturn(10L);
+    
when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES, 
TimeUnit.SECONDS)).thenReturn(20L);
+    
when(conf.getFloatVar(HiveConf.ConfVars.REPL_RETRY_BACKOFF_COEFFICIENT)).thenReturn(2.0f);
+  }
+
+  public AtlasServiceException getAtlasServiceException(ClientResponse.Status 
status) {
+    AtlasBaseClient.API api = new AtlasBaseClient.API("/api/atlas/admin", 
HttpMethod.POST,
+            Response.Status.fromStatusCode(status.getStatusCode()));
+    ClientResponse response = Mockito.mock(ClientResponse.class);
+    when(response.getStatus()).thenReturn(status.getStatusCode());
+    AtlasServiceException atlasServiceException = new 
AtlasServiceException(api, response);
+    return atlasServiceException;
+  }
 }

Reply via email to