http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java deleted file mode 100644 index 39b7c51..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java +++ /dev/null @@ -1,435 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle.collections; - -import java.io.DataInput; -import java.util.Collection; -import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.concurrent.ConcurrentLinkedQueue; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopSerialization; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; -import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataInStream; -import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataOutStream; -import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopOffheapBuffer; -import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; -import org.jetbrains.annotations.Nullable; - -import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_OFFHEAP_PAGE_SIZE; -import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get; - -/** - * Base class for all multimaps. - */ -public abstract class HadoopMultimapBase implements HadoopMultimap { - /** */ - protected final GridUnsafeMemory mem; - - /** */ - protected final int pageSize; - - /** */ - private final Collection<Page> allPages = new ConcurrentLinkedQueue<>(); - - /** - * @param jobInfo Job info. - * @param mem Memory. - */ - protected HadoopMultimapBase(HadoopJobInfo jobInfo, GridUnsafeMemory mem) { - assert jobInfo != null; - assert mem != null; - - this.mem = mem; - - pageSize = get(jobInfo, SHUFFLE_OFFHEAP_PAGE_SIZE, 32 * 1024); - } - - /** - * @param page Page. - */ - private void deallocate(Page page) { - assert page != null; - - mem.release(page.ptr, page.size); - } - - /** - * @param valPtr Value page pointer. - * @param nextValPtr Next value page pointer. - */ - protected void nextValue(long valPtr, long nextValPtr) { - mem.writeLong(valPtr, nextValPtr); - } - - /** - * @param valPtr Value page pointer. - * @return Next value page pointer. - */ - protected long nextValue(long valPtr) { - return mem.readLong(valPtr); - } - - /** - * @param valPtr Value page pointer. - * @param size Size. - */ - protected void valueSize(long valPtr, int size) { - mem.writeInt(valPtr + 8, size); - } - - /** - * @param valPtr Value page pointer. - * @return Value size. - */ - protected int valueSize(long valPtr) { - return mem.readInt(valPtr + 8); - } - - /** {@inheritDoc} */ - @Override public void close() { - for (Page page : allPages) - deallocate(page); - } - - /** - * Reader for key and value. - */ - protected class ReaderBase implements AutoCloseable { - /** */ - private Object tmp; - - /** */ - private final HadoopSerialization ser; - - /** */ - private final HadoopDataInStream in = new HadoopDataInStream(mem); - - /** - * @param ser Serialization. - */ - protected ReaderBase(HadoopSerialization ser) { - assert ser != null; - - this.ser = ser; - } - - /** - * @param valPtr Value page pointer. - * @return Value. - */ - public Object readValue(long valPtr) { - assert valPtr > 0 : valPtr; - - try { - return read(valPtr + 12, valueSize(valPtr)); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - - /** - * Resets temporary object to the given one. - * - * @param tmp Temporary object for reuse. - */ - public void resetReusedObject(Object tmp) { - this.tmp = tmp; - } - - /** - * @param ptr Pointer. - * @param size Object size. - * @return Object. - */ - protected Object read(long ptr, long size) throws IgniteCheckedException { - in.buffer().set(ptr, size); - - tmp = ser.read(in, tmp); - - return tmp; - } - - /** {@inheritDoc} */ - @Override public void close() throws IgniteCheckedException { - ser.close(); - } - } - - /** - * Base class for adders. - */ - protected abstract class AdderBase implements Adder { - /** */ - protected final HadoopSerialization keySer; - - /** */ - protected final HadoopSerialization valSer; - - /** */ - private final HadoopDataOutStream out; - - /** */ - private long writeStart; - - /** Current page. */ - private Page curPage; - - /** - * @param ctx Task context. - * @throws IgniteCheckedException If failed. - */ - protected AdderBase(HadoopTaskContext ctx) throws IgniteCheckedException { - valSer = ctx.valueSerialization(); - keySer = ctx.keySerialization(); - - out = new HadoopDataOutStream(mem) { - @Override public long move(long size) { - long ptr = super.move(size); - - if (ptr == 0) // Was not able to move - not enough free space. - ptr = allocateNextPage(size); - - assert ptr != 0; - - return ptr; - } - }; - } - - /** - * @param requestedSize Requested size. - * @return Next write pointer. - */ - private long allocateNextPage(long requestedSize) { - int writtenSize = writtenSize(); - - long newPageSize = nextPageSize(writtenSize + requestedSize); - long newPagePtr = mem.allocate(newPageSize); - - HadoopOffheapBuffer b = out.buffer(); - - b.set(newPagePtr, newPageSize); - - if (writtenSize != 0) { - mem.copyMemory(writeStart, newPagePtr, writtenSize); - - b.move(writtenSize); - } - - writeStart = newPagePtr; - - // At this point old page is not needed, so we release it. - Page oldPage = curPage; - - curPage = new Page(newPagePtr, newPageSize); - - if (oldPage != null) - allPages.add(oldPage); - - return b.move(requestedSize); - } - - /** - * Get next page size. - * - * @param required Required amount of data. - * @return Next page size. - */ - private long nextPageSize(long required) { - long pages = (required / pageSize) + 1; - - long pagesPow2 = nextPowerOfTwo(pages); - - return pagesPow2 * pageSize; - } - - /** - * Get next power of two which greater or equal to the given number. Naive implementation. - * - * @param val Number - * @return Nearest pow2. - */ - private long nextPowerOfTwo(long val) { - long res = 1; - - while (res < val) - res = res << 1; - - if (res < 0) - throw new IllegalArgumentException("Value is too big to find positive pow2: " + val); - - return res; - } - - /** - * @return Fixed pointer. - */ - private long fixAlignment() { - HadoopOffheapBuffer b = out.buffer(); - - long ptr = b.pointer(); - - if ((ptr & 7L) != 0) { // Address is not aligned by octet. - ptr = (ptr + 8L) & ~7L; - - b.pointer(ptr); - } - - return ptr; - } - - /** - * @param off Offset. - * @param o Object. - * @return Page pointer. - * @throws IgniteCheckedException If failed. - */ - protected long write(int off, Object o, HadoopSerialization ser) throws IgniteCheckedException { - writeStart = fixAlignment(); - - if (off != 0) - out.move(off); - - ser.write(out, o); - - return writeStart; - } - - /** - * @param size Size. - * @return Pointer. - */ - protected long allocate(int size) { - writeStart = fixAlignment(); - - out.move(size); - - return writeStart; - } - - /** - * Rewinds local allocation pointer to the given pointer if possible. - * - * @param ptr Pointer. - */ - protected void localDeallocate(long ptr) { - HadoopOffheapBuffer b = out.buffer(); - - if (b.isInside(ptr)) - b.pointer(ptr); - else - b.reset(); - } - - /** - * @return Written size. - */ - protected int writtenSize() { - return (int)(out.buffer().pointer() - writeStart); - } - - /** {@inheritDoc} */ - @Override public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public void close() throws IgniteCheckedException { - if (curPage != null) - allPages.add(curPage); - - keySer.close(); - valSer.close(); - } - } - - /** - * Iterator over values. - */ - protected class ValueIterator implements Iterator<Object> { - /** */ - private long valPtr; - - /** */ - private final ReaderBase valReader; - - /** - * @param valPtr Value page pointer. - * @param valReader Value reader. - */ - protected ValueIterator(long valPtr, ReaderBase valReader) { - this.valPtr = valPtr; - this.valReader = valReader; - } - - /** - * @param valPtr Head value pointer. - */ - public void head(long valPtr) { - this.valPtr = valPtr; - } - - /** {@inheritDoc} */ - @Override public boolean hasNext() { - return valPtr != 0; - } - - /** {@inheritDoc} */ - @Override public Object next() { - if (!hasNext()) - throw new NoSuchElementException(); - - Object res = valReader.readValue(valPtr); - - valPtr = nextValue(valPtr); - - return res; - } - - /** {@inheritDoc} */ - @Override public void remove() { - throw new UnsupportedOperationException(); - } - } - - /** - * Page. - */ - private static class Page { - /** Pointer. */ - private final long ptr; - - /** Size. */ - private final long size; - - /** - * Constructor. - * - * @param ptr Pointer. - * @param size Size. - */ - public Page(long ptr, long size) { - this.ptr = ptr; - this.size = size; - } - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java deleted file mode 100644 index 7db88bc..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java +++ /dev/null @@ -1,733 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle.collections; - -import java.io.DataInput; -import java.util.Comparator; -import java.util.Iterator; -import java.util.Random; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopSerialization; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput; -import org.apache.ignite.internal.util.GridLongList; -import org.apache.ignite.internal.util.GridRandom; -import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; -import org.apache.ignite.internal.util.typedef.internal.A; -import org.jetbrains.annotations.Nullable; - -/** - * Skip list. - */ -public class HadoopSkipList extends HadoopMultimapBase { - /** */ - private static final int HEADS_SIZE = 24 + 33 * 8; // Offset + max level is from 0 to 32 inclusive. - - /** Top level. */ - private final AtomicInteger topLevel = new AtomicInteger(-1); - - /** Heads for all the lists. */ - private final long heads; - - /** */ - private final AtomicBoolean visitGuard = new AtomicBoolean(); - - /** - * @param jobInfo Job info. - * @param mem Memory. - */ - public HadoopSkipList(HadoopJobInfo jobInfo, GridUnsafeMemory mem) { - super(jobInfo, mem); - - heads = mem.allocate(HEADS_SIZE, true); - } - - /** {@inheritDoc} */ - @Override public void close() { - super.close(); - - mem.release(heads, HEADS_SIZE); - } - - /** {@inheritDoc} */ - @Override public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException { - if (!visitGuard.compareAndSet(false, true)) - return false; - - for (long meta = nextMeta(heads, 0); meta != 0L; meta = nextMeta(meta, 0)) { - long valPtr = value(meta); - - long lastVisited = ignoreLastVisited ? 0 : lastVisitedValue(meta); - - if (valPtr != lastVisited) { - long k = key(meta); - - v.onKey(k + 4, keySize(k)); - - lastVisitedValue(meta, valPtr); // Set it to the first value in chain. - - do { - v.onValue(valPtr + 12, valueSize(valPtr)); - - valPtr = nextValue(valPtr); - } - while (valPtr != lastVisited); - } - } - - visitGuard.lazySet(false); - - return true; - } - - /** {@inheritDoc} */ - @Override public Adder startAdding(HadoopTaskContext ctx) throws IgniteCheckedException { - return new AdderImpl(ctx); - } - - /** {@inheritDoc} */ - @Override public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException { - Input in = new Input(taskCtx); - - Comparator<Object> grpCmp = taskCtx.groupComparator(); - - if (grpCmp != null) - return new GroupedInput(grpCmp, in); - - return in; - } - - /** - * @param meta Meta pointer. - * @return Key pointer. - */ - private long key(long meta) { - return mem.readLong(meta); - } - - /** - * @param meta Meta pointer. - * @param key Key pointer. - */ - private void key(long meta, long key) { - mem.writeLong(meta, key); - } - - /** - * @param meta Meta pointer. - * @return Value pointer. - */ - private long value(long meta) { - return mem.readLongVolatile(meta + 8); - } - - /** - * @param meta Meta pointer. - * @param valPtr Value pointer. - */ - private void value(long meta, long valPtr) { - mem.writeLongVolatile(meta + 8, valPtr); - } - - /** - * @param meta Meta pointer. - * @param oldValPtr Old first value pointer. - * @param newValPtr New first value pointer. - * @return {@code true} If operation succeeded. - */ - private boolean casValue(long meta, long oldValPtr, long newValPtr) { - return mem.casLong(meta + 8, oldValPtr, newValPtr); - } - - /** - * @param meta Meta pointer. - * @return Last visited value pointer. - */ - private long lastVisitedValue(long meta) { - return mem.readLong(meta + 16); - } - - /** - * @param meta Meta pointer. - * @param valPtr Last visited value pointer. - */ - private void lastVisitedValue(long meta, long valPtr) { - mem.writeLong(meta + 16, valPtr); - } - - /** - * @param meta Meta pointer. - * @param level Level. - * @return Next meta pointer. - */ - private long nextMeta(long meta, int level) { - assert meta > 0 : meta; - - return mem.readLongVolatile(meta + 24 + 8 * level); - } - - /** - * @param meta Meta pointer. - * @param level Level. - * @param oldNext Old next meta pointer. - * @param newNext New next meta pointer. - * @return {@code true} If operation succeeded. - */ - private boolean casNextMeta(long meta, int level, long oldNext, long newNext) { - assert meta > 0 : meta; - - return mem.casLong(meta + 24 + 8 * level, oldNext, newNext); - } - - /** - * @param meta Meta pointer. - * @param level Level. - * @param nextMeta Next meta. - */ - private void nextMeta(long meta, int level, long nextMeta) { - assert meta != 0; - - mem.writeLong(meta + 24 + 8 * level, nextMeta); - } - - /** - * @param keyPtr Key pointer. - * @return Key size. - */ - private int keySize(long keyPtr) { - return mem.readInt(keyPtr); - } - - /** - * @param keyPtr Key pointer. - * @param keySize Key size. - */ - private void keySize(long keyPtr, int keySize) { - mem.writeInt(keyPtr, keySize); - } - - /** - * @param rnd Random. - * @return Next level. - */ - public static int randomLevel(Random rnd) { - int x = rnd.nextInt(); - - int level = 0; - - while ((x & 1) != 0) { // Count sequential 1 bits. - level++; - - x >>>= 1; - } - - return level; - } - - /** - * Reader. - */ - private class Reader extends ReaderBase { - /** - * @param ser Serialization. - */ - protected Reader(HadoopSerialization ser) { - super(ser); - } - - /** - * @param meta Meta pointer. - * @return Key. - */ - public Object readKey(long meta) { - assert meta > 0 : meta; - - long k = key(meta); - - try { - return read(k + 4, keySize(k)); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - } - - /** - * Adder. - */ - private class AdderImpl extends AdderBase { - /** */ - private final Comparator<Object> cmp; - - /** */ - private final Random rnd = new GridRandom(); - - /** */ - private final GridLongList stack = new GridLongList(16); - - /** */ - private final Reader keyReader; - - /** - * @param ctx Task context. - * @throws IgniteCheckedException If failed. - */ - protected AdderImpl(HadoopTaskContext ctx) throws IgniteCheckedException { - super(ctx); - - keyReader = new Reader(keySer); - - cmp = ctx.sortComparator(); - } - - /** {@inheritDoc} */ - @Override public void write(Object key, Object val) throws IgniteCheckedException { - A.notNull(val, "val"); - - add(key, val); - } - - /** {@inheritDoc} */ - @Override public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException { - KeyImpl k = reuse == null ? new KeyImpl() : (KeyImpl)reuse; - - k.tmpKey = keySer.read(in, k.tmpKey); - - k.meta = add(k.tmpKey, null); - - return k; - } - - /** - * @param key Key. - * @param val Value. - * @param level Level. - * @return Meta pointer. - */ - private long createMeta(long key, long val, int level) { - int size = 32 + 8 * level; - - long meta = allocate(size); - - key(meta, key); - value(meta, val); - lastVisitedValue(meta, 0L); - - for (int i = 32; i < size; i += 8) // Fill with 0. - mem.writeLong(meta + i, 0L); - - return meta; - } - - /** - * @param key Key. - * @return Pointer. - * @throws IgniteCheckedException If failed. - */ - private long writeKey(Object key) throws IgniteCheckedException { - long keyPtr = write(4, key, keySer); - int keySize = writtenSize() - 4; - - keySize(keyPtr, keySize); - - return keyPtr; - } - - /** - * @param prevMeta Previous meta. - * @param meta Next meta. - */ - private void stackPush(long prevMeta, long meta) { - stack.add(prevMeta); - stack.add(meta); - } - - /** - * Drops last remembered frame from the stack. - */ - private void stackPop() { - stack.pop(2); - } - - /** - * @param key Key. - * @param val Value. - * @return Meta pointer. - * @throws IgniteCheckedException If failed. - */ - private long add(Object key, @Nullable Object val) throws IgniteCheckedException { - assert key != null; - - stack.clear(); - - long valPtr = 0; - - if (val != null) { // Write value. - valPtr = write(12, val, valSer); - int valSize = writtenSize() - 12; - - nextValue(valPtr, 0); - valueSize(valPtr, valSize); - } - - long keyPtr = 0; - long newMeta = 0; - int newMetaLevel = -1; - - long prevMeta = heads; - int level = topLevel.get(); - long meta = level < 0 ? 0 : nextMeta(heads, level); - - for (;;) { - if (level < 0) { // We did not find our key, trying to add new meta. - if (keyPtr == 0) { // Write key and create meta only once. - keyPtr = writeKey(key); - - newMetaLevel = randomLevel(rnd); - newMeta = createMeta(keyPtr, valPtr, newMetaLevel); - } - - nextMeta(newMeta, 0, meta); // Set next to new meta before publishing. - - if (casNextMeta(prevMeta, 0, meta, newMeta)) { // New key was added successfully. - laceUp(key, newMeta, newMetaLevel); - - return newMeta; - } - else { // Add failed, need to check out what was added by another thread. - meta = nextMeta(prevMeta, level = 0); - - stackPop(); - } - } - - int cmpRes = cmp(key, meta); - - if (cmpRes == 0) { // Key found. - if (newMeta != 0) // Deallocate if we've allocated something. - localDeallocate(keyPtr); - - if (valPtr == 0) // Only key needs to be added. - return meta; - - for (;;) { // Add value for the key found. - long nextVal = value(meta); - - nextValue(valPtr, nextVal); - - if (casValue(meta, nextVal, valPtr)) - return meta; - } - } - - assert cmpRes != 0; - - if (cmpRes > 0) { // Go right. - prevMeta = meta; - meta = nextMeta(meta, level); - - if (meta != 0) // If nothing to the right then go down. - continue; - } - - while (--level >= 0) { // Go down. - stackPush(prevMeta, meta); // Remember the path. - - long nextMeta = nextMeta(prevMeta, level); - - if (nextMeta != meta) { // If the meta is the same as on upper level go deeper. - meta = nextMeta; - - assert meta != 0; - - break; - } - } - } - } - - /** - * @param key Key. - * @param meta Meta pointer. - * @return Comparison result. - */ - @SuppressWarnings("unchecked") - private int cmp(Object key, long meta) { - assert meta != 0; - - return cmp.compare(key, keyReader.readKey(meta)); - } - - /** - * Adds appropriate index links between metas. - * - * @param newMeta Just added meta. - * @param newMetaLevel New level. - */ - private void laceUp(Object key, long newMeta, int newMetaLevel) { - for (int level = 1; level <= newMetaLevel; level++) { // Go from the bottom up. - long prevMeta = heads; - long meta = 0; - - if (!stack.isEmpty()) { // Get the path back. - meta = stack.remove(); - prevMeta = stack.remove(); - } - - for (;;) { - nextMeta(newMeta, level, meta); - - if (casNextMeta(prevMeta, level, meta, newMeta)) - break; - - long oldMeta = meta; - - meta = nextMeta(prevMeta, level); // Reread meta. - - for (;;) { - int cmpRes = cmp(key, meta); - - if (cmpRes > 0) { // Go right. - prevMeta = meta; - meta = nextMeta(prevMeta, level); - - if (meta != oldMeta) // Old meta already known to be greater than ours or is 0. - continue; - } - - assert cmpRes != 0; // Two different metas with equal keys must be impossible. - - break; // Retry cas. - } - } - } - - if (!stack.isEmpty()) - return; // Our level already lower than top. - - for (;;) { // Raise top level. - int top = topLevel.get(); - - if (newMetaLevel <= top || topLevel.compareAndSet(top, newMetaLevel)) - break; - } - } - - /** - * Key. - */ - private class KeyImpl implements Key { - /** */ - private long meta; - - /** */ - private Object tmpKey; - - /** - * @return Meta pointer for the key. - */ - public long address() { - return meta; - } - - /** - * @param val Value. - */ - @Override public void add(Value val) { - int size = val.size(); - - long valPtr = allocate(size + 12); - - val.copyTo(valPtr + 12); - - valueSize(valPtr, size); - - long nextVal; - - do { - nextVal = value(meta); - - nextValue(valPtr, nextVal); - } - while(!casValue(meta, nextVal, valPtr)); - } - } - } - - /** - * Task input. - */ - private class Input implements HadoopTaskInput { - /** */ - private long metaPtr = heads; - - /** */ - private final Reader keyReader; - - /** */ - private final Reader valReader; - - /** - * @param taskCtx Task context. - * @throws IgniteCheckedException If failed. - */ - private Input(HadoopTaskContext taskCtx) throws IgniteCheckedException { - keyReader = new Reader(taskCtx.keySerialization()); - valReader = new Reader(taskCtx.valueSerialization()); - } - - /** {@inheritDoc} */ - @Override public boolean next() { - metaPtr = nextMeta(metaPtr, 0); - - return metaPtr != 0; - } - - /** {@inheritDoc} */ - @Override public Object key() { - return keyReader.readKey(metaPtr); - } - - /** {@inheritDoc} */ - @Override public Iterator<?> values() { - return new ValueIterator(value(metaPtr), valReader); - } - - /** {@inheritDoc} */ - @Override public void close() throws IgniteCheckedException { - keyReader.close(); - valReader.close(); - } - } - - /** - * Grouped input using grouping comparator. - */ - private class GroupedInput implements HadoopTaskInput { - /** */ - private final Comparator<Object> grpCmp; - - /** */ - private final Input in; - - /** */ - private Object prevKey; - - /** */ - private Object nextKey; - - /** */ - private final GridLongList vals = new GridLongList(); - - /** - * @param grpCmp Grouping comparator. - * @param in Input. - */ - private GroupedInput(Comparator<Object> grpCmp, Input in) { - this.grpCmp = grpCmp; - this.in = in; - } - - /** {@inheritDoc} */ - @Override public boolean next() { - if (prevKey == null) { // First call. - if (!in.next()) - return false; - - prevKey = in.key(); - - assert prevKey != null; - - in.keyReader.resetReusedObject(null); // We need 2 instances of key object for comparison. - - vals.add(value(in.metaPtr)); - } - else { - if (in.metaPtr == 0) // We reached the end of the input. - return false; - - vals.clear(); - - vals.add(value(in.metaPtr)); - - in.keyReader.resetReusedObject(prevKey); // Switch key instances. - - prevKey = nextKey; - } - - while (in.next()) { // Fill with head value pointers with equal keys. - if (grpCmp.compare(prevKey, nextKey = in.key()) == 0) - vals.add(value(in.metaPtr)); - else - break; - } - - assert !vals.isEmpty(); - - return true; - } - - /** {@inheritDoc} */ - @Override public Object key() { - return prevKey; - } - - /** {@inheritDoc} */ - @Override public Iterator<?> values() { - assert !vals.isEmpty(); - - final ValueIterator valIter = new ValueIterator(vals.get(0), in.valReader); - - return new Iterator<Object>() { - /** */ - private int idx; - - @Override public boolean hasNext() { - if (!valIter.hasNext()) { - if (++idx == vals.size()) - return false; - - valIter.head(vals.get(idx)); - - assert valIter.hasNext(); - } - - return true; - } - - @Override public Object next() { - return valIter.next(); - } - - @Override public void remove() { - valIter.remove(); - } - }; - } - - /** {@inheritDoc} */ - @Override public void close() throws IgniteCheckedException { - in.close(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java deleted file mode 100644 index 3b5fa15..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle.streams; - -import java.io.DataInput; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; - -/** - * Data input stream. - */ -public class HadoopDataInStream extends InputStream implements DataInput { - /** */ - private final HadoopOffheapBuffer buf = new HadoopOffheapBuffer(0, 0); - - /** */ - private final GridUnsafeMemory mem; - - /** - * @param mem Memory. - */ - public HadoopDataInStream(GridUnsafeMemory mem) { - assert mem != null; - - this.mem = mem; - } - - /** - * @return Buffer. - */ - public HadoopOffheapBuffer buffer() { - return buf; - } - - /** - * @param size Size. - * @return Old pointer. - */ - protected long move(long size) throws IOException { - long ptr = buf.move(size); - - assert ptr != 0; - - return ptr; - } - - /** {@inheritDoc} */ - @Override public int read() throws IOException { - return readUnsignedByte(); - } - - /** {@inheritDoc} */ - @Override public int read(byte[] b, int off, int len) throws IOException { - readFully(b, off, len); - - return len; - } - - /** {@inheritDoc} */ - @Override public long skip(long n) throws IOException { - move(n); - - return n; - } - - /** {@inheritDoc} */ - @Override public void readFully(byte[] b) throws IOException { - readFully(b, 0, b.length); - } - - /** {@inheritDoc} */ - @Override public void readFully(byte[] b, int off, int len) throws IOException { - mem.readBytes(move(len), b, off, len); - } - - /** {@inheritDoc} */ - @Override public int skipBytes(int n) throws IOException { - move(n); - - return n; - } - - /** {@inheritDoc} */ - @Override public boolean readBoolean() throws IOException { - byte res = readByte(); - - if (res == 1) - return true; - - assert res == 0 : res; - - return false; - } - - /** {@inheritDoc} */ - @Override public byte readByte() throws IOException { - return mem.readByte(move(1)); - } - - /** {@inheritDoc} */ - @Override public int readUnsignedByte() throws IOException { - return readByte() & 0xff; - } - - /** {@inheritDoc} */ - @Override public short readShort() throws IOException { - return mem.readShort(move(2)); - } - - /** {@inheritDoc} */ - @Override public int readUnsignedShort() throws IOException { - return readShort() & 0xffff; - } - - /** {@inheritDoc} */ - @Override public char readChar() throws IOException { - return (char)readShort(); - } - - /** {@inheritDoc} */ - @Override public int readInt() throws IOException { - return mem.readInt(move(4)); - } - - /** {@inheritDoc} */ - @Override public long readLong() throws IOException { - return mem.readLong(move(8)); - } - - /** {@inheritDoc} */ - @Override public float readFloat() throws IOException { - return mem.readFloat(move(4)); - } - - /** {@inheritDoc} */ - @Override public double readDouble() throws IOException { - return mem.readDouble(move(8)); - } - - /** {@inheritDoc} */ - @Override public String readLine() throws IOException { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public String readUTF() throws IOException { - byte[] bytes = new byte[readInt()]; - - if (bytes.length != 0) - readFully(bytes); - - return new String(bytes, StandardCharsets.UTF_8); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataOutStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataOutStream.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataOutStream.java deleted file mode 100644 index f7b1a73..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataOutStream.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle.streams; - -import java.io.DataOutput; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; -import org.apache.ignite.internal.util.GridUnsafe; -import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; - -/** - * Data output stream. - */ -public class HadoopDataOutStream extends OutputStream implements DataOutput { - /** */ - private final HadoopOffheapBuffer buf = new HadoopOffheapBuffer(0, 0); - - /** */ - private final GridUnsafeMemory mem; - - /** - * @param mem Memory. - */ - public HadoopDataOutStream(GridUnsafeMemory mem) { - this.mem = mem; - } - - /** - * @return Buffer. - */ - public HadoopOffheapBuffer buffer() { - return buf; - } - - /** - * @param size Size. - * @return Old pointer or {@code 0} if move was impossible. - */ - public long move(long size) { - return buf.move(size); - } - - /** {@inheritDoc} */ - @Override public void write(int b) { - writeByte(b); - } - - /** {@inheritDoc} */ - @Override public void write(byte[] b) { - write(b, 0, b.length); - } - - /** {@inheritDoc} */ - @Override public void write(byte[] b, int off, int len) { - GridUnsafe.copyMemory(b, GridUnsafe.BYTE_ARR_OFF + off, null, move(len), len); - } - - /** {@inheritDoc} */ - @Override public void writeBoolean(boolean v) { - writeByte(v ? 1 : 0); - } - - /** {@inheritDoc} */ - @Override public void writeByte(int v) { - mem.writeByte(move(1), (byte)v); - } - - /** {@inheritDoc} */ - @Override public void writeShort(int v) { - mem.writeShort(move(2), (short)v); - } - - /** {@inheritDoc} */ - @Override public void writeChar(int v) { - writeShort(v); - } - - /** {@inheritDoc} */ - @Override public void writeInt(int v) { - mem.writeInt(move(4), v); - } - - /** {@inheritDoc} */ - @Override public void writeLong(long v) { - mem.writeLong(move(8), v); - } - - /** {@inheritDoc} */ - @Override public void writeFloat(float v) { - mem.writeFloat(move(4), v); - } - - /** {@inheritDoc} */ - @Override public void writeDouble(double v) { - mem.writeDouble(move(8), v); - } - - /** {@inheritDoc} */ - @Override public void writeBytes(String s) { - writeUTF(s); - } - - /** {@inheritDoc} */ - @Override public void writeChars(String s) { - writeUTF(s); - } - - /** {@inheritDoc} */ - @Override public void writeUTF(String s) { - byte[] b = s.getBytes(StandardCharsets.UTF_8); - - writeInt(b.length); - write(b); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java deleted file mode 100644 index acc9be6..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle.streams; - -/** - * Offheap buffer. - */ -public class HadoopOffheapBuffer { - /** Buffer begin address. */ - private long bufPtr; - - /** The first address we do not own. */ - private long bufEnd; - - /** Current read or write pointer. */ - private long posPtr; - - /** - * @param bufPtr Pointer to buffer begin. - * @param bufSize Size of the buffer. - */ - public HadoopOffheapBuffer(long bufPtr, long bufSize) { - set(bufPtr, bufSize); - } - - /** - * @param bufPtr Pointer to buffer begin. - * @param bufSize Size of the buffer. - */ - public void set(long bufPtr, long bufSize) { - this.bufPtr = bufPtr; - - posPtr = bufPtr; - bufEnd = bufPtr + bufSize; - } - - /** - * @return Pointer to internal buffer begin. - */ - public long begin() { - return bufPtr; - } - - /** - * @return Buffer capacity. - */ - public long capacity() { - return bufEnd - bufPtr; - } - - /** - * @return Remaining capacity. - */ - public long remaining() { - return bufEnd - posPtr; - } - - /** - * @return Absolute pointer to the current position inside of the buffer. - */ - public long pointer() { - return posPtr; - } - - /** - * @param ptr Absolute pointer to the current position inside of the buffer. - */ - public void pointer(long ptr) { - assert ptr >= bufPtr : bufPtr + " <= " + ptr; - assert ptr <= bufEnd : bufEnd + " <= " + bufPtr; - - posPtr = ptr; - } - - /** - * @param size Size move on. - * @return Old position pointer or {@code 0} if move goes beyond the end of the buffer. - */ - public long move(long size) { - assert size > 0 : size; - - long oldPos = posPtr; - long newPos = oldPos + size; - - if (newPos > bufEnd) - return 0; - - posPtr = newPos; - - return oldPos; - } - - /** - * @param ptr Pointer. - * @return {@code true} If the given pointer is inside of this buffer. - */ - public boolean isInside(long ptr) { - return ptr >= bufPtr && ptr <= bufEnd; - } - - /** - * Resets position to the beginning of buffer. - */ - public void reset() { - posPtr = bufPtr; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java deleted file mode 100644 index 5ede18e..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor; - -import java.util.Collection; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; -import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput; -import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobMetadata; -import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker; -import org.apache.ignite.internal.util.GridConcurrentHashSet; -import org.apache.ignite.internal.util.typedef.internal.U; - - -/** - * Task executor. - */ -public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter { - /** Job tracker. */ - private HadoopJobTracker jobTracker; - - /** */ - private final ConcurrentMap<HadoopJobId, Collection<HadoopRunnableTask>> jobs = new ConcurrentHashMap<>(); - - /** Executor service to run tasks. */ - private HadoopExecutorService exec; - - /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { - super.onKernalStart(); - - jobTracker = ctx.jobTracker(); - - exec = new HadoopExecutorService(log, ctx.kernalContext().gridName(), - ctx.configuration().getMaxParallelTasks(), ctx.configuration().getMaxTaskQueueSize()); - } - - /** {@inheritDoc} */ - @Override public void onKernalStop(boolean cancel) { - if (exec != null) { - exec.shutdown(3000); - - if (cancel) { - for (HadoopJobId jobId : jobs.keySet()) - cancelTasks(jobId); - } - } - } - - /** {@inheritDoc} */ - @Override public void stop(boolean cancel) { - if (exec != null && !exec.shutdown(30000)) - U.warn(log, "Failed to finish running tasks in 30 sec."); - } - - /** {@inheritDoc} */ - @Override public void run(final HadoopJob job, Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException { - if (log.isDebugEnabled()) - log.debug("Submitting tasks for local execution [locNodeId=" + ctx.localNodeId() + - ", tasksCnt=" + tasks.size() + ']'); - - Collection<HadoopRunnableTask> executedTasks = jobs.get(job.id()); - - if (executedTasks == null) { - executedTasks = new GridConcurrentHashSet<>(); - - Collection<HadoopRunnableTask> extractedCol = jobs.put(job.id(), executedTasks); - - assert extractedCol == null; - } - - final Collection<HadoopRunnableTask> finalExecutedTasks = executedTasks; - - for (final HadoopTaskInfo info : tasks) { - assert info != null; - - HadoopRunnableTask task = new HadoopRunnableTask(log, job, ctx.shuffle().memory(), info, - ctx.localNodeId()) { - @Override protected void onTaskFinished(HadoopTaskStatus status) { - if (log.isDebugEnabled()) - log.debug("Finished task execution [jobId=" + job.id() + ", taskInfo=" + info + ", " + - "waitTime=" + waitTime() + ", execTime=" + executionTime() + ']'); - - finalExecutedTasks.remove(this); - - jobTracker.onTaskFinished(info, status); - } - - @Override protected HadoopTaskInput createInput(HadoopTaskContext taskCtx) throws IgniteCheckedException { - return ctx.shuffle().input(taskCtx); - } - - @Override protected HadoopTaskOutput createOutput(HadoopTaskContext taskCtx) throws IgniteCheckedException { - return ctx.shuffle().output(taskCtx); - } - }; - - executedTasks.add(task); - - exec.submit(task); - } - } - - /** - * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks - * for this job ID. - * <p> - * It is guaranteed that this method will not be called concurrently with - * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method. No more job submissions will be performed via - * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method for given job ID after this method is called. - * - * @param jobId Job ID to cancel. - */ - @Override public void cancelTasks(HadoopJobId jobId) { - Collection<HadoopRunnableTask> executedTasks = jobs.get(jobId); - - if (executedTasks != null) { - for (HadoopRunnableTask task : executedTasks) - task.cancel(); - } - } - - /** {@inheritDoc} */ - @Override public void onJobStateChanged(HadoopJobMetadata meta) throws IgniteCheckedException { - if (meta.phase() == HadoopJobPhase.PHASE_COMPLETE) { - Collection<HadoopRunnableTask> executedTasks = jobs.remove(meta.jobId()); - - assert executedTasks == null || executedTasks.isEmpty(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java deleted file mode 100644 index 993ecc9..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java +++ /dev/null @@ -1,234 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor; - - -import java.util.Collection; -import java.util.concurrent.Callable; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; -import org.apache.ignite.internal.util.worker.GridWorker; -import org.apache.ignite.internal.util.worker.GridWorkerListener; -import org.apache.ignite.internal.util.worker.GridWorkerListenerAdapter; -import org.apache.ignite.thread.IgniteThread; -import org.jsr166.ConcurrentHashMap8; - -import static java.util.Collections.newSetFromMap; - -/** - * Executor service without thread pooling. - */ -public class HadoopExecutorService { - /** */ - private final LinkedBlockingQueue<Callable<?>> queue; - - /** */ - private final Collection<GridWorker> workers = newSetFromMap(new ConcurrentHashMap8<GridWorker, Boolean>()); - - /** */ - private final AtomicInteger active = new AtomicInteger(); - - /** */ - private final int maxTasks; - - /** */ - private final String gridName; - - /** */ - private final IgniteLogger log; - - /** */ - private volatile boolean shutdown; - - /** */ - private final GridWorkerListener lsnr = new GridWorkerListenerAdapter() { - @Override public void onStopped(GridWorker w) { - workers.remove(w); - - if (shutdown) { - active.decrementAndGet(); - - return; - } - - Callable<?> task = queue.poll(); - - if (task != null) - startThread(task); - else { - active.decrementAndGet(); - - if (!queue.isEmpty()) - startFromQueue(); - } - } - }; - - /** - * @param log Logger. - * @param gridName Grid name. - * @param maxTasks Max number of tasks. - * @param maxQueue Max queue length. - */ - public HadoopExecutorService(IgniteLogger log, String gridName, int maxTasks, int maxQueue) { - assert maxTasks > 0 : maxTasks; - assert maxQueue > 0 : maxQueue; - - this.maxTasks = maxTasks; - this.queue = new LinkedBlockingQueue<>(maxQueue); - this.gridName = gridName; - this.log = log.getLogger(HadoopExecutorService.class); - } - - /** - * @return Number of active workers. - */ - public int active() { - return workers.size(); - } - - /** - * Submit task. - * - * @param task Task. - */ - public void submit(Callable<?> task) { - while (queue.isEmpty()) { - int active0 = active.get(); - - if (active0 == maxTasks) - break; - - if (active.compareAndSet(active0, active0 + 1)) { - startThread(task); - - return; // Started in new thread bypassing queue. - } - } - - try { - while (!queue.offer(task, 100, TimeUnit.MILLISECONDS)) { - if (shutdown) - return; // Rejected due to shutdown. - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - return; - } - - startFromQueue(); - } - - /** - * Attempts to start task from queue. - */ - private void startFromQueue() { - do { - int active0 = active.get(); - - if (active0 == maxTasks) - break; - - if (active.compareAndSet(active0, active0 + 1)) { - Callable<?> task = queue.poll(); - - if (task == null) { - int res = active.decrementAndGet(); - - assert res >= 0 : res; - - break; - } - - startThread(task); - } - } - while (!queue.isEmpty()); - } - - /** - * @param task Task. - */ - private void startThread(final Callable<?> task) { - String workerName; - - if (task instanceof HadoopRunnableTask) { - final HadoopTaskInfo i = ((HadoopRunnableTask)task).taskInfo(); - - workerName = "Hadoop-task-" + i.jobId() + "-" + i.type() + "-" + i.taskNumber() + "-" + i.attempt(); - } - else - workerName = task.toString(); - - GridWorker w = new GridWorker(gridName, workerName, log, lsnr) { - @Override protected void body() { - try { - task.call(); - } - catch (Exception e) { - log.error("Failed to execute task: " + task, e); - } - } - }; - - workers.add(w); - - if (shutdown) - w.cancel(); - - new IgniteThread(w).start(); - } - - /** - * Shuts down this executor service. - * - * @param awaitTimeMillis Time in milliseconds to wait for tasks completion. - * @return {@code true} If all tasks completed. - */ - public boolean shutdown(long awaitTimeMillis) { - shutdown = true; - - for (GridWorker w : workers) - w.cancel(); - - while (awaitTimeMillis > 0 && !workers.isEmpty()) { - try { - Thread.sleep(100); - - awaitTimeMillis -= 100; - } - catch (InterruptedException e) { - break; - } - } - - return workers.isEmpty(); - } - - /** - * @return {@code true} If method {@linkplain #shutdown(long)} was already called. - */ - public boolean isShutdown() { - return shutdown; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java deleted file mode 100644 index a57efe6..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java +++ /dev/null @@ -1,293 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor; - -import java.util.UUID; -import java.util.concurrent.Callable; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter; -import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopHashMultimap; -import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopMultimap; -import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipList; -import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; -import org.apache.ignite.internal.util.typedef.internal.U; - -import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.COMBINER_HASHMAP_SIZE; -import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_COMBINER_NO_SORTING; -import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get; -import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.COMBINE; -import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.MAP; - -/** - * Runnable task. - */ -public abstract class HadoopRunnableTask implements Callable<Void> { - /** */ - private final GridUnsafeMemory mem; - - /** */ - private final IgniteLogger log; - - /** */ - private final HadoopJob job; - - /** Task to run. */ - private final HadoopTaskInfo info; - - /** Submit time. */ - private final long submitTs = U.currentTimeMillis(); - - /** Execution start timestamp. */ - private long execStartTs; - - /** Execution end timestamp. */ - private long execEndTs; - - /** */ - private HadoopMultimap combinerInput; - - /** */ - private volatile HadoopTaskContext ctx; - - /** Set if task is to cancelling. */ - private volatile boolean cancelled; - - /** Node id. */ - private UUID nodeId; - - /** - * @param log Log. - * @param job Job. - * @param mem Memory. - * @param info Task info. - * @param nodeId Node id. - */ - protected HadoopRunnableTask(IgniteLogger log, HadoopJob job, GridUnsafeMemory mem, HadoopTaskInfo info, - UUID nodeId) { - this.nodeId = nodeId; - this.log = log.getLogger(HadoopRunnableTask.class); - this.job = job; - this.mem = mem; - this.info = info; - } - - /** - * @return Wait time. - */ - public long waitTime() { - return execStartTs - submitTs; - } - - /** - * @return Execution time. - */ - public long executionTime() { - return execEndTs - execStartTs; - } - - /** {@inheritDoc} */ - @Override public Void call() throws IgniteCheckedException { - ctx = job.getTaskContext(info); - - return ctx.runAsJobOwner(new Callable<Void>() { - @Override public Void call() throws Exception { - call0(); - - return null; - } - }); - } - - /** - * Implements actual task running. - * @throws IgniteCheckedException - */ - void call0() throws IgniteCheckedException { - execStartTs = U.currentTimeMillis(); - - Throwable err = null; - - HadoopTaskState state = HadoopTaskState.COMPLETED; - - HadoopPerformanceCounter perfCntr = null; - - try { - perfCntr = HadoopPerformanceCounter.getCounter(ctx.counters(), nodeId); - - perfCntr.onTaskSubmit(info, submitTs); - perfCntr.onTaskPrepare(info, execStartTs); - - ctx.prepareTaskEnvironment(); - - runTask(perfCntr); - - if (info.type() == MAP && job.info().hasCombiner()) { - ctx.taskInfo(new HadoopTaskInfo(COMBINE, info.jobId(), info.taskNumber(), info.attempt(), null)); - - try { - runTask(perfCntr); - } - finally { - ctx.taskInfo(info); - } - } - } - catch (HadoopTaskCancelledException ignored) { - state = HadoopTaskState.CANCELED; - } - catch (Throwable e) { - state = HadoopTaskState.FAILED; - err = e; - - U.error(log, "Task execution failed.", e); - - if (e instanceof Error) - throw e; - } - finally { - execEndTs = U.currentTimeMillis(); - - if (perfCntr != null) - perfCntr.onTaskFinish(info, execEndTs); - - onTaskFinished(new HadoopTaskStatus(state, err, ctx==null ? null : ctx.counters())); - - if (combinerInput != null) - combinerInput.close(); - - if (ctx != null) - ctx.cleanupTaskEnvironment(); - } - } - - /** - * @param perfCntr Performance counter. - * @throws IgniteCheckedException If failed. - */ - private void runTask(HadoopPerformanceCounter perfCntr) throws IgniteCheckedException { - if (cancelled) - throw new HadoopTaskCancelledException("Task cancelled."); - - try (HadoopTaskOutput out = createOutputInternal(ctx); - HadoopTaskInput in = createInputInternal(ctx)) { - - ctx.input(in); - ctx.output(out); - - perfCntr.onTaskStart(ctx.taskInfo(), U.currentTimeMillis()); - - ctx.run(); - } - } - - /** - * Cancel the executed task. - */ - public void cancel() { - cancelled = true; - - if (ctx != null) - ctx.cancel(); - } - - /** - * @param status Task status. - */ - protected abstract void onTaskFinished(HadoopTaskStatus status); - - /** - * @param ctx Task context. - * @return Task input. - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings("unchecked") - private HadoopTaskInput createInputInternal(HadoopTaskContext ctx) throws IgniteCheckedException { - switch (ctx.taskInfo().type()) { - case SETUP: - case MAP: - case COMMIT: - case ABORT: - return null; - - case COMBINE: - assert combinerInput != null; - - return combinerInput.input(ctx); - - default: - return createInput(ctx); - } - } - - /** - * @param ctx Task context. - * @return Input. - * @throws IgniteCheckedException If failed. - */ - protected abstract HadoopTaskInput createInput(HadoopTaskContext ctx) throws IgniteCheckedException; - - /** - * @param ctx Task info. - * @return Output. - * @throws IgniteCheckedException If failed. - */ - protected abstract HadoopTaskOutput createOutput(HadoopTaskContext ctx) throws IgniteCheckedException; - - /** - * @param ctx Task info. - * @return Task output. - * @throws IgniteCheckedException If failed. - */ - private HadoopTaskOutput createOutputInternal(HadoopTaskContext ctx) throws IgniteCheckedException { - switch (ctx.taskInfo().type()) { - case SETUP: - case REDUCE: - case COMMIT: - case ABORT: - return null; - - case MAP: - if (job.info().hasCombiner()) { - assert combinerInput == null; - - combinerInput = get(job.info(), SHUFFLE_COMBINER_NO_SORTING, false) ? - new HadoopHashMultimap(job.info(), mem, get(job.info(), COMBINER_HASHMAP_SIZE, 8 * 1024)): - new HadoopSkipList(job.info(), mem); // TODO replace with red-black tree - - return combinerInput.startAdding(ctx); - } - - default: - return createOutput(ctx); - } - } - - /** - * @return Task info. - */ - public HadoopTaskInfo taskInfo() { - return info; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java deleted file mode 100644 index f13c76a..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor; - -import java.util.Collection; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.hadoop.HadoopComponent; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; -import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobMetadata; - -/** - * Common superclass for task executor. - */ -public abstract class HadoopTaskExecutorAdapter extends HadoopComponent { - /** - * Runs tasks. - * - * @param job Job. - * @param tasks Tasks. - * @throws IgniteCheckedException If failed. - */ - public abstract void run(final HadoopJob job, Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException; - - /** - * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks - * for this job ID. - * <p> - * It is guaranteed that this method will not be called concurrently with - * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method. No more job submissions will be performed via - * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method for given job ID after this method is called. - * - * @param jobId Job ID to cancel. - */ - public abstract void cancelTasks(HadoopJobId jobId) throws IgniteCheckedException; - - /** - * On job state change callback; - * - * @param meta Job metadata. - */ - public abstract void onJobStateChanged(HadoopJobMetadata meta) throws IgniteCheckedException; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java deleted file mode 100644 index b22d291..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor; - -/** -* State of the task. -*/ -public enum HadoopTaskState { - /** Running task. */ - RUNNING, - - /** Completed task. */ - COMPLETED, - - /** Failed task. */ - FAILED, - - /** Canceled task. */ - CANCELED, - - /** Process crashed. */ - CRASHED -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java deleted file mode 100644 index fa09ff7..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.jetbrains.annotations.Nullable; - -/** - * Task status. - */ -public class HadoopTaskStatus implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private HadoopTaskState state; - - /** */ - private Throwable failCause; - - /** */ - private HadoopCounters cntrs; - - /** - * Default constructor required by {@link Externalizable}. - */ - public HadoopTaskStatus() { - // No-op. - } - - /** - * Creates new instance. - * - * @param state Task state. - * @param failCause Failure cause (if any). - */ - public HadoopTaskStatus(HadoopTaskState state, @Nullable Throwable failCause) { - this(state, failCause, null); - } - - /** - * Creates new instance. - * - * @param state Task state. - * @param failCause Failure cause (if any). - * @param cntrs Task counters. - */ - public HadoopTaskStatus(HadoopTaskState state, @Nullable Throwable failCause, - @Nullable HadoopCounters cntrs) { - assert state != null; - - this.state = state; - this.failCause = failCause; - this.cntrs = cntrs; - } - - /** - * @return State. - */ - public HadoopTaskState state() { - return state; - } - - /** - * @return Fail cause. - */ - @Nullable public Throwable failCause() { - return failCause; - } - - /** - * @return Counters. - */ - @Nullable public HadoopCounters counters() { - return cntrs; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopTaskStatus.class, this); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(state); - out.writeObject(failCause); - out.writeObject(cntrs); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - state = (HadoopTaskState)in.readObject(); - failCause = (Throwable)in.readObject(); - cntrs = (HadoopCounters)in.readObject(); - } -} \ No newline at end of file