IGNITE-4054: Hadoop: added map-reduce plan debug output.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/43ac3f5d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/43ac3f5d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/43ac3f5d Branch: refs/heads/ignite-ssl-hotfix Commit: 43ac3f5d5e8ab664e07f26d99be34f284f7941dd Parents: 80abd1b Author: vozerov-gridgain <voze...@gridgain.com> Authored: Mon Oct 17 11:26:12 2016 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Mon Oct 17 11:26:12 2016 +0300 ---------------------------------------------------------------------- .../hadoop/HadoopMapReducePlanner.java | 1 + .../processors/hadoop/HadoopExternalSplit.java | 8 +++ .../processors/hadoop/HadoopSplitWrapper.java | 9 +++ .../hadoop/jobtracker/HadoopJobTracker.java | 61 ++++++++++++++++++++ 4 files changed, 79 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/43ac3f5d/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java index 185994f..0009c4a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java @@ -33,6 +33,7 @@ public interface HadoopMapReducePlanner { * @param top Topology. * @param oldPlan Old plan in case of partial failure. * @return Map reduce plan. + * @throws IgniteCheckedException If an error occurs. */ public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top, @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException; http://git-wip-us.apache.org/repos/asf/ignite/blob/43ac3f5d/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java index bd767b3..a9b4532 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java @@ -17,10 +17,13 @@ package org.apache.ignite.internal.processors.hadoop; +import org.apache.ignite.internal.util.typedef.internal.S; + import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import java.util.Arrays; /** * Split serialized in external file. @@ -85,4 +88,9 @@ public class HadoopExternalSplit extends HadoopInputSplit { @Override public int hashCode() { return (int)(off ^ (off >>> 32)); } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(HadoopExternalSplit.class, this, "hosts", Arrays.toString(hosts)); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/43ac3f5d/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java index 511aa5a..fb6d0f3 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java @@ -20,7 +20,10 @@ package org.apache.ignite.internal.processors.hadoop; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import java.util.Arrays; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; /** @@ -33,6 +36,7 @@ public class HadoopSplitWrapper extends HadoopInputSplit { private static final long serialVersionUID = 0L; /** Native hadoop input split. */ + @GridToStringExclude private byte[] bytes; /** */ @@ -116,4 +120,9 @@ public class HadoopSplitWrapper extends HadoopInputSplit { @Override public int hashCode() { return id; } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(HadoopSplitWrapper.class, this, "hosts", Arrays.toString(hosts)); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/43ac3f5d/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java index bffb82b..36782bf 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java @@ -75,6 +75,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteInClosure; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; @@ -313,6 +314,8 @@ public class HadoopJobTracker extends HadoopComponent { HadoopMapReducePlan mrPlan = mrPlanner.preparePlan(job, ctx.nodes(), null); + logPlan(info, mrPlan); + HadoopJobMetadata meta = new HadoopJobMetadata(ctx.localNodeId(), jobId, info); meta.mapReducePlan(mrPlan); @@ -354,6 +357,64 @@ public class HadoopJobTracker extends HadoopComponent { } /** + * Log map-reduce plan if needed. + * + * @param info Job info. + * @param plan Plan. + */ + @SuppressWarnings("StringConcatenationInsideStringBufferAppend") + private void logPlan(HadoopJobInfo info, HadoopMapReducePlan plan) { + if (log.isDebugEnabled()) { + Map<UUID, IgniteBiTuple<Collection<HadoopInputSplit>, int[]>> map = new HashMap<>(); + + for (UUID nodeId : plan.mapperNodeIds()) + map.put(nodeId, new IgniteBiTuple<Collection<HadoopInputSplit>, int[]>(plan.mappers(nodeId), null)); + + for (UUID nodeId : plan.reducerNodeIds()) { + int[] reducers = plan.reducers(nodeId); + + IgniteBiTuple<Collection<HadoopInputSplit>, int[]> entry = map.get(nodeId); + + if (entry == null) + map.put(nodeId, new IgniteBiTuple<Collection<HadoopInputSplit>, int[]>(null, reducers)); + else + entry.set2(reducers); + } + + StringBuilder details = new StringBuilder("["); + + boolean first = true; + + for (Map.Entry<UUID, IgniteBiTuple<Collection<HadoopInputSplit>, int[]>> entry : map.entrySet()) { + if (first) + first = false; + else + details.append(", "); + + UUID nodeId = entry.getKey(); + + Collection<HadoopInputSplit> mappers = entry.getValue().get1(); + + if (mappers == null) + mappers = Collections.emptyList(); + + int[] reducers = entry.getValue().get2(); + + if (reducers == null) + reducers = new int[0]; + + details.append("[nodeId=" + nodeId + ", mappers=" + mappers.size() + ", reducers=" + reducers.length + + ", mapperDetails=" + mappers + ", reducerDetails=" + Arrays.toString(reducers) + ']'); + } + + details.append(']'); + + log.debug("Prepared map-reduce plan [jobName=" + info.jobName() + ", mappers=" + plan.mappers() + + ", reducers=" + plan.reducers() + ", details=" + details + ']'); + } + } + + /** * Convert Hadoop job metadata to job status. * * @param meta Metadata.