http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java new file mode 100644 index 0000000..17c2ff5 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor; +import org.apache.ignite.internal.util.nio.GridNioFuture; +import org.apache.ignite.internal.util.nio.GridNioSession; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Grid client for NIO server. + */ +public class HadoopTcpNioCommunicationClient extends HadoopAbstractCommunicationClient { + /** Socket. */ + private final GridNioSession ses; + + /** + * Constructor for test purposes only. + */ + public HadoopTcpNioCommunicationClient() { + ses = null; + } + + /** + * @param ses Session. + */ + public HadoopTcpNioCommunicationClient(GridNioSession ses) { + assert ses != null; + + this.ses = ses; + } + + /** {@inheritDoc} */ + @Override public boolean close() { + boolean res = super.close(); + + if (res) + ses.close(); + + return res; + } + + /** {@inheritDoc} */ + @Override public void forceClose() { + super.forceClose(); + + ses.close(); + } + + /** {@inheritDoc} */ + @Override public void sendMessage(HadoopProcessDescriptor desc, HadoopMessage msg) + throws IgniteCheckedException { + if (closed()) + throw new IgniteCheckedException("Client was closed: " + this); + + GridNioFuture<?> fut = ses.send(msg); + + if (fut.isDone()) + fut.get(); + } + + /** {@inheritDoc} */ + @Override public long getIdleTime() { + long now = U.currentTimeMillis(); + + // Session can be used for receiving and sending. + return Math.min(Math.min(now - ses.lastReceiveTime(), now - ses.lastSendScheduleTime()), + now - ses.lastSendTime()); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopTcpNioCommunicationClient.class, this, super.toString()); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1CleanupTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1CleanupTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1CleanupTask.java new file mode 100644 index 0000000..750b314 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1CleanupTask.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import java.io.IOException; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.JobStatus; +import org.apache.hadoop.mapred.OutputCommitter; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; +import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2TaskContext; + +/** + * Hadoop cleanup task implementation for v1 API. + */ +public class HadoopV1CleanupTask extends HadoopV1Task { + /** Abort flag. */ + private final boolean abort; + + /** + * @param taskInfo Task info. + * @param abort Abort flag. + */ + public HadoopV1CleanupTask(HadoopTaskInfo taskInfo, boolean abort) { + super(taskInfo); + + this.abort = abort; + } + + /** {@inheritDoc} */ + @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException { + HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; + + JobContext jobCtx = ctx.jobContext(); + + try { + OutputCommitter committer = jobCtx.getJobConf().getOutputCommitter(); + + if (abort) + committer.abortJob(jobCtx, JobStatus.State.FAILED); + else + committer.commitJob(jobCtx); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Counter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Counter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Counter.java new file mode 100644 index 0000000..c623eab --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Counter.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter; +import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Counter; + +import static org.apache.hadoop.mapreduce.util.CountersStrings.toEscapedCompactString; + +/** + * Hadoop counter implementation for v1 API. + */ +public class HadoopV1Counter extends Counters.Counter { + /** Delegate. */ + private final HadoopLongCounter cntr; + + /** + * Creates new instance. + * + * @param cntr Delegate counter. + */ + public HadoopV1Counter(HadoopLongCounter cntr) { + this.cntr = cntr; + } + + /** {@inheritDoc} */ + @Override public void setDisplayName(String displayName) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String getName() { + return cntr.name(); + } + + /** {@inheritDoc} */ + @Override public String getDisplayName() { + return getName(); + } + + /** {@inheritDoc} */ + @Override public long getValue() { + return cntr.value(); + } + + /** {@inheritDoc} */ + @Override public void setValue(long val) { + cntr.value(val); + } + + /** {@inheritDoc} */ + @Override public void increment(long incr) { + cntr.increment(incr); + } + + /** {@inheritDoc} */ + @Override public void write(DataOutput out) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } + + /** {@inheritDoc} */ + @Override public void readFields(DataInput in) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } + + /** {@inheritDoc} */ + @Override public String makeEscapedCompactString() { + return toEscapedCompactString(new HadoopV2Counter(cntr)); + } + + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override public boolean contentEquals(Counters.Counter cntr) { + return getUnderlyingCounter().equals(cntr.getUnderlyingCounter()); + } + + /** {@inheritDoc} */ + @Override public long getCounter() { + return cntr.value(); + } + + /** {@inheritDoc} */ + @Override public Counter getUnderlyingCounter() { + return this; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java new file mode 100644 index 0000000..b42b20d --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; +import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; +import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopTaskCancelledException; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; +import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2TaskContext; + +/** + * Hadoop map task implementation for v1 API. + */ +public class HadoopV1MapTask extends HadoopV1Task { + /** */ + private static final String[] EMPTY_HOSTS = new String[0]; + + /** + * Constructor. + * + * @param taskInfo + */ + public HadoopV1MapTask(HadoopTaskInfo taskInfo) { + super(taskInfo); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException { + HadoopJob job = taskCtx.job(); + + HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; + + JobConf jobConf = ctx.jobConf(); + + InputFormat inFormat = jobConf.getInputFormat(); + + HadoopInputSplit split = info().inputSplit(); + + InputSplit nativeSplit; + + if (split instanceof HadoopFileBlock) { + HadoopFileBlock block = (HadoopFileBlock)split; + + nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS); + } + else + nativeSplit = (InputSplit)ctx.getNativeSplit(split); + + assert nativeSplit != null; + + Reporter reporter = new HadoopV1Reporter(taskCtx); + + HadoopV1OutputCollector collector = null; + + try { + collector = collector(jobConf, ctx, !job.info().hasCombiner() && !job.info().hasReducer(), + fileName(), ctx.attemptId()); + + RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter); + + Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf); + + Object key = reader.createKey(); + Object val = reader.createValue(); + + assert mapper != null; + + try { + try { + while (reader.next(key, val)) { + if (isCancelled()) + throw new HadoopTaskCancelledException("Map task cancelled."); + + mapper.map(key, val, collector, reporter); + } + } + finally { + mapper.close(); + } + } + finally { + collector.closeWriter(); + } + + collector.commit(); + } + catch (Exception e) { + if (collector != null) + collector.abort(); + + throw new IgniteCheckedException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1OutputCollector.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1OutputCollector.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1OutputCollector.java new file mode 100644 index 0000000..37f81a6 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1OutputCollector.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import java.io.IOException; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.OutputCommitter; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.hadoop.mapred.TaskAttemptContextImpl; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.jetbrains.annotations.Nullable; + +/** + * Hadoop output collector. + */ +public class HadoopV1OutputCollector implements OutputCollector { + /** Job configuration. */ + private final JobConf jobConf; + + /** Task context. */ + private final HadoopTaskContext taskCtx; + + /** Optional direct writer. */ + private final RecordWriter writer; + + /** Task attempt. */ + private final TaskAttemptID attempt; + + /** + * @param jobConf Job configuration. + * @param taskCtx Task context. + * @param directWrite Direct write flag. + * @param fileName File name. + * @throws IOException In case of IO exception. + */ + HadoopV1OutputCollector(JobConf jobConf, HadoopTaskContext taskCtx, boolean directWrite, + @Nullable String fileName, TaskAttemptID attempt) throws IOException { + this.jobConf = jobConf; + this.taskCtx = taskCtx; + this.attempt = attempt; + + if (directWrite) { + jobConf.set("mapreduce.task.attempt.id", attempt.toString()); + + OutputFormat outFormat = jobConf.getOutputFormat(); + + writer = outFormat.getRecordWriter(null, jobConf, fileName, Reporter.NULL); + } + else + writer = null; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void collect(Object key, Object val) throws IOException { + if (writer != null) + writer.write(key, val); + else { + try { + taskCtx.output().write(key, val); + } + catch (IgniteCheckedException e) { + throw new IOException(e); + } + } + } + + /** + * Close writer. + * + * @throws IOException In case of IO exception. + */ + public void closeWriter() throws IOException { + if (writer != null) + writer.close(Reporter.NULL); + } + + /** + * Setup task. + * + * @throws IOException If failed. + */ + public void setup() throws IOException { + if (writer != null) + jobConf.getOutputCommitter().setupTask(new TaskAttemptContextImpl(jobConf, attempt)); + } + + /** + * Commit task. + * + * @throws IOException In failed. + */ + public void commit() throws IOException { + if (writer != null) { + OutputCommitter outputCommitter = jobConf.getOutputCommitter(); + + TaskAttemptContext taskCtx = new TaskAttemptContextImpl(jobConf, attempt); + + if (outputCommitter.needsTaskCommit(taskCtx)) + outputCommitter.commitTask(taskCtx); + } + } + + /** + * Abort task. + */ + public void abort() { + try { + if (writer != null) + jobConf.getOutputCommitter().abortTask(new TaskAttemptContextImpl(jobConf, attempt)); + } + catch (IOException ignore) { + // No-op. + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Partitioner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Partitioner.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Partitioner.java new file mode 100644 index 0000000..0ab1bba --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Partitioner.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.Partitioner; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner; + +/** + * Hadoop partitioner adapter for v1 API. + */ +public class HadoopV1Partitioner implements HadoopPartitioner { + /** Partitioner instance. */ + private Partitioner<Object, Object> part; + + /** + * @param cls Hadoop partitioner class. + * @param conf Job configuration. + */ + public HadoopV1Partitioner(Class<? extends Partitioner> cls, Configuration conf) { + part = (Partitioner<Object, Object>) ReflectionUtils.newInstance(cls, conf); + } + + /** {@inheritDoc} */ + @Override public int partition(Object key, Object val, int parts) { + return part.getPartition(key, val, parts); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java new file mode 100644 index 0000000..f9920ec --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopTaskCancelledException; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput; +import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2TaskContext; + +/** + * Hadoop reduce task implementation for v1 API. + */ +public class HadoopV1ReduceTask extends HadoopV1Task { + /** {@code True} if reduce, {@code false} if combine. */ + private final boolean reduce; + + /** + * Constructor. + * + * @param taskInfo Task info. + * @param reduce {@code True} if reduce, {@code false} if combine. + */ + public HadoopV1ReduceTask(HadoopTaskInfo taskInfo, boolean reduce) { + super(taskInfo); + + this.reduce = reduce; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException { + HadoopJob job = taskCtx.job(); + + HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; + + JobConf jobConf = ctx.jobConf(); + + HadoopTaskInput input = taskCtx.input(); + + HadoopV1OutputCollector collector = null; + + try { + collector = collector(jobConf, ctx, reduce || !job.info().hasReducer(), fileName(), ctx.attemptId()); + + Reducer reducer; + if (reduce) reducer = ReflectionUtils.newInstance(jobConf.getReducerClass(), + jobConf); + else reducer = ReflectionUtils.newInstance(jobConf.getCombinerClass(), + jobConf); + + assert reducer != null; + + try { + try { + while (input.next()) { + if (isCancelled()) + throw new HadoopTaskCancelledException("Reduce task cancelled."); + + reducer.reduce(input.key(), input.values(), collector, Reporter.NULL); + } + } + finally { + reducer.close(); + } + } + finally { + collector.closeWriter(); + } + + collector.commit(); + } + catch (Exception e) { + if (collector != null) + collector.abort(); + + throw new IgniteCheckedException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Reporter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Reporter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Reporter.java new file mode 100644 index 0000000..5a63aab --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Reporter.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.Reporter; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter; + +/** + * Hadoop reporter implementation for v1 API. + */ +public class HadoopV1Reporter implements Reporter { + /** Context. */ + private final HadoopTaskContext ctx; + + /** + * Creates new instance. + * + * @param ctx Context. + */ + public HadoopV1Reporter(HadoopTaskContext ctx) { + this.ctx = ctx; + } + + /** {@inheritDoc} */ + @Override public void setStatus(String status) { + // TODO + } + + /** {@inheritDoc} */ + @Override public Counters.Counter getCounter(Enum<?> name) { + return getCounter(name.getDeclaringClass().getName(), name.name()); + } + + /** {@inheritDoc} */ + @Override public Counters.Counter getCounter(String grp, String name) { + return new HadoopV1Counter(ctx.counter(grp, name, HadoopLongCounter.class)); + } + + /** {@inheritDoc} */ + @Override public void incrCounter(Enum<?> key, long amount) { + getCounter(key).increment(amount); + } + + /** {@inheritDoc} */ + @Override public void incrCounter(String grp, String cntr, long amount) { + getCounter(grp, cntr).increment(amount); + } + + /** {@inheritDoc} */ + @Override public InputSplit getInputSplit() throws UnsupportedOperationException { + throw new UnsupportedOperationException("reporter has no input"); // TODO + } + + /** {@inheritDoc} */ + @Override public float getProgress() { + return 0.5f; // TODO + } + + /** {@inheritDoc} */ + @Override public void progress() { + // TODO + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1SetupTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1SetupTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1SetupTask.java new file mode 100644 index 0000000..d2f6823 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1SetupTask.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import java.io.IOException; +import org.apache.hadoop.mapred.OutputCommitter; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; +import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2TaskContext; + +/** + * Hadoop setup task implementation for v1 API. + */ +public class HadoopV1SetupTask extends HadoopV1Task { + /** + * Constructor. + * + * @param taskInfo Task info. + */ + public HadoopV1SetupTask(HadoopTaskInfo taskInfo) { + super(taskInfo); + } + + /** {@inheritDoc} */ + @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException { + HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; + + try { + ctx.jobConf().getOutputFormat().checkOutputSpecs(null, ctx.jobConf()); + + OutputCommitter committer = ctx.jobConf().getOutputCommitter(); + + if (committer != null) + committer.setupJob(ctx.jobContext()); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Splitter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Splitter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Splitter.java new file mode 100644 index 0000000..8c730e7 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Splitter.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; +import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +/** + * Hadoop API v1 splitter. + */ +public class HadoopV1Splitter { + /** */ + private static final String[] EMPTY_HOSTS = {}; + + /** + * @param jobConf Job configuration. + * @return Collection of mapped splits. + * @throws IgniteCheckedException If mapping failed. + */ + public static Collection<HadoopInputSplit> splitJob(JobConf jobConf) throws IgniteCheckedException { + try { + InputFormat<?, ?> format = jobConf.getInputFormat(); + + assert format != null; + + InputSplit[] splits = format.getSplits(jobConf, 0); + + Collection<HadoopInputSplit> res = new ArrayList<>(splits.length); + + for (int i = 0; i < splits.length; i++) { + InputSplit nativeSplit = splits[i]; + + if (nativeSplit instanceof FileSplit) { + FileSplit s = (FileSplit)nativeSplit; + + res.add(new HadoopFileBlock(s.getLocations(), s.getPath().toUri(), s.getStart(), s.getLength())); + } + else + res.add(HadoopUtils.wrapSplit(i, nativeSplit, nativeSplit.getLocations())); + } + + return res; + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } + + /** + * @param clsName Input split class name. + * @param in Input stream. + * @param hosts Optional hosts. + * @return File block or {@code null} if it is not a {@link FileSplit} instance. + * @throws IgniteCheckedException If failed. + */ + @Nullable public static HadoopFileBlock readFileBlock(String clsName, FSDataInputStream in, + @Nullable String[] hosts) throws IgniteCheckedException { + if (!FileSplit.class.getName().equals(clsName)) + return null; + + FileSplit split = U.newInstance(FileSplit.class); + + try { + split.readFields(in); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + + if (hosts == null) + hosts = EMPTY_HOSTS; + + return new HadoopFileBlock(hosts, split.getPath().toUri(), split.getStart(), split.getLength()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Task.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Task.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Task.java new file mode 100644 index 0000000..f695874 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Task.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import java.io.IOException; +import java.text.NumberFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.ignite.internal.processors.hadoop.HadoopTask; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopTaskCancelledException; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; +import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2TaskContext; +import org.jetbrains.annotations.Nullable; + +/** + * Extended Hadoop v1 task. + */ +public abstract class HadoopV1Task extends HadoopTask { + /** Indicates that this task is to be cancelled. */ + private volatile boolean cancelled; + + /** + * Constructor. + * + * @param taskInfo Task info. + */ + protected HadoopV1Task(HadoopTaskInfo taskInfo) { + super(taskInfo); + } + + /** + * Gets file name for that task result. + * + * @return File name. + */ + public String fileName() { + NumberFormat numFormat = NumberFormat.getInstance(); + + numFormat.setMinimumIntegerDigits(5); + numFormat.setGroupingUsed(false); + + return "part-" + numFormat.format(info().taskNumber()); + } + + /** + * + * @param jobConf Job configuration. + * @param taskCtx Task context. + * @param directWrite Direct write flag. + * @param fileName File name. + * @param attempt Attempt of task. + * @return Collector. + * @throws IOException In case of IO exception. + */ + protected HadoopV1OutputCollector collector(JobConf jobConf, HadoopV2TaskContext taskCtx, + boolean directWrite, @Nullable String fileName, TaskAttemptID attempt) throws IOException { + HadoopV1OutputCollector collector = new HadoopV1OutputCollector(jobConf, taskCtx, directWrite, + fileName, attempt) { + /** {@inheritDoc} */ + @Override public void collect(Object key, Object val) throws IOException { + if (cancelled) + throw new HadoopTaskCancelledException("Task cancelled."); + + super.collect(key, val); + } + }; + + collector.setup(); + + return collector; + } + + /** {@inheritDoc} */ + @Override public void cancel() { + cancelled = true; + } + + /** Returns true if task is cancelled. */ + public boolean isCancelled() { + return cancelled; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopDaemon.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopDaemon.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopDaemon.java new file mode 100644 index 0000000..9632525 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopDaemon.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import java.util.Collection; +import java.util.LinkedList; + +/** + * Replacement for Hadoop {@code org.apache.hadoop.util.Daemon} class. + */ +@SuppressWarnings("UnusedDeclaration") +public class HadoopDaemon extends Thread { + /** Lock object used for synchronization. */ + private static final Object lock = new Object(); + + /** Collection to hold the threads to be stopped. */ + private static Collection<HadoopDaemon> daemons = new LinkedList<>(); + + { + setDaemon(true); // always a daemon + } + + /** Runnable of this thread, may be this. */ + final Runnable runnable; + + /** + * Construct a daemon thread. + */ + public HadoopDaemon() { + super(); + + runnable = this; + + enqueueIfNeeded(); + } + + /** + * Construct a daemon thread. + */ + public HadoopDaemon(Runnable runnable) { + super(runnable); + + this.runnable = runnable; + + this.setName(runnable.toString()); + + enqueueIfNeeded(); + } + + /** + * Construct a daemon thread to be part of a specified thread group. + */ + public HadoopDaemon(ThreadGroup grp, Runnable runnable) { + super(grp, runnable); + + this.runnable = runnable; + + this.setName(runnable.toString()); + + enqueueIfNeeded(); + } + + /** + * Getter for the runnable. May return this. + * + * @return the runnable + */ + public Runnable getRunnable() { + return runnable; + } + + /** + * if the runnable is a Hadoop org.apache.hadoop.hdfs.PeerCache Runnable. + * + * @param r the runnable. + * @return true if it is. + */ + private static boolean isPeerCacheRunnable(Runnable r) { + String name = r.getClass().getName(); + + return name.startsWith("org.apache.hadoop.hdfs.PeerCache"); + } + + /** + * Enqueue this thread if it should be stopped upon the task end. + */ + private void enqueueIfNeeded() { + synchronized (lock) { + if (daemons == null) + throw new RuntimeException("Failed to create HadoopDaemon (its registry is already cleared): " + + "[classLoader=" + getClass().getClassLoader() + ']'); + + if (runnable.getClass().getClassLoader() == getClass().getClassLoader() && isPeerCacheRunnable(runnable)) + daemons.add(this); + } + } + + /** + * Stops all the registered threads. + */ + public static void dequeueAndStopAll() { + synchronized (lock) { + if (daemons != null) { + for (HadoopDaemon daemon : daemons) + daemon.interrupt(); + + daemons = null; + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopExternalSplit.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopExternalSplit.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopExternalSplit.java new file mode 100644 index 0000000..c7e8a0a --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopExternalSplit.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; + +/** + * Split serialized in external file. + */ +public class HadoopExternalSplit extends HadoopInputSplit { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long off; + + /** + * For {@link Externalizable}. + */ + public HadoopExternalSplit() { + // No-op. + } + + /** + * @param hosts Hosts. + * @param off Offset of this split in external file. + */ + public HadoopExternalSplit(String[] hosts, long off) { + assert off >= 0 : off; + assert hosts != null; + + this.hosts = hosts; + this.off = off; + } + + /** + * @return Offset of this input split in external file. + */ + public long offset() { + return off; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(off); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + off = in.readLong(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + HadoopExternalSplit that = (HadoopExternalSplit) o; + + return off == that.off; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return (int)(off ^ (off >>> 32)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopSerializationWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopSerializationWrapper.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopSerializationWrapper.java new file mode 100644 index 0000000..844e7f8 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopSerializationWrapper.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.hadoop.io.serializer.Deserializer; +import org.apache.hadoop.io.serializer.Serialization; +import org.apache.hadoop.io.serializer.Serializer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopSerialization; +import org.jetbrains.annotations.Nullable; + +/** + * The wrapper around external serializer. + */ +public class HadoopSerializationWrapper<T> implements HadoopSerialization { + /** External serializer - writer. */ + private final Serializer<T> serializer; + + /** External serializer - reader. */ + private final Deserializer<T> deserializer; + + /** Data output for current write operation. */ + private OutputStream currOut; + + /** Data input for current read operation. */ + private InputStream currIn; + + /** Wrapper around current output to provide OutputStream interface. */ + private final OutputStream outStream = new OutputStream() { + /** {@inheritDoc} */ + @Override public void write(int b) throws IOException { + currOut.write(b); + } + + /** {@inheritDoc} */ + @Override public void write(byte[] b, int off, int len) throws IOException { + currOut.write(b, off, len); + } + }; + + /** Wrapper around current input to provide InputStream interface. */ + private final InputStream inStream = new InputStream() { + /** {@inheritDoc} */ + @Override public int read() throws IOException { + return currIn.read(); + } + + /** {@inheritDoc} */ + @Override public int read(byte[] b, int off, int len) throws IOException { + return currIn.read(b, off, len); + } + }; + + /** + * @param serialization External serializer to wrap. + * @param cls The class to serialize. + */ + public HadoopSerializationWrapper(Serialization<T> serialization, Class<T> cls) throws IgniteCheckedException { + assert cls != null; + + serializer = serialization.getSerializer(cls); + deserializer = serialization.getDeserializer(cls); + + try { + serializer.open(outStream); + deserializer.open(inStream); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } + + /** {@inheritDoc} */ + @Override public void write(DataOutput out, Object obj) throws IgniteCheckedException { + assert out != null; + assert obj != null; + + try { + currOut = (OutputStream)out; + + serializer.serialize((T)obj); + + currOut = null; + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } + + /** {@inheritDoc} */ + @Override public Object read(DataInput in, @Nullable Object obj) throws IgniteCheckedException { + assert in != null; + + try { + currIn = (InputStream)in; + + T res = deserializer.deserialize((T) obj); + + currIn = null; + + return res; + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteCheckedException { + try { + serializer.close(); + deserializer.close(); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopShutdownHookManager.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopShutdownHookManager.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopShutdownHookManager.java new file mode 100644 index 0000000..8bd71e0 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopShutdownHookManager.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Fake manager for shutdown hooks. + */ +public class HadoopShutdownHookManager { + /** */ + private static final HadoopShutdownHookManager MGR = new HadoopShutdownHookManager(); + + /** + * Return <code>ShutdownHookManager</code> singleton. + * + * @return <code>ShutdownHookManager</code> singleton. + */ + public static HadoopShutdownHookManager get() { + return MGR; + } + + /** */ + private Set<Runnable> hooks = Collections.synchronizedSet(new HashSet<Runnable>()); + + /** */ + private AtomicBoolean shutdownInProgress = new AtomicBoolean(false); + + /** + * Singleton. + */ + private HadoopShutdownHookManager() { + // No-op. + } + + /** + * Adds a shutdownHook with a priority, the higher the priority + * the earlier will run. ShutdownHooks with same priority run + * in a non-deterministic order. + * + * @param shutdownHook shutdownHook <code>Runnable</code> + * @param priority priority of the shutdownHook. + */ + public void addShutdownHook(Runnable shutdownHook, int priority) { + if (shutdownHook == null) + throw new IllegalArgumentException("shutdownHook cannot be NULL"); + + hooks.add(shutdownHook); + } + + /** + * Removes a shutdownHook. + * + * @param shutdownHook shutdownHook to remove. + * @return TRUE if the shutdownHook was registered and removed, + * FALSE otherwise. + */ + public boolean removeShutdownHook(Runnable shutdownHook) { + return hooks.remove(shutdownHook); + } + + /** + * Indicates if a shutdownHook is registered or not. + * + * @param shutdownHook shutdownHook to check if registered. + * @return TRUE/FALSE depending if the shutdownHook is is registered. + */ + public boolean hasShutdownHook(Runnable shutdownHook) { + return hooks.contains(shutdownHook); + } + + /** + * Indicates if shutdown is in progress or not. + * + * @return TRUE if the shutdown is in progress, otherwise FALSE. + */ + public boolean isShutdownInProgress() { + return shutdownInProgress.get(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopSplitWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopSplitWrapper.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopSplitWrapper.java new file mode 100644 index 0000000..df77adb --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopSplitWrapper.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * The wrapper for native hadoop input splits. + * + * Warning!! This class must not depend on any Hadoop classes directly or indirectly. + */ +public class HadoopSplitWrapper extends HadoopInputSplit { + /** */ + private static final long serialVersionUID = 0L; + + /** Native hadoop input split. */ + private byte[] bytes; + + /** */ + private String clsName; + + /** Internal ID */ + private int id; + + /** + * Creates new split wrapper. + */ + public HadoopSplitWrapper() { + // No-op. + } + + /** + * Creates new split wrapper. + * + * @param id Split ID. + * @param clsName Class name. + * @param bytes Serialized class. + * @param hosts Hosts where split is located. + */ + public HadoopSplitWrapper(int id, String clsName, byte[] bytes, String[] hosts) { + assert hosts != null; + assert clsName != null; + assert bytes != null; + + this.hosts = hosts; + this.id = id; + + this.clsName = clsName; + this.bytes = bytes; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(id); + + out.writeUTF(clsName); + U.writeByteArray(out, bytes); + } + + /** + * @return Class name. + */ + public String className() { + return clsName; + } + + /** + * @return Class bytes. + */ + public byte[] bytes() { + return bytes; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + id = in.readInt(); + + clsName = in.readUTF(); + bytes = U.readByteArray(in); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + HadoopSplitWrapper that = (HadoopSplitWrapper)o; + + return id == that.id; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2CleanupTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2CleanupTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2CleanupTask.java new file mode 100644 index 0000000..abb904c --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2CleanupTask.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import java.io.IOException; +import org.apache.hadoop.mapred.JobContextImpl; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; + +/** + * Hadoop cleanup task (commits or aborts job). + */ +public class HadoopV2CleanupTask extends HadoopV2Task { + /** Abort flag. */ + private final boolean abort; + + /** + * @param taskInfo Task info. + * @param abort Abort flag. + */ + public HadoopV2CleanupTask(HadoopTaskInfo taskInfo, boolean abort) { + super(taskInfo); + + this.abort = abort; + } + + /** {@inheritDoc} */ + @SuppressWarnings("ConstantConditions") + @Override public void run0(HadoopV2TaskContext taskCtx) throws IgniteCheckedException { + JobContextImpl jobCtx = taskCtx.jobContext(); + + try { + OutputFormat outputFormat = getOutputFormat(jobCtx); + + OutputCommitter committer = outputFormat.getOutputCommitter(hadoopContext()); + + if (committer != null) { + if (abort) + committer.abortJob(jobCtx, JobStatus.State.FAILED); + else + committer.commitJob(jobCtx); + } + } + catch (ClassNotFoundException | IOException e) { + throw new IgniteCheckedException(e); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedCheckedException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java new file mode 100644 index 0000000..bc9a3ec --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import java.io.IOException; +import java.util.Iterator; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.MapContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.ReduceContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; +import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopTaskCancelledException; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter; + +/** + * Hadoop context implementation for v2 API. It provides IO operations for hadoop tasks. + */ +public class HadoopV2Context extends JobContextImpl implements MapContext, ReduceContext { + /** Input reader to overriding of HadoopTaskContext input. */ + private RecordReader reader; + + /** Output writer to overriding of HadoopTaskContext output. */ + private RecordWriter writer; + + /** Output is provided by executor environment. */ + private final HadoopTaskOutput output; + + /** Input is provided by executor environment. */ + private final HadoopTaskInput input; + + /** Unique identifier for a task attempt. */ + private final TaskAttemptID taskAttemptID; + + /** Indicates that this task is to be cancelled. */ + private volatile boolean cancelled; + + /** Input split. */ + private InputSplit inputSplit; + + /** */ + private final HadoopTaskContext ctx; + + /** */ + private String status; + + /** + * @param ctx Context for IO operations. + */ + public HadoopV2Context(HadoopV2TaskContext ctx) { + super(ctx.jobConf(), ctx.jobContext().getJobID()); + + taskAttemptID = ctx.attemptId(); + + conf.set("mapreduce.job.id", taskAttemptID.getJobID().toString()); + conf.set("mapreduce.task.id", taskAttemptID.getTaskID().toString()); + + output = ctx.output(); + input = ctx.input(); + + this.ctx = ctx; + } + + /** {@inheritDoc} */ + @Override public InputSplit getInputSplit() { + if (inputSplit == null) { + HadoopInputSplit split = ctx.taskInfo().inputSplit(); + + if (split == null) + return null; + + if (split instanceof HadoopFileBlock) { + HadoopFileBlock fileBlock = (HadoopFileBlock)split; + + inputSplit = new FileSplit(new Path(fileBlock.file()), fileBlock.start(), fileBlock.length(), null); + } + else + { + try { + inputSplit = (InputSplit) ((HadoopV2TaskContext)ctx).getNativeSplit(split); + } catch (IgniteCheckedException e) { + throw new IllegalStateException(e); + } + } + } + + return inputSplit; + } + + /** {@inheritDoc} */ + @Override public boolean nextKeyValue() throws IOException, InterruptedException { + if (cancelled) + throw new HadoopTaskCancelledException("Task cancelled."); + + return reader.nextKeyValue(); + } + + /** {@inheritDoc} */ + @Override public Object getCurrentKey() throws IOException, InterruptedException { + if (reader != null) + return reader.getCurrentKey(); + + return input.key(); + } + + /** {@inheritDoc} */ + @Override public Object getCurrentValue() throws IOException, InterruptedException { + return reader.getCurrentValue(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void write(Object key, Object val) throws IOException, InterruptedException { + if (cancelled) + throw new HadoopTaskCancelledException("Task cancelled."); + + if (writer != null) + writer.write(key, val); + else { + try { + output.write(key, val); + } + catch (IgniteCheckedException e) { + throw new IOException(e); + } + } + } + + /** {@inheritDoc} */ + @Override public OutputCommitter getOutputCommitter() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public TaskAttemptID getTaskAttemptID() { + return taskAttemptID; + } + + /** {@inheritDoc} */ + @Override public void setStatus(String msg) { + status = msg; + } + + /** {@inheritDoc} */ + @Override public String getStatus() { + return status; + } + + /** {@inheritDoc} */ + @Override public float getProgress() { + return 0.5f; // TODO + } + + /** {@inheritDoc} */ + @Override public Counter getCounter(Enum<?> cntrName) { + return getCounter(cntrName.getDeclaringClass().getName(), cntrName.name()); + } + + /** {@inheritDoc} */ + @Override public Counter getCounter(String grpName, String cntrName) { + return new HadoopV2Counter(ctx.counter(grpName, cntrName, HadoopLongCounter.class)); + } + + /** {@inheritDoc} */ + @Override public void progress() { + // No-op. + } + + /** + * Overrides default input data reader. + * + * @param reader New reader. + */ + public void reader(RecordReader reader) { + this.reader = reader; + } + + /** {@inheritDoc} */ + @Override public boolean nextKey() throws IOException, InterruptedException { + if (cancelled) + throw new HadoopTaskCancelledException("Task cancelled."); + + return input.next(); + } + + /** {@inheritDoc} */ + @Override public Iterable getValues() throws IOException, InterruptedException { + return new Iterable() { + @Override public Iterator iterator() { + return input.values(); + } + }; + } + + /** + * @return Overridden output data writer. + */ + public RecordWriter writer() { + return writer; + } + + /** + * Overrides default output data writer. + * + * @param writer New writer. + */ + public void writer(RecordWriter writer) { + this.writer = writer; + } + + /** + * Cancels the task by stop the IO. + */ + public void cancel() { + cancelled = true; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Counter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Counter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Counter.java new file mode 100644 index 0000000..cad9e64 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Counter.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter; + +/** + * Adapter from own counter implementation into Hadoop API Counter od version 2.0. + */ +public class HadoopV2Counter implements Counter { + /** Delegate. */ + private final HadoopLongCounter cntr; + + /** + * Creates new instance with given delegate. + * + * @param cntr Internal counter. + */ + public HadoopV2Counter(HadoopLongCounter cntr) { + assert cntr != null : "counter must be non-null"; + + this.cntr = cntr; + } + + /** {@inheritDoc} */ + @Override public void setDisplayName(String displayName) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String getName() { + return cntr.name(); + } + + /** {@inheritDoc} */ + @Override public String getDisplayName() { + return getName(); + } + + /** {@inheritDoc} */ + @Override public long getValue() { + return cntr.value(); + } + + /** {@inheritDoc} */ + @Override public void setValue(long val) { + cntr.value(val); + } + + /** {@inheritDoc} */ + @Override public void increment(long incr) { + cntr.increment(incr); + } + + /** {@inheritDoc} */ + @Override public Counter getUnderlyingCounter() { + return this; + } + + /** {@inheritDoc} */ + @Override public void write(DataOutput out) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } + + /** {@inheritDoc} */ + @Override public void readFields(DataInput in) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } +} \ No newline at end of file