http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java deleted file mode 100644 index 17c2ff5..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication; - -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor; -import org.apache.ignite.internal.util.nio.GridNioFuture; -import org.apache.ignite.internal.util.nio.GridNioSession; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; - -/** - * Grid client for NIO server. - */ -public class HadoopTcpNioCommunicationClient extends HadoopAbstractCommunicationClient { - /** Socket. */ - private final GridNioSession ses; - - /** - * Constructor for test purposes only. - */ - public HadoopTcpNioCommunicationClient() { - ses = null; - } - - /** - * @param ses Session. - */ - public HadoopTcpNioCommunicationClient(GridNioSession ses) { - assert ses != null; - - this.ses = ses; - } - - /** {@inheritDoc} */ - @Override public boolean close() { - boolean res = super.close(); - - if (res) - ses.close(); - - return res; - } - - /** {@inheritDoc} */ - @Override public void forceClose() { - super.forceClose(); - - ses.close(); - } - - /** {@inheritDoc} */ - @Override public void sendMessage(HadoopProcessDescriptor desc, HadoopMessage msg) - throws IgniteCheckedException { - if (closed()) - throw new IgniteCheckedException("Client was closed: " + this); - - GridNioFuture<?> fut = ses.send(msg); - - if (fut.isDone()) - fut.get(); - } - - /** {@inheritDoc} */ - @Override public long getIdleTime() { - long now = U.currentTimeMillis(); - - // Session can be used for receiving and sending. - return Math.min(Math.min(now - ses.lastReceiveTime(), now - ses.lastSendScheduleTime()), - now - ses.lastSendTime()); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopTcpNioCommunicationClient.class, this, super.toString()); - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java deleted file mode 100644 index 750b314..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v1; - -import java.io.IOException; -import org.apache.hadoop.mapred.JobContext; -import org.apache.hadoop.mapred.JobStatus; -import org.apache.hadoop.mapred.OutputCommitter; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; -import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2TaskContext; - -/** - * Hadoop cleanup task implementation for v1 API. - */ -public class HadoopV1CleanupTask extends HadoopV1Task { - /** Abort flag. */ - private final boolean abort; - - /** - * @param taskInfo Task info. - * @param abort Abort flag. - */ - public HadoopV1CleanupTask(HadoopTaskInfo taskInfo, boolean abort) { - super(taskInfo); - - this.abort = abort; - } - - /** {@inheritDoc} */ - @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException { - HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; - - JobContext jobCtx = ctx.jobContext(); - - try { - OutputCommitter committer = jobCtx.getJobConf().getOutputCommitter(); - - if (abort) - committer.abortJob(jobCtx, JobStatus.State.FAILED); - else - committer.commitJob(jobCtx); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java deleted file mode 100644 index c623eab..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v1; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import org.apache.hadoop.mapred.Counters; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter; -import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Counter; - -import static org.apache.hadoop.mapreduce.util.CountersStrings.toEscapedCompactString; - -/** - * Hadoop counter implementation for v1 API. - */ -public class HadoopV1Counter extends Counters.Counter { - /** Delegate. */ - private final HadoopLongCounter cntr; - - /** - * Creates new instance. - * - * @param cntr Delegate counter. - */ - public HadoopV1Counter(HadoopLongCounter cntr) { - this.cntr = cntr; - } - - /** {@inheritDoc} */ - @Override public void setDisplayName(String displayName) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public String getName() { - return cntr.name(); - } - - /** {@inheritDoc} */ - @Override public String getDisplayName() { - return getName(); - } - - /** {@inheritDoc} */ - @Override public long getValue() { - return cntr.value(); - } - - /** {@inheritDoc} */ - @Override public void setValue(long val) { - cntr.value(val); - } - - /** {@inheritDoc} */ - @Override public void increment(long incr) { - cntr.increment(incr); - } - - /** {@inheritDoc} */ - @Override public void write(DataOutput out) throws IOException { - throw new UnsupportedOperationException("not implemented"); - } - - /** {@inheritDoc} */ - @Override public void readFields(DataInput in) throws IOException { - throw new UnsupportedOperationException("not implemented"); - } - - /** {@inheritDoc} */ - @Override public String makeEscapedCompactString() { - return toEscapedCompactString(new HadoopV2Counter(cntr)); - } - - /** {@inheritDoc} */ - @SuppressWarnings("deprecation") - @Override public boolean contentEquals(Counters.Counter cntr) { - return getUnderlyingCounter().equals(cntr.getUnderlyingCounter()); - } - - /** {@inheritDoc} */ - @Override public long getCounter() { - return cntr.value(); - } - - /** {@inheritDoc} */ - @Override public Counter getUnderlyingCounter() { - return this; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java deleted file mode 100644 index fb2266a..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v1; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; -import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; -import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2TaskContext; - -/** - * Hadoop map task implementation for v1 API. - */ -public class HadoopV1MapTask extends HadoopV1Task { - /** */ - private static final String[] EMPTY_HOSTS = new String[0]; - - /** - * Constructor. - * - * @param taskInfo - */ - public HadoopV1MapTask(HadoopTaskInfo taskInfo) { - super(taskInfo); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException { - HadoopJob job = taskCtx.job(); - - HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; - - JobConf jobConf = ctx.jobConf(); - - InputFormat inFormat = jobConf.getInputFormat(); - - HadoopInputSplit split = info().inputSplit(); - - InputSplit nativeSplit; - - if (split instanceof HadoopFileBlock) { - HadoopFileBlock block = (HadoopFileBlock)split; - - nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS); - } - else - nativeSplit = (InputSplit)ctx.getNativeSplit(split); - - assert nativeSplit != null; - - Reporter reporter = new HadoopV1Reporter(taskCtx); - - HadoopV1OutputCollector collector = null; - - try { - collector = collector(jobConf, ctx, !job.info().hasCombiner() && !job.info().hasReducer(), - fileName(), ctx.attemptId()); - - RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter); - - Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf); - - Object key = reader.createKey(); - Object val = reader.createValue(); - - assert mapper != null; - - try { - try { - while (reader.next(key, val)) { - if (isCancelled()) - throw new HadoopTaskCancelledException("Map task cancelled."); - - mapper.map(key, val, collector, reporter); - } - } - finally { - mapper.close(); - } - } - finally { - collector.closeWriter(); - } - - collector.commit(); - } - catch (Exception e) { - if (collector != null) - collector.abort(); - - throw new IgniteCheckedException(e); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java deleted file mode 100644 index 37f81a6..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v1; - -import java.io.IOException; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.OutputCommitter; -import org.apache.hadoop.mapred.OutputFormat; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.TaskAttemptContext; -import org.apache.hadoop.mapred.TaskAttemptContextImpl; -import org.apache.hadoop.mapred.TaskAttemptID; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; -import org.jetbrains.annotations.Nullable; - -/** - * Hadoop output collector. - */ -public class HadoopV1OutputCollector implements OutputCollector { - /** Job configuration. */ - private final JobConf jobConf; - - /** Task context. */ - private final HadoopTaskContext taskCtx; - - /** Optional direct writer. */ - private final RecordWriter writer; - - /** Task attempt. */ - private final TaskAttemptID attempt; - - /** - * @param jobConf Job configuration. - * @param taskCtx Task context. - * @param directWrite Direct write flag. - * @param fileName File name. - * @throws IOException In case of IO exception. - */ - HadoopV1OutputCollector(JobConf jobConf, HadoopTaskContext taskCtx, boolean directWrite, - @Nullable String fileName, TaskAttemptID attempt) throws IOException { - this.jobConf = jobConf; - this.taskCtx = taskCtx; - this.attempt = attempt; - - if (directWrite) { - jobConf.set("mapreduce.task.attempt.id", attempt.toString()); - - OutputFormat outFormat = jobConf.getOutputFormat(); - - writer = outFormat.getRecordWriter(null, jobConf, fileName, Reporter.NULL); - } - else - writer = null; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void collect(Object key, Object val) throws IOException { - if (writer != null) - writer.write(key, val); - else { - try { - taskCtx.output().write(key, val); - } - catch (IgniteCheckedException e) { - throw new IOException(e); - } - } - } - - /** - * Close writer. - * - * @throws IOException In case of IO exception. - */ - public void closeWriter() throws IOException { - if (writer != null) - writer.close(Reporter.NULL); - } - - /** - * Setup task. - * - * @throws IOException If failed. - */ - public void setup() throws IOException { - if (writer != null) - jobConf.getOutputCommitter().setupTask(new TaskAttemptContextImpl(jobConf, attempt)); - } - - /** - * Commit task. - * - * @throws IOException In failed. - */ - public void commit() throws IOException { - if (writer != null) { - OutputCommitter outputCommitter = jobConf.getOutputCommitter(); - - TaskAttemptContext taskCtx = new TaskAttemptContextImpl(jobConf, attempt); - - if (outputCommitter.needsTaskCommit(taskCtx)) - outputCommitter.commitTask(taskCtx); - } - } - - /** - * Abort task. - */ - public void abort() { - try { - if (writer != null) - jobConf.getOutputCommitter().abortTask(new TaskAttemptContextImpl(jobConf, attempt)); - } - catch (IOException ignore) { - // No-op. - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java deleted file mode 100644 index 0ab1bba..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v1; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.Partitioner; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner; - -/** - * Hadoop partitioner adapter for v1 API. - */ -public class HadoopV1Partitioner implements HadoopPartitioner { - /** Partitioner instance. */ - private Partitioner<Object, Object> part; - - /** - * @param cls Hadoop partitioner class. - * @param conf Job configuration. - */ - public HadoopV1Partitioner(Class<? extends Partitioner> cls, Configuration conf) { - part = (Partitioner<Object, Object>) ReflectionUtils.newInstance(cls, conf); - } - - /** {@inheritDoc} */ - @Override public int partition(Object key, Object val, int parts) { - return part.getPartition(key, val, parts); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java deleted file mode 100644 index e656695..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v1; - -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput; -import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2TaskContext; - -/** - * Hadoop reduce task implementation for v1 API. - */ -public class HadoopV1ReduceTask extends HadoopV1Task { - /** {@code True} if reduce, {@code false} if combine. */ - private final boolean reduce; - - /** - * Constructor. - * - * @param taskInfo Task info. - * @param reduce {@code True} if reduce, {@code false} if combine. - */ - public HadoopV1ReduceTask(HadoopTaskInfo taskInfo, boolean reduce) { - super(taskInfo); - - this.reduce = reduce; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException { - HadoopJob job = taskCtx.job(); - - HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; - - JobConf jobConf = ctx.jobConf(); - - HadoopTaskInput input = taskCtx.input(); - - HadoopV1OutputCollector collector = null; - - try { - collector = collector(jobConf, ctx, reduce || !job.info().hasReducer(), fileName(), ctx.attemptId()); - - Reducer reducer; - if (reduce) reducer = ReflectionUtils.newInstance(jobConf.getReducerClass(), - jobConf); - else reducer = ReflectionUtils.newInstance(jobConf.getCombinerClass(), - jobConf); - - assert reducer != null; - - try { - try { - while (input.next()) { - if (isCancelled()) - throw new HadoopTaskCancelledException("Reduce task cancelled."); - - reducer.reduce(input.key(), input.values(), collector, Reporter.NULL); - } - } - finally { - reducer.close(); - } - } - finally { - collector.closeWriter(); - } - - collector.commit(); - } - catch (Exception e) { - if (collector != null) - collector.abort(); - - throw new IgniteCheckedException(e); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java deleted file mode 100644 index 5a63aab..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v1; - -import org.apache.hadoop.mapred.Counters; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.Reporter; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter; - -/** - * Hadoop reporter implementation for v1 API. - */ -public class HadoopV1Reporter implements Reporter { - /** Context. */ - private final HadoopTaskContext ctx; - - /** - * Creates new instance. - * - * @param ctx Context. - */ - public HadoopV1Reporter(HadoopTaskContext ctx) { - this.ctx = ctx; - } - - /** {@inheritDoc} */ - @Override public void setStatus(String status) { - // TODO - } - - /** {@inheritDoc} */ - @Override public Counters.Counter getCounter(Enum<?> name) { - return getCounter(name.getDeclaringClass().getName(), name.name()); - } - - /** {@inheritDoc} */ - @Override public Counters.Counter getCounter(String grp, String name) { - return new HadoopV1Counter(ctx.counter(grp, name, HadoopLongCounter.class)); - } - - /** {@inheritDoc} */ - @Override public void incrCounter(Enum<?> key, long amount) { - getCounter(key).increment(amount); - } - - /** {@inheritDoc} */ - @Override public void incrCounter(String grp, String cntr, long amount) { - getCounter(grp, cntr).increment(amount); - } - - /** {@inheritDoc} */ - @Override public InputSplit getInputSplit() throws UnsupportedOperationException { - throw new UnsupportedOperationException("reporter has no input"); // TODO - } - - /** {@inheritDoc} */ - @Override public float getProgress() { - return 0.5f; // TODO - } - - /** {@inheritDoc} */ - @Override public void progress() { - // TODO - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java deleted file mode 100644 index d2f6823..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v1; - -import java.io.IOException; -import org.apache.hadoop.mapred.OutputCommitter; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; -import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2TaskContext; - -/** - * Hadoop setup task implementation for v1 API. - */ -public class HadoopV1SetupTask extends HadoopV1Task { - /** - * Constructor. - * - * @param taskInfo Task info. - */ - public HadoopV1SetupTask(HadoopTaskInfo taskInfo) { - super(taskInfo); - } - - /** {@inheritDoc} */ - @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException { - HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; - - try { - ctx.jobConf().getOutputFormat().checkOutputSpecs(null, ctx.jobConf()); - - OutputCommitter committer = ctx.jobConf().getOutputCommitter(); - - if (committer != null) - committer.setupJob(ctx.jobContext()); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java deleted file mode 100644 index 203def4..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v1; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; -import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; -import org.apache.ignite.internal.processors.hadoop.HadoopUtils; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.jetbrains.annotations.Nullable; - -/** - * Hadoop API v1 splitter. - */ -public class HadoopV1Splitter { - /** */ - private static final String[] EMPTY_HOSTS = {}; - - /** - * @param jobConf Job configuration. - * @return Collection of mapped splits. - * @throws IgniteCheckedException If mapping failed. - */ - public static Collection<HadoopInputSplit> splitJob(JobConf jobConf) throws IgniteCheckedException { - try { - InputFormat<?, ?> format = jobConf.getInputFormat(); - - assert format != null; - - InputSplit[] splits = format.getSplits(jobConf, 0); - - Collection<HadoopInputSplit> res = new ArrayList<>(splits.length); - - for (int i = 0; i < splits.length; i++) { - InputSplit nativeSplit = splits[i]; - - if (nativeSplit instanceof FileSplit) { - FileSplit s = (FileSplit)nativeSplit; - - res.add(new HadoopFileBlock(s.getLocations(), s.getPath().toUri(), s.getStart(), s.getLength())); - } - else - res.add(HadoopUtils.wrapSplit(i, nativeSplit, nativeSplit.getLocations())); - } - - return res; - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } - - /** - * @param clsName Input split class name. - * @param in Input stream. - * @param hosts Optional hosts. - * @return File block or {@code null} if it is not a {@link FileSplit} instance. - * @throws IgniteCheckedException If failed. - */ - @Nullable public static HadoopFileBlock readFileBlock(String clsName, FSDataInputStream in, - @Nullable String[] hosts) throws IgniteCheckedException { - if (!FileSplit.class.getName().equals(clsName)) - return null; - - FileSplit split = U.newInstance(FileSplit.class); - - try { - split.readFields(in); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - - if (hosts == null) - hosts = EMPTY_HOSTS; - - return new HadoopFileBlock(hosts, split.getPath().toUri(), split.getStart(), split.getLength()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java deleted file mode 100644 index a89323c..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v1; - -import java.io.IOException; -import java.text.NumberFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.TaskAttemptID; -import org.apache.ignite.internal.processors.hadoop.HadoopTask; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; -import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2TaskContext; -import org.jetbrains.annotations.Nullable; - -/** - * Extended Hadoop v1 task. - */ -public abstract class HadoopV1Task extends HadoopTask { - /** Indicates that this task is to be cancelled. */ - private volatile boolean cancelled; - - /** - * Constructor. - * - * @param taskInfo Task info. - */ - protected HadoopV1Task(HadoopTaskInfo taskInfo) { - super(taskInfo); - } - - /** - * Gets file name for that task result. - * - * @return File name. - */ - public String fileName() { - NumberFormat numFormat = NumberFormat.getInstance(); - - numFormat.setMinimumIntegerDigits(5); - numFormat.setGroupingUsed(false); - - return "part-" + numFormat.format(info().taskNumber()); - } - - /** - * - * @param jobConf Job configuration. - * @param taskCtx Task context. - * @param directWrite Direct write flag. - * @param fileName File name. - * @param attempt Attempt of task. - * @return Collector. - * @throws IOException In case of IO exception. - */ - protected HadoopV1OutputCollector collector(JobConf jobConf, HadoopV2TaskContext taskCtx, - boolean directWrite, @Nullable String fileName, TaskAttemptID attempt) throws IOException { - HadoopV1OutputCollector collector = new HadoopV1OutputCollector(jobConf, taskCtx, directWrite, - fileName, attempt) { - /** {@inheritDoc} */ - @Override public void collect(Object key, Object val) throws IOException { - if (cancelled) - throw new HadoopTaskCancelledException("Task cancelled."); - - super.collect(key, val); - } - }; - - collector.setup(); - - return collector; - } - - /** {@inheritDoc} */ - @Override public void cancel() { - cancelled = true; - } - - /** Returns true if task is cancelled. */ - public boolean isCancelled() { - return cancelled; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java deleted file mode 100644 index 9632525..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import java.util.Collection; -import java.util.LinkedList; - -/** - * Replacement for Hadoop {@code org.apache.hadoop.util.Daemon} class. - */ -@SuppressWarnings("UnusedDeclaration") -public class HadoopDaemon extends Thread { - /** Lock object used for synchronization. */ - private static final Object lock = new Object(); - - /** Collection to hold the threads to be stopped. */ - private static Collection<HadoopDaemon> daemons = new LinkedList<>(); - - { - setDaemon(true); // always a daemon - } - - /** Runnable of this thread, may be this. */ - final Runnable runnable; - - /** - * Construct a daemon thread. - */ - public HadoopDaemon() { - super(); - - runnable = this; - - enqueueIfNeeded(); - } - - /** - * Construct a daemon thread. - */ - public HadoopDaemon(Runnable runnable) { - super(runnable); - - this.runnable = runnable; - - this.setName(runnable.toString()); - - enqueueIfNeeded(); - } - - /** - * Construct a daemon thread to be part of a specified thread group. - */ - public HadoopDaemon(ThreadGroup grp, Runnable runnable) { - super(grp, runnable); - - this.runnable = runnable; - - this.setName(runnable.toString()); - - enqueueIfNeeded(); - } - - /** - * Getter for the runnable. May return this. - * - * @return the runnable - */ - public Runnable getRunnable() { - return runnable; - } - - /** - * if the runnable is a Hadoop org.apache.hadoop.hdfs.PeerCache Runnable. - * - * @param r the runnable. - * @return true if it is. - */ - private static boolean isPeerCacheRunnable(Runnable r) { - String name = r.getClass().getName(); - - return name.startsWith("org.apache.hadoop.hdfs.PeerCache"); - } - - /** - * Enqueue this thread if it should be stopped upon the task end. - */ - private void enqueueIfNeeded() { - synchronized (lock) { - if (daemons == null) - throw new RuntimeException("Failed to create HadoopDaemon (its registry is already cleared): " + - "[classLoader=" + getClass().getClassLoader() + ']'); - - if (runnable.getClass().getClassLoader() == getClass().getClassLoader() && isPeerCacheRunnable(runnable)) - daemons.add(this); - } - } - - /** - * Stops all the registered threads. - */ - public static void dequeueAndStopAll() { - synchronized (lock) { - if (daemons != null) { - for (HadoopDaemon daemon : daemons) - daemon.interrupt(); - - daemons = null; - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopExternalSplit.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopExternalSplit.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopExternalSplit.java deleted file mode 100644 index c7e8a0a..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopExternalSplit.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; - -/** - * Split serialized in external file. - */ -public class HadoopExternalSplit extends HadoopInputSplit { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private long off; - - /** - * For {@link Externalizable}. - */ - public HadoopExternalSplit() { - // No-op. - } - - /** - * @param hosts Hosts. - * @param off Offset of this split in external file. - */ - public HadoopExternalSplit(String[] hosts, long off) { - assert off >= 0 : off; - assert hosts != null; - - this.hosts = hosts; - this.off = off; - } - - /** - * @return Offset of this input split in external file. - */ - public long offset() { - return off; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeLong(off); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - off = in.readLong(); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - HadoopExternalSplit that = (HadoopExternalSplit) o; - - return off == that.off; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return (int)(off ^ (off >>> 32)); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSerializationWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSerializationWrapper.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSerializationWrapper.java deleted file mode 100644 index 844e7f8..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSerializationWrapper.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import org.apache.hadoop.io.serializer.Deserializer; -import org.apache.hadoop.io.serializer.Serialization; -import org.apache.hadoop.io.serializer.Serializer; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.hadoop.HadoopSerialization; -import org.jetbrains.annotations.Nullable; - -/** - * The wrapper around external serializer. - */ -public class HadoopSerializationWrapper<T> implements HadoopSerialization { - /** External serializer - writer. */ - private final Serializer<T> serializer; - - /** External serializer - reader. */ - private final Deserializer<T> deserializer; - - /** Data output for current write operation. */ - private OutputStream currOut; - - /** Data input for current read operation. */ - private InputStream currIn; - - /** Wrapper around current output to provide OutputStream interface. */ - private final OutputStream outStream = new OutputStream() { - /** {@inheritDoc} */ - @Override public void write(int b) throws IOException { - currOut.write(b); - } - - /** {@inheritDoc} */ - @Override public void write(byte[] b, int off, int len) throws IOException { - currOut.write(b, off, len); - } - }; - - /** Wrapper around current input to provide InputStream interface. */ - private final InputStream inStream = new InputStream() { - /** {@inheritDoc} */ - @Override public int read() throws IOException { - return currIn.read(); - } - - /** {@inheritDoc} */ - @Override public int read(byte[] b, int off, int len) throws IOException { - return currIn.read(b, off, len); - } - }; - - /** - * @param serialization External serializer to wrap. - * @param cls The class to serialize. - */ - public HadoopSerializationWrapper(Serialization<T> serialization, Class<T> cls) throws IgniteCheckedException { - assert cls != null; - - serializer = serialization.getSerializer(cls); - deserializer = serialization.getDeserializer(cls); - - try { - serializer.open(outStream); - deserializer.open(inStream); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } - - /** {@inheritDoc} */ - @Override public void write(DataOutput out, Object obj) throws IgniteCheckedException { - assert out != null; - assert obj != null; - - try { - currOut = (OutputStream)out; - - serializer.serialize((T)obj); - - currOut = null; - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } - - /** {@inheritDoc} */ - @Override public Object read(DataInput in, @Nullable Object obj) throws IgniteCheckedException { - assert in != null; - - try { - currIn = (InputStream)in; - - T res = deserializer.deserialize((T) obj); - - currIn = null; - - return res; - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } - - /** {@inheritDoc} */ - @Override public void close() throws IgniteCheckedException { - try { - serializer.close(); - deserializer.close(); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopShutdownHookManager.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopShutdownHookManager.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopShutdownHookManager.java deleted file mode 100644 index 8bd71e0..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopShutdownHookManager.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Fake manager for shutdown hooks. - */ -public class HadoopShutdownHookManager { - /** */ - private static final HadoopShutdownHookManager MGR = new HadoopShutdownHookManager(); - - /** - * Return <code>ShutdownHookManager</code> singleton. - * - * @return <code>ShutdownHookManager</code> singleton. - */ - public static HadoopShutdownHookManager get() { - return MGR; - } - - /** */ - private Set<Runnable> hooks = Collections.synchronizedSet(new HashSet<Runnable>()); - - /** */ - private AtomicBoolean shutdownInProgress = new AtomicBoolean(false); - - /** - * Singleton. - */ - private HadoopShutdownHookManager() { - // No-op. - } - - /** - * Adds a shutdownHook with a priority, the higher the priority - * the earlier will run. ShutdownHooks with same priority run - * in a non-deterministic order. - * - * @param shutdownHook shutdownHook <code>Runnable</code> - * @param priority priority of the shutdownHook. - */ - public void addShutdownHook(Runnable shutdownHook, int priority) { - if (shutdownHook == null) - throw new IllegalArgumentException("shutdownHook cannot be NULL"); - - hooks.add(shutdownHook); - } - - /** - * Removes a shutdownHook. - * - * @param shutdownHook shutdownHook to remove. - * @return TRUE if the shutdownHook was registered and removed, - * FALSE otherwise. - */ - public boolean removeShutdownHook(Runnable shutdownHook) { - return hooks.remove(shutdownHook); - } - - /** - * Indicates if a shutdownHook is registered or not. - * - * @param shutdownHook shutdownHook to check if registered. - * @return TRUE/FALSE depending if the shutdownHook is is registered. - */ - public boolean hasShutdownHook(Runnable shutdownHook) { - return hooks.contains(shutdownHook); - } - - /** - * Indicates if shutdown is in progress or not. - * - * @return TRUE if the shutdown is in progress, otherwise FALSE. - */ - public boolean isShutdownInProgress() { - return shutdownInProgress.get(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSplitWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSplitWrapper.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSplitWrapper.java deleted file mode 100644 index df77adb..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSplitWrapper.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; -import org.apache.ignite.internal.util.typedef.internal.U; - -/** - * The wrapper for native hadoop input splits. - * - * Warning!! This class must not depend on any Hadoop classes directly or indirectly. - */ -public class HadoopSplitWrapper extends HadoopInputSplit { - /** */ - private static final long serialVersionUID = 0L; - - /** Native hadoop input split. */ - private byte[] bytes; - - /** */ - private String clsName; - - /** Internal ID */ - private int id; - - /** - * Creates new split wrapper. - */ - public HadoopSplitWrapper() { - // No-op. - } - - /** - * Creates new split wrapper. - * - * @param id Split ID. - * @param clsName Class name. - * @param bytes Serialized class. - * @param hosts Hosts where split is located. - */ - public HadoopSplitWrapper(int id, String clsName, byte[] bytes, String[] hosts) { - assert hosts != null; - assert clsName != null; - assert bytes != null; - - this.hosts = hosts; - this.id = id; - - this.clsName = clsName; - this.bytes = bytes; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeInt(id); - - out.writeUTF(clsName); - U.writeByteArray(out, bytes); - } - - /** - * @return Class name. - */ - public String className() { - return clsName; - } - - /** - * @return Class bytes. - */ - public byte[] bytes() { - return bytes; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - id = in.readInt(); - - clsName = in.readUTF(); - bytes = U.readByteArray(in); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - HadoopSplitWrapper that = (HadoopSplitWrapper)o; - - return id == that.id; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return id; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2CleanupTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2CleanupTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2CleanupTask.java deleted file mode 100644 index abb904c..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2CleanupTask.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import java.io.IOException; -import org.apache.hadoop.mapred.JobContextImpl; -import org.apache.hadoop.mapreduce.JobStatus; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; - -/** - * Hadoop cleanup task (commits or aborts job). - */ -public class HadoopV2CleanupTask extends HadoopV2Task { - /** Abort flag. */ - private final boolean abort; - - /** - * @param taskInfo Task info. - * @param abort Abort flag. - */ - public HadoopV2CleanupTask(HadoopTaskInfo taskInfo, boolean abort) { - super(taskInfo); - - this.abort = abort; - } - - /** {@inheritDoc} */ - @SuppressWarnings("ConstantConditions") - @Override public void run0(HadoopV2TaskContext taskCtx) throws IgniteCheckedException { - JobContextImpl jobCtx = taskCtx.jobContext(); - - try { - OutputFormat outputFormat = getOutputFormat(jobCtx); - - OutputCommitter committer = outputFormat.getOutputCommitter(hadoopContext()); - - if (committer != null) { - if (abort) - committer.abortJob(jobCtx, JobStatus.State.FAILED); - else - committer.commitJob(jobCtx); - } - } - catch (ClassNotFoundException | IOException e) { - throw new IgniteCheckedException(e); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IgniteInterruptedCheckedException(e); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java deleted file mode 100644 index 2ff2945..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java +++ /dev/null @@ -1,243 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import java.io.IOException; -import java.util.Iterator; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.MapContext; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.ReduceContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; -import org.apache.hadoop.mapreduce.task.JobContextImpl; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; -import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter; - -/** - * Hadoop context implementation for v2 API. It provides IO operations for hadoop tasks. - */ -public class HadoopV2Context extends JobContextImpl implements MapContext, ReduceContext { - /** Input reader to overriding of HadoopTaskContext input. */ - private RecordReader reader; - - /** Output writer to overriding of HadoopTaskContext output. */ - private RecordWriter writer; - - /** Output is provided by executor environment. */ - private final HadoopTaskOutput output; - - /** Input is provided by executor environment. */ - private final HadoopTaskInput input; - - /** Unique identifier for a task attempt. */ - private final TaskAttemptID taskAttemptID; - - /** Indicates that this task is to be cancelled. */ - private volatile boolean cancelled; - - /** Input split. */ - private InputSplit inputSplit; - - /** */ - private final HadoopTaskContext ctx; - - /** */ - private String status; - - /** - * @param ctx Context for IO operations. - */ - public HadoopV2Context(HadoopV2TaskContext ctx) { - super(ctx.jobConf(), ctx.jobContext().getJobID()); - - taskAttemptID = ctx.attemptId(); - - conf.set("mapreduce.job.id", taskAttemptID.getJobID().toString()); - conf.set("mapreduce.task.id", taskAttemptID.getTaskID().toString()); - - output = ctx.output(); - input = ctx.input(); - - this.ctx = ctx; - } - - /** {@inheritDoc} */ - @Override public InputSplit getInputSplit() { - if (inputSplit == null) { - HadoopInputSplit split = ctx.taskInfo().inputSplit(); - - if (split == null) - return null; - - if (split instanceof HadoopFileBlock) { - HadoopFileBlock fileBlock = (HadoopFileBlock)split; - - inputSplit = new FileSplit(new Path(fileBlock.file()), fileBlock.start(), fileBlock.length(), null); - } - else - { - try { - inputSplit = (InputSplit) ((HadoopV2TaskContext)ctx).getNativeSplit(split); - } catch (IgniteCheckedException e) { - throw new IllegalStateException(e); - } - } - } - - return inputSplit; - } - - /** {@inheritDoc} */ - @Override public boolean nextKeyValue() throws IOException, InterruptedException { - if (cancelled) - throw new HadoopTaskCancelledException("Task cancelled."); - - return reader.nextKeyValue(); - } - - /** {@inheritDoc} */ - @Override public Object getCurrentKey() throws IOException, InterruptedException { - if (reader != null) - return reader.getCurrentKey(); - - return input.key(); - } - - /** {@inheritDoc} */ - @Override public Object getCurrentValue() throws IOException, InterruptedException { - return reader.getCurrentValue(); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void write(Object key, Object val) throws IOException, InterruptedException { - if (cancelled) - throw new HadoopTaskCancelledException("Task cancelled."); - - if (writer != null) - writer.write(key, val); - else { - try { - output.write(key, val); - } - catch (IgniteCheckedException e) { - throw new IOException(e); - } - } - } - - /** {@inheritDoc} */ - @Override public OutputCommitter getOutputCommitter() { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public TaskAttemptID getTaskAttemptID() { - return taskAttemptID; - } - - /** {@inheritDoc} */ - @Override public void setStatus(String msg) { - status = msg; - } - - /** {@inheritDoc} */ - @Override public String getStatus() { - return status; - } - - /** {@inheritDoc} */ - @Override public float getProgress() { - return 0.5f; // TODO - } - - /** {@inheritDoc} */ - @Override public Counter getCounter(Enum<?> cntrName) { - return getCounter(cntrName.getDeclaringClass().getName(), cntrName.name()); - } - - /** {@inheritDoc} */ - @Override public Counter getCounter(String grpName, String cntrName) { - return new HadoopV2Counter(ctx.counter(grpName, cntrName, HadoopLongCounter.class)); - } - - /** {@inheritDoc} */ - @Override public void progress() { - // No-op. - } - - /** - * Overrides default input data reader. - * - * @param reader New reader. - */ - public void reader(RecordReader reader) { - this.reader = reader; - } - - /** {@inheritDoc} */ - @Override public boolean nextKey() throws IOException, InterruptedException { - if (cancelled) - throw new HadoopTaskCancelledException("Task cancelled."); - - return input.next(); - } - - /** {@inheritDoc} */ - @Override public Iterable getValues() throws IOException, InterruptedException { - return new Iterable() { - @Override public Iterator iterator() { - return input.values(); - } - }; - } - - /** - * @return Overridden output data writer. - */ - public RecordWriter writer() { - return writer; - } - - /** - * Overrides default output data writer. - * - * @param writer New writer. - */ - public void writer(RecordWriter writer) { - this.writer = writer; - } - - /** - * Cancels the task by stop the IO. - */ - public void cancel() { - cancelled = true; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Counter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Counter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Counter.java deleted file mode 100644 index cad9e64..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Counter.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter; - -/** - * Adapter from own counter implementation into Hadoop API Counter od version 2.0. - */ -public class HadoopV2Counter implements Counter { - /** Delegate. */ - private final HadoopLongCounter cntr; - - /** - * Creates new instance with given delegate. - * - * @param cntr Internal counter. - */ - public HadoopV2Counter(HadoopLongCounter cntr) { - assert cntr != null : "counter must be non-null"; - - this.cntr = cntr; - } - - /** {@inheritDoc} */ - @Override public void setDisplayName(String displayName) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public String getName() { - return cntr.name(); - } - - /** {@inheritDoc} */ - @Override public String getDisplayName() { - return getName(); - } - - /** {@inheritDoc} */ - @Override public long getValue() { - return cntr.value(); - } - - /** {@inheritDoc} */ - @Override public void setValue(long val) { - cntr.value(val); - } - - /** {@inheritDoc} */ - @Override public void increment(long incr) { - cntr.increment(incr); - } - - /** {@inheritDoc} */ - @Override public Counter getUnderlyingCounter() { - return this; - } - - /** {@inheritDoc} */ - @Override public void write(DataOutput out) throws IOException { - throw new UnsupportedOperationException("not implemented"); - } - - /** {@inheritDoc} */ - @Override public void readFields(DataInput in) throws IOException { - throw new UnsupportedOperationException("not implemented"); - } -} \ No newline at end of file