http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregatorServer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregatorServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregatorServer.java deleted file mode 100644 index 902047d..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregatorServer.java +++ /dev/null @@ -1,149 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.aggregator; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.ExitUtil; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; -import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; -import org.junit.Test; - -public class TestPerNodeAggregatorServer { - private ApplicationAttemptId appAttemptId; - - public TestPerNodeAggregatorServer() { - ApplicationId appId = - ApplicationId.newInstance(System.currentTimeMillis(), 1); - appAttemptId = ApplicationAttemptId.newInstance(appId, 1); - } - - @Test - public void testAddApplication() throws Exception { - PerNodeAggregatorServer aggregator = createAggregatorAndAddApplication(); - // aggregator should have a single app - assertTrue(aggregator.hasApplication( - appAttemptId.getApplicationId().toString())); - aggregator.close(); - } - - @Test - public void testAddApplicationNonAMContainer() throws Exception { - PerNodeAggregatorServer aggregator = createAggregator(); - - ContainerId containerId = getContainerId(2L); // not an AM - ContainerInitializationContext context = - mock(ContainerInitializationContext.class); - when(context.getContainerId()).thenReturn(containerId); - aggregator.initializeContainer(context); - // aggregator should not have that app - assertFalse(aggregator.hasApplication( - appAttemptId.getApplicationId().toString())); - } - - @Test - public void testRemoveApplication() throws Exception { - PerNodeAggregatorServer aggregator = createAggregatorAndAddApplication(); - // aggregator should have a single app - String appIdStr = appAttemptId.getApplicationId().toString(); - assertTrue(aggregator.hasApplication(appIdStr)); - - ContainerId containerId = getAMContainerId(); - ContainerTerminationContext context = - mock(ContainerTerminationContext.class); - when(context.getContainerId()).thenReturn(containerId); - aggregator.stopContainer(context); - // aggregator should not have that app - assertFalse(aggregator.hasApplication(appIdStr)); - aggregator.close(); - } - - @Test - public void testRemoveApplicationNonAMContainer() throws Exception { - PerNodeAggregatorServer aggregator = createAggregatorAndAddApplication(); - // aggregator should have a single app - String appIdStr = appAttemptId.getApplicationId().toString(); - assertTrue(aggregator.hasApplication(appIdStr)); - - ContainerId containerId = getContainerId(2L); // not an AM - ContainerTerminationContext context = - mock(ContainerTerminationContext.class); - when(context.getContainerId()).thenReturn(containerId); - aggregator.stopContainer(context); - // aggregator should still have that app - assertTrue(aggregator.hasApplication(appIdStr)); - aggregator.close(); - } - - @Test(timeout = 60000) - public void testLaunch() throws Exception { - ExitUtil.disableSystemExit(); - PerNodeAggregatorServer server = null; - try { - server = - PerNodeAggregatorServer.launchServer(new String[0]); - } catch (ExitUtil.ExitException e) { - assertEquals(0, e.status); - ExitUtil.resetFirstExitException(); - fail(); - } finally { - if (server != null) { - server.stop(); - } - } - } - - private PerNodeAggregatorServer createAggregatorAndAddApplication() { - PerNodeAggregatorServer aggregator = createAggregator(); - // create an AM container - ContainerId containerId = getAMContainerId(); - ContainerInitializationContext context = - mock(ContainerInitializationContext.class); - when(context.getContainerId()).thenReturn(containerId); - aggregator.initializeContainer(context); - return aggregator; - } - - private PerNodeAggregatorServer createAggregator() { - AppLevelServiceManager serviceManager = spy(new AppLevelServiceManager()); - doReturn(new Configuration()).when(serviceManager).getConfig(); - PerNodeAggregatorServer aggregator = - spy(new PerNodeAggregatorServer(serviceManager)); - return aggregator; - } - - private ContainerId getAMContainerId() { - return getContainerId(1L); - } - - private ContainerId getContainerId(long id) { - return ContainerId.newContainerId(appAttemptId, id); - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeTimelineAggregatorsAuxService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeTimelineAggregatorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeTimelineAggregatorsAuxService.java new file mode 100644 index 0000000..1c89ead --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeTimelineAggregatorsAuxService.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.aggregator; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; +import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; +import org.junit.Test; + +public class TestPerNodeTimelineAggregatorsAuxService { + private ApplicationAttemptId appAttemptId; + + public TestPerNodeTimelineAggregatorsAuxService() { + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + } + + @Test + public void testAddApplication() throws Exception { + PerNodeTimelineAggregatorsAuxService auxService = createAggregatorAndAddApplication(); + // auxService should have a single app + assertTrue(auxService.hasApplication( + appAttemptId.getApplicationId().toString())); + auxService.close(); + } + + @Test + public void testAddApplicationNonAMContainer() throws Exception { + PerNodeTimelineAggregatorsAuxService auxService = createAggregator(); + + ContainerId containerId = getContainerId(2L); // not an AM + ContainerInitializationContext context = + mock(ContainerInitializationContext.class); + when(context.getContainerId()).thenReturn(containerId); + auxService.initializeContainer(context); + // auxService should not have that app + assertFalse(auxService.hasApplication( + appAttemptId.getApplicationId().toString())); + } + + @Test + public void testRemoveApplication() throws Exception { + PerNodeTimelineAggregatorsAuxService auxService = createAggregatorAndAddApplication(); + // auxService should have a single app + String appIdStr = appAttemptId.getApplicationId().toString(); + assertTrue(auxService.hasApplication(appIdStr)); + + ContainerId containerId = getAMContainerId(); + ContainerTerminationContext context = + mock(ContainerTerminationContext.class); + when(context.getContainerId()).thenReturn(containerId); + auxService.stopContainer(context); + // auxService should not have that app + assertFalse(auxService.hasApplication(appIdStr)); + auxService.close(); + } + + @Test + public void testRemoveApplicationNonAMContainer() throws Exception { + PerNodeTimelineAggregatorsAuxService auxService = createAggregatorAndAddApplication(); + // auxService should have a single app + String appIdStr = appAttemptId.getApplicationId().toString(); + assertTrue(auxService.hasApplication(appIdStr)); + + ContainerId containerId = getContainerId(2L); // not an AM + ContainerTerminationContext context = + mock(ContainerTerminationContext.class); + when(context.getContainerId()).thenReturn(containerId); + auxService.stopContainer(context); + // auxService should still have that app + assertTrue(auxService.hasApplication(appIdStr)); + auxService.close(); + } + + @Test(timeout = 60000) + public void testLaunch() throws Exception { + ExitUtil.disableSystemExit(); + PerNodeTimelineAggregatorsAuxService auxService = null; + try { + auxService = + PerNodeTimelineAggregatorsAuxService.launchServer(new String[0]); + } catch (ExitUtil.ExitException e) { + assertEquals(0, e.status); + ExitUtil.resetFirstExitException(); + fail(); + } finally { + if (auxService != null) { + auxService.stop(); + } + } + } + + private PerNodeTimelineAggregatorsAuxService createAggregatorAndAddApplication() { + PerNodeTimelineAggregatorsAuxService auxService = createAggregator(); + // create an AM container + ContainerId containerId = getAMContainerId(); + ContainerInitializationContext context = + mock(ContainerInitializationContext.class); + when(context.getContainerId()).thenReturn(containerId); + auxService.initializeContainer(context); + return auxService; + } + + private PerNodeTimelineAggregatorsAuxService createAggregator() { + TimelineAggregatorsCollection + aggregatorsCollection = spy(new TimelineAggregatorsCollection()); + doReturn(new Configuration()).when(aggregatorsCollection).getConfig(); + PerNodeTimelineAggregatorsAuxService auxService = + spy(new PerNodeTimelineAggregatorsAuxService(aggregatorsCollection)); + return auxService; + } + + private ContainerId getAMContainerId() { + return getContainerId(1L); + } + + private ContainerId getContainerId(long id) { + return ContainerId.newContainerId(appAttemptId, id); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregator.java new file mode 100644 index 0000000..821e455 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregator.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.aggregator; + +public class TestTimelineAggregator { + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java new file mode 100644 index 0000000..cec1d71 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.aggregator; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider; +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +public class TestTimelineAggregatorsCollection { + + @Test(timeout=60000) + public void testMultithreadedAdd() throws Exception { + final TimelineAggregatorsCollection aggregatorCollection = + spy(new TimelineAggregatorsCollection()); + doReturn(new Configuration()).when(aggregatorCollection).getConfig(); + + final int NUM_APPS = 5; + List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>(); + for (int i = 0; i < NUM_APPS; i++) { + final String appId = String.valueOf(i); + Callable<Boolean> task = new Callable<Boolean>() { + public Boolean call() { + AppLevelTimelineAggregator aggregator = + new AppLevelTimelineAggregator(appId); + return (aggregatorCollection.putIfAbsent(appId, aggregator) == aggregator); + } + }; + tasks.add(task); + } + ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS); + try { + List<Future<Boolean>> futures = executor.invokeAll(tasks); + for (Future<Boolean> future: futures) { + assertTrue(future.get()); + } + } finally { + executor.shutdownNow(); + } + // check the keys + for (int i = 0; i < NUM_APPS; i++) { + assertTrue(aggregatorCollection.containsKey(String.valueOf(i))); + } + } + + @Test + public void testMultithreadedAddAndRemove() throws Exception { + final TimelineAggregatorsCollection aggregatorCollection = + spy(new TimelineAggregatorsCollection()); + doReturn(new Configuration()).when(aggregatorCollection).getConfig(); + + final int NUM_APPS = 5; + List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>(); + for (int i = 0; i < NUM_APPS; i++) { + final String appId = String.valueOf(i); + Callable<Boolean> task = new Callable<Boolean>() { + public Boolean call() { + AppLevelTimelineAggregator aggregator = + new AppLevelTimelineAggregator(appId); + boolean successPut = + (aggregatorCollection.putIfAbsent(appId, aggregator) == aggregator); + return successPut && aggregatorCollection.remove(appId); + } + }; + tasks.add(task); + } + ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS); + try { + List<Future<Boolean>> futures = executor.invokeAll(tasks); + for (Future<Boolean> future: futures) { + assertTrue(future.get()); + } + } finally { + executor.shutdownNow(); + } + // check the keys + for (int i = 0; i < NUM_APPS; i++) { + assertFalse(aggregatorCollection.containsKey(String.valueOf(i))); + } + } +}