http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopMultimapBase.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopMultimapBase.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopMultimapBase.java new file mode 100644 index 0000000..39b7c51 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopMultimapBase.java @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.collections; + +import java.io.DataInput; +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopSerialization; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataInStream; +import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataOutStream; +import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopOffheapBuffer; +import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_OFFHEAP_PAGE_SIZE; +import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get; + +/** + * Base class for all multimaps. + */ +public abstract class HadoopMultimapBase implements HadoopMultimap { + /** */ + protected final GridUnsafeMemory mem; + + /** */ + protected final int pageSize; + + /** */ + private final Collection<Page> allPages = new ConcurrentLinkedQueue<>(); + + /** + * @param jobInfo Job info. + * @param mem Memory. + */ + protected HadoopMultimapBase(HadoopJobInfo jobInfo, GridUnsafeMemory mem) { + assert jobInfo != null; + assert mem != null; + + this.mem = mem; + + pageSize = get(jobInfo, SHUFFLE_OFFHEAP_PAGE_SIZE, 32 * 1024); + } + + /** + * @param page Page. + */ + private void deallocate(Page page) { + assert page != null; + + mem.release(page.ptr, page.size); + } + + /** + * @param valPtr Value page pointer. + * @param nextValPtr Next value page pointer. + */ + protected void nextValue(long valPtr, long nextValPtr) { + mem.writeLong(valPtr, nextValPtr); + } + + /** + * @param valPtr Value page pointer. + * @return Next value page pointer. + */ + protected long nextValue(long valPtr) { + return mem.readLong(valPtr); + } + + /** + * @param valPtr Value page pointer. + * @param size Size. + */ + protected void valueSize(long valPtr, int size) { + mem.writeInt(valPtr + 8, size); + } + + /** + * @param valPtr Value page pointer. + * @return Value size. + */ + protected int valueSize(long valPtr) { + return mem.readInt(valPtr + 8); + } + + /** {@inheritDoc} */ + @Override public void close() { + for (Page page : allPages) + deallocate(page); + } + + /** + * Reader for key and value. + */ + protected class ReaderBase implements AutoCloseable { + /** */ + private Object tmp; + + /** */ + private final HadoopSerialization ser; + + /** */ + private final HadoopDataInStream in = new HadoopDataInStream(mem); + + /** + * @param ser Serialization. + */ + protected ReaderBase(HadoopSerialization ser) { + assert ser != null; + + this.ser = ser; + } + + /** + * @param valPtr Value page pointer. + * @return Value. + */ + public Object readValue(long valPtr) { + assert valPtr > 0 : valPtr; + + try { + return read(valPtr + 12, valueSize(valPtr)); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** + * Resets temporary object to the given one. + * + * @param tmp Temporary object for reuse. + */ + public void resetReusedObject(Object tmp) { + this.tmp = tmp; + } + + /** + * @param ptr Pointer. + * @param size Object size. + * @return Object. + */ + protected Object read(long ptr, long size) throws IgniteCheckedException { + in.buffer().set(ptr, size); + + tmp = ser.read(in, tmp); + + return tmp; + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteCheckedException { + ser.close(); + } + } + + /** + * Base class for adders. + */ + protected abstract class AdderBase implements Adder { + /** */ + protected final HadoopSerialization keySer; + + /** */ + protected final HadoopSerialization valSer; + + /** */ + private final HadoopDataOutStream out; + + /** */ + private long writeStart; + + /** Current page. */ + private Page curPage; + + /** + * @param ctx Task context. + * @throws IgniteCheckedException If failed. + */ + protected AdderBase(HadoopTaskContext ctx) throws IgniteCheckedException { + valSer = ctx.valueSerialization(); + keySer = ctx.keySerialization(); + + out = new HadoopDataOutStream(mem) { + @Override public long move(long size) { + long ptr = super.move(size); + + if (ptr == 0) // Was not able to move - not enough free space. + ptr = allocateNextPage(size); + + assert ptr != 0; + + return ptr; + } + }; + } + + /** + * @param requestedSize Requested size. + * @return Next write pointer. + */ + private long allocateNextPage(long requestedSize) { + int writtenSize = writtenSize(); + + long newPageSize = nextPageSize(writtenSize + requestedSize); + long newPagePtr = mem.allocate(newPageSize); + + HadoopOffheapBuffer b = out.buffer(); + + b.set(newPagePtr, newPageSize); + + if (writtenSize != 0) { + mem.copyMemory(writeStart, newPagePtr, writtenSize); + + b.move(writtenSize); + } + + writeStart = newPagePtr; + + // At this point old page is not needed, so we release it. + Page oldPage = curPage; + + curPage = new Page(newPagePtr, newPageSize); + + if (oldPage != null) + allPages.add(oldPage); + + return b.move(requestedSize); + } + + /** + * Get next page size. + * + * @param required Required amount of data. + * @return Next page size. + */ + private long nextPageSize(long required) { + long pages = (required / pageSize) + 1; + + long pagesPow2 = nextPowerOfTwo(pages); + + return pagesPow2 * pageSize; + } + + /** + * Get next power of two which greater or equal to the given number. Naive implementation. + * + * @param val Number + * @return Nearest pow2. + */ + private long nextPowerOfTwo(long val) { + long res = 1; + + while (res < val) + res = res << 1; + + if (res < 0) + throw new IllegalArgumentException("Value is too big to find positive pow2: " + val); + + return res; + } + + /** + * @return Fixed pointer. + */ + private long fixAlignment() { + HadoopOffheapBuffer b = out.buffer(); + + long ptr = b.pointer(); + + if ((ptr & 7L) != 0) { // Address is not aligned by octet. + ptr = (ptr + 8L) & ~7L; + + b.pointer(ptr); + } + + return ptr; + } + + /** + * @param off Offset. + * @param o Object. + * @return Page pointer. + * @throws IgniteCheckedException If failed. + */ + protected long write(int off, Object o, HadoopSerialization ser) throws IgniteCheckedException { + writeStart = fixAlignment(); + + if (off != 0) + out.move(off); + + ser.write(out, o); + + return writeStart; + } + + /** + * @param size Size. + * @return Pointer. + */ + protected long allocate(int size) { + writeStart = fixAlignment(); + + out.move(size); + + return writeStart; + } + + /** + * Rewinds local allocation pointer to the given pointer if possible. + * + * @param ptr Pointer. + */ + protected void localDeallocate(long ptr) { + HadoopOffheapBuffer b = out.buffer(); + + if (b.isInside(ptr)) + b.pointer(ptr); + else + b.reset(); + } + + /** + * @return Written size. + */ + protected int writtenSize() { + return (int)(out.buffer().pointer() - writeStart); + } + + /** {@inheritDoc} */ + @Override public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteCheckedException { + if (curPage != null) + allPages.add(curPage); + + keySer.close(); + valSer.close(); + } + } + + /** + * Iterator over values. + */ + protected class ValueIterator implements Iterator<Object> { + /** */ + private long valPtr; + + /** */ + private final ReaderBase valReader; + + /** + * @param valPtr Value page pointer. + * @param valReader Value reader. + */ + protected ValueIterator(long valPtr, ReaderBase valReader) { + this.valPtr = valPtr; + this.valReader = valReader; + } + + /** + * @param valPtr Head value pointer. + */ + public void head(long valPtr) { + this.valPtr = valPtr; + } + + /** {@inheritDoc} */ + @Override public boolean hasNext() { + return valPtr != 0; + } + + /** {@inheritDoc} */ + @Override public Object next() { + if (!hasNext()) + throw new NoSuchElementException(); + + Object res = valReader.readValue(valPtr); + + valPtr = nextValue(valPtr); + + return res; + } + + /** {@inheritDoc} */ + @Override public void remove() { + throw new UnsupportedOperationException(); + } + } + + /** + * Page. + */ + private static class Page { + /** Pointer. */ + private final long ptr; + + /** Size. */ + private final long size; + + /** + * Constructor. + * + * @param ptr Pointer. + * @param size Size. + */ + public Page(long ptr, long size) { + this.ptr = ptr; + this.size = size; + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipList.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipList.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipList.java new file mode 100644 index 0000000..7db88bc --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipList.java @@ -0,0 +1,733 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.collections; + +import java.io.DataInput; +import java.util.Comparator; +import java.util.Iterator; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopSerialization; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput; +import org.apache.ignite.internal.util.GridLongList; +import org.apache.ignite.internal.util.GridRandom; +import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.jetbrains.annotations.Nullable; + +/** + * Skip list. + */ +public class HadoopSkipList extends HadoopMultimapBase { + /** */ + private static final int HEADS_SIZE = 24 + 33 * 8; // Offset + max level is from 0 to 32 inclusive. + + /** Top level. */ + private final AtomicInteger topLevel = new AtomicInteger(-1); + + /** Heads for all the lists. */ + private final long heads; + + /** */ + private final AtomicBoolean visitGuard = new AtomicBoolean(); + + /** + * @param jobInfo Job info. + * @param mem Memory. + */ + public HadoopSkipList(HadoopJobInfo jobInfo, GridUnsafeMemory mem) { + super(jobInfo, mem); + + heads = mem.allocate(HEADS_SIZE, true); + } + + /** {@inheritDoc} */ + @Override public void close() { + super.close(); + + mem.release(heads, HEADS_SIZE); + } + + /** {@inheritDoc} */ + @Override public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException { + if (!visitGuard.compareAndSet(false, true)) + return false; + + for (long meta = nextMeta(heads, 0); meta != 0L; meta = nextMeta(meta, 0)) { + long valPtr = value(meta); + + long lastVisited = ignoreLastVisited ? 0 : lastVisitedValue(meta); + + if (valPtr != lastVisited) { + long k = key(meta); + + v.onKey(k + 4, keySize(k)); + + lastVisitedValue(meta, valPtr); // Set it to the first value in chain. + + do { + v.onValue(valPtr + 12, valueSize(valPtr)); + + valPtr = nextValue(valPtr); + } + while (valPtr != lastVisited); + } + } + + visitGuard.lazySet(false); + + return true; + } + + /** {@inheritDoc} */ + @Override public Adder startAdding(HadoopTaskContext ctx) throws IgniteCheckedException { + return new AdderImpl(ctx); + } + + /** {@inheritDoc} */ + @Override public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException { + Input in = new Input(taskCtx); + + Comparator<Object> grpCmp = taskCtx.groupComparator(); + + if (grpCmp != null) + return new GroupedInput(grpCmp, in); + + return in; + } + + /** + * @param meta Meta pointer. + * @return Key pointer. + */ + private long key(long meta) { + return mem.readLong(meta); + } + + /** + * @param meta Meta pointer. + * @param key Key pointer. + */ + private void key(long meta, long key) { + mem.writeLong(meta, key); + } + + /** + * @param meta Meta pointer. + * @return Value pointer. + */ + private long value(long meta) { + return mem.readLongVolatile(meta + 8); + } + + /** + * @param meta Meta pointer. + * @param valPtr Value pointer. + */ + private void value(long meta, long valPtr) { + mem.writeLongVolatile(meta + 8, valPtr); + } + + /** + * @param meta Meta pointer. + * @param oldValPtr Old first value pointer. + * @param newValPtr New first value pointer. + * @return {@code true} If operation succeeded. + */ + private boolean casValue(long meta, long oldValPtr, long newValPtr) { + return mem.casLong(meta + 8, oldValPtr, newValPtr); + } + + /** + * @param meta Meta pointer. + * @return Last visited value pointer. + */ + private long lastVisitedValue(long meta) { + return mem.readLong(meta + 16); + } + + /** + * @param meta Meta pointer. + * @param valPtr Last visited value pointer. + */ + private void lastVisitedValue(long meta, long valPtr) { + mem.writeLong(meta + 16, valPtr); + } + + /** + * @param meta Meta pointer. + * @param level Level. + * @return Next meta pointer. + */ + private long nextMeta(long meta, int level) { + assert meta > 0 : meta; + + return mem.readLongVolatile(meta + 24 + 8 * level); + } + + /** + * @param meta Meta pointer. + * @param level Level. + * @param oldNext Old next meta pointer. + * @param newNext New next meta pointer. + * @return {@code true} If operation succeeded. + */ + private boolean casNextMeta(long meta, int level, long oldNext, long newNext) { + assert meta > 0 : meta; + + return mem.casLong(meta + 24 + 8 * level, oldNext, newNext); + } + + /** + * @param meta Meta pointer. + * @param level Level. + * @param nextMeta Next meta. + */ + private void nextMeta(long meta, int level, long nextMeta) { + assert meta != 0; + + mem.writeLong(meta + 24 + 8 * level, nextMeta); + } + + /** + * @param keyPtr Key pointer. + * @return Key size. + */ + private int keySize(long keyPtr) { + return mem.readInt(keyPtr); + } + + /** + * @param keyPtr Key pointer. + * @param keySize Key size. + */ + private void keySize(long keyPtr, int keySize) { + mem.writeInt(keyPtr, keySize); + } + + /** + * @param rnd Random. + * @return Next level. + */ + public static int randomLevel(Random rnd) { + int x = rnd.nextInt(); + + int level = 0; + + while ((x & 1) != 0) { // Count sequential 1 bits. + level++; + + x >>>= 1; + } + + return level; + } + + /** + * Reader. + */ + private class Reader extends ReaderBase { + /** + * @param ser Serialization. + */ + protected Reader(HadoopSerialization ser) { + super(ser); + } + + /** + * @param meta Meta pointer. + * @return Key. + */ + public Object readKey(long meta) { + assert meta > 0 : meta; + + long k = key(meta); + + try { + return read(k + 4, keySize(k)); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + } + + /** + * Adder. + */ + private class AdderImpl extends AdderBase { + /** */ + private final Comparator<Object> cmp; + + /** */ + private final Random rnd = new GridRandom(); + + /** */ + private final GridLongList stack = new GridLongList(16); + + /** */ + private final Reader keyReader; + + /** + * @param ctx Task context. + * @throws IgniteCheckedException If failed. + */ + protected AdderImpl(HadoopTaskContext ctx) throws IgniteCheckedException { + super(ctx); + + keyReader = new Reader(keySer); + + cmp = ctx.sortComparator(); + } + + /** {@inheritDoc} */ + @Override public void write(Object key, Object val) throws IgniteCheckedException { + A.notNull(val, "val"); + + add(key, val); + } + + /** {@inheritDoc} */ + @Override public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException { + KeyImpl k = reuse == null ? new KeyImpl() : (KeyImpl)reuse; + + k.tmpKey = keySer.read(in, k.tmpKey); + + k.meta = add(k.tmpKey, null); + + return k; + } + + /** + * @param key Key. + * @param val Value. + * @param level Level. + * @return Meta pointer. + */ + private long createMeta(long key, long val, int level) { + int size = 32 + 8 * level; + + long meta = allocate(size); + + key(meta, key); + value(meta, val); + lastVisitedValue(meta, 0L); + + for (int i = 32; i < size; i += 8) // Fill with 0. + mem.writeLong(meta + i, 0L); + + return meta; + } + + /** + * @param key Key. + * @return Pointer. + * @throws IgniteCheckedException If failed. + */ + private long writeKey(Object key) throws IgniteCheckedException { + long keyPtr = write(4, key, keySer); + int keySize = writtenSize() - 4; + + keySize(keyPtr, keySize); + + return keyPtr; + } + + /** + * @param prevMeta Previous meta. + * @param meta Next meta. + */ + private void stackPush(long prevMeta, long meta) { + stack.add(prevMeta); + stack.add(meta); + } + + /** + * Drops last remembered frame from the stack. + */ + private void stackPop() { + stack.pop(2); + } + + /** + * @param key Key. + * @param val Value. + * @return Meta pointer. + * @throws IgniteCheckedException If failed. + */ + private long add(Object key, @Nullable Object val) throws IgniteCheckedException { + assert key != null; + + stack.clear(); + + long valPtr = 0; + + if (val != null) { // Write value. + valPtr = write(12, val, valSer); + int valSize = writtenSize() - 12; + + nextValue(valPtr, 0); + valueSize(valPtr, valSize); + } + + long keyPtr = 0; + long newMeta = 0; + int newMetaLevel = -1; + + long prevMeta = heads; + int level = topLevel.get(); + long meta = level < 0 ? 0 : nextMeta(heads, level); + + for (;;) { + if (level < 0) { // We did not find our key, trying to add new meta. + if (keyPtr == 0) { // Write key and create meta only once. + keyPtr = writeKey(key); + + newMetaLevel = randomLevel(rnd); + newMeta = createMeta(keyPtr, valPtr, newMetaLevel); + } + + nextMeta(newMeta, 0, meta); // Set next to new meta before publishing. + + if (casNextMeta(prevMeta, 0, meta, newMeta)) { // New key was added successfully. + laceUp(key, newMeta, newMetaLevel); + + return newMeta; + } + else { // Add failed, need to check out what was added by another thread. + meta = nextMeta(prevMeta, level = 0); + + stackPop(); + } + } + + int cmpRes = cmp(key, meta); + + if (cmpRes == 0) { // Key found. + if (newMeta != 0) // Deallocate if we've allocated something. + localDeallocate(keyPtr); + + if (valPtr == 0) // Only key needs to be added. + return meta; + + for (;;) { // Add value for the key found. + long nextVal = value(meta); + + nextValue(valPtr, nextVal); + + if (casValue(meta, nextVal, valPtr)) + return meta; + } + } + + assert cmpRes != 0; + + if (cmpRes > 0) { // Go right. + prevMeta = meta; + meta = nextMeta(meta, level); + + if (meta != 0) // If nothing to the right then go down. + continue; + } + + while (--level >= 0) { // Go down. + stackPush(prevMeta, meta); // Remember the path. + + long nextMeta = nextMeta(prevMeta, level); + + if (nextMeta != meta) { // If the meta is the same as on upper level go deeper. + meta = nextMeta; + + assert meta != 0; + + break; + } + } + } + } + + /** + * @param key Key. + * @param meta Meta pointer. + * @return Comparison result. + */ + @SuppressWarnings("unchecked") + private int cmp(Object key, long meta) { + assert meta != 0; + + return cmp.compare(key, keyReader.readKey(meta)); + } + + /** + * Adds appropriate index links between metas. + * + * @param newMeta Just added meta. + * @param newMetaLevel New level. + */ + private void laceUp(Object key, long newMeta, int newMetaLevel) { + for (int level = 1; level <= newMetaLevel; level++) { // Go from the bottom up. + long prevMeta = heads; + long meta = 0; + + if (!stack.isEmpty()) { // Get the path back. + meta = stack.remove(); + prevMeta = stack.remove(); + } + + for (;;) { + nextMeta(newMeta, level, meta); + + if (casNextMeta(prevMeta, level, meta, newMeta)) + break; + + long oldMeta = meta; + + meta = nextMeta(prevMeta, level); // Reread meta. + + for (;;) { + int cmpRes = cmp(key, meta); + + if (cmpRes > 0) { // Go right. + prevMeta = meta; + meta = nextMeta(prevMeta, level); + + if (meta != oldMeta) // Old meta already known to be greater than ours or is 0. + continue; + } + + assert cmpRes != 0; // Two different metas with equal keys must be impossible. + + break; // Retry cas. + } + } + } + + if (!stack.isEmpty()) + return; // Our level already lower than top. + + for (;;) { // Raise top level. + int top = topLevel.get(); + + if (newMetaLevel <= top || topLevel.compareAndSet(top, newMetaLevel)) + break; + } + } + + /** + * Key. + */ + private class KeyImpl implements Key { + /** */ + private long meta; + + /** */ + private Object tmpKey; + + /** + * @return Meta pointer for the key. + */ + public long address() { + return meta; + } + + /** + * @param val Value. + */ + @Override public void add(Value val) { + int size = val.size(); + + long valPtr = allocate(size + 12); + + val.copyTo(valPtr + 12); + + valueSize(valPtr, size); + + long nextVal; + + do { + nextVal = value(meta); + + nextValue(valPtr, nextVal); + } + while(!casValue(meta, nextVal, valPtr)); + } + } + } + + /** + * Task input. + */ + private class Input implements HadoopTaskInput { + /** */ + private long metaPtr = heads; + + /** */ + private final Reader keyReader; + + /** */ + private final Reader valReader; + + /** + * @param taskCtx Task context. + * @throws IgniteCheckedException If failed. + */ + private Input(HadoopTaskContext taskCtx) throws IgniteCheckedException { + keyReader = new Reader(taskCtx.keySerialization()); + valReader = new Reader(taskCtx.valueSerialization()); + } + + /** {@inheritDoc} */ + @Override public boolean next() { + metaPtr = nextMeta(metaPtr, 0); + + return metaPtr != 0; + } + + /** {@inheritDoc} */ + @Override public Object key() { + return keyReader.readKey(metaPtr); + } + + /** {@inheritDoc} */ + @Override public Iterator<?> values() { + return new ValueIterator(value(metaPtr), valReader); + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteCheckedException { + keyReader.close(); + valReader.close(); + } + } + + /** + * Grouped input using grouping comparator. + */ + private class GroupedInput implements HadoopTaskInput { + /** */ + private final Comparator<Object> grpCmp; + + /** */ + private final Input in; + + /** */ + private Object prevKey; + + /** */ + private Object nextKey; + + /** */ + private final GridLongList vals = new GridLongList(); + + /** + * @param grpCmp Grouping comparator. + * @param in Input. + */ + private GroupedInput(Comparator<Object> grpCmp, Input in) { + this.grpCmp = grpCmp; + this.in = in; + } + + /** {@inheritDoc} */ + @Override public boolean next() { + if (prevKey == null) { // First call. + if (!in.next()) + return false; + + prevKey = in.key(); + + assert prevKey != null; + + in.keyReader.resetReusedObject(null); // We need 2 instances of key object for comparison. + + vals.add(value(in.metaPtr)); + } + else { + if (in.metaPtr == 0) // We reached the end of the input. + return false; + + vals.clear(); + + vals.add(value(in.metaPtr)); + + in.keyReader.resetReusedObject(prevKey); // Switch key instances. + + prevKey = nextKey; + } + + while (in.next()) { // Fill with head value pointers with equal keys. + if (grpCmp.compare(prevKey, nextKey = in.key()) == 0) + vals.add(value(in.metaPtr)); + else + break; + } + + assert !vals.isEmpty(); + + return true; + } + + /** {@inheritDoc} */ + @Override public Object key() { + return prevKey; + } + + /** {@inheritDoc} */ + @Override public Iterator<?> values() { + assert !vals.isEmpty(); + + final ValueIterator valIter = new ValueIterator(vals.get(0), in.valReader); + + return new Iterator<Object>() { + /** */ + private int idx; + + @Override public boolean hasNext() { + if (!valIter.hasNext()) { + if (++idx == vals.size()) + return false; + + valIter.head(vals.get(idx)); + + assert valIter.hasNext(); + } + + return true; + } + + @Override public Object next() { + return valIter.next(); + } + + @Override public void remove() { + valIter.remove(); + } + }; + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteCheckedException { + in.close(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataInStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataInStream.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataInStream.java new file mode 100644 index 0000000..3b5fa15 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataInStream.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.streams; + +import java.io.DataInput; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; + +/** + * Data input stream. + */ +public class HadoopDataInStream extends InputStream implements DataInput { + /** */ + private final HadoopOffheapBuffer buf = new HadoopOffheapBuffer(0, 0); + + /** */ + private final GridUnsafeMemory mem; + + /** + * @param mem Memory. + */ + public HadoopDataInStream(GridUnsafeMemory mem) { + assert mem != null; + + this.mem = mem; + } + + /** + * @return Buffer. + */ + public HadoopOffheapBuffer buffer() { + return buf; + } + + /** + * @param size Size. + * @return Old pointer. + */ + protected long move(long size) throws IOException { + long ptr = buf.move(size); + + assert ptr != 0; + + return ptr; + } + + /** {@inheritDoc} */ + @Override public int read() throws IOException { + return readUnsignedByte(); + } + + /** {@inheritDoc} */ + @Override public int read(byte[] b, int off, int len) throws IOException { + readFully(b, off, len); + + return len; + } + + /** {@inheritDoc} */ + @Override public long skip(long n) throws IOException { + move(n); + + return n; + } + + /** {@inheritDoc} */ + @Override public void readFully(byte[] b) throws IOException { + readFully(b, 0, b.length); + } + + /** {@inheritDoc} */ + @Override public void readFully(byte[] b, int off, int len) throws IOException { + mem.readBytes(move(len), b, off, len); + } + + /** {@inheritDoc} */ + @Override public int skipBytes(int n) throws IOException { + move(n); + + return n; + } + + /** {@inheritDoc} */ + @Override public boolean readBoolean() throws IOException { + byte res = readByte(); + + if (res == 1) + return true; + + assert res == 0 : res; + + return false; + } + + /** {@inheritDoc} */ + @Override public byte readByte() throws IOException { + return mem.readByte(move(1)); + } + + /** {@inheritDoc} */ + @Override public int readUnsignedByte() throws IOException { + return readByte() & 0xff; + } + + /** {@inheritDoc} */ + @Override public short readShort() throws IOException { + return mem.readShort(move(2)); + } + + /** {@inheritDoc} */ + @Override public int readUnsignedShort() throws IOException { + return readShort() & 0xffff; + } + + /** {@inheritDoc} */ + @Override public char readChar() throws IOException { + return (char)readShort(); + } + + /** {@inheritDoc} */ + @Override public int readInt() throws IOException { + return mem.readInt(move(4)); + } + + /** {@inheritDoc} */ + @Override public long readLong() throws IOException { + return mem.readLong(move(8)); + } + + /** {@inheritDoc} */ + @Override public float readFloat() throws IOException { + return mem.readFloat(move(4)); + } + + /** {@inheritDoc} */ + @Override public double readDouble() throws IOException { + return mem.readDouble(move(8)); + } + + /** {@inheritDoc} */ + @Override public String readLine() throws IOException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public String readUTF() throws IOException { + byte[] bytes = new byte[readInt()]; + + if (bytes.length != 0) + readFully(bytes); + + return new String(bytes, StandardCharsets.UTF_8); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataOutStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataOutStream.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataOutStream.java new file mode 100644 index 0000000..f7b1a73 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataOutStream.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.streams; + +import java.io.DataOutput; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; + +/** + * Data output stream. + */ +public class HadoopDataOutStream extends OutputStream implements DataOutput { + /** */ + private final HadoopOffheapBuffer buf = new HadoopOffheapBuffer(0, 0); + + /** */ + private final GridUnsafeMemory mem; + + /** + * @param mem Memory. + */ + public HadoopDataOutStream(GridUnsafeMemory mem) { + this.mem = mem; + } + + /** + * @return Buffer. + */ + public HadoopOffheapBuffer buffer() { + return buf; + } + + /** + * @param size Size. + * @return Old pointer or {@code 0} if move was impossible. + */ + public long move(long size) { + return buf.move(size); + } + + /** {@inheritDoc} */ + @Override public void write(int b) { + writeByte(b); + } + + /** {@inheritDoc} */ + @Override public void write(byte[] b) { + write(b, 0, b.length); + } + + /** {@inheritDoc} */ + @Override public void write(byte[] b, int off, int len) { + GridUnsafe.copyMemory(b, GridUnsafe.BYTE_ARR_OFF + off, null, move(len), len); + } + + /** {@inheritDoc} */ + @Override public void writeBoolean(boolean v) { + writeByte(v ? 1 : 0); + } + + /** {@inheritDoc} */ + @Override public void writeByte(int v) { + mem.writeByte(move(1), (byte)v); + } + + /** {@inheritDoc} */ + @Override public void writeShort(int v) { + mem.writeShort(move(2), (short)v); + } + + /** {@inheritDoc} */ + @Override public void writeChar(int v) { + writeShort(v); + } + + /** {@inheritDoc} */ + @Override public void writeInt(int v) { + mem.writeInt(move(4), v); + } + + /** {@inheritDoc} */ + @Override public void writeLong(long v) { + mem.writeLong(move(8), v); + } + + /** {@inheritDoc} */ + @Override public void writeFloat(float v) { + mem.writeFloat(move(4), v); + } + + /** {@inheritDoc} */ + @Override public void writeDouble(double v) { + mem.writeDouble(move(8), v); + } + + /** {@inheritDoc} */ + @Override public void writeBytes(String s) { + writeUTF(s); + } + + /** {@inheritDoc} */ + @Override public void writeChars(String s) { + writeUTF(s); + } + + /** {@inheritDoc} */ + @Override public void writeUTF(String s) { + byte[] b = s.getBytes(StandardCharsets.UTF_8); + + writeInt(b.length); + write(b); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopOffheapBuffer.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopOffheapBuffer.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopOffheapBuffer.java new file mode 100644 index 0000000..acc9be6 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopOffheapBuffer.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.streams; + +/** + * Offheap buffer. + */ +public class HadoopOffheapBuffer { + /** Buffer begin address. */ + private long bufPtr; + + /** The first address we do not own. */ + private long bufEnd; + + /** Current read or write pointer. */ + private long posPtr; + + /** + * @param bufPtr Pointer to buffer begin. + * @param bufSize Size of the buffer. + */ + public HadoopOffheapBuffer(long bufPtr, long bufSize) { + set(bufPtr, bufSize); + } + + /** + * @param bufPtr Pointer to buffer begin. + * @param bufSize Size of the buffer. + */ + public void set(long bufPtr, long bufSize) { + this.bufPtr = bufPtr; + + posPtr = bufPtr; + bufEnd = bufPtr + bufSize; + } + + /** + * @return Pointer to internal buffer begin. + */ + public long begin() { + return bufPtr; + } + + /** + * @return Buffer capacity. + */ + public long capacity() { + return bufEnd - bufPtr; + } + + /** + * @return Remaining capacity. + */ + public long remaining() { + return bufEnd - posPtr; + } + + /** + * @return Absolute pointer to the current position inside of the buffer. + */ + public long pointer() { + return posPtr; + } + + /** + * @param ptr Absolute pointer to the current position inside of the buffer. + */ + public void pointer(long ptr) { + assert ptr >= bufPtr : bufPtr + " <= " + ptr; + assert ptr <= bufEnd : bufEnd + " <= " + bufPtr; + + posPtr = ptr; + } + + /** + * @param size Size move on. + * @return Old position pointer or {@code 0} if move goes beyond the end of the buffer. + */ + public long move(long size) { + assert size > 0 : size; + + long oldPos = posPtr; + long newPos = oldPos + size; + + if (newPos > bufEnd) + return 0; + + posPtr = newPos; + + return oldPos; + } + + /** + * @param ptr Pointer. + * @return {@code true} If the given pointer is inside of this buffer. + */ + public boolean isInside(long ptr) { + return ptr >= bufPtr && ptr <= bufEnd; + } + + /** + * Resets position to the beginning of buffer. + */ + public void reset() { + posPtr = bufPtr; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopEmbeddedTaskExecutor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopEmbeddedTaskExecutor.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopEmbeddedTaskExecutor.java new file mode 100644 index 0000000..5ede18e --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopEmbeddedTaskExecutor.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor; + +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput; +import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobMetadata; +import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker; +import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.typedef.internal.U; + + +/** + * Task executor. + */ +public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter { + /** Job tracker. */ + private HadoopJobTracker jobTracker; + + /** */ + private final ConcurrentMap<HadoopJobId, Collection<HadoopRunnableTask>> jobs = new ConcurrentHashMap<>(); + + /** Executor service to run tasks. */ + private HadoopExecutorService exec; + + /** {@inheritDoc} */ + @Override public void onKernalStart() throws IgniteCheckedException { + super.onKernalStart(); + + jobTracker = ctx.jobTracker(); + + exec = new HadoopExecutorService(log, ctx.kernalContext().gridName(), + ctx.configuration().getMaxParallelTasks(), ctx.configuration().getMaxTaskQueueSize()); + } + + /** {@inheritDoc} */ + @Override public void onKernalStop(boolean cancel) { + if (exec != null) { + exec.shutdown(3000); + + if (cancel) { + for (HadoopJobId jobId : jobs.keySet()) + cancelTasks(jobId); + } + } + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) { + if (exec != null && !exec.shutdown(30000)) + U.warn(log, "Failed to finish running tasks in 30 sec."); + } + + /** {@inheritDoc} */ + @Override public void run(final HadoopJob job, Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException { + if (log.isDebugEnabled()) + log.debug("Submitting tasks for local execution [locNodeId=" + ctx.localNodeId() + + ", tasksCnt=" + tasks.size() + ']'); + + Collection<HadoopRunnableTask> executedTasks = jobs.get(job.id()); + + if (executedTasks == null) { + executedTasks = new GridConcurrentHashSet<>(); + + Collection<HadoopRunnableTask> extractedCol = jobs.put(job.id(), executedTasks); + + assert extractedCol == null; + } + + final Collection<HadoopRunnableTask> finalExecutedTasks = executedTasks; + + for (final HadoopTaskInfo info : tasks) { + assert info != null; + + HadoopRunnableTask task = new HadoopRunnableTask(log, job, ctx.shuffle().memory(), info, + ctx.localNodeId()) { + @Override protected void onTaskFinished(HadoopTaskStatus status) { + if (log.isDebugEnabled()) + log.debug("Finished task execution [jobId=" + job.id() + ", taskInfo=" + info + ", " + + "waitTime=" + waitTime() + ", execTime=" + executionTime() + ']'); + + finalExecutedTasks.remove(this); + + jobTracker.onTaskFinished(info, status); + } + + @Override protected HadoopTaskInput createInput(HadoopTaskContext taskCtx) throws IgniteCheckedException { + return ctx.shuffle().input(taskCtx); + } + + @Override protected HadoopTaskOutput createOutput(HadoopTaskContext taskCtx) throws IgniteCheckedException { + return ctx.shuffle().output(taskCtx); + } + }; + + executedTasks.add(task); + + exec.submit(task); + } + } + + /** + * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks + * for this job ID. + * <p> + * It is guaranteed that this method will not be called concurrently with + * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method. No more job submissions will be performed via + * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method for given job ID after this method is called. + * + * @param jobId Job ID to cancel. + */ + @Override public void cancelTasks(HadoopJobId jobId) { + Collection<HadoopRunnableTask> executedTasks = jobs.get(jobId); + + if (executedTasks != null) { + for (HadoopRunnableTask task : executedTasks) + task.cancel(); + } + } + + /** {@inheritDoc} */ + @Override public void onJobStateChanged(HadoopJobMetadata meta) throws IgniteCheckedException { + if (meta.phase() == HadoopJobPhase.PHASE_COMPLETE) { + Collection<HadoopRunnableTask> executedTasks = jobs.remove(meta.jobId()); + + assert executedTasks == null || executedTasks.isEmpty(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopExecutorService.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopExecutorService.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopExecutorService.java new file mode 100644 index 0000000..993ecc9 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopExecutorService.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor; + + +import java.util.Collection; +import java.util.concurrent.Callable; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; +import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.internal.util.worker.GridWorkerListener; +import org.apache.ignite.internal.util.worker.GridWorkerListenerAdapter; +import org.apache.ignite.thread.IgniteThread; +import org.jsr166.ConcurrentHashMap8; + +import static java.util.Collections.newSetFromMap; + +/** + * Executor service without thread pooling. + */ +public class HadoopExecutorService { + /** */ + private final LinkedBlockingQueue<Callable<?>> queue; + + /** */ + private final Collection<GridWorker> workers = newSetFromMap(new ConcurrentHashMap8<GridWorker, Boolean>()); + + /** */ + private final AtomicInteger active = new AtomicInteger(); + + /** */ + private final int maxTasks; + + /** */ + private final String gridName; + + /** */ + private final IgniteLogger log; + + /** */ + private volatile boolean shutdown; + + /** */ + private final GridWorkerListener lsnr = new GridWorkerListenerAdapter() { + @Override public void onStopped(GridWorker w) { + workers.remove(w); + + if (shutdown) { + active.decrementAndGet(); + + return; + } + + Callable<?> task = queue.poll(); + + if (task != null) + startThread(task); + else { + active.decrementAndGet(); + + if (!queue.isEmpty()) + startFromQueue(); + } + } + }; + + /** + * @param log Logger. + * @param gridName Grid name. + * @param maxTasks Max number of tasks. + * @param maxQueue Max queue length. + */ + public HadoopExecutorService(IgniteLogger log, String gridName, int maxTasks, int maxQueue) { + assert maxTasks > 0 : maxTasks; + assert maxQueue > 0 : maxQueue; + + this.maxTasks = maxTasks; + this.queue = new LinkedBlockingQueue<>(maxQueue); + this.gridName = gridName; + this.log = log.getLogger(HadoopExecutorService.class); + } + + /** + * @return Number of active workers. + */ + public int active() { + return workers.size(); + } + + /** + * Submit task. + * + * @param task Task. + */ + public void submit(Callable<?> task) { + while (queue.isEmpty()) { + int active0 = active.get(); + + if (active0 == maxTasks) + break; + + if (active.compareAndSet(active0, active0 + 1)) { + startThread(task); + + return; // Started in new thread bypassing queue. + } + } + + try { + while (!queue.offer(task, 100, TimeUnit.MILLISECONDS)) { + if (shutdown) + return; // Rejected due to shutdown. + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + return; + } + + startFromQueue(); + } + + /** + * Attempts to start task from queue. + */ + private void startFromQueue() { + do { + int active0 = active.get(); + + if (active0 == maxTasks) + break; + + if (active.compareAndSet(active0, active0 + 1)) { + Callable<?> task = queue.poll(); + + if (task == null) { + int res = active.decrementAndGet(); + + assert res >= 0 : res; + + break; + } + + startThread(task); + } + } + while (!queue.isEmpty()); + } + + /** + * @param task Task. + */ + private void startThread(final Callable<?> task) { + String workerName; + + if (task instanceof HadoopRunnableTask) { + final HadoopTaskInfo i = ((HadoopRunnableTask)task).taskInfo(); + + workerName = "Hadoop-task-" + i.jobId() + "-" + i.type() + "-" + i.taskNumber() + "-" + i.attempt(); + } + else + workerName = task.toString(); + + GridWorker w = new GridWorker(gridName, workerName, log, lsnr) { + @Override protected void body() { + try { + task.call(); + } + catch (Exception e) { + log.error("Failed to execute task: " + task, e); + } + } + }; + + workers.add(w); + + if (shutdown) + w.cancel(); + + new IgniteThread(w).start(); + } + + /** + * Shuts down this executor service. + * + * @param awaitTimeMillis Time in milliseconds to wait for tasks completion. + * @return {@code true} If all tasks completed. + */ + public boolean shutdown(long awaitTimeMillis) { + shutdown = true; + + for (GridWorker w : workers) + w.cancel(); + + while (awaitTimeMillis > 0 && !workers.isEmpty()) { + try { + Thread.sleep(100); + + awaitTimeMillis -= 100; + } + catch (InterruptedException e) { + break; + } + } + + return workers.isEmpty(); + } + + /** + * @return {@code true} If method {@linkplain #shutdown(long)} was already called. + */ + public boolean isShutdown() { + return shutdown; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopRunnableTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopRunnableTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopRunnableTask.java new file mode 100644 index 0000000..9e38061 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopRunnableTask.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor; + +import java.util.UUID; +import java.util.concurrent.Callable; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopTaskCancelledException; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter; +import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopHashMultimap; +import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopMultimap; +import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipList; +import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; +import org.apache.ignite.internal.util.typedef.internal.U; + +import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.COMBINER_HASHMAP_SIZE; +import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_COMBINER_NO_SORTING; +import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get; +import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.COMBINE; +import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.MAP; + +/** + * Runnable task. + */ +public abstract class HadoopRunnableTask implements Callable<Void> { + /** */ + private final GridUnsafeMemory mem; + + /** */ + private final IgniteLogger log; + + /** */ + private final HadoopJob job; + + /** Task to run. */ + private final HadoopTaskInfo info; + + /** Submit time. */ + private final long submitTs = U.currentTimeMillis(); + + /** Execution start timestamp. */ + private long execStartTs; + + /** Execution end timestamp. */ + private long execEndTs; + + /** */ + private HadoopMultimap combinerInput; + + /** */ + private volatile HadoopTaskContext ctx; + + /** Set if task is to cancelling. */ + private volatile boolean cancelled; + + /** Node id. */ + private UUID nodeId; + + /** + * @param log Log. + * @param job Job. + * @param mem Memory. + * @param info Task info. + * @param nodeId Node id. + */ + protected HadoopRunnableTask(IgniteLogger log, HadoopJob job, GridUnsafeMemory mem, HadoopTaskInfo info, + UUID nodeId) { + this.nodeId = nodeId; + this.log = log.getLogger(HadoopRunnableTask.class); + this.job = job; + this.mem = mem; + this.info = info; + } + + /** + * @return Wait time. + */ + public long waitTime() { + return execStartTs - submitTs; + } + + /** + * @return Execution time. + */ + public long executionTime() { + return execEndTs - execStartTs; + } + + /** {@inheritDoc} */ + @Override public Void call() throws IgniteCheckedException { + ctx = job.getTaskContext(info); + + return ctx.runAsJobOwner(new Callable<Void>() { + @Override public Void call() throws Exception { + call0(); + + return null; + } + }); + } + + /** + * Implements actual task running. + * @throws IgniteCheckedException + */ + void call0() throws IgniteCheckedException { + execStartTs = U.currentTimeMillis(); + + Throwable err = null; + + HadoopTaskState state = HadoopTaskState.COMPLETED; + + HadoopPerformanceCounter perfCntr = null; + + try { + perfCntr = HadoopPerformanceCounter.getCounter(ctx.counters(), nodeId); + + perfCntr.onTaskSubmit(info, submitTs); + perfCntr.onTaskPrepare(info, execStartTs); + + ctx.prepareTaskEnvironment(); + + runTask(perfCntr); + + if (info.type() == MAP && job.info().hasCombiner()) { + ctx.taskInfo(new HadoopTaskInfo(COMBINE, info.jobId(), info.taskNumber(), info.attempt(), null)); + + try { + runTask(perfCntr); + } + finally { + ctx.taskInfo(info); + } + } + } + catch (HadoopTaskCancelledException ignored) { + state = HadoopTaskState.CANCELED; + } + catch (Throwable e) { + state = HadoopTaskState.FAILED; + err = e; + + U.error(log, "Task execution failed.", e); + + if (e instanceof Error) + throw e; + } + finally { + execEndTs = U.currentTimeMillis(); + + if (perfCntr != null) + perfCntr.onTaskFinish(info, execEndTs); + + onTaskFinished(new HadoopTaskStatus(state, err, ctx==null ? null : ctx.counters())); + + if (combinerInput != null) + combinerInput.close(); + + if (ctx != null) + ctx.cleanupTaskEnvironment(); + } + } + + /** + * @param perfCntr Performance counter. + * @throws IgniteCheckedException If failed. + */ + private void runTask(HadoopPerformanceCounter perfCntr) throws IgniteCheckedException { + if (cancelled) + throw new HadoopTaskCancelledException("Task cancelled."); + + try (HadoopTaskOutput out = createOutputInternal(ctx); + HadoopTaskInput in = createInputInternal(ctx)) { + + ctx.input(in); + ctx.output(out); + + perfCntr.onTaskStart(ctx.taskInfo(), U.currentTimeMillis()); + + ctx.run(); + } + } + + /** + * Cancel the executed task. + */ + public void cancel() { + cancelled = true; + + if (ctx != null) + ctx.cancel(); + } + + /** + * @param status Task status. + */ + protected abstract void onTaskFinished(HadoopTaskStatus status); + + /** + * @param ctx Task context. + * @return Task input. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + private HadoopTaskInput createInputInternal(HadoopTaskContext ctx) throws IgniteCheckedException { + switch (ctx.taskInfo().type()) { + case SETUP: + case MAP: + case COMMIT: + case ABORT: + return null; + + case COMBINE: + assert combinerInput != null; + + return combinerInput.input(ctx); + + default: + return createInput(ctx); + } + } + + /** + * @param ctx Task context. + * @return Input. + * @throws IgniteCheckedException If failed. + */ + protected abstract HadoopTaskInput createInput(HadoopTaskContext ctx) throws IgniteCheckedException; + + /** + * @param ctx Task info. + * @return Output. + * @throws IgniteCheckedException If failed. + */ + protected abstract HadoopTaskOutput createOutput(HadoopTaskContext ctx) throws IgniteCheckedException; + + /** + * @param ctx Task info. + * @return Task output. + * @throws IgniteCheckedException If failed. + */ + private HadoopTaskOutput createOutputInternal(HadoopTaskContext ctx) throws IgniteCheckedException { + switch (ctx.taskInfo().type()) { + case SETUP: + case REDUCE: + case COMMIT: + case ABORT: + return null; + + case MAP: + if (job.info().hasCombiner()) { + assert combinerInput == null; + + combinerInput = get(job.info(), SHUFFLE_COMBINER_NO_SORTING, false) ? + new HadoopHashMultimap(job.info(), mem, get(job.info(), COMBINER_HASHMAP_SIZE, 8 * 1024)): + new HadoopSkipList(job.info(), mem); // TODO replace with red-black tree + + return combinerInput.startAdding(ctx); + } + + default: + return createOutput(ctx); + } + } + + /** + * @return Task info. + */ + public HadoopTaskInfo taskInfo() { + return info; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskExecutorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskExecutorAdapter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskExecutorAdapter.java new file mode 100644 index 0000000..529182b --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskExecutorAdapter.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor; + +import java.util.Collection; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopComponent; +import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; +import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobMetadata; + +/** + * Common superclass for task executor. + */ +public abstract class HadoopTaskExecutorAdapter extends HadoopComponent { + /** + * Runs tasks. + * + * @param job Job. + * @param tasks Tasks. + * @throws IgniteCheckedException If failed. + */ + public abstract void run(final HadoopJob job, Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException; + + /** + * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks + * for this job ID. + * <p> + * It is guaranteed that this method will not be called concurrently with + * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method. No more job submissions will be performed via + * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method for given job ID after this method is called. + * + * @param jobId Job ID to cancel. + */ + public abstract void cancelTasks(HadoopJobId jobId) throws IgniteCheckedException; + + /** + * On job state change callback; + * + * @param meta Job metadata. + */ + public abstract void onJobStateChanged(HadoopJobMetadata meta) throws IgniteCheckedException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskState.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskState.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskState.java new file mode 100644 index 0000000..b22d291 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskState.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor; + +/** +* State of the task. +*/ +public enum HadoopTaskState { + /** Running task. */ + RUNNING, + + /** Completed task. */ + COMPLETED, + + /** Failed task. */ + FAILED, + + /** Canceled task. */ + CANCELED, + + /** Process crashed. */ + CRASHED +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskStatus.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskStatus.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskStatus.java new file mode 100644 index 0000000..fa09ff7 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskStatus.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * Task status. + */ +public class HadoopTaskStatus implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private HadoopTaskState state; + + /** */ + private Throwable failCause; + + /** */ + private HadoopCounters cntrs; + + /** + * Default constructor required by {@link Externalizable}. + */ + public HadoopTaskStatus() { + // No-op. + } + + /** + * Creates new instance. + * + * @param state Task state. + * @param failCause Failure cause (if any). + */ + public HadoopTaskStatus(HadoopTaskState state, @Nullable Throwable failCause) { + this(state, failCause, null); + } + + /** + * Creates new instance. + * + * @param state Task state. + * @param failCause Failure cause (if any). + * @param cntrs Task counters. + */ + public HadoopTaskStatus(HadoopTaskState state, @Nullable Throwable failCause, + @Nullable HadoopCounters cntrs) { + assert state != null; + + this.state = state; + this.failCause = failCause; + this.cntrs = cntrs; + } + + /** + * @return State. + */ + public HadoopTaskState state() { + return state; + } + + /** + * @return Fail cause. + */ + @Nullable public Throwable failCause() { + return failCause; + } + + /** + * @return Counters. + */ + @Nullable public HadoopCounters counters() { + return cntrs; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopTaskStatus.class, this); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(state); + out.writeObject(failCause); + out.writeObject(cntrs); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + state = (HadoopTaskState)in.readObject(); + failCause = (Throwable)in.readObject(); + cntrs = (HadoopCounters)in.readObject(); + } +} \ No newline at end of file