This is an automated email from the ASF dual-hosted git repository. anishek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 6617116 HIVE-24327: AtlasServer entity may not be present during first Atlas metadata dump (Pravin Kumar Sinha, reviewed by Aasha Medhi) 6617116 is described below commit 66171166e79a059c58e8d778d259cf0a9528cee2 Author: Anishek Agarwal <anis...@gmail.com> AuthorDate: Thu Nov 12 09:44:34 2020 +0530 HIVE-24327: AtlasServer entity may not be present during first Atlas metadata dump (Pravin Kumar Sinha, reviewed by Aasha Medhi) --- .../ql/exec/repl/atlas/AtlasRestClientImpl.java | 14 +++- .../hive/ql/exec/repl/TestAtlasDumpTask.java | 78 +++++++++++++++++++--- 2 files changed, 81 insertions(+), 11 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java index ed7485d..7397749 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java @@ -133,7 +133,19 @@ public class AtlasRestClientImpl extends RetryingClientTimeBased implements Atla .withHiveConf(conf) .withRetryOnException(AtlasServiceException.class).build(); try { - return retryable.executeCallable(() -> clientV2.getServer(endpoint)); + return retryable.executeCallable((Callable<AtlasServer>) () -> { + try { + return clientV2.getServer(endpoint); + } catch (AtlasServiceException e) { + int statusCode = e.getStatus() != null ? e.getStatus().getStatusCode() : -1; + if (NOT_FOUND.getStatusCode() == statusCode) { + // Atlas server entity is initialized on first import/export o/p. + LOG.info("Atlas server entity is not found"); + return null; + } + throw e; + } + }); } catch (Exception e) { throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java index 935dc99..5f79db0 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java @@ -18,9 +18,12 @@ package org.apache.hadoop.hive.ql.exec.repl; +import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.AtlasBaseClient; import org.apache.atlas.AtlasClientV2; import org.apache.atlas.AtlasServiceException; import org.apache.atlas.model.impexp.AtlasExportRequest; +import org.apache.atlas.model.impexp.AtlasServer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo; @@ -32,7 +35,6 @@ import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplState; import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; -import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.junit.Assert; import org.junit.Test; @@ -46,7 +48,8 @@ import org.powermock.reflect.Whitebox; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.mockito.ArgumentMatchers.any; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.core.Response; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -96,8 +99,9 @@ public class TestAtlasDumpTask { Logger logger = Mockito.mock(Logger.class); Whitebox.setInternalState(ReplState.class, logger); Mockito.doReturn(0L).when(atlasDumpTaskSpy) - .dumpAtlasMetaData(any(AtlasRequestBuilder.class), any(AtlasReplInfo.class)); - Mockito.doNothing().when(atlasDumpTaskSpy).createDumpMetadata(any(AtlasReplInfo.class), any(Long.class)); + .dumpAtlasMetaData(Mockito.any(AtlasRequestBuilder.class), Mockito.any(AtlasReplInfo.class)); + Mockito.doNothing().when(atlasDumpTaskSpy).createDumpMetadata(Mockito.any(AtlasReplInfo.class), + Mockito.any(Long.class)); int status = atlasDumpTaskSpy.execute(); Assert.assertEquals(0, status); ArgumentCaptor<String> replStateCaptor = ArgumentCaptor.forClass(String.class); @@ -130,7 +134,7 @@ public class TestAtlasDumpTask { AtlasExportRequest exportRequest = mock(AtlasExportRequest.class); String exportResponseData = "dumpExportContent"; InputStream exportedMetadataIS = new ByteArrayInputStream(exportResponseData.getBytes(StandardCharsets.UTF_8)); - when(atlasClientV2.exportData(any(AtlasExportRequest.class))).thenReturn(exportedMetadataIS); + when(atlasClientV2.exportData(Mockito.any(AtlasExportRequest.class))).thenReturn(exportedMetadataIS); when(exportRequest.toString()).thenReturn("dummyExportRequest"); when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION, TimeUnit.SECONDS)).thenReturn(60L); when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY, TimeUnit.SECONDS)).thenReturn(1L); @@ -152,12 +156,9 @@ public class TestAtlasDumpTask { AtlasExportRequest exportRequest = mock(AtlasExportRequest.class); AtlasServiceException atlasServiceException = mock(AtlasServiceException.class); when(atlasServiceException.getMessage()).thenReturn("import or export is in progress"); - when(atlasClientV2.exportData(any(AtlasExportRequest.class))).thenThrow(atlasServiceException); + when(atlasClientV2.exportData(Mockito.any(AtlasExportRequest.class))).thenThrow(atlasServiceException); when(exportRequest.toString()).thenReturn("dummyExportRequest"); - when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION, TimeUnit.SECONDS)).thenReturn(60L); - when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY, TimeUnit.SECONDS)).thenReturn(10L); - when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES, TimeUnit.SECONDS)).thenReturn(20L); - when(conf.getFloatVar(HiveConf.ConfVars.REPL_RETRY_BACKOFF_COEFFICIENT)).thenReturn(2.0f); + setupConfForRetry(); AtlasRestClient atlasClient = new AtlasRestClientImpl(atlasClientV2, conf); AtlasRestClientImpl atlasRestClientImpl = (AtlasRestClientImpl)atlasClient; InputStream inputStream = null; @@ -175,4 +176,61 @@ public class TestAtlasDumpTask { } Assert.assertTrue(inputStream == null); } + + @Test + public void testAtlasServerEntity() throws AtlasServiceException, SemanticException { + AtlasClientV2 atlasClientV2 = mock(AtlasClientV2.class); + AtlasServer atlasServer = mock(AtlasServer.class); + when(atlasClientV2.getServer(Mockito.anyString())).thenReturn(atlasServer); + AtlasRestClient atlasClient = new AtlasRestClientImpl(atlasClientV2, conf); + AtlasServer atlasServerRet = atlasClient.getServer("src", conf); + Assert.assertTrue(atlasServer == atlasServerRet); + } + + @Test + public void testAtlasServerEntityNotFound() throws AtlasServiceException, SemanticException { + setupConfForRetry(); + AtlasServiceException atlasServiceException = getAtlasServiceException(ClientResponse.Status.NOT_FOUND); + AtlasClientV2 atlasClientV2 = mock(AtlasClientV2.class); + when(atlasClientV2.getServer(Mockito.anyString())).thenThrow(atlasServiceException); + AtlasRestClient atlasClient = new AtlasRestClientImpl(atlasClientV2, conf); + AtlasServer atlasServerRet = atlasClient.getServer("src", conf); + Assert.assertNull(atlasServerRet); + ArgumentCaptor<String> getServerReqCaptor = ArgumentCaptor.forClass(String.class); + Mockito.verify(atlasClientV2, Mockito.times(1)).getServer(getServerReqCaptor.capture()); + } + + @Test + public void testAtlasServerEntityRetryExhausted() throws AtlasServiceException { + setupConfForRetry(); + AtlasServiceException atlasServiceException = getAtlasServiceException(ClientResponse.Status.BAD_REQUEST); + AtlasClientV2 atlasClientV2 = mock(AtlasClientV2.class); + when(atlasClientV2.getServer(Mockito.anyString())).thenThrow(atlasServiceException); + AtlasRestClient atlasClient = new AtlasRestClientImpl(atlasClientV2, conf); + try { + atlasClient.getServer("src", conf); + Assert.fail("Should have thrown SemanticException."); + } catch (SemanticException ex) { + Assert.assertTrue(ex.getMessage().contains("Retry exhausted for retryable error code")); + Assert.assertTrue(atlasServiceException == ex.getCause()); + } + ArgumentCaptor<String> getServerReqCaptor = ArgumentCaptor.forClass(String.class); + Mockito.verify(atlasClientV2, Mockito.times(4)).getServer(getServerReqCaptor.capture()); + } + + private void setupConfForRetry() { + when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION, TimeUnit.SECONDS)).thenReturn(60L); + when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY, TimeUnit.SECONDS)).thenReturn(10L); + when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES, TimeUnit.SECONDS)).thenReturn(20L); + when(conf.getFloatVar(HiveConf.ConfVars.REPL_RETRY_BACKOFF_COEFFICIENT)).thenReturn(2.0f); + } + + public AtlasServiceException getAtlasServiceException(ClientResponse.Status status) { + AtlasBaseClient.API api = new AtlasBaseClient.API("/api/atlas/admin", HttpMethod.POST, + Response.Status.fromStatusCode(status.getStatusCode())); + ClientResponse response = Mockito.mock(ClientResponse.class); + when(response.getStatus()).thenReturn(status.getStatusCode()); + AtlasServiceException atlasServiceException = new AtlasServiceException(api, response); + return atlasServiceException; + } }