http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedDualAsyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedDualAsyncSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedDualAsyncSelfTest.java new file mode 100644 index 0000000..cca960b --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedDualAsyncSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.igfs; + +import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC; + +/** + * IGFS Hadoop file system IPC shmem self test in DUAL_ASYNC mode. + */ +public class IgniteHadoopFileSystemShmemEmbeddedDualAsyncSelfTest + extends IgniteHadoopFileSystemShmemAbstractSelfTest { + /** + * Constructor. + */ + public IgniteHadoopFileSystemShmemEmbeddedDualAsyncSelfTest() { + super(DUAL_ASYNC, false); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedDualSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedDualSyncSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedDualSyncSelfTest.java new file mode 100644 index 0000000..73db4f8 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedDualSyncSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.igfs; + +import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC; + +/** + * IGFS Hadoop file system IPC shmem self test in DUAL_SYNC mode. + */ +public class IgniteHadoopFileSystemShmemEmbeddedDualSyncSelfTest + extends IgniteHadoopFileSystemShmemAbstractSelfTest { + /** + * Constructor. + */ + public IgniteHadoopFileSystemShmemEmbeddedDualSyncSelfTest() { + super(DUAL_SYNC, false); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest.java new file mode 100644 index 0000000..48a4694 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.igfs; + +import static org.apache.ignite.igfs.IgfsMode.PRIMARY; + +/** + * IGFS Hadoop file system IPC shmem self test in PRIMARY mode. + */ +public class IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest + extends IgniteHadoopFileSystemShmemAbstractSelfTest { + /** + * Constructor. + */ + public IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest() { + super(PRIMARY, false); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest.java new file mode 100644 index 0000000..ab9c357 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.igfs; + +import static org.apache.ignite.igfs.IgfsMode.PROXY; + +/** + * IGFS Hadoop file system IPC shmem self test in SECONDARY mode. + */ +public class IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest + extends IgniteHadoopFileSystemShmemAbstractSelfTest { + /** + * Constructor. + */ + public IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest() { + super(PROXY, false); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest.java new file mode 100644 index 0000000..5154642 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.igfs; + +import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC; + +/** + * IGFS Hadoop file system IPC shmem self test in DUAL_ASYNC mode. + */ +public class IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest + extends IgniteHadoopFileSystemShmemAbstractSelfTest { + /** + * Constructor. + */ + public IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest() { + super(DUAL_ASYNC, true); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalDualSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalDualSyncSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalDualSyncSelfTest.java new file mode 100644 index 0000000..d88a38b --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalDualSyncSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.igfs; + +import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC; + +/** + * IGFS Hadoop file system IPC shmem self test in DUAL_SYNC mode. + */ +public class IgniteHadoopFileSystemShmemExternalDualSyncSelfTest + extends IgniteHadoopFileSystemShmemAbstractSelfTest { + /** + * Constructor. + */ + public IgniteHadoopFileSystemShmemExternalDualSyncSelfTest() { + super(DUAL_SYNC, true); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalPrimarySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalPrimarySelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalPrimarySelfTest.java new file mode 100644 index 0000000..7b41b22 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalPrimarySelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.igfs; + +import static org.apache.ignite.igfs.IgfsMode.PRIMARY; + +/** + * IGFS Hadoop file system IPC shmem self test in PRIMARY mode. + */ +public class IgniteHadoopFileSystemShmemExternalPrimarySelfTest + extends IgniteHadoopFileSystemShmemAbstractSelfTest { + /** + * Constructor. + */ + public IgniteHadoopFileSystemShmemExternalPrimarySelfTest() { + super(PRIMARY, true); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalSecondarySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalSecondarySelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalSecondarySelfTest.java new file mode 100644 index 0000000..e54b020 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalSecondarySelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.igfs; + +import static org.apache.ignite.igfs.IgfsMode.PROXY; + +/** + * IGFS Hadoop file system IPC shmem self test in SECONDARY mode. + */ +public class IgniteHadoopFileSystemShmemExternalSecondarySelfTest + extends IgniteHadoopFileSystemShmemAbstractSelfTest { + /** + * Constructor. + */ + public IgniteHadoopFileSystemShmemExternalSecondarySelfTest() { + super(PROXY, true); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopExecutorServiceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopExecutorServiceTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopExecutorServiceTest.java new file mode 100644 index 0000000..b4e63d1 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopExecutorServiceTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor; + +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopExecutorService; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jsr166.LongAdder8; + +/** + * + */ +public class HadoopExecutorServiceTest extends GridCommonAbstractTest { + /** + * @throws Exception If failed. + */ + public void testExecutesAll() throws Exception { + final HadoopExecutorService exec = new HadoopExecutorService(log, "_GRID_NAME_", 10, 5); + + for (int i = 0; i < 5; i++) { + final int loops = 5000; + int threads = 17; + + final LongAdder8 sum = new LongAdder8(); + + multithreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + for (int i = 0; i < loops; i++) { + exec.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + sum.increment(); + + return null; + } + }); + } + + return null; + } + }, threads); + + while (exec.active() != 0) { + X.println("__ active: " + exec.active()); + + Thread.sleep(200); + } + + assertEquals(threads * loops, sum.sum()); + + X.println("_ ok"); + } + + assertTrue(exec.shutdown(0)); + } + + /** + * @throws Exception If failed. + */ + public void testShutdown() throws Exception { + for (int i = 0; i < 5; i++) { + final HadoopExecutorService exec = new HadoopExecutorService(log, "_GRID_NAME_", 10, 5); + + final LongAdder8 sum = new LongAdder8(); + + final AtomicBoolean finish = new AtomicBoolean(); + + IgniteInternalFuture<?> fut = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + while (!finish.get()) { + exec.submit(new Callable<Void>() { + @Override public Void call() throws Exception { + sum.increment(); + + return null; + } + }); + } + + return null; + } + }, 19); + + Thread.sleep(200); + + assertTrue(exec.shutdown(50)); + + long res = sum.sum(); + + assertTrue(res > 0); + + finish.set(true); + + fut.get(); + + assertEquals(res, sum.sum()); // Nothing was executed after shutdown. + + X.println("_ ok"); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/util/BasicUserNameMapperSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/util/BasicUserNameMapperSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/util/BasicUserNameMapperSelfTest.java new file mode 100644 index 0000000..43924ed --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/util/BasicUserNameMapperSelfTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.util; + +import org.apache.ignite.hadoop.util.BasicUserNameMapper; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; + +import java.util.HashMap; +import java.util.Map; + +/** + * Test for basic user name mapper. + */ +public class BasicUserNameMapperSelfTest extends GridCommonAbstractTest { + /** + * Test null mappings. + * + * @throws Exception If failed. + */ + public void testNullMappings() throws Exception { + checkNullOrEmptyMappings(null); + } + + /** + * Test empty mappings. + * + * @throws Exception If failed. + */ + public void testEmptyMappings() throws Exception { + checkNullOrEmptyMappings(new HashMap<String, String>()); + } + + /** + * Check null or empty mappings. + * + * @param map Mappings. + * @throws Exception If failed. + */ + private void checkNullOrEmptyMappings(@Nullable Map<String, String> map) throws Exception { + BasicUserNameMapper mapper = create(map, false, null); + + assertNull(mapper.map(null)); + assertEquals("1", mapper.map("1")); + assertEquals("2", mapper.map("2")); + + mapper = create(map, true, null); + + assertNull(mapper.map(null)); + assertNull(mapper.map("1")); + assertNull(mapper.map("2")); + + mapper = create(map, false, "A"); + + assertNull(mapper.map(null)); + assertEquals("1", mapper.map("1")); + assertEquals("2", mapper.map("2")); + + mapper = create(map, true, "A"); + + assertEquals("A", mapper.map(null)); + assertEquals("A", mapper.map("1")); + assertEquals("A", mapper.map("2")); + } + + /** + * Test regular mappings. + * + * @throws Exception If failed. + */ + public void testMappings() throws Exception { + Map<String, String> map = new HashMap<>(); + + map.put("1", "101"); + + BasicUserNameMapper mapper = create(map, false, null); + + assertNull(mapper.map(null)); + assertEquals("101", mapper.map("1")); + assertEquals("2", mapper.map("2")); + + mapper = create(map, true, null); + + assertNull(mapper.map(null)); + assertEquals("101", mapper.map("1")); + assertNull(mapper.map("2")); + + mapper = create(map, false, "A"); + + assertNull(mapper.map(null)); + assertEquals("101", mapper.map("1")); + assertEquals("2", mapper.map("2")); + + mapper = create(map, true, "A"); + + assertEquals("A", mapper.map(null)); + assertEquals("101", mapper.map("1")); + assertEquals("A", mapper.map("2")); + } + + /** + * Create mapper. + * + * @param dictionary Dictionary. + * @param useDfltUsrName Whether to use default user name. + * @param dfltUsrName Default user name. + * @return Mapper. + */ + private BasicUserNameMapper create(@Nullable Map<String, String> dictionary, boolean useDfltUsrName, + @Nullable String dfltUsrName) { + BasicUserNameMapper mapper = new BasicUserNameMapper(); + + mapper.setMappings(dictionary); + mapper.setUseDefaultUserName(useDfltUsrName); + mapper.setDefaultUserName(dfltUsrName); + + return mapper; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/util/ChainedUserNameMapperSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/util/ChainedUserNameMapperSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/util/ChainedUserNameMapperSelfTest.java new file mode 100644 index 0000000..a9d295f --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/util/ChainedUserNameMapperSelfTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.util; + +import org.apache.ignite.IgniteException; +import org.apache.ignite.hadoop.util.BasicUserNameMapper; +import org.apache.ignite.hadoop.util.ChainedUserNameMapper; +import org.apache.ignite.hadoop.util.KerberosUserNameMapper; +import org.apache.ignite.hadoop.util.UserNameMapper; +import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import java.util.Collections; +import java.util.concurrent.Callable; + +/** + * Tests for chained user name mapper. + */ +public class ChainedUserNameMapperSelfTest extends GridCommonAbstractTest { + /** Test instance. */ + private static final String INSTANCE = "test_instance"; + + /** Test realm. */ + private static final String REALM = "test_realm"; + + /** + * Test case when mappers are null. + * + * @throws Exception If failed. + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public void testNullMappers() throws Exception { + GridTestUtils.assertThrows(null, new Callable<Void>() { + @Override public Void call() throws Exception { + create((UserNameMapper[])null); + + return null; + } + }, IgniteException.class, null); + } + + /** + * Test case when one of mappers is null. + * + * @throws Exception If failed. + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public void testNullMapperElement() throws Exception { + GridTestUtils.assertThrows(null, new Callable<Void>() { + @Override public Void call() throws Exception { + create(new BasicUserNameMapper(), null); + + return null; + } + }, IgniteException.class, null); + } + + /** + * Test actual chaining logic. + * + * @throws Exception If failed. + */ + public void testChaining() throws Exception { + BasicUserNameMapper mapper1 = new BasicUserNameMapper(); + + mapper1.setMappings(Collections.singletonMap("1", "101")); + + KerberosUserNameMapper mapper2 = new KerberosUserNameMapper(); + + mapper2.setInstance(INSTANCE); + mapper2.setRealm(REALM); + + ChainedUserNameMapper mapper = create(mapper1, mapper2); + + assertEquals("101" + "/" + INSTANCE + "@" + REALM, mapper.map("1")); + assertEquals("2" + "/" + INSTANCE + "@" + REALM, mapper.map("2")); + assertEquals(IgfsUtils.fixUserName(null) + "/" + INSTANCE + "@" + REALM, mapper.map(null)); + } + + /** + * Create chained mapper. + * + * @param mappers Child mappers. + * @return Chained mapper. + */ + private ChainedUserNameMapper create(UserNameMapper... mappers) { + ChainedUserNameMapper mapper = new ChainedUserNameMapper(); + + mapper.setMappers(mappers); + + mapper.start(); + + return mapper; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/util/KerberosUserNameMapperSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/util/KerberosUserNameMapperSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/util/KerberosUserNameMapperSelfTest.java new file mode 100644 index 0000000..bd76b51 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/util/KerberosUserNameMapperSelfTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.util; + +import org.apache.ignite.hadoop.util.KerberosUserNameMapper; +import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; + +/** + * Tests for Kerberos name mapper. + */ +public class KerberosUserNameMapperSelfTest extends GridCommonAbstractTest { + /** Test instance. */ + private static final String INSTANCE = "test_instance"; + + /** Test realm. */ + private static final String REALM = "test_realm"; + + /** + * Test mapper without instance and realm components. + * + * @throws Exception If failed. + */ + public void testMapper() throws Exception { + KerberosUserNameMapper mapper = create(null, null); + + assertEquals(IgfsUtils.fixUserName(null), mapper.map(null)); + assertEquals("test", mapper.map("test")); + } + + /** + * Test mapper with instance component. + * + * @throws Exception If failed. + */ + public void testMapperInstance() throws Exception { + KerberosUserNameMapper mapper = create(INSTANCE, null); + + assertEquals(IgfsUtils.fixUserName(null) + "/" + INSTANCE, mapper.map(null)); + assertEquals("test" + "/" + INSTANCE, mapper.map("test")); + } + + /** + * Test mapper with realm. + * + * @throws Exception If failed. + */ + public void testMapperRealm() throws Exception { + KerberosUserNameMapper mapper = create(null, REALM); + + assertEquals(IgfsUtils.fixUserName(null) + "@" + REALM, mapper.map(null)); + assertEquals("test" + "@" + REALM, mapper.map("test")); + } + + /** + * Test mapper with instance and realm components. + * + * @throws Exception If failed. + */ + public void testMapperInstanceAndRealm() throws Exception { + KerberosUserNameMapper mapper = create(INSTANCE, REALM); + + assertEquals(IgfsUtils.fixUserName(null) + "/" + INSTANCE + "@" + REALM, mapper.map(null)); + assertEquals("test" + "/" + INSTANCE + "@" + REALM, mapper.map("test")); + } + + /** + * Create mapper. + * + * @param instance Instance. + * @param realm Realm. + * @return Mapper. + */ + private KerberosUserNameMapper create(@Nullable String instance, @Nullable String realm) { + KerberosUserNameMapper mapper = new KerberosUserNameMapper(); + + mapper.setInstance(instance); + mapper.setRealm(realm); + + mapper.start(); + + return mapper; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java index 45c178a..07a3ecc 100644 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java @@ -32,7 +32,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopSerialization; import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; -import org.apache.ignite.internal.processors.hadoop.v2.HadoopWritableSerialization; +import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopWritableSerialization; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jetbrains.annotations.Nullable; http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java index f70ef2f..15d3b13 100644 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java @@ -103,7 +103,7 @@ public class HadoopSkipListSelfTest extends HadoopAbstractMapTest { HadoopMultimap m = new HadoopSkipList(job, mem); - HadoopConcurrentHashMultimap.Adder a = m.startAdding(taskCtx); + HadoopMultimap.Adder a = m.startAdding(taskCtx); Multimap<Integer, Integer> mm = ArrayListMultimap.create(); Multimap<Integer, Integer> vis = ArrayListMultimap.create(); @@ -180,7 +180,7 @@ public class HadoopSkipListSelfTest extends HadoopAbstractMapTest { final GridDataInput dataInput = new GridUnsafeDataInput(); - m.visit(false, new HadoopConcurrentHashMultimap.Visitor() { + m.visit(false, new HadoopMultimap.Visitor() { /** */ IntWritable key = new IntWritable(); http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataStreamSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataStreamSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataStreamSelfTest.java index dd571af..3fea0ae 100644 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataStreamSelfTest.java +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataStreamSelfTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.hadoop.shuffle.streams; import java.io.IOException; import java.util.Arrays; + import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorServiceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorServiceTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorServiceTest.java deleted file mode 100644 index 7dd045a..0000000 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorServiceTest.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor; - -import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.jsr166.LongAdder8; - -/** - * - */ -public class HadoopExecutorServiceTest extends GridCommonAbstractTest { - /** - * @throws Exception If failed. - */ - public void testExecutesAll() throws Exception { - final HadoopExecutorService exec = new HadoopExecutorService(log, "_GRID_NAME_", 10, 5); - - for (int i = 0; i < 5; i++) { - final int loops = 5000; - int threads = 17; - - final LongAdder8 sum = new LongAdder8(); - - multithreaded(new Callable<Object>() { - @Override public Object call() throws Exception { - for (int i = 0; i < loops; i++) { - exec.submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - sum.increment(); - - return null; - } - }); - } - - return null; - } - }, threads); - - while (exec.active() != 0) { - X.println("__ active: " + exec.active()); - - Thread.sleep(200); - } - - assertEquals(threads * loops, sum.sum()); - - X.println("_ ok"); - } - - assertTrue(exec.shutdown(0)); - } - - /** - * @throws Exception If failed. - */ - public void testShutdown() throws Exception { - for (int i = 0; i < 5; i++) { - final HadoopExecutorService exec = new HadoopExecutorService(log, "_GRID_NAME_", 10, 5); - - final LongAdder8 sum = new LongAdder8(); - - final AtomicBoolean finish = new AtomicBoolean(); - - IgniteInternalFuture<?> fut = multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - while (!finish.get()) { - exec.submit(new Callable<Void>() { - @Override public Void call() throws Exception { - sum.increment(); - - return null; - } - }); - } - - return null; - } - }, 19); - - Thread.sleep(200); - - assertTrue(exec.shutdown(50)); - - long res = sum.sum(); - - assertTrue(res > 0); - - finish.set(true); - - fut.get(); - - assertEquals(res, sum.sum()); // Nothing was executed after shutdown. - - X.println("_ ok"); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java index ec33836..2385668 100644 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java @@ -37,12 +37,12 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.igfs.IgfsOutputStream; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.hadoop.HadoopAbstractSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopAbstractSelfTest; import org.apache.ignite.internal.processors.hadoop.HadoopJobId; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.marshaller.jdk.JdkMarshaller; -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo; +import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo; /** * Job tracker self test. http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java index 603fd5b..773bec5 100644 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java @@ -22,57 +22,56 @@ import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; import org.apache.ignite.IgniteSystemProperties; -import org.apache.ignite.client.hadoop.HadoopClientProtocolEmbeddedSelfTest; -import org.apache.ignite.client.hadoop.HadoopClientProtocolSelfTest; -import org.apache.ignite.hadoop.cache.HadoopTxConfigCacheTest; -import org.apache.ignite.hadoop.fs.KerberosHadoopFileSystemFactorySelfTest; -import org.apache.ignite.hadoop.util.BasicUserNameMapperSelfTest; -import org.apache.ignite.hadoop.util.ChainedUserNameMapperSelfTest; -import org.apache.ignite.hadoop.util.KerberosUserNameMapperSelfTest; -import org.apache.ignite.igfs.Hadoop1OverIgfsDualAsyncTest; -import org.apache.ignite.igfs.Hadoop1OverIgfsDualSyncTest; -import org.apache.ignite.igfs.HadoopFIleSystemFactorySelfTest; -import org.apache.ignite.igfs.HadoopIgfs20FileSystemLoopbackPrimarySelfTest; -import org.apache.ignite.igfs.HadoopIgfsDualAsyncSelfTest; -import org.apache.ignite.igfs.HadoopIgfsDualSyncSelfTest; -import org.apache.ignite.igfs.HadoopSecondaryFileSystemConfigurationTest; -import org.apache.ignite.igfs.IgfsEventsTestSuite; -import org.apache.ignite.igfs.IgniteHadoopFileSystemClientSelfTest; -import org.apache.ignite.igfs.IgniteHadoopFileSystemHandshakeSelfTest; -import org.apache.ignite.igfs.IgniteHadoopFileSystemLoggerSelfTest; -import org.apache.ignite.igfs.IgniteHadoopFileSystemLoggerStateSelfTest; -import org.apache.ignite.igfs.IgniteHadoopFileSystemLoopbackEmbeddedDualAsyncSelfTest; -import org.apache.ignite.igfs.IgniteHadoopFileSystemLoopbackEmbeddedDualSyncSelfTest; -import org.apache.ignite.igfs.IgniteHadoopFileSystemLoopbackEmbeddedPrimarySelfTest; -import org.apache.ignite.igfs.IgniteHadoopFileSystemLoopbackEmbeddedSecondarySelfTest; -import org.apache.ignite.igfs.IgniteHadoopFileSystemLoopbackExternalDualAsyncSelfTest; -import org.apache.ignite.igfs.IgniteHadoopFileSystemLoopbackExternalDualSyncSelfTest; -import org.apache.ignite.igfs.IgniteHadoopFileSystemLoopbackExternalPrimarySelfTest; -import org.apache.ignite.igfs.IgniteHadoopFileSystemLoopbackExternalSecondarySelfTest; -import org.apache.ignite.igfs.IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest; -import org.apache.ignite.internal.processors.hadoop.HadoopClassLoaderTest; -import org.apache.ignite.internal.processors.hadoop.HadoopCommandLineTest; -import org.apache.ignite.internal.processors.hadoop.HadoopDefaultMapReducePlannerSelfTest; -import org.apache.ignite.internal.processors.hadoop.HadoopFileSystemsTest; -import org.apache.ignite.internal.processors.hadoop.HadoopGroupingTest; -import org.apache.ignite.internal.processors.hadoop.HadoopJobTrackerSelfTest; -import org.apache.ignite.internal.processors.hadoop.HadoopMapReduceEmbeddedSelfTest; -import org.apache.ignite.internal.processors.hadoop.HadoopMapReduceErrorResilienceTest; -import org.apache.ignite.internal.processors.hadoop.HadoopMapReduceTest; -import org.apache.ignite.internal.processors.hadoop.HadoopNoHadoopMapReduceTest; -import org.apache.ignite.internal.processors.hadoop.HadoopSerializationWrapperSelfTest; -import org.apache.ignite.internal.processors.hadoop.HadoopSnappyFullMapReduceTest; -import org.apache.ignite.internal.processors.hadoop.HadoopSnappyTest; -import org.apache.ignite.internal.processors.hadoop.HadoopSortingTest; -import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapperSelfTest; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskExecutionSelfTest; -import org.apache.ignite.internal.processors.hadoop.HadoopTasksV1Test; -import org.apache.ignite.internal.processors.hadoop.HadoopTasksV2Test; -import org.apache.ignite.internal.processors.hadoop.HadoopUserLibsSelfTest; -import org.apache.ignite.internal.processors.hadoop.HadoopV2JobSelfTest; -import org.apache.ignite.internal.processors.hadoop.HadoopValidationSelfTest; -import org.apache.ignite.internal.processors.hadoop.HadoopWeightedMapReducePlannerTest; -import org.apache.ignite.internal.processors.hadoop.HadoopWeightedPlannerMapReduceTest; +import org.apache.ignite.internal.processors.hadoop.impl.client.HadoopClientProtocolEmbeddedSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.client.HadoopClientProtocolSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopTxConfigCacheTest; +import org.apache.ignite.internal.processors.hadoop.impl.fs.KerberosHadoopFileSystemFactorySelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.util.BasicUserNameMapperSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.util.ChainedUserNameMapperSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.util.KerberosUserNameMapperSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.Hadoop1OverIgfsDualAsyncTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.Hadoop1OverIgfsDualSyncTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopFIleSystemFactorySelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfs20FileSystemLoopbackPrimarySelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsDualAsyncSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsDualSyncSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopSecondaryFileSystemConfigurationTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgfsEventsTestSuite; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemClientSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemHandshakeSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemLoggerSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemLoggerStateSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemLoopbackEmbeddedDualAsyncSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemLoopbackEmbeddedDualSyncSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemLoopbackEmbeddedPrimarySelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemLoopbackEmbeddedSecondarySelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemLoopbackExternalDualAsyncSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemLoopbackExternalDualSyncSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemLoopbackExternalPrimarySelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemLoopbackExternalSecondarySelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopCommandLineTest; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopDefaultMapReducePlannerSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopFileSystemsTest; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopGroupingTest; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopJobTrackerSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopMapReduceEmbeddedSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopMapReduceErrorResilienceTest; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopMapReduceTest; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopNoHadoopMapReduceTest; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopSerializationWrapperSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopSnappyFullMapReduceTest; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopSnappyTest; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopSortingTest; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopSplitWrapperSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopTaskExecutionSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopTasksV1Test; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopTasksV2Test; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopUserLibsSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopV2JobSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopValidationSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopWeightedMapReducePlannerTest; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopWeightedPlannerMapReduceTest; import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopConcurrentHashMultimapSelftest; import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopHashMapSelfTest; import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipListSelfTest; @@ -125,8 +124,6 @@ public class IgniteHadoopTestSuite extends TestSuite { suite.addTest(new TestSuite(ldr.loadClass(HadoopSnappyTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(HadoopSnappyFullMapReduceTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(HadoopClassLoaderTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(HadoopIgfs20FileSystemLoopbackPrimarySelfTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(HadoopIgfsDualSyncSelfTest.class.getName()))); http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite2.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite2.java new file mode 100644 index 0000000..1a32cba --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite2.java @@ -0,0 +1,116 @@ +package org.apache.ignite.testsuites; + +import junit.framework.TestSuite; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.processors.hadoop.impl.client.HadoopClientProtocolSelfTest; +import org.apache.ignite.internal.util.typedef.F; + +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * Separate class loader for better class handling. + */ +// TODO: Remove or adopt. +public class IgniteHadoopTestSuite2 extends TestSuite { + /** + * @return Test suite. + * @throws Exception Thrown in case of the failure. + */ + public static TestSuite suite() throws Exception { + final ClassLoader ldr = new TestClassLoader(); + + TestSuite suite = new TestSuite("Ignite Hadoop MR Test Suite"); + + suite.addTest(new TestSuite(ldr.loadClass(HadoopClientProtocolSelfTest.class.getName()))); + + return suite; + } + + /** + * Class loader for tests. + */ + private static class TestClassLoader extends URLClassLoader { + /** Parent class loader. */ + private static final URLClassLoader APP_CLS_LDR = (URLClassLoader)TestClassLoader.class.getClassLoader(); + + /** */ + private static final Collection<URL> APP_JARS = F.asList(APP_CLS_LDR.getURLs()); + + /** All participating URLs. */ + private static final URL[] URLS; + + static { + try { + List<URL> allJars = new ArrayList<>(); + + allJars.addAll(APP_JARS); + + // TODO: Remove + //allJars.addAll(HadoopClasspathUtils.classpathForClassLoader()); + + List<URL> res = new ArrayList<>(); + + for (URL url : allJars) { + String urlStr = url.toString(); + + if (urlStr.contains("modules/hadoop/") || + urlStr.contains("modules/hadoop-impl/") || + urlStr.contains("org/apache/hadoop")) { + res.add(url); + + System.out.println(url.toString()); + } + } + + URLS = res.toArray(new URL[res.size()]); + } + catch (Exception e) { + throw new IgniteException("Failed to initialize classloader JARs.", e); + } + } + + /** + * Constructor. + * + * @throws Exception If failed. + */ + public TestClassLoader() throws Exception { + super(URLS, APP_CLS_LDR); + } + + /** {@inheritDoc} */ + @Override protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { + try { + synchronized (getClassLoadingLock(name)) { + // First, check if the class has already been loaded + Class c = findLoadedClass(name); + + if (c == null) { + long t1 = System.nanoTime(); + + c = findClass(name); + + // this is the defining class loader; record the stats + sun.misc.PerfCounter.getFindClassTime().addElapsedTimeFrom(t1); + sun.misc.PerfCounter.getFindClasses().increment(); + } + + if (resolve) + resolveClass(c); + + return c; + } + } + catch (NoClassDefFoundError | ClassNotFoundException e) { + // TODO: Remove + //System.out.println("Not founded, delegated: " + name); + } + + return super.loadClass(name, resolve); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java index 4ed1d65..0d4bf3e 100644 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java @@ -18,17 +18,17 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; -import org.apache.ignite.igfs.HadoopIgfs20FileSystemShmemPrimarySelfTest; -import org.apache.ignite.igfs.IgfsEventsTestSuite; -import org.apache.ignite.igfs.IgniteHadoopFileSystemIpcCacheSelfTest; -import org.apache.ignite.igfs.IgniteHadoopFileSystemShmemEmbeddedDualAsyncSelfTest; -import org.apache.ignite.igfs.IgniteHadoopFileSystemShmemEmbeddedDualSyncSelfTest; -import org.apache.ignite.igfs.IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest; -import org.apache.ignite.igfs.IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest; -import org.apache.ignite.igfs.IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest; -import org.apache.ignite.igfs.IgniteHadoopFileSystemShmemExternalDualSyncSelfTest; -import org.apache.ignite.igfs.IgniteHadoopFileSystemShmemExternalPrimarySelfTest; -import org.apache.ignite.igfs.IgniteHadoopFileSystemShmemExternalSecondarySelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfs20FileSystemShmemPrimarySelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgfsEventsTestSuite; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemIpcCacheSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemShmemEmbeddedDualAsyncSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemShmemEmbeddedDualSyncSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemShmemExternalDualSyncSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemShmemExternalPrimarySelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemShmemExternalSecondarySelfTest; import org.apache.ignite.internal.processors.igfs.IgfsServerManagerIpcEndpointRegistrationOnLinuxAndMacSelfTest; import static org.apache.ignite.testsuites.IgniteHadoopTestSuite.downloadHadoop; http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java index d11944e..f1c1b16 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java @@ -43,15 +43,16 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter /** {@inheritDoc} */ @Override public void write(HadoopJob job, HadoopCounters cntrs) throws IgniteCheckedException { - delegate().write(job, cntrs); + delegate(job).write(job, cntrs); } /** * Get delegate creating it if needed. * + * @param job Job. * @return Delegate. */ - private HadoopFileSystemCounterWriterDelegate delegate() { + private HadoopFileSystemCounterWriterDelegate delegate(HadoopJob job) { HadoopFileSystemCounterWriterDelegate delegate0 = delegate; if (delegate0 == null) { @@ -59,7 +60,7 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter delegate0 = delegate; if (delegate0 == null) { - delegate0 = HadoopDelegateUtils.counterWriterDelegate(this); + delegate0 = HadoopDelegateUtils.counterWriterDelegate(job.getClass().getClassLoader(), this); delegate = delegate0; } http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java index 5fb078a..c9d08c5 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java @@ -24,9 +24,13 @@ import org.apache.ignite.igfs.IgfsFile; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.igfs.IgfsUserContext; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader; +import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils; import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware; import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils; import org.apache.ignite.internal.processors.hadoop.delegate.HadoopIgfsSecondaryFileSystemDelegate; +import org.apache.ignite.internal.processors.igfs.IgfsKernalContextAware; import org.apache.ignite.internal.processors.igfs.IgfsSecondaryFileSystemV2; import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.lifecycle.LifecycleAware; @@ -42,16 +46,19 @@ import java.util.concurrent.Callable; * <p> * Target {@code FileSystem}'s are created on per-user basis using passed {@link HadoopFileSystemFactory}. */ -public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystemV2, LifecycleAware, - HadoopPayloadAware { +public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystemV2, IgfsKernalContextAware, + LifecycleAware, HadoopPayloadAware { /** The default user name. It is used if no user context is set. */ private String dfltUsrName; /** Factory. */ private HadoopFileSystemFactory factory; + /** Kernal context. */ + private volatile GridKernalContext ctx; + /** Target. */ - volatile private HadoopIgfsSecondaryFileSystemDelegate target; + private volatile HadoopIgfsSecondaryFileSystemDelegate target; /** * Default constructor for Spring. @@ -238,10 +245,24 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys } /** {@inheritDoc} */ + @Override public void setKernalContext(GridKernalContext ctx) { + this.ctx = ctx; + } + + /** {@inheritDoc} */ @Override public void start() throws IgniteException { - target = HadoopDelegateUtils.secondaryFileSystemDelegate(this); + HadoopClassLoader ldr = ctx.hadoopHelper().commonClassLoader(); + + ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(ldr); + + try { + target = HadoopDelegateUtils.secondaryFileSystemDelegate(ldr, this); - target.start(); + target.start(); + } + finally { + HadoopCommonUtils.restoreContextClassLoader(oldLdr); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java new file mode 100644 index 0000000..23eaa18 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.HadoopConfiguration; +import org.apache.ignite.internal.IgniteNodeAttributes; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Arrays; + +/** + * Hadoop attributes. + */ +public class HadoopAttributes implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Attribute name. */ + public static final String NAME = IgniteNodeAttributes.ATTR_PREFIX + ".hadoop"; + + /** Map-reduce planner class name. */ + private String plannerCls; + + /** External executor flag. */ + private boolean extExec; + + /** Maximum parallel tasks. */ + private int maxParallelTasks; + + /** Maximum task queue size. */ + private int maxTaskQueueSize; + + /** Library names. */ + @GridToStringExclude + private String[] libNames; + + /** Number of cores. */ + private int cores; + + /** + * Get attributes for node (if any). + * + * @param node Node. + * @return Attributes or {@code null} if Hadoop Accelerator is not enabled for node. + */ + @Nullable public static HadoopAttributes forNode(ClusterNode node) { + return node.attribute(NAME); + } + + /** + * {@link Externalizable} support. + */ + public HadoopAttributes() { + // No-op. + } + + /** + * Constructor. + * + * @param cfg Configuration. + */ + public HadoopAttributes(HadoopConfiguration cfg) { + assert cfg != null; + assert cfg.getMapReducePlanner() != null; + + plannerCls = cfg.getMapReducePlanner().getClass().getName(); + + // TODO: IGNITE-404: Get from configuration when fixed. + extExec = false; + + maxParallelTasks = cfg.getMaxParallelTasks(); + maxTaskQueueSize = cfg.getMaxTaskQueueSize(); + libNames = cfg.getNativeLibraryNames(); + + // Cores count already passed in other attributes, we add it here for convenience. + cores = Runtime.getRuntime().availableProcessors(); + } + + /** + * @return Map reduce planner class name. + */ + public String plannerClassName() { + return plannerCls; + } + + /** + * @return External execution flag. + */ + public boolean externalExecution() { + return extExec; + } + + /** + * @return Maximum parallel tasks. + */ + public int maxParallelTasks() { + return maxParallelTasks; + } + + /** + * @return Maximum task queue size. + */ + public int maxTaskQueueSize() { + return maxTaskQueueSize; + } + + + /** + * @return Native library names. + */ + public String[] nativeLibraryNames() { + return libNames; + } + + /** + * @return Number of cores on machine. + */ + public int cores() { + return cores; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(plannerCls); + out.writeBoolean(extExec); + out.writeInt(maxParallelTasks); + out.writeInt(maxTaskQueueSize); + out.writeObject(libNames); + out.writeInt(cores); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + plannerCls = (String)in.readObject(); + extExec = in.readBoolean(); + maxParallelTasks = in.readInt(); + maxTaskQueueSize = in.readInt(); + libNames = (String[])in.readObject(); + cores = in.readInt(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopAttributes.class, this, "libNames", Arrays.toString(libNames)); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java index 83f94ce..37af147 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.hadoop; +import org.jetbrains.annotations.Nullable; + import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -26,6 +28,21 @@ import java.util.TreeSet; * Common Hadoop utility methods which do not depend on Hadoop API. */ public class HadoopCommonUtils { + /** Job class name. */ + public static final String JOB_CLS_NAME = "org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2Job"; + + /** Property to store timestamp of new job id request. */ + public static final String REQ_NEW_JOBID_TS_PROPERTY = "ignite.job.requestNewIdTs"; + + /** Property to store timestamp of response of new job id request. */ + public static final String RESPONSE_NEW_JOBID_TS_PROPERTY = "ignite.job.responseNewIdTs"; + + /** Property to store timestamp of job submission. */ + public static final String JOB_SUBMISSION_START_TS_PROPERTY = "ignite.job.submissionStartTs"; + + /** Property to set custom writer of job statistics. */ + public static final String JOB_COUNTER_WRITER_PROPERTY = "ignite.counters.writer"; + /** * Sort input splits by length. * @@ -52,6 +69,33 @@ public class HadoopCommonUtils { } /** + * Set context class loader. + * + * @param newLdr New class loader. + * @return Old class loader. + */ + @Nullable public static ClassLoader setContextClassLoader(@Nullable ClassLoader newLdr) { + ClassLoader oldLdr = Thread.currentThread().getContextClassLoader(); + + if (newLdr != oldLdr) + Thread.currentThread().setContextClassLoader(newLdr); + + return oldLdr; + } + + /** + * Restore context class loader. + * + * @param oldLdr Original class loader. + */ + public static void restoreContextClassLoader(@Nullable ClassLoader oldLdr) { + ClassLoader newLdr = Thread.currentThread().getContextClassLoader(); + + if (newLdr != oldLdr) + Thread.currentThread().setContextClassLoader(oldLdr); + } + + /** * Split wrapper for sorting. */ private static class SplitSortWrapper implements Comparable<SplitSortWrapper> { http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java new file mode 100644 index 0000000..aeda5c0 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; + +/** + * Abstract class for all hadoop components. + */ +public abstract class HadoopComponent { + /** Hadoop context. */ + protected HadoopContext ctx; + + /** Logger. */ + protected IgniteLogger log; + + /** + * @param ctx Hadoop context. + */ + public void start(HadoopContext ctx) throws IgniteCheckedException { + this.ctx = ctx; + + log = ctx.kernalContext().log(getClass()); + } + + /** + * Stops manager. + */ + public void stop(boolean cancel) { + // No-op. + } + + /** + * Callback invoked when all grid components are started. + */ + public void onKernalStart() throws IgniteCheckedException { + // No-op. + } + + /** + * Callback invoked before all grid components are stopped. + */ + public void onKernalStop(boolean cancel) { + // No-op. + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java new file mode 100644 index 0000000..4326ad2 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.HadoopConfiguration; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobMetadata; +import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker; +import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffle; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskExecutorAdapter; +import org.apache.ignite.internal.util.typedef.internal.CU; + +/** + * Hadoop accelerator context. + */ +public class HadoopContext { + /** Kernal context. */ + private GridKernalContext ctx; + + /** Hadoop configuration. */ + private HadoopConfiguration cfg; + + /** Job tracker. */ + private HadoopJobTracker jobTracker; + + /** External task executor. */ + private HadoopTaskExecutorAdapter taskExecutor; + + /** */ + private HadoopShuffle shuffle; + + /** Managers list. */ + private List<HadoopComponent> components = new ArrayList<>(); + + /** + * @param ctx Kernal context. + */ + public HadoopContext( + GridKernalContext ctx, + HadoopConfiguration cfg, + HadoopJobTracker jobTracker, + HadoopTaskExecutorAdapter taskExecutor, + HadoopShuffle shuffle + ) { + this.ctx = ctx; + this.cfg = cfg; + + this.jobTracker = add(jobTracker); + this.taskExecutor = add(taskExecutor); + this.shuffle = add(shuffle); + } + + /** + * Gets list of managers. + * + * @return List of managers. + */ + public List<HadoopComponent> components() { + return components; + } + + /** + * Gets kernal context. + * + * @return Grid kernal context instance. + */ + public GridKernalContext kernalContext() { + return ctx; + } + + /** + * Gets Hadoop configuration. + * + * @return Hadoop configuration. + */ + public HadoopConfiguration configuration() { + return cfg; + } + + /** + * Gets local node ID. Shortcut for {@code kernalContext().localNodeId()}. + * + * @return Local node ID. + */ + public UUID localNodeId() { + return ctx.localNodeId(); + } + + /** + * Gets local node order. + * + * @return Local node order. + */ + public long localNodeOrder() { + assert ctx.discovery() != null; + + return ctx.discovery().localNode().order(); + } + + /** + * @return Hadoop-enabled nodes. + */ + public Collection<ClusterNode> nodes() { + return ctx.discovery().cacheNodes(CU.SYS_CACHE_HADOOP_MR, ctx.discovery().topologyVersionEx()); + } + + /** + * @return {@code True} if + */ + public boolean jobUpdateLeader() { + long minOrder = Long.MAX_VALUE; + ClusterNode minOrderNode = null; + + for (ClusterNode node : nodes()) { + if (node.order() < minOrder) { + minOrder = node.order(); + minOrderNode = node; + } + } + + assert minOrderNode != null; + + return localNodeId().equals(minOrderNode.id()); + } + + /** + * @param meta Job metadata. + * @return {@code true} If local node is participating in job execution. + */ + public boolean isParticipating(HadoopJobMetadata meta) { + UUID locNodeId = localNodeId(); + + if (locNodeId.equals(meta.submitNodeId())) + return true; + + HadoopMapReducePlan plan = meta.mapReducePlan(); + + return plan.mapperNodeIds().contains(locNodeId) || plan.reducerNodeIds().contains(locNodeId) || jobUpdateLeader(); + } + + /** + * @return Jon tracker instance. + */ + public HadoopJobTracker jobTracker() { + return jobTracker; + } + + /** + * @return Task executor. + */ + public HadoopTaskExecutorAdapter taskExecutor() { + return taskExecutor; + } + + /** + * @return Shuffle. + */ + public HadoopShuffle shuffle() { + return shuffle; + } + + /** + * @return Map-reduce planner. + */ + public HadoopMapReducePlanner planner() { + return cfg.getMapReducePlanner(); + } + + /** + * Adds component. + * + * @param c Component to add. + * @return Added manager. + */ + private <C extends HadoopComponent> C add(C c) { + components.add(c); + + return c; + } +} \ No newline at end of file