http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopDefaultMapReducePlannerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopDefaultMapReducePlannerSelfTest.java new file mode 100644 index 0000000..ee1c88f --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopDefaultMapReducePlannerSelfTest.java @@ -0,0 +1,619 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteFileSystem; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.hadoop.mapreduce.IgniteHadoopMapReducePlanner; +import org.apache.ignite.igfs.IgfsBlockLocation; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; +import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; +import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan; +import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner; +import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner; +import org.apache.ignite.internal.processors.igfs.IgfsBlockLocationImpl; +import org.apache.ignite.internal.processors.igfs.IgfsIgniteMock; +import org.apache.ignite.internal.processors.igfs.IgfsMock; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.testframework.GridTestNode; +import org.apache.ignite.testframework.GridTestUtils; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +/** + * + */ +public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTest { + /** */ + private static final UUID ID_1 = new UUID(0, 1); + + /** */ + private static final UUID ID_2 = new UUID(0, 2); + + /** */ + private static final UUID ID_3 = new UUID(0, 3); + + /** */ + private static final String HOST_1 = "host1"; + + /** */ + private static final String HOST_2 = "host2"; + + /** */ + private static final String HOST_3 = "host3"; + + /** */ + private static final String INVALID_HOST_1 = "invalid_host1"; + + /** */ + private static final String INVALID_HOST_2 = "invalid_host2"; + + /** */ + private static final String INVALID_HOST_3 = "invalid_host3"; + + /** Mocked IGFS. */ + private static final IgniteFileSystem IGFS = new MockIgfs(); + + /** Mocked Grid. */ + private static final IgfsIgniteMock GRID = new IgfsIgniteMock(null, IGFS); + + /** Planner. */ + private static final HadoopMapReducePlanner PLANNER = new IgniteHadoopMapReducePlanner(); + + /** Block locations. */ + private static final Map<Block, Collection<IgfsBlockLocation>> BLOCK_MAP = new HashMap<>(); + + /** Proxy map. */ + private static final Map<URI, Boolean> PROXY_MAP = new HashMap<>(); + + /** Last created plan. */ + private static final ThreadLocal<HadoopMapReducePlan> PLAN = new ThreadLocal<>(); + + /** + * Static initializer. + */ + static { + GridTestUtils.setFieldValue(PLANNER, HadoopAbstractMapReducePlanner.class, "ignite", GRID); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + GridTestUtils.setFieldValue(PLANNER, HadoopAbstractMapReducePlanner.class, "log", log()); + + BLOCK_MAP.clear(); + PROXY_MAP.clear(); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testIgfsOneBlockPerNode() throws IgniteCheckedException { + HadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1); + HadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_2); + HadoopFileBlock split3 = split(true, "/file3", 0, 100, HOST_3); + + mapIgfsBlock(split1.file(), 0, 100, location(0, 100, ID_1)); + mapIgfsBlock(split2.file(), 0, 100, location(0, 100, ID_2)); + mapIgfsBlock(split3.file(), 0, 100, location(0, 100, ID_3)); + + plan(1, split1); + assert ensureMappers(ID_1, split1); + assert ensureReducers(ID_1, 1); + assert ensureEmpty(ID_2); + assert ensureEmpty(ID_3); + + plan(2, split1); + assert ensureMappers(ID_1, split1); + assert ensureReducers(ID_1, 2); + assert ensureEmpty(ID_2); + assert ensureEmpty(ID_3); + + plan(1, split1, split2); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(2, split1, split2); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureReducers(ID_1, 1); + assert ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(3, split1, split2); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) || ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(3, split1, split2, split3); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureMappers(ID_3, split3); + assert ensureReducers(ID_1, 1); + assert ensureReducers(ID_2, 1); + assert ensureReducers(ID_3, 1); + + plan(5, split1, split2, split3); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureMappers(ID_3, split3); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) || + ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) || + ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testNonIgfsOneBlockPerNode() throws IgniteCheckedException { + HadoopFileBlock split1 = split(false, "/file1", 0, 100, HOST_1); + HadoopFileBlock split2 = split(false, "/file2", 0, 100, HOST_2); + HadoopFileBlock split3 = split(false, "/file3", 0, 100, HOST_3); + + plan(1, split1); + assert ensureMappers(ID_1, split1); + assert ensureReducers(ID_1, 1); + assert ensureEmpty(ID_2); + assert ensureEmpty(ID_3); + + plan(2, split1); + assert ensureMappers(ID_1, split1); + assert ensureReducers(ID_1, 2); + assert ensureEmpty(ID_2); + assert ensureEmpty(ID_3); + + plan(1, split1, split2); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(2, split1, split2); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureReducers(ID_1, 1); + assert ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(3, split1, split2); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) || ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(3, split1, split2, split3); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureMappers(ID_3, split3); + assert ensureReducers(ID_1, 1); + assert ensureReducers(ID_2, 1); + assert ensureReducers(ID_3, 1); + + plan(5, split1, split2, split3); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureMappers(ID_3, split3); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) || + ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) || + ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testIgfsSeveralBlocksPerNode() throws IgniteCheckedException { + HadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1, HOST_2); + HadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_1, HOST_2); + HadoopFileBlock split3 = split(true, "/file3", 0, 100, HOST_1, HOST_3); + + mapIgfsBlock(split1.file(), 0, 100, location(0, 100, ID_1, ID_2)); + mapIgfsBlock(split2.file(), 0, 100, location(0, 100, ID_1, ID_2)); + mapIgfsBlock(split3.file(), 0, 100, location(0, 100, ID_1, ID_3)); + + plan(1, split1); + assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) || + ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(2, split1); + assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && ensureEmpty(ID_2) || + ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 2); + assert ensureEmpty(ID_3); + + plan(1, split1, split2); + assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) || + ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(2, split1, split2); + assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) || + ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1); + assert ensureReducers(ID_1, 1); + assert ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(3, split1, split2, split3); + assert ensureReducers(ID_1, 1); + assert ensureReducers(ID_2, 1); + assert ensureReducers(ID_3, 1); + + plan(5, split1, split2, split3); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) || + ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) || + ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testNonIgfsSeveralBlocksPerNode() throws IgniteCheckedException { + HadoopFileBlock split1 = split(false, "/file1", 0, 100, HOST_1, HOST_2); + HadoopFileBlock split2 = split(false, "/file2", 0, 100, HOST_1, HOST_2); + HadoopFileBlock split3 = split(false, "/file3", 0, 100, HOST_1, HOST_3); + + plan(1, split1); + assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) || + ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(2, split1); + assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && ensureEmpty(ID_2) || + ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 2); + assert ensureEmpty(ID_3); + + plan(1, split1, split2); + assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) || + ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(2, split1, split2); + assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) || + ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1); + assert ensureReducers(ID_1, 1); + assert ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(3, split1, split2, split3); + assert ensureReducers(ID_1, 1); + assert ensureReducers(ID_2, 1); + assert ensureReducers(ID_3, 1); + + plan(5, split1, split2, split3); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) || + ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) || + ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testIgfsSeveralComplexBlocksPerNode() throws IgniteCheckedException { + HadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1, HOST_2, HOST_3); + HadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_1, HOST_2, HOST_3); + + mapIgfsBlock(split1.file(), 0, 100, location(0, 50, ID_1, ID_2), location(51, 100, ID_1, ID_3)); + mapIgfsBlock(split2.file(), 0, 100, location(0, 50, ID_1, ID_2), location(51, 100, ID_2, ID_3)); + + plan(1, split1); + assert ensureMappers(ID_1, split1); + assert ensureReducers(ID_1, 1); + assert ensureEmpty(ID_2); + assert ensureEmpty(ID_3); + + plan(1, split2); + assert ensureMappers(ID_2, split2); + assert ensureReducers(ID_2, 1); + assert ensureEmpty(ID_1); + assert ensureEmpty(ID_3); + + plan(1, split1, split2); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1) || ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0); + assert ensureEmpty(ID_3); + + plan(2, split1, split2); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureReducers(ID_1, 1); + assert ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testNonIgfsOrphans() throws IgniteCheckedException { + HadoopFileBlock split1 = split(false, "/file1", 0, 100, INVALID_HOST_1, INVALID_HOST_2); + HadoopFileBlock split2 = split(false, "/file2", 0, 100, INVALID_HOST_1, INVALID_HOST_3); + HadoopFileBlock split3 = split(false, "/file3", 0, 100, INVALID_HOST_2, INVALID_HOST_3); + + plan(1, split1); + assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) && ensureEmpty(ID_3) || + ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 1) && ensureEmpty(ID_3) || + ensureEmpty(ID_1) && ensureEmpty(ID_2) && ensureMappers(ID_3, split1) && ensureReducers(ID_3, 1); + + plan(2, split1); + assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && ensureEmpty(ID_2) && ensureEmpty(ID_3) || + ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 2) && ensureEmpty(ID_3) || + ensureEmpty(ID_1) && ensureEmpty(ID_2) && ensureMappers(ID_3, split1) && ensureReducers(ID_3, 2); + + plan(1, split1, split2, split3); + assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split3) || + ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split2) || + ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split3) || + ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split1) || + ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split2) || + ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split1); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) && ensureReducers(ID_3, 0) || + ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 0) || + ensureReducers(ID_1, 0) && ensureReducers(ID_2, 0) && ensureReducers(ID_3, 1); + + plan(3, split1, split2, split3); + assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split3) || + ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split2) || + ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split3) || + ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split1) || + ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split2) || + ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split1); + assert ensureReducers(ID_1, 1); + assert ensureReducers(ID_2, 1); + assert ensureReducers(ID_3, 1); + + plan(5, split1, split2, split3); + assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split3) || + ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split2) || + ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split3) || + ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split1) || + ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split2) || + ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split1); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) || + ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) || + ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1); + } + + /** + * Create plan. + * + * @param reducers Reducers count. + * @param splits Splits. + * @return Plan. + * @throws IgniteCheckedException If failed. + */ + private static HadoopMapReducePlan plan(int reducers, HadoopInputSplit... splits) throws IgniteCheckedException { + assert reducers > 0; + assert splits != null && splits.length > 0; + + Collection<HadoopInputSplit> splitList = new ArrayList<>(splits.length); + + Collections.addAll(splitList, splits); + + Collection<ClusterNode> top = new ArrayList<>(); + + GridTestNode node1 = new GridTestNode(ID_1); + GridTestNode node2 = new GridTestNode(ID_2); + GridTestNode node3 = new GridTestNode(ID_3); + + node1.setHostName(HOST_1); + node2.setHostName(HOST_2); + node3.setHostName(HOST_3); + + top.add(node1); + top.add(node2); + top.add(node3); + + HadoopMapReducePlan plan = PLANNER.preparePlan(new HadoopPlannerMockJob(splitList, reducers), top, null); + + PLAN.set(plan); + + return plan; + } + + /** + * Ensure that node contains the given mappers. + * + * @param nodeId Node ID. + * @param expSplits Expected splits. + * @return {@code True} if this assumption is valid. + */ + private static boolean ensureMappers(UUID nodeId, HadoopInputSplit... expSplits) { + Collection<HadoopInputSplit> expSplitsCol = new ArrayList<>(); + + Collections.addAll(expSplitsCol, expSplits); + + Collection<HadoopInputSplit> splits = PLAN.get().mappers(nodeId); + + return F.eq(expSplitsCol, splits); + } + + /** + * Ensure that node contains the given amount of reducers. + * + * @param nodeId Node ID. + * @param reducers Reducers. + * @return {@code True} if this assumption is valid. + */ + private static boolean ensureReducers(UUID nodeId, int reducers) { + int[] reducersArr = PLAN.get().reducers(nodeId); + + return reducers == 0 ? F.isEmpty(reducersArr) : (reducersArr != null && reducersArr.length == reducers); + } + + /** + * Ensure that no mappers and reducers is located on this node. + * + * @param nodeId Node ID. + * @return {@code True} if this assumption is valid. + */ + private static boolean ensureEmpty(UUID nodeId) { + return F.isEmpty(PLAN.get().mappers(nodeId)) && F.isEmpty(PLAN.get().reducers(nodeId)); + } + + /** + * Create split. + * + * @param igfs IGFS flag. + * @param file File. + * @param start Start. + * @param len Length. + * @param hosts Hosts. + * @return Split. + */ + private static HadoopFileBlock split(boolean igfs, String file, long start, long len, String... hosts) { + URI uri = URI.create((igfs ? "igfs://igfs@" : "hdfs://") + file); + + return new HadoopFileBlock(hosts, uri, start, len); + } + + /** + * Create block location. + * + * @param start Start. + * @param len Length. + * @param nodeIds Node IDs. + * @return Block location. + */ + private static IgfsBlockLocation location(long start, long len, UUID... nodeIds) { + assert nodeIds != null && nodeIds.length > 0; + + Collection<ClusterNode> nodes = new ArrayList<>(nodeIds.length); + + for (UUID id : nodeIds) + nodes.add(new GridTestNode(id)); + + return new IgfsBlockLocationImpl(start, len, nodes); + } + + /** + * Map IGFS block to nodes. + * + * @param file File. + * @param start Start. + * @param len Length. + * @param locations Locations. + */ + private static void mapIgfsBlock(URI file, long start, long len, IgfsBlockLocation... locations) { + assert locations != null && locations.length > 0; + + IgfsPath path = new IgfsPath(file); + + Block block = new Block(path, start, len); + + Collection<IgfsBlockLocation> locationsList = new ArrayList<>(); + + Collections.addAll(locationsList, locations); + + BLOCK_MAP.put(block, locationsList); + } + + /** + * Block. + */ + private static class Block { + /** */ + private final IgfsPath path; + + /** */ + private final long start; + + /** */ + private final long len; + + /** + * Constructor. + * + * @param path Path. + * @param start Start. + * @param len Length. + */ + private Block(IgfsPath path, long start, long len) { + this.path = path; + this.start = start; + this.len = len; + } + + /** {@inheritDoc} */ + @SuppressWarnings("RedundantIfStatement") + @Override public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof Block)) return false; + + Block block = (Block) o; + + if (len != block.len) + return false; + + if (start != block.start) + return false; + + if (!path.equals(block.path)) + return false; + + return true; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = path.hashCode(); + + res = 31 * res + (int) (start ^ (start >>> 32)); + res = 31 * res + (int) (len ^ (len >>> 32)); + + return res; + } + } + + /** + * Mocked IGFS. + */ + private static class MockIgfs extends IgfsMock { + /** + * Constructor. + */ + public MockIgfs() { + super("igfs"); + } + + /** {@inheritDoc} */ + @Override public boolean isProxy(URI path) { + return PROXY_MAP.containsKey(path) && PROXY_MAP.get(path); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) { + return BLOCK_MAP.get(new Block(path, start, len)); + } + + /** {@inheritDoc} */ + @Override public boolean exists(IgfsPath path) { + return true; + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopErrorSimulator.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopErrorSimulator.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopErrorSimulator.java new file mode 100644 index 0000000..b89dcc1 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopErrorSimulator.java @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Error simulator. + */ +public class HadoopErrorSimulator { + /** No-op singleton instance. */ + public static final HadoopErrorSimulator noopInstance = new HadoopErrorSimulator(); + + /** Instance ref. */ + private static final AtomicReference<HadoopErrorSimulator> ref = new AtomicReference<>(noopInstance); + + /** + * Creates simulator of given kind with given stage bits. + * + * @param kind The kind. + * @param bits The stage bits. + * @return The simulator. + */ + public static HadoopErrorSimulator create(Kind kind, int bits) { + switch (kind) { + case Noop: + return noopInstance; + case Runtime: + return new RuntimeExceptionBitHadoopErrorSimulator(bits); + case IOException: + return new IOExceptionBitHadoopErrorSimulator(bits); + case Error: + return new ErrorBitHadoopErrorSimulator(bits); + default: + throw new IllegalStateException("Unknown kind: " + kind); + } + } + + /** + * Gets the error simulator instance. + */ + public static HadoopErrorSimulator instance() { + return ref.get(); + } + + /** + * Sets instance. + */ + public static boolean setInstance(HadoopErrorSimulator expect, HadoopErrorSimulator update) { + return ref.compareAndSet(expect, update); + } + + /** + * Constructor. + */ + private HadoopErrorSimulator() { + // no-op + } + + /** + * Invoked on the named stage. + */ + public void onMapConfigure() { + // no-op + } + + /** + * Invoked on the named stage. + */ + public void onMapSetup() throws IOException, InterruptedException { + // no-op + } + + /** + * Invoked on the named stage. + */ + public void onMap() throws IOException { + // no-op + } + + /** + * Invoked on the named stage. + */ + public void onMapCleanup() throws IOException, InterruptedException { + // no-op + } + + /** + * Invoked on the named stage. + */ + public void onMapClose() throws IOException { + // no-op + } + + /** + * setConf() does not declare IOException to be thrown. + */ + public void onCombineConfigure() { + // no-op + } + + /** + * Invoked on the named stage. + */ + public void onCombineSetup() throws IOException, InterruptedException { + // no-op + } + + /** + * Invoked on the named stage. + */ + public void onCombine() throws IOException { + // no-op + } + + /** + * Invoked on the named stage. + */ + public void onCombineCleanup() throws IOException, InterruptedException { + // no-op + } + + /** + * Invoked on the named stage. + */ + public void onReduceConfigure() { + // no-op + } + + /** + * Invoked on the named stage. + */ + public void onReduceSetup() throws IOException, InterruptedException { + // no-op + } + + /** + * Invoked on the named stage. + */ + public void onReduce() throws IOException { + // no-op + } + + /** + * Invoked on the named stage. + */ + public void onReduceCleanup() throws IOException, InterruptedException { + // no-op + } + + /** + * Error kind. + */ + public enum Kind { + /** No error. */ + Noop, + + /** Runtime. */ + Runtime, + + /** IOException. */ + IOException, + + /** java.lang.Error. */ + Error + } + + /** + * Runtime error simulator. + */ + public static class RuntimeExceptionBitHadoopErrorSimulator extends HadoopErrorSimulator { + /** Stage bits: defines what map-reduce stages will cause errors. */ + private final int bits; + + /** + * Constructor. + */ + protected RuntimeExceptionBitHadoopErrorSimulator(int b) { + bits = b; + } + + /** + * Simulates an error. + */ + protected void simulateError() throws IOException { + throw new RuntimeException("An error simulated by " + getClass().getSimpleName()); + } + + /** {@inheritDoc} */ + @Override public final void onMapConfigure() { + try { + if ((bits & 1) != 0) + simulateError(); + } + catch (IOException e) { + // ignore + } + } + + /** {@inheritDoc} */ + @Override public final void onMapSetup() throws IOException, InterruptedException { + if ((bits & 2) != 0) + simulateError(); + } + + /** {@inheritDoc} */ + @Override public final void onMap() throws IOException { + if ((bits & 4) != 0) + simulateError(); + } + + /** {@inheritDoc} */ + @Override public final void onMapCleanup() throws IOException, InterruptedException { + if ((bits & 8) != 0) + simulateError(); + } + + /** {@inheritDoc} */ + @Override public final void onCombineConfigure() { + try { + if ((bits & 16) != 0) + simulateError(); + } + catch (IOException e) { + // ignore + } + } + + /** {@inheritDoc} */ + @Override public final void onCombineSetup() throws IOException, InterruptedException { + if ((bits & 32) != 0) + simulateError(); + } + + /** {@inheritDoc} */ + @Override public final void onCombine() throws IOException { + if ((bits & 64) != 0) + simulateError(); + } + + /** {@inheritDoc} */ + @Override public final void onCombineCleanup() throws IOException, InterruptedException { + if ((bits & 128) != 0) + simulateError(); + } + + /** {@inheritDoc} */ + @Override public final void onReduceConfigure() { + try { + if ((bits & 256) != 0) + simulateError(); + } + catch (IOException e) { + // ignore + } + } + + /** {@inheritDoc} */ + @Override public final void onReduceSetup() throws IOException, InterruptedException { + if ((bits & 512) != 0) + simulateError(); + } + + /** {@inheritDoc} */ + @Override public final void onReduce() throws IOException { + if ((bits & 1024) != 0) + simulateError(); + } + + /** {@inheritDoc} */ + @Override public final void onReduceCleanup() throws IOException, InterruptedException { + if ((bits & 2048) != 0) + simulateError(); + } + } + + /** + * java.lang.Error simulator. + */ + public static class ErrorBitHadoopErrorSimulator extends RuntimeExceptionBitHadoopErrorSimulator { + /** + * Constructor. + */ + public ErrorBitHadoopErrorSimulator(int bits) { + super(bits); + } + + /** {@inheritDoc} */ + @Override protected void simulateError() { + throw new Error("An error simulated by " + getClass().getSimpleName()); + } + } + + /** + * IOException simulator. + */ + public static class IOExceptionBitHadoopErrorSimulator extends RuntimeExceptionBitHadoopErrorSimulator { + /** + * Constructor. + */ + public IOExceptionBitHadoopErrorSimulator(int bits) { + super(bits); + } + + /** {@inheritDoc} */ + @Override protected void simulateError() throws IOException { + throw new IOException("An IOException simulated by " + getClass().getSimpleName()); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopFileSystemsTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopFileSystemsTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopFileSystemsTest.java new file mode 100644 index 0000000..1794a95 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopFileSystemsTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl; + +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils; +import org.apache.ignite.testframework.GridTestUtils; + +/** + * Test file systems for the working directory multi-threading support. + */ +public class HadoopFileSystemsTest extends HadoopAbstractSelfTest { + /** the number of threads */ + private static final int THREAD_COUNT = 3; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrids(gridCount()); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(true); + } + + /** {@inheritDoc} */ + @Override protected boolean igfsEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } + + + /** + * Test the file system with specified URI for the multi-thread working directory support. + * + * @param uri Base URI of the file system (scheme and authority). + * @throws Exception If fails. + */ + private void testFileSystem(final URI uri) throws Exception { + final Configuration cfg = new Configuration(); + + setupFileSystems(cfg); + + cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, + new Path(new Path(uri), "user/" + System.getProperty("user.name")).toString()); + + final CountDownLatch changeUserPhase = new CountDownLatch(THREAD_COUNT); + final CountDownLatch changeDirPhase = new CountDownLatch(THREAD_COUNT); + final CountDownLatch changeAbsDirPhase = new CountDownLatch(THREAD_COUNT); + final CountDownLatch finishPhase = new CountDownLatch(THREAD_COUNT); + + final Path[] newUserInitWorkDir = new Path[THREAD_COUNT]; + final Path[] newWorkDir = new Path[THREAD_COUNT]; + final Path[] newAbsWorkDir = new Path[THREAD_COUNT]; + final Path[] newInstanceWorkDir = new Path[THREAD_COUNT]; + + final AtomicInteger threadNum = new AtomicInteger(0); + + GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + try { + int curThreadNum = threadNum.getAndIncrement(); + + if ("file".equals(uri.getScheme())) + FileSystem.get(uri, cfg).setWorkingDirectory(new Path("file:///user/user" + curThreadNum)); + + changeUserPhase.countDown(); + changeUserPhase.await(); + + newUserInitWorkDir[curThreadNum] = FileSystem.get(uri, cfg).getWorkingDirectory(); + + FileSystem.get(uri, cfg).setWorkingDirectory(new Path("folder" + curThreadNum)); + + changeDirPhase.countDown(); + changeDirPhase.await(); + + newWorkDir[curThreadNum] = FileSystem.get(uri, cfg).getWorkingDirectory(); + + FileSystem.get(uri, cfg).setWorkingDirectory(new Path("/folder" + curThreadNum)); + + changeAbsDirPhase.countDown(); + changeAbsDirPhase.await(); + + newAbsWorkDir[curThreadNum] = FileSystem.get(uri, cfg).getWorkingDirectory(); + + newInstanceWorkDir[curThreadNum] = FileSystem.newInstance(uri, cfg).getWorkingDirectory(); + + finishPhase.countDown(); + } + catch (InterruptedException | IOException e) { + error("Failed to execute test thread.", e); + + fail(); + } + } + }, THREAD_COUNT, "filesystems-test"); + + finishPhase.await(); + + for (int i = 0; i < THREAD_COUNT; i ++) { + cfg.set(MRJobConfig.USER_NAME, "user" + i); + + Path workDir = new Path(new Path(uri), "user/user" + i); + + cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, workDir.toString()); + + assertEquals(workDir, FileSystem.newInstance(uri, cfg).getWorkingDirectory()); + + assertEquals(workDir, newUserInitWorkDir[i]); + + assertEquals(new Path(new Path(uri), "user/user" + i + "/folder" + i), newWorkDir[i]); + + assertEquals(new Path("/folder" + i), newAbsWorkDir[i]); + + assertEquals(new Path(new Path(uri), "user/" + System.getProperty("user.name")), newInstanceWorkDir[i]); + } + + System.out.println(System.getProperty("user.dir")); + } + + /** + * Test LocalFS multi-thread working directory. + * + * @throws Exception If fails. + */ + public void testLocal() throws Exception { + testFileSystem(URI.create("file:///")); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java new file mode 100644 index 0000000..d2e3418 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.ignite.configuration.HadoopConfiguration; +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.GridRandom; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.S; + +import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo; + +/** + * Grouping test. + */ +public class HadoopGroupingTest extends HadoopAbstractSelfTest { + /** */ + private static final String PATH_OUTPUT = "/test-out"; + + /** */ + private static final GridConcurrentHashSet<UUID> vals = HadoopSharedMap.map(HadoopGroupingTest.class) + .put("vals", new GridConcurrentHashSet<UUID>()); + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** {@inheritDoc} */ + protected boolean igfsEnabled() { + return false; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrids(gridCount()); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(true); + } + + /** {@inheritDoc} */ + @Override public HadoopConfiguration hadoopConfiguration(String gridName) { + HadoopConfiguration cfg = super.hadoopConfiguration(gridName); + + // TODO: IGNITE-404: Uncomment when fixed. + //cfg.setExternalExecution(false); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testGroupingReducer() throws Exception { + doTestGrouping(false); + } + + /** + * @throws Exception If failed. + */ + public void testGroupingCombiner() throws Exception { + doTestGrouping(true); + } + + /** + * @param combiner With combiner. + * @throws Exception If failed. + */ + public void doTestGrouping(boolean combiner) throws Exception { + vals.clear(); + + Job job = Job.getInstance(); + + job.setInputFormatClass(InFormat.class); + job.setOutputFormatClass(OutFormat.class); + + job.setOutputKeyClass(YearTemperature.class); + job.setOutputValueClass(Text.class); + + job.setMapperClass(Mapper.class); + + if (combiner) { + job.setCombinerClass(MyReducer.class); + job.setNumReduceTasks(0); + job.setCombinerKeyGroupingComparatorClass(YearComparator.class); + } + else { + job.setReducerClass(MyReducer.class); + job.setNumReduceTasks(4); + job.setGroupingComparatorClass(YearComparator.class); + } + + grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 2), + createJobInfo(job.getConfiguration())).get(30000); + + assertTrue(vals.isEmpty()); + } + + public static class MyReducer extends Reducer<YearTemperature, Text, Text, Object> { + /** */ + int lastYear; + + @Override protected void reduce(YearTemperature key, Iterable<Text> vals0, Context context) + throws IOException, InterruptedException { + X.println("___ : " + context.getTaskAttemptID() + " --> " + key); + + Set<UUID> ids = new HashSet<>(); + + for (Text val : vals0) + assertTrue(ids.add(UUID.fromString(val.toString()))); + + for (Text val : vals0) + assertTrue(ids.remove(UUID.fromString(val.toString()))); + + assertTrue(ids.isEmpty()); + + assertTrue(key.year > lastYear); + + lastYear = key.year; + + for (Text val : vals0) + assertTrue(vals.remove(UUID.fromString(val.toString()))); + } + } + + public static class YearComparator implements RawComparator<YearTemperature> { // Grouping comparator. + /** {@inheritDoc} */ + @Override public int compare(YearTemperature o1, YearTemperature o2) { + return Integer.compare(o1.year, o2.year); + } + + /** {@inheritDoc} */ + @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + throw new IllegalStateException(); + } + } + + public static class YearTemperature implements WritableComparable<YearTemperature>, Cloneable { + /** */ + private int year; + + /** */ + private int temperature; + + /** {@inheritDoc} */ + @Override public void write(DataOutput out) throws IOException { + out.writeInt(year); + out.writeInt(temperature); + } + + /** {@inheritDoc} */ + @Override public void readFields(DataInput in) throws IOException { + year = in.readInt(); + temperature = in.readInt(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + throw new IllegalStateException(); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { // To be partitioned by year. + return year; + } + + /** {@inheritDoc} */ + @Override public int compareTo(YearTemperature o) { + int res = Integer.compare(year, o.year); + + if (res != 0) + return res; + + // Sort comparator by year and temperature, to find max for year. + return Integer.compare(o.temperature, temperature); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(YearTemperature.class, this); + } + } + + public static class InFormat extends InputFormat<YearTemperature, Text> { + /** {@inheritDoc} */ + @Override public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { + ArrayList<InputSplit> list = new ArrayList<>(); + + for (int i = 0; i < 10; i++) + list.add(new HadoopSortingTest.FakeSplit(20)); + + return list; + } + + /** {@inheritDoc} */ + @Override public RecordReader<YearTemperature, Text> createRecordReader(final InputSplit split, + TaskAttemptContext context) throws IOException, InterruptedException { + return new RecordReader<YearTemperature, Text>() { + /** */ + int cnt; + + /** */ + Random rnd = new GridRandom(); + + /** */ + YearTemperature key = new YearTemperature(); + + /** */ + Text val = new Text(); + + @Override public void initialize(InputSplit split, TaskAttemptContext context) { + // No-op. + } + + @Override public boolean nextKeyValue() throws IOException, InterruptedException { + return cnt++ < split.getLength(); + } + + @Override public YearTemperature getCurrentKey() { + key.year = 1990 + rnd.nextInt(10); + key.temperature = 10 + rnd.nextInt(20); + + return key; + } + + @Override public Text getCurrentValue() { + UUID id = UUID.randomUUID(); + + assertTrue(vals.add(id)); + + val.set(id.toString()); + + return val; + } + + @Override public float getProgress() { + return 0; + } + + @Override public void close() { + // No-op. + } + }; + } + } + + /** + * + */ + public static class OutFormat extends OutputFormat { + /** {@inheritDoc} */ + @Override public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { + return null; + } + + /** {@inheritDoc} */ + @Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { + return null; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java new file mode 100644 index 0000000..fbb54ed --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.ignite.configuration.HadoopConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.processors.hadoop.Hadoop; +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus; +import org.apache.ignite.internal.util.typedef.internal.U; + +import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo; + +/** + * Job tracker self test. + */ +public class HadoopJobTrackerSelfTest extends HadoopAbstractSelfTest { + /** */ + private static final String PATH_OUTPUT = "/test-out"; + + /** Test block count parameter name. */ + private static final int BLOCK_CNT = 10; + + /** */ + private static HadoopSharedMap m = HadoopSharedMap.map(HadoopJobTrackerSelfTest.class); + + /** Map task execution count. */ + private static final AtomicInteger mapExecCnt = m.put("mapExecCnt", new AtomicInteger()); + + /** Reduce task execution count. */ + private static final AtomicInteger reduceExecCnt = m.put("reduceExecCnt", new AtomicInteger()); + + /** Reduce task execution count. */ + private static final AtomicInteger combineExecCnt = m.put("combineExecCnt", new AtomicInteger()); + + /** */ + private static final Map<String, CountDownLatch> latch = m.put("latch", new HashMap<String, CountDownLatch>()); + + /** {@inheritDoc} */ + @Override protected boolean igfsEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(gridCount()); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + latch.put("mapAwaitLatch", new CountDownLatch(1)); + latch.put("reduceAwaitLatch", new CountDownLatch(1)); + latch.put("combineAwaitLatch", new CountDownLatch(1)); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + mapExecCnt.set(0); + combineExecCnt.set(0); + reduceExecCnt.set(0); + } + + /** {@inheritDoc} */ + @Override public HadoopConfiguration hadoopConfiguration(String gridName) { + HadoopConfiguration cfg = super.hadoopConfiguration(gridName); + + cfg.setMapReducePlanner(new HadoopTestRoundRobinMrPlanner()); + + // TODO: IGNITE-404: Uncomment when fixed. + //cfg.setExternalExecution(false); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testSimpleTaskSubmit() throws Exception { + try { + UUID globalId = UUID.randomUUID(); + + Job job = Job.getInstance(); + setupFileSystems(job.getConfiguration()); + + job.setMapperClass(TestMapper.class); + job.setReducerClass(TestReducer.class); + job.setInputFormatClass(InFormat.class); + + FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT + "1")); + + HadoopJobId jobId = new HadoopJobId(globalId, 1); + + grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); + + checkStatus(jobId, false); + + info("Releasing map latch."); + + latch.get("mapAwaitLatch").countDown(); + + checkStatus(jobId, false); + + info("Releasing reduce latch."); + + latch.get("reduceAwaitLatch").countDown(); + + checkStatus(jobId, true); + + assertEquals(10, mapExecCnt.get()); + assertEquals(0, combineExecCnt.get()); + assertEquals(1, reduceExecCnt.get()); + } + finally { + // Safety. + latch.get("mapAwaitLatch").countDown(); + latch.get("combineAwaitLatch").countDown(); + latch.get("reduceAwaitLatch").countDown(); + } + } + + /** + * @throws Exception If failed. + */ + public void testTaskWithCombinerPerMap() throws Exception { + try { + UUID globalId = UUID.randomUUID(); + + Job job = Job.getInstance(); + setupFileSystems(job.getConfiguration()); + + job.setMapperClass(TestMapper.class); + job.setReducerClass(TestReducer.class); + job.setCombinerClass(TestCombiner.class); + job.setInputFormatClass(InFormat.class); + + FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT + "2")); + + HadoopJobId jobId = new HadoopJobId(globalId, 1); + + grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); + + checkStatus(jobId, false); + + info("Releasing map latch."); + + latch.get("mapAwaitLatch").countDown(); + + checkStatus(jobId, false); + + // All maps are completed. We have a combiner, so no reducers should be executed + // before combiner latch is released. + + U.sleep(50); + + assertEquals(0, reduceExecCnt.get()); + + info("Releasing combiner latch."); + + latch.get("combineAwaitLatch").countDown(); + + checkStatus(jobId, false); + + info("Releasing reduce latch."); + + latch.get("reduceAwaitLatch").countDown(); + + checkStatus(jobId, true); + + assertEquals(10, mapExecCnt.get()); + assertEquals(10, combineExecCnt.get()); + assertEquals(1, reduceExecCnt.get()); + } + finally { + // Safety. + latch.get("mapAwaitLatch").countDown(); + latch.get("combineAwaitLatch").countDown(); + latch.get("reduceAwaitLatch").countDown(); + } + } + + /** + * Checks job execution status. + * + * @param jobId Job ID. + * @param complete Completion status. + * @throws Exception If failed. + */ + private void checkStatus(HadoopJobId jobId, boolean complete) throws Exception { + for (int i = 0; i < gridCount(); i++) { + IgniteKernal kernal = (IgniteKernal)grid(i); + + Hadoop hadoop = kernal.hadoop(); + + HadoopJobStatus stat = hadoop.status(jobId); + + assert stat != null; + + IgniteInternalFuture<?> fut = hadoop.finishFuture(jobId); + + if (!complete) + assertFalse(fut.isDone()); + else { + info("Waiting for status future completion on node [idx=" + i + ", nodeId=" + + kernal.getLocalNodeId() + ']'); + + fut.get(); + } + } + } + + /** + * Test input format + */ + public static class InFormat extends InputFormat { + + @Override public List<InputSplit> getSplits(JobContext ctx) throws IOException, InterruptedException { + List<InputSplit> res = new ArrayList<>(BLOCK_CNT); + + for (int i = 0; i < BLOCK_CNT; i++) + try { + res.add(new FileSplit(new Path(new URI("someFile")), i, i + 1, new String[] {"localhost"})); + } + catch (URISyntaxException e) { + throw new IOException(e); + } + + return res; + } + + @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext ctx) throws IOException, InterruptedException { + return new RecordReader() { + @Override public void initialize(InputSplit split, TaskAttemptContext ctx) { + } + + @Override public boolean nextKeyValue() { + return false; + } + + @Override public Object getCurrentKey() { + return null; + } + + @Override public Object getCurrentValue() { + return null; + } + + @Override public float getProgress() { + return 0; + } + + @Override public void close() { + + } + }; + } + } + + /** + * Test mapper. + */ + private static class TestMapper extends Mapper { + @Override public void run(Context ctx) throws IOException, InterruptedException { + System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId()); + + latch.get("mapAwaitLatch").await(); + + mapExecCnt.incrementAndGet(); + + System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId()); + } + } + + /** + * Test reducer. + */ + private static class TestReducer extends Reducer { + @Override public void run(Context ctx) throws IOException, InterruptedException { + System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId()); + + latch.get("reduceAwaitLatch").await(); + + reduceExecCnt.incrementAndGet(); + + System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId()); + } + } + + /** + * Test combiner. + */ + private static class TestCombiner extends Reducer { + @Override public void run(Context ctx) throws IOException, InterruptedException { + System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId()); + + latch.get("combineAwaitLatch").await(); + + combineExecCnt.incrementAndGet(); + + System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java new file mode 100644 index 0000000..1ce45cd --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.serializer.WritableSerialization; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.ignite.configuration.HadoopConfiguration; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount1; +import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount2; + +import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo; + +/** + * Tests map-reduce execution with embedded mode. + */ +public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest { + /** */ + private static Map<String, Boolean> flags = HadoopSharedMap.map(HadoopMapReduceEmbeddedSelfTest.class) + .put("flags", new HashMap<String, Boolean>()); + + /** {@inheritDoc} */ + @Override public HadoopConfiguration hadoopConfiguration(String gridName) { + HadoopConfiguration cfg = super.hadoopConfiguration(gridName); + + // TODO: IGNITE-404: Uncomment when fixed. + //cfg.setExternalExecution(false); + + return cfg; + } + + /** + * Tests whole job execution with all phases in old and new versions of API with definition of custom + * Serialization, Partitioner and IO formats. + * @throws Exception If fails. + */ + public void testMultiReducerWholeMapReduceExecution() throws Exception { + IgfsPath inDir = new IgfsPath(PATH_INPUT); + + igfs.mkdirs(inDir); + + IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input"); + + generateTestFile(inFile.toString(), "key1", 10000, "key2", 20000, "key3", 15000, "key4", 7000, "key5", 12000, + "key6", 18000 ); + + for (int i = 0; i < 2; i++) { + boolean useNewAPI = i == 1; + + igfs.delete(new IgfsPath(PATH_OUTPUT), true); + + flags.put("serializationWasConfigured", false); + flags.put("partitionerWasConfigured", false); + flags.put("inputFormatWasConfigured", false); + flags.put("outputFormatWasConfigured", false); + + JobConf jobConf = new JobConf(); + + jobConf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName()); + + //To split into about 6-7 items for v2 + jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000); + + //For v1 + jobConf.setInt("fs.local.block.size", 65000); + + // File system coordinates. + setupFileSystems(jobConf); + + HadoopWordCount1.setTasksClasses(jobConf, !useNewAPI, !useNewAPI, !useNewAPI); + + if (!useNewAPI) { + jobConf.setPartitionerClass(CustomV1Partitioner.class); + jobConf.setInputFormat(CustomV1InputFormat.class); + jobConf.setOutputFormat(CustomV1OutputFormat.class); + } + + Job job = Job.getInstance(jobConf); + + HadoopWordCount2.setTasksClasses(job, useNewAPI, useNewAPI, useNewAPI, false); + + if (useNewAPI) { + job.setPartitionerClass(CustomV2Partitioner.class); + job.setInputFormatClass(CustomV2InputFormat.class); + job.setOutputFormatClass(CustomV2OutputFormat.class); + } + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + FileInputFormat.setInputPaths(job, new Path(igfsScheme() + inFile.toString())); + FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT)); + + job.setNumReduceTasks(3); + + job.setJarByClass(HadoopWordCount2.class); + + IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1), + createJobInfo(job.getConfiguration())); + + fut.get(); + + assertTrue("Serialization was configured (new API is " + useNewAPI + ")", + flags.get("serializationWasConfigured")); + + assertTrue("Partitioner was configured (new API is = " + useNewAPI + ")", + flags.get("partitionerWasConfigured")); + + assertTrue("Input format was configured (new API is = " + useNewAPI + ")", + flags.get("inputFormatWasConfigured")); + + assertTrue("Output format was configured (new API is = " + useNewAPI + ")", + flags.get("outputFormatWasConfigured")); + + assertEquals("Use new API = " + useNewAPI, + "key3\t15000\n" + + "key6\t18000\n", + readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00000") + ); + + assertEquals("Use new API = " + useNewAPI, + "key1\t10000\n" + + "key4\t7000\n", + readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00001") + ); + + assertEquals("Use new API = " + useNewAPI, + "key2\t20000\n" + + "key5\t12000\n", + readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00002") + ); + + } + } + + /** + * Custom serialization class that inherits behaviour of native {@link WritableSerialization}. + */ + protected static class CustomSerialization extends WritableSerialization { + @Override public void setConf(Configuration conf) { + super.setConf(conf); + + flags.put("serializationWasConfigured", true); + } + } + + /** + * Custom implementation of Partitioner in v1 API. + */ + private static class CustomV1Partitioner extends org.apache.hadoop.mapred.lib.HashPartitioner { + /** {@inheritDoc} */ + @Override public void configure(JobConf job) { + flags.put("partitionerWasConfigured", true); + } + } + + /** + * Custom implementation of Partitioner in v2 API. + */ + private static class CustomV2Partitioner extends org.apache.hadoop.mapreduce.lib.partition.HashPartitioner + implements Configurable { + /** {@inheritDoc} */ + @Override public void setConf(Configuration conf) { + flags.put("partitionerWasConfigured", true); + } + + /** {@inheritDoc} */ + @Override public Configuration getConf() { + return null; + } + } + + /** + * Custom implementation of InputFormat in v2 API. + */ + private static class CustomV2InputFormat extends org.apache.hadoop.mapreduce.lib.input.TextInputFormat implements Configurable { + /** {@inheritDoc} */ + @Override public void setConf(Configuration conf) { + flags.put("inputFormatWasConfigured", true); + } + + /** {@inheritDoc} */ + @Override public Configuration getConf() { + return null; + } + } + + /** + * Custom implementation of OutputFormat in v2 API. + */ + private static class CustomV2OutputFormat extends org.apache.hadoop.mapreduce.lib.output.TextOutputFormat implements Configurable { + /** {@inheritDoc} */ + @Override public void setConf(Configuration conf) { + flags.put("outputFormatWasConfigured", true); + } + + /** {@inheritDoc} */ + @Override public Configuration getConf() { + return null; + } + } + + /** + * Custom implementation of InputFormat in v1 API. + */ + private static class CustomV1InputFormat extends org.apache.hadoop.mapred.TextInputFormat { + /** {@inheritDoc} */ + @Override public void configure(JobConf job) { + super.configure(job); + + flags.put("inputFormatWasConfigured", true); + } + } + + /** + * Custom implementation of OutputFormat in v1 API. + */ + private static class CustomV1OutputFormat extends org.apache.hadoop.mapred.TextOutputFormat implements JobConfigurable { + /** {@inheritDoc} */ + @Override public void configure(JobConf job) { + flags.put("outputFormatWasConfigured", true); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceErrorResilienceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceErrorResilienceTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceErrorResilienceTest.java new file mode 100644 index 0000000..afd6f26 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceErrorResilienceTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl; + +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount2; + +/** + * Test of error resiliency after an error in a map-reduce job execution. + * Combinations tested: + * { new ALI, old API } + * x { unchecked exception, checked exception, error } + * x { phase where the error happens }. + */ +public class HadoopMapReduceErrorResilienceTest extends HadoopAbstractMapReduceTest { + /** + * Tests recovery. + * + * @throws Exception If failed. + */ + public void testRecoveryAfterAnError0_Runtime() throws Exception { + doTestRecoveryAfterAnError(0, HadoopErrorSimulator.Kind.Runtime); + } + + /** + * Tests recovery. + * + * @throws Exception If failed. + */ + public void testRecoveryAfterAnError0_IOException() throws Exception { + doTestRecoveryAfterAnError(0, HadoopErrorSimulator.Kind.IOException); + } + + /** + * Tests recovery. + * + * @throws Exception If failed. + */ + public void testRecoveryAfterAnError0_Error() throws Exception { + doTestRecoveryAfterAnError(0, HadoopErrorSimulator.Kind.Error); + } + + /** + * Tests recovery. + * + * @throws Exception If failed. + */ + public void testRecoveryAfterAnError7_Runtime() throws Exception { + doTestRecoveryAfterAnError(7, HadoopErrorSimulator.Kind.Runtime); + } + /** + * Tests recovery. + * + * @throws Exception If failed. + */ + public void testRecoveryAfterAnError7_IOException() throws Exception { + doTestRecoveryAfterAnError(7, HadoopErrorSimulator.Kind.IOException); + } + /** + * Tests recovery. + * + * @throws Exception If failed. + */ + public void testRecoveryAfterAnError7_Error() throws Exception { + doTestRecoveryAfterAnError(7, HadoopErrorSimulator.Kind.Error); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 10 * 60 * 1000L; + } + + /** + * Tests correct work after an error. + * + * @throws Exception On error. + */ + private void doTestRecoveryAfterAnError(int useNewBits, HadoopErrorSimulator.Kind simulatorKind) throws Exception { + try { + IgfsPath inDir = new IgfsPath(PATH_INPUT); + + igfs.mkdirs(inDir); + + IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input"); + + generateTestFile(inFile.toString(), "red", red, "blue", blue, "green", green, "yellow", yellow); + + boolean useNewMapper = (useNewBits & 1) == 0; + boolean useNewCombiner = (useNewBits & 2) == 0; + boolean useNewReducer = (useNewBits & 4) == 0; + + for (int i = 0; i < 12; i++) { + int bits = 1 << i; + + System.out.println("############################ Simulator kind = " + simulatorKind + + ", Stage bits = " + bits); + + HadoopErrorSimulator sim = HadoopErrorSimulator.create(simulatorKind, bits); + + doTestWithErrorSimulator(sim, inFile, useNewMapper, useNewCombiner, useNewReducer); + } + } catch (Throwable t) { + t.printStackTrace(); + + fail("Unexpected throwable: " + t); + } + } + + /** + * Performs test with given error simulator. + * + * @param sim The simulator. + * @param inFile Input file. + * @param useNewMapper If the use new mapper API. + * @param useNewCombiner If to use new combiner. + * @param useNewReducer If to use new reducer API. + * @throws Exception If failed. + */ + private void doTestWithErrorSimulator(HadoopErrorSimulator sim, IgfsPath inFile, boolean useNewMapper, + boolean useNewCombiner, boolean useNewReducer) throws Exception { + // Set real simulating error simulator: + assertTrue(HadoopErrorSimulator.setInstance(HadoopErrorSimulator.noopInstance, sim)); + + try { + // Expect failure there: + doTest(inFile, useNewMapper, useNewCombiner, useNewReducer); + } + catch (Throwable t) { // This may be an Error. + // Expected: + System.out.println(t.toString()); // Ignore, continue the test. + } + + // Set no-op error simulator: + assertTrue(HadoopErrorSimulator.setInstance(sim, HadoopErrorSimulator.noopInstance)); + + // Expect success there: + doTest(inFile, useNewMapper, useNewCombiner, useNewReducer); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceTest.java new file mode 100644 index 0000000..feccb59 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl; + +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount2; + +/** + * Test of whole cycle of map-reduce processing via Job tracker. + */ +public class HadoopMapReduceTest extends HadoopAbstractMapReduceTest { + /** + * Tests whole job execution with all phases in all combination of new and old versions of API. + * @throws Exception If fails. + */ + public void testWholeMapReduceExecution() throws Exception { + IgfsPath inDir = new IgfsPath(PATH_INPUT); + + igfs.mkdirs(inDir); + + IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input"); + + generateTestFile(inFile.toString(), "red", red, "blue", blue, "green", green, "yellow", yellow ); + + for (boolean[] apiMode: getApiModes()) { + assert apiMode.length == 3; + + boolean useNewMapper = apiMode[0]; + boolean useNewCombiner = apiMode[1]; + boolean useNewReducer = apiMode[2]; + + doTest(inFile, useNewMapper, useNewCombiner, useNewReducer); + } + } + + /** + * Gets API mode combinations to be tested. + * Each boolean[] is { newMapper, newCombiner, newReducer } flag triplet. + * + * @return Arrays of booleans indicating API combinations to test. + */ + protected boolean[][] getApiModes() { + return new boolean[][] { + { false, false, false }, + { false, false, true }, + { false, true, false }, + { true, false, false }, + { true, true, true }, + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopNoHadoopMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopNoHadoopMapReduceTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopNoHadoopMapReduceTest.java new file mode 100644 index 0000000..3bb8735 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopNoHadoopMapReduceTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl; + +import org.apache.ignite.configuration.IgniteConfiguration; + +/** + * Test attempt to execute a map-reduce task while no Hadoop processor available. + */ +public class HadoopNoHadoopMapReduceTest extends HadoopMapReduceTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + c.setHadoopConfiguration(null); + c.setPeerClassLoadingEnabled(true); + + return c; + } + + /** {@inheritDoc} */ + @Override public void testWholeMapReduceExecution() throws Exception { + try { + super.testWholeMapReduceExecution(); + + fail("IllegalStateException expected."); + } + catch (IllegalStateException ignore) { + // No-op. + } + } +}