/(cross//-posted on ///scalding-dev@, /cascading-user@, and user@tez)
/Hi,
Chris K Wensel wrote not so long ago
/will also add that one user is having some success with Scalding on
Cascading 3.0 and Tez/
I'm that guy. It's been a fun ride, and the news is that there /are/
results. While the unboxing experience isn't yet totally pleasant, these
results are now very promising.
The really good part is that apart from build.sbt, we needed *no changes
to application code* to run with the local, hadoop (1.x API on a 2.6.0
cluster), hadoop2-mr1, and hadoop2-tez back-ends.
Numbers:
* Full dataset: about 116M lines in 6 distinct CSV inputs
o *hadoop: about 18 hours* (pretty much busy all the time, maxing
out either the LAN, disk bandwith OR CPU depending on phases)
o *tez: about 8 hours* (with no LAN and few disk saturation
periods, and apparent room for improvement in CPU/task
allocation — confident a couple hours could be shaved).
* Reduced dataset (integration testing dataset): about 2.3M lines in 6
distinct inputs
o *hadoop: 112 minutes**
*
o *tez: 6.25 minutes *
* In common:
o the job is a cascade made of 20 Flows, which compile into about
420 Cascading steps (Hadoop) or 20 DAG (TEZ)
o about 10K lines of Scala code
In the small-dataset experiment, hadoop suffers a lot from the zillions
of step setup ceremonials it has to perform with YARN, whereas TEZ apps
are higher-level and tend to stay much longer from the ResourceManager's
point of view.
Results appear identical so far (still busy comparing and ensuring we've
covered all code paths, which we haven't yet, but this looks really good).
I am grateful for everyone who had the patience to sift through the huge
haystacks of logs and graphs I sent, and for the time spent writing
patches in the dark for me to test.
Chris, I have no idea how much time I tied you up on this, but wow, thanks!
-- Cyrille
------------------------------------------------------------------------
How to replicate this:
* We are using a cluster of 7 i7-3770K
<http://ark.intel.com/products/65523/Intel-Core-i7-3770K-Processor-8M-Cache-up-to-3_90-GHz>
ex-desktop machines, running Debian Jessie + Apache Hadoop 2.6.0
under Ansible. Each machine has 1 system disk, and 4*2TiB of HDFS
spindles, + 120GiB of SSD for flush-happy components (Zookeeper, NN,
RM etc.). Except for the ATS, all components are in HA mode under
Zookeeper, Keepalived and/or HAProxy as appropriate.
o Up to 80% of this cluster is dedicated to the "test" work queue.
o 16 GiB RAM per node (except one which is at 24GiB), we could now
justify going to the max at 32GiB.
o We noticed in case there are multiple apps running, that a
single TEZ app will tend to "gobble up" all available slots even
if multiple RUNNING MR tasks are cohabiting. Not a big deal, and
fairly obvious given that TezChildren stay up until preempted
out or jobless.
* Scalding 0.13.1 patched with
https://github.com/twitter/scalding/pull/1220
o This replaces --hdfs with --hadoop, --hadoop2-mr1 and
--hadoop2-tez which delegate to the appropriate Cascading
back-end (--hdfs becomes an alias for --hadoop)
* Cascading 3.0.0-wip-97
* TEZ rebuilt from the branch-0.6 branch (post-0.6.0) as of commit
282e63af187f59700191579d2328cae3b8d2fa9c + a few other patches
(attached here)
o /had to //patch guava up to version 16.0.1//-- //TEZ-2164 would
help a lot/
o /TEZ-2256: reduced logging noise by using a boolean rather than
an exception to signal end of buffer in UnorderedPartitonedKVWriter/
o /TEZ-2237: two patches from Siddarth Seth that helped unstick
some of the more complex DAGs/
* a few extra settings//in the job config:
"tez.task.resource.memory.mb"-> (1024+512).toString,// default 1024
"tez.container.max.java.heap.fraction"->"0.7",// default 0.8
"tez.queue.name"-> params.jobArgs.getOrElse("queue","default"),
"cascading.flow.runtime.gather.partitions.num"->
params.jobArgs.getOrElse("tez-partitions","4"),
"tez.lib.uris"->
params.jobArgs.getOrElse("tez.lib.uris","hdfs://cluster/apps/tez-0.6.0/tez-0.6.0.tar.gz"),
"tez.history.logging.service.class"->"org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService",
"tez.allow.disabled.timeline-domains"->"true"
* A couple things in the app's build.sbt (sorry for my clumsy use of SBT):
scalaVersion:="2.11.6"
valhadoopVersion ="2.6.0"
valcascadingFabric = sys.props.getOrElse("CASCADING_FABRIC","hadoop2-tez")// can be "hadoop",
"hadoop2-mr1" or "hadoop2-tez"
valcascadingVersion ="3.0.0-wip-97"
if(cascadingVersion.endsWith("-dev")) {
libraryDependencies++=Seq(
"org.jgrapht"%"jgrapht-core"%"0.9.1",
"org.jgrapht"%"jgrapht-ext"%"0.9.1",
"riffle"%"riffle"%"1.0.0-wip-7",
"org.codehaus.janino"%"janino"%"2.7.6"
)
}else{
libraryDependencies++=Seq()
}
valscaldingVersion = {
if(cascadingFabric =="hadoop") {
"0.13.1"
}else{
"0.13.1-cch-ffc2"// this one is the version as patched with PR1220
}
}
if(cascadingFabric =="hadoop2-tez") {
libraryDependencies++=Seq("javax.xml.bind"%"jaxb-api"%"2.2.2"exclude("javax.xml.stream","stax-api"),
"com.sun.jersey"%"jersey-server"%"1.9"exclude("asm","asm"),
"org.sonatype.sisu.inject"%"cglib"%"2.2.1-v20090111"exclude("asm","asm")
)
}
else{
libraryDependencies++=Seq()
}
if(cascadingFabric =="hadoop2-tez") {
dependencyOverrides+="com.google.guava"%"guava"%"16.0.1"
/* as of 0.6.0, Tez depends on a very old version (11.0.2) of guava,
which has an incompatibly different API to Stopwatch() than more
recent guavas.
Version 14.0 of guava introduced the breaking change. */
libraryDependencies++=Seq(
"org.apache.tez"%"tez-api"%"0.6.0-SNAPSHOT",
"org.apache.tez"%"tez-mapreduce"%"0.6.0-SNAPSHOT", / // FIXME: not
sure this is needed/
"com.google.guava"%"guava"%"16.0.1"
) }else{
libraryDependencies++=Seq()
}
libraryDependencies++=Seq(
"com.twitter"%%"scalding-core"% scaldingVersion
exclude("cascading","cascading-core")
exclude("cascading","cascading-hadoop")
exclude("cascading","cascading-local"),
"com.twitter"%%"scalding-args"% scaldingVersion
exclude("cascading","cascading-core")
exclude("cascading","cascading-hadoop")
exclude("cascading","cascading-local"),
"com.twitter"%%"scalding-date"% scaldingVersion
exclude("cascading","cascading-core")
exclude("cascading","cascading-hadoop")
exclude("cascading","cascading-local"),
"com.twitter"%%"scalding-commons"% scaldingVersion
exclude("cascading","cascading-core")
exclude("cascading","cascading-hadoop")
exclude("cascading","cascading-local")
exclude("com.hadoop.gplcompression","hadoop-lzo")
// hadoop-lzo also pulled in by elephantbird
)
libraryDependencies++=Seq(
"org.apache.thrift"%"libthrift"%"0.9.1",
"cascading"%"cascading-core"% cascadingVersion,
"cascading"% ("cascading-"+ cascadingFabric) % cascadingVersion,
"cascading"%"cascading-local"% cascadingVersion
)
The tez.container.max.java.heap.fraction override was important, to
ensure the Heap+Native memory of tez children didn't exceed the
allocated container size. As the
Application/Scalding/Scala/Cascading/Tez/JVM stack probably takes a
little more native memory than a more basic app/Tez/JVM stack, the
defaults were too small and YARN kept yanking Tez children at times. As
has been pointed out, running the cluster with
yarn.nodemanager.pmem-check-enabled=false removes the need for that.
An item in one DAG was creating Out-Of-Memory trouble in Tez'
DefaultSorter when running with the default task memory at 1024MiB
(716MiB for the heap), increasing it slightly was a coward but effective
workaround.
>From 3ba2f60d78e047000db9ce3d5876396db4a4e85b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Cyrille=20Ch=C3=A9p=C3=A9lov=20=28TP12=29?=
<[email protected]>
Date: Mon, 30 Mar 2015 10:46:13 +0200
Subject: [PATCH 1/5] Switch to guava's newer API for Stopwatch (14.0+)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Cyrille Chépélov (TP12) <[email protected]>
---
.../java/org/apache/tez/common/TezUtilsInternal.java | 5 +++--
.../tez/mapreduce/common/MRInputAMSplitGenerator.java | 7 ++++---
.../tez/mapreduce/common/MRInputSplitDistributor.java | 3 ++-
.../runtime/library/common/shuffle/HttpConnection.java | 8 ++++----
.../common/writers/UnorderedPartitionedKVWriter.java | 16 +++++++++++++++-
.../org/apache/tez/mapreduce/examples/RPCLoadGen.java | 5 +++--
6 files changed, 31 insertions(+), 13 deletions(-)
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index 9761b15..8666981 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -26,6 +26,7 @@ import java.util.BitSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.DataFormatException;
@@ -92,7 +93,7 @@ public class TezUtilsInternal {
if (LOG.isDebugEnabled()) {
sw.stop();
LOG.debug("UncompressedSize: " + inBytes.length + ", CompressedSize: " + compressed.length
- + ", CompressTime: " + sw.elapsedMillis());
+ + ", CompressTime: " + sw.elapsed(TimeUnit.MILLISECONDS));
}
return compressed;
}
@@ -106,7 +107,7 @@ public class TezUtilsInternal {
if (LOG.isDebugEnabled()) {
sw.stop();
LOG.debug("CompressedSize: " + inBytes.length + ", UncompressedSize: " + uncompressed.length
- + ", UncompressTimeTaken: " + sw.elapsedMillis());
+ + ", UncompressTimeTaken: " + sw.elapsed(TimeUnit.MILLISECONDS));
}
return uncompressed;
}
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
index 884054b..bdd646c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
@@ -19,6 +19,7 @@
package org.apache.tez.mapreduce.common;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
@@ -78,7 +79,7 @@ public class MRInputAMSplitGenerator extends InputInitializer {
if (LOG.isDebugEnabled()) {
sw.stop();
LOG.debug("Time to parse MRInput payload into prot: "
- + sw.elapsedMillis());
+ + sw.elapsed(TimeUnit.MILLISECONDS));
}
if (LOG.isDebugEnabled()) {
sw.reset().start();
@@ -92,7 +93,7 @@ public class MRInputAMSplitGenerator extends InputInitializer {
LOG.info("Emitting serialized splits: " + sendSerializedEvents);
if (LOG.isDebugEnabled()) {
sw.stop();
- LOG.debug("Time converting ByteString to configuration: " + sw.elapsedMillis());
+ LOG.debug("Time converting ByteString to configuration: " + sw.elapsed(TimeUnit.MILLISECONDS));
}
if (LOG.isDebugEnabled()) {
@@ -125,7 +126,7 @@ public class MRInputAMSplitGenerator extends InputInitializer {
}
if (LOG.isDebugEnabled()) {
sw.stop();
- LOG.debug("Time to create splits to mem: " + sw.elapsedMillis());
+ LOG.debug("Time to create splits to mem: " + sw.elapsed(TimeUnit.MILLISECONDS));
}
List<Event> events = Lists.newArrayListWithCapacity(inputSplitInfo
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
index 1307687..e2f39b6 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
@@ -20,6 +20,7 @@ package org.apache.tez.mapreduce.common;
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -78,7 +79,7 @@ public class MRInputSplitDistributor extends InputInitializer {
if (LOG.isDebugEnabled()) {
sw.stop();
LOG.debug("Time to parse MRInput payload into prot: "
- + sw.elapsedMillis());
+ + sw.elapsed(TimeUnit.MILLISECONDS));
}
Configuration conf = TezUtils.createConfFromByteString(userPayloadProto
.getConfigurationBytes());
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
index 4732a5a..69ad824 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
@@ -205,7 +205,7 @@ public class HttpConnection {
}
if (LOG.isDebugEnabled()) {
LOG.debug("Time taken to connect to " + url.toString() +
- " " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms; connectionFailures="+ connectionFailures);
+ " " + stopWatch.elapsed(TimeUnit.MILLISECONDS) + " ms; connectionFailures="+ connectionFailures);
}
return true;
}
@@ -238,7 +238,7 @@ public class HttpConnection {
// verify that replyHash is HMac of encHash
SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecretMgr);
LOG.info("for url=" + url +
- " sent hash and receievd reply " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms");
+ " sent hash and receievd reply " + stopWatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
}
/**
@@ -257,7 +257,7 @@ public class HttpConnection {
}
if (LOG.isDebugEnabled()) {
LOG.debug("Time taken to getInputStream (connect) " + url +
- " " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms");
+ " " + stopWatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
}
return input;
}
@@ -298,7 +298,7 @@ public class HttpConnection {
}
if (LOG.isDebugEnabled()) {
LOG.debug("Time taken to cleanup connection to " + url +
- " " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms");
+ " " + stopWatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
}
}
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 1ba00a0..d8fd417 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -208,19 +208,29 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
int metaStart = currentBuffer.nextPosition;
currentBuffer.availableSize -= (META_SIZE + metaSkip);
currentBuffer.nextPosition += META_SIZE;
+
+ LOG.info(String.format("currentBuffer nextPosition=%d metastart=%d availableSize=%d key=%s value=%s",
+ currentBuffer.nextPosition, metaStart, currentBuffer.availableSize,
+ (key != null) ? key.toString() : "(null)",
+ (value != null) ? value.toString() : "(null)"));
+
try {
keySerializer.serialize(key);
} catch (BufferTooSmallException e) {
if (metaStart == 0) { // Started writing at the start of the buffer. Write Key to disk.
// Key too large for any buffer. Write entire record to disk.
+ LOG.info("BTSE caught, attempting writeLargeRecord (K)");
currentBuffer.reset();
writeLargeRecord(key, value, partition, numSpills.incrementAndGet());
+ LOG.info("BTSE caught, completed writeLargeRecord (K)");
return;
} else { // Exceeded length on current buffer.
+ LOG.info("BTSE caught, attempting regular write in next buffer (K)");
// Try resetting the buffer to the next one, if this was not the start of a buffer,
// and begin spilling the current buffer to disk if it has any records.
setupNextBuffer();
write(key, value, partition);
+ LOG.info("BTSE caught, completed regular write in next buffer (K)");
return;
}
}
@@ -231,13 +241,17 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
// Value too large for current buffer, or K-V too large for entire buffer.
if (metaStart == 0) {
// Key + Value too large for a single buffer.
+ LOG.info("BTSE caught, attempting writeLargeRecord (K+V)");
currentBuffer.reset();
writeLargeRecord(key, value, partition, numSpills.incrementAndGet());
+ LOG.info("BTSE caught, completed writeLargeRecord (K+V)");
return;
} else { // Exceeded length on current buffer.
+ LOG.info("BTSE caught, attempting regular write in next buffer (K+V)");
// Try writing key+value to a new buffer - will fall back to disk if that fails.
setupNextBuffer();
write(key, value, partition);
+ LOG.info("BTSE caught, completed regular write in next buffer (K+V)");
return;
}
}
@@ -790,4 +804,4 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
return shufflePort;
}
-}
\ No newline at end of file
+}
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java
index 67cf328..ced278f 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.TimeUnit;
import com.google.common.base.Stopwatch;
import org.apache.commons.logging.Log;
@@ -204,7 +205,7 @@ public class RPCLoadGen extends TezExampleBase {
} else {
throw new IllegalArgumentException("Unknown execution mode: [" + modeByte + "]");
}
- LOG.info("TimeTakenToAccessPayload=" + sw.stop().elapsedMillis());
+ LOG.info("TimeTakenToAccessPayload=" + sw.stop().elapsed(TimeUnit.MILLISECONDS));
LOG.info("Sleeping for: " + sleepTime);
Thread.sleep(sleepTime);
}
@@ -214,4 +215,4 @@ public class RPCLoadGen extends TezExampleBase {
int res = ToolRunner.run(new Configuration(), new RPCLoadGen(), args);
System.exit(res);
}
-}
\ No newline at end of file
+}
--
2.1.4
>From 31f336f47f91d58b87644066921663660f0a2e61 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Cyrille=20Ch=C3=A9p=C3=A9lov=20=28TP12=29?=
<[email protected]>
Date: Wed, 1 Apr 2015 13:22:50 +0200
Subject: [PATCH 2/5] (DO NOT COMMIT) bump guava to 0.16
---
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/pom.xml b/pom.xml
index 6bf93a6..0831e66 100644
--- a/pom.xml
+++ b/pom.xml
@@ -589,7 +589,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
- <version>11.0.2</version>
+ <version>16.0.1</version>
</dependency>
<dependency>
<groupId>org.codehaus.jettison</groupId>
--
2.1.4
>From 30dca340764645b3738118a57dbc6f58797a6b91 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Cyrille=20Ch=C3=A9p=C3=A9lov=20=28TP12=29?=
<[email protected]>
Date: Wed, 1 Apr 2015 17:24:29 +0200
Subject: [PATCH 3/5] WIP TEZ-2256: a first attempt
---
.../writers/UnorderedPartitionedKVWriter.java | 41 ++++++++--------------
1 file changed, 15 insertions(+), 26 deletions(-)
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index d8fd417..1203a5e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -199,7 +199,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
// Wrap to 4 byte (Int) boundary for metaData
int mod = currentBuffer.nextPosition % INT_SIZE;
int metaSkip = mod == 0 ? 0 : (INT_SIZE - mod);
- if (currentBuffer.availableSize < (META_SIZE + metaSkip)) {
+ if ((currentBuffer.availableSize < (META_SIZE + metaSkip)) || (currentBuffer.full)) {
// Move over to the next buffer.
metaSkip = 0;
setupNextBuffer();
@@ -208,50 +208,39 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
int metaStart = currentBuffer.nextPosition;
currentBuffer.availableSize -= (META_SIZE + metaSkip);
currentBuffer.nextPosition += META_SIZE;
+
+ keySerializer.serialize(key);
- LOG.info(String.format("currentBuffer nextPosition=%d metastart=%d availableSize=%d key=%s value=%s",
- currentBuffer.nextPosition, metaStart, currentBuffer.availableSize,
- (key != null) ? key.toString() : "(null)",
- (value != null) ? value.toString() : "(null)"));
-
- try {
- keySerializer.serialize(key);
- } catch (BufferTooSmallException e) {
+ if (currentBuffer.full) {
if (metaStart == 0) { // Started writing at the start of the buffer. Write Key to disk.
// Key too large for any buffer. Write entire record to disk.
- LOG.info("BTSE caught, attempting writeLargeRecord (K)");
currentBuffer.reset();
writeLargeRecord(key, value, partition, numSpills.incrementAndGet());
- LOG.info("BTSE caught, completed writeLargeRecord (K)");
return;
} else { // Exceeded length on current buffer.
- LOG.info("BTSE caught, attempting regular write in next buffer (K)");
// Try resetting the buffer to the next one, if this was not the start of a buffer,
// and begin spilling the current buffer to disk if it has any records.
setupNextBuffer();
write(key, value, partition);
- LOG.info("BTSE caught, completed regular write in next buffer (K)");
return;
}
}
+
+
int valStart = currentBuffer.nextPosition;
- try {
- valSerializer.serialize(value);
- } catch (BufferTooSmallException e) {
+ valSerializer.serialize(value);
+
+ if (currentBuffer.full) {
// Value too large for current buffer, or K-V too large for entire buffer.
if (metaStart == 0) {
// Key + Value too large for a single buffer.
- LOG.info("BTSE caught, attempting writeLargeRecord (K+V)");
currentBuffer.reset();
writeLargeRecord(key, value, partition, numSpills.incrementAndGet());
- LOG.info("BTSE caught, completed writeLargeRecord (K+V)");
return;
} else { // Exceeded length on current buffer.
- LOG.info("BTSE caught, attempting regular write in next buffer (K+V)");
// Try writing key+value to a new buffer - will fall back to disk if that fails.
setupNextBuffer();
write(key, value, partition);
- LOG.info("BTSE caught, completed regular write in next buffer (K+V)");
return;
}
}
@@ -661,8 +650,10 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
}
public void write(byte[] b, int off, int len) throws IOException {
- if (len > currentBuffer.availableSize) {
- throw new BufferTooSmallException();
+ if (currentBuffer.full) {
+ /* no longer do anything until reset */
+ } else if (len > currentBuffer.availableSize) {
+ currentBuffer.full = true; /* stop working & signal we hit the end */
} else {
System.arraycopy(b, off, currentBuffer.buffer, currentBuffer.nextPosition, len);
currentBuffer.nextPosition += len;
@@ -688,6 +679,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
private int nextPosition = 0;
private int availableSize;
+ private boolean full = false;
WrappedBuffer(int numPartitions, int size) {
this.partitionPositions = new int[numPartitions];
@@ -713,6 +705,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
nextPosition = 0;
skipSize = 0;
availableSize = size;
+ full = false;
}
void cleanup() {
@@ -721,10 +714,6 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
}
}
- private static class BufferTooSmallException extends IOException {
- private static final long serialVersionUID = 1L;
- }
-
private class SpillCallback implements FutureCallback<SpillResult> {
private final int spillNumber;
--
2.1.4
>From 4962e0d10fb59375e05e318a198e63465bd80631 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Cyrille=20Ch=C3=A9p=C3=A9lov=20=28TP12=29?=
<[email protected]>
Date: Fri, 3 Apr 2015 08:53:36 +0200
Subject: [PATCH 4/5] TEZ-2237-hack.branch6.txt WIP: trying out patch from
[~sseth]
---
.../library/output/OrderedPartitionedKVOutput.java | 26 +++++++++++++++++++++-
.../output/UnorderedPartitionedKVOutput.java | 26 +++++++++++++++++++++-
2 files changed, 50 insertions(+), 2 deletions(-)
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index b3290a5..a46e994 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -54,6 +54,7 @@ import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
@@ -147,9 +148,32 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
this.endTime = System.nanoTime();
return generateEventsOnClose();
} else {
+ ShuffleUserPayloads.VertexManagerEventPayloadProto.Builder vmBuilder =
+ ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder();
+ vmBuilder.setOutputSize(0);
+ VertexManagerEvent vmEvent = VertexManagerEvent.create(
+ getContext().getDestinationVertexName(),
+ vmBuilder.build().toByteString().asReadOnlyByteBuffer());
+
+ ShuffleUserPayloads.DataMovementEventPayloadProto.Builder payloadBuilder =
+ ShuffleUserPayloads.DataMovementEventPayloadProto
+ .newBuilder();
+ BitSet emptyPartitionDetails = new BitSet();
+ emptyPartitionDetails.set(0, getNumPhysicalOutputs(), true);
+ ByteString emptyPartitionsBytesString =
+ TezCommonUtils.compressByteArrayToByteString(
+ TezUtilsInternal.toByteArray(emptyPartitionDetails));
+ payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString);
+ ShuffleUserPayloads.DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
+ ByteBuffer payload = payloadProto.toByteString().asReadOnlyByteBuffer();
+
+ CompositeDataMovementEvent cdme =
+ CompositeDataMovementEvent.create(0, getNumPhysicalOutputs(), payload);
+
+
LOG.warn("Attempting to close output " + getContext().getDestinationVertexName()
+ " before it was started");
- return Collections.emptyList();
+ return Lists.newArrayList(vmEvent, cdme);
}
}
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
index 1e39535..9690213 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
@@ -18,6 +18,8 @@
package org.apache.tez.runtime.library.output;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -26,22 +28,28 @@ import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.api.AbstractLogicalOutput;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.Writer;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
import org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
/**
* {@link UnorderedPartitionedKVOutput} is a {@link LogicalOutput} which can be used to
@@ -100,7 +108,23 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput {
if (isStarted.get()) {
return kvWriter.close();
} else {
- return Collections.emptyList();
+ ShuffleUserPayloads.DataMovementEventPayloadProto.Builder payloadBuilder =
+ ShuffleUserPayloads.DataMovementEventPayloadProto
+ .newBuilder();
+ BitSet emptyPartitionDetails = new BitSet();
+ emptyPartitionDetails.set(0, getNumPhysicalOutputs(), true);
+ ByteString emptyPartitionsBytesString =
+ TezCommonUtils.compressByteArrayToByteString(
+ TezUtilsInternal.toByteArray(emptyPartitionDetails));
+ payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString);
+ ShuffleUserPayloads.DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
+ ByteBuffer payload = payloadProto.toByteString().asReadOnlyByteBuffer();
+
+ LOG.warn("Attempting to close output " + getContext().getDestinationVertexName()
+ + " before it was started");
+ Event cdme =
+ CompositeDataMovementEvent.create(0, getNumPhysicalOutputs(), payload);
+ return Lists.newArrayList(cdme);
}
}
--
2.1.4
>From 3968342f1536d442bb2765f4d188640a430588a1 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Cyrille=20Ch=C3=A9p=C3=A9lov=20=28TP12=29?=
<[email protected]>
Date: Mon, 6 Apr 2015 12:53:57 +0200
Subject: [PATCH 5/5] TEZ-2237 WIP experimenal patch#2
TEZ-2237.test.2_branch0.6.txt
---
.../tez/runtime/library/output/OrderedPartitionedKVOutput.java | 5 ++++-
.../org/apache/tez/runtime/library/output/UnorderedKVOutput.java | 2 ++
.../tez/runtime/library/output/UnorderedPartitionedKVOutput.java | 5 ++++-
3 files changed, 10 insertions(+), 2 deletions(-)
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index a46e994..cc7af3d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -105,6 +105,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
@Override
public synchronized void start() throws Exception {
+ LOG.info("Starting output " + OrderedPartitionedKVOutput.class.getSimpleName() + " " + getContext().getDestinationVertexName());
if (!isStarted.get()) {
memoryUpdateCallbackHandler.validateUpdateReceived();
if (this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS,
@@ -142,12 +143,14 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
@Override
public synchronized List<Event> close() throws IOException {
+ LOG.info("Closing output " + OrderedPartitionedKVOutput.class.getSimpleName() + " " + getContext().getDestinationVertexName());
if (sorter != null) {
sorter.flush();
sorter.close();
this.endTime = System.nanoTime();
return generateEventsOnClose();
} else {
+ LOG.info("Output: " + getContext().getDestinationVertexName() + " was not started. Generating empty events");
ShuffleUserPayloads.VertexManagerEventPayloadProto.Builder vmBuilder =
ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder();
vmBuilder.setOutputSize(0);
@@ -172,7 +175,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
LOG.warn("Attempting to close output " + getContext().getDestinationVertexName()
- + " before it was started");
+ + " before it was started. Generating empty events.");
return Lists.newArrayList(vmEvent, cdme);
}
}
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
index 7bfc397..e4f2f23 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
@@ -102,6 +102,7 @@ public class UnorderedKVOutput extends AbstractLogicalOutput {
@Override
public synchronized void start() {
+ LOG.info("Starting output " + UnorderedKVOutput.class.getSimpleName() + " " + getContext().getDestinationVertexName());
}
@Override
@@ -117,6 +118,7 @@ public class UnorderedKVOutput extends AbstractLogicalOutput {
@Override
public synchronized List<Event> close() throws Exception {
+ LOG.info("Closing output " + UnorderedKVOutput.class.getSimpleName() + " " + getContext().getDestinationVertexName());
boolean outputGenerated = this.kvWriter.close();
DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
index 9690213..48b231c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
@@ -85,6 +85,7 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput {
@Override
public synchronized void start() throws Exception {
+ LOG.info("Starting output " + UnorderedPartitionedKVOutput.class.getSimpleName() + " " + getContext().getDestinationVertexName());
if (!isStarted.get()) {
memoryUpdateCallbackHandler.validateUpdateReceived();
this.kvWriter = new UnorderedPartitionedKVWriter(getContext(), conf, getNumPhysicalOutputs(),
@@ -105,9 +106,11 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput {
@Override
public synchronized List<Event> close() throws Exception {
+ LOG.info("Closing output " + UnorderedPartitionedKVOutput.class.getSimpleName() +" " + getContext().getDestinationVertexName());
if (isStarted.get()) {
return kvWriter.close();
} else {
+ LOG.info("Output: " + getContext().getDestinationVertexName() + " was not started. Generating empty events");
ShuffleUserPayloads.DataMovementEventPayloadProto.Builder payloadBuilder =
ShuffleUserPayloads.DataMovementEventPayloadProto
.newBuilder();
@@ -121,7 +124,7 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput {
ByteBuffer payload = payloadProto.toByteString().asReadOnlyByteBuffer();
LOG.warn("Attempting to close output " + getContext().getDestinationVertexName()
- + " before it was started");
+ + " before it was started. Generating empty events.");
Event cdme =
CompositeDataMovementEvent.create(0, getNumPhysicalOutputs(), payload);
return Lists.newArrayList(cdme);
--
2.1.4