-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53801/
-----------------------------------------------------------
(Updated Nov. 19, 2016, 12:49 a.m.)
Review request for Ambari, Alejandro Fernandez, Madhuvanthi Radhakrishnan,
Sumit Mohanty, and Siddharth Seth.
Changes
-------
Removed the unintended change at :
ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog250Test.java:365.
Bugs: AMBARI-18901
https://issues.apache.org/jira/browse/AMBARI-18901
Repository: ambari
Description
-------
AMBARI-18901. Use 'Number of LLAP Nodes' selected as the driver for LLAP config
calculations.
Below is the calculation logic used:
**********************************************************************************************************************************************
**********************************************************************************************************************************************
-----------------------------------------------------------
For use with default setup - Ambari managed queue
Parameteres
numRequestedLlapNodes UserInput
tezAmContainerSize Computed
memoryPerThread Computed // Set as parameter in
HiveConf. user can override in Advanced
numConcurrentQueries Computed // user can override in Advanced
maxConcurrentQueries Computed // Max value for Slider
numLlapNodes Computed // Can be lower for
small clusters. user can override in Advanced
sliderAmContainerSize Computed // user can override in Advanced
numExecutorsPerNode Computed // user can override in Advanced |
TODO: This and memPerThread are about the same when taking daemonSize, and
cache into considertaion
cacheMemoryPerNode Computed // user can override in
Advanced
amFraction 1 // Set
to 1 for Ambari controlled queued. | TODO Factor into concurrency for user
provided queues
llapQueueFraction Computed // Computed by Ambari.
(Avoid changing if the current value is > computed value, and the user
specified the current value?)
numClusterNodes ClusterParameter
nmMemoryPerNode ClusterParameter
nmCpusPerNode ClusterParameter
minContainerSize ClusterParameter
CONSTANT DEFAULT_EXECUTOR_TO_AM_RATIO = 20;
CONSTANT MIN_EXECUTOR_TO_AM_RATIO = 10;
CONSTANT MAX_CONCURRENT_QUERIES = 32;
nmMemoryPerNodeNormalized = normalizeDown(nmMemoryPerNode, minContainerSize);
totalClusterCapacity = numClusterNodes * nmMemoryPerNodeNormalized;
totalllapMemory = numRequestedLlapNodes * nmMemoryPerNodeNormalized;
amCapacityAvailable = totalLlapMemory; // For the LLAP queue - the AM fraction
is set to 1
sliderAmSize -> Current calculations remain unchanged. (<=, < fixes)
llapMemoryTezAMsAndDaemons = totalllapMemory - sliderAmSize;
FAIL("Not enough capacity available on the cluster to run LLAP") if
(llapMemoryTezAMsAndDaemons < 2 * minContainerSize);
tezAmContainerSize = (totalClusterCapacity, minContainerSize) {
desiredSize = { // This part is unchanged from current calculations.
if (totalClusterCapacity <= 4096) {
return 256;
} else if (totalClusterCapacity <= 73728) {
return 512;
} else {
return 1536;
}
}
return normalizeUp(desiredSize, minContainerSize);
}
memoryPerThread = (nmMemoryPerNodeNormalized, nmCpusPerNode) { // TODO: Not
linear. e.g. 1024 to 1025 goes from 2 executors to 1.
if (userSpecifiedValue) {
return userSpecifiedValue;
} else if (nmMemoryPerNodeNormalized <= 1024) {
return Math.min(512, nmMemoryPerNodeNormalized)
} else if (nmMemoryPerNodeNormalized <= 4096 ) {
return 1024;
} else if (nmMemoryPerNodeNormalized <= 10240) {
return 2048;
} else if (nmMemoryPerNodeNormalized <= 24576) {
return 3072;
} else {
return 4096;
}
}
numConcurrentQueries, maxConcurrentQueries = (nmMemoryPerNodeNormalized,
nmCpusPerNode, memoryPerThread, numRequestedLlapNodes,
llapMemoryTezAMsAndDaemons, tezAmContainerSize, amCapacityAvailable) {
maxExecutorsPerNode = getMaxExecutorsPerNode(nmMemoryPerNodeNormalized,
nmCpusPerNode, memoryPerThread);
FAIL if maxExecutorsPerNode < 1;
// Default 1 AM for every 20 executor threads.
// The second part of the min calculates based on mem required for
DEFAULT_EXECUTOR_TO_AM_RATIO executors + 1 AM, making use of total memory.
However, it's possible
// that total memory will not be used - and the numExecutors is instead
limited by #CPUs. Use maxPerNode to factor this in.
numConcurrentQueriesLimit = Math.min(floor(maxExecutorsPerNode *
numRequestedLlapNodes / DEFAULT_EXECUTOR_TO_AM_RATIO), MAX_CONCURRENT_QUERIES);
numConcurrentQueries = Math.min(numConcurrentQueriesLimit,
floor(llapMemoryTezAMsAndDaemons / (DEFAULT_EXECUTOR_TO_AM_RATIO *
memoryPerThread + tezAmContainerSize)));
if (numConcurrentQueries == 0) {
numConcurrentQueries = 1;
}
if (numConcurrentQueries * tezAmContainerSize > amCapacityAvailable) {
numConcurrentQueries = floor(amCapacityAvailable /
tezAmContainerSize);
FAIL if numConcurrentQueries < 1;
}
maxConcurrentQueriesLimit = Math.min(floor(maxExecutorsPerNode *
numRequestedLlapNodes / MIN_EXECUTOR_TO_AM_RATIO), MAX_CONCURRENT_QUERIES) ;
maxConcurrentQueries = Math.min(floor(llapMemoryTezAMsAndDaemons /
(MIN_EXECUTOR_TO_AM_RATIO * memoryPerThread + tezAmContainerSize));
if (maxConcurrentQueries == 0) {
maxConcurrentQueries = 1;
}
if (maxConcurrentQueries * tezAmContainerSize > amCapacityAvailable) {
maxConcurrentQueries = floor(amCapacityAvailable /
tezAmContainerSize);
FAIL if maxConcurrentQueries < 1;
}
}
numLlapNodes, daemonMemoryPerNode, numExecutorsPerNode, executorMemoryPerNode,
cacheMemoryPerNode = (desiredConcurrency, tezAmContainerSize,
llapMemoryTezAMsAndDaemons, minContainerSize, numRequestedLlapNodes,
memoryPerThread, numRequestedLlapNodes) {
amMemoryRequired = desiredConcurrency * tezAmContainerSize;
llapMemoryDaemons = llapMemoryTezAMsAndDaemons - amMemoryRequired;
FAIL if llapMemoryDaemons < minContainerSize;
FAIL("Not enough memory available for executors") if (llapMemoryDaemons
< memoryPerThread || llapMemoryDaemons < minContainerSize);
daemonMemoryPerNode = normalizeDown(llapMemoryDaemons /
numRequestedLlapNodes, minContainerSize);
if (daemonMemoryPerNode == 0) {
// Small cluster. No capacity left on a node after running AMs.
daemonMemoryPerNode = memoryPerThread;
numLlapNodes = floor(llapMemoryDaemons / memoryPerThread)
} else if (daemonMemoeryPerNode < memoryPerThread) {
// Previously computed value of memory per thread may be too
high. Cut the number of nodes. (Alternately reduce memory per node)
daemonMemoryPerNode = memoryPerThread;
numLlapNodes = floor(llapMemoryDaemons / memoryPerThread);
} else {
// All good. We have a proper value for memoryPerNode.
numLlapNodes = numRequestedLlapNodes
}
maxExecutorsPerNode = getMaxExecutorsPerNode(nmMemoryPerNodeNormalized,
nmCpusPerNode, memoryPerThread);
FAIL if maxExecutorsPerNode < 1;
// NumExecutorsPerNode is not necessarily max - since some capacity
would have been reserved for AMs, if this value were based on mem.
numExecutorsPerNode = Math.min(floor(daemonMemoryPerNode /
memoryPerThread), maxExecutorsPerNode)
// Now figure out how much of the memory will be used by the executors,
and how much will be used by the cache.
executorMemoryPerNode = numExecutorsPerNode * memoryPerThread;
cacheMemoryPerNode = daemonMemoryPerNode - executorMemoryPerNode;
Assert numExecutorsPerNode > 0
return (numLlapNodes, daemonMemoryPerNode, numExecutorsPerNode,
executorMemoryPerNode, cacheMemoryPerNode);
}
llapQueueFracation = numRequestedLlapNodes / numClusterNodes
def getMaxExecutorsPerNode(nmMemoryPerNodeNormalized, nmCpusPerNode) {
// TODO Special case VMs: 1 CPU doesn't necessarily mean 1 executor.
return nmCpusPerNode;
}
def getMaxExecutorsPerNode(nmMemoryPerNodeNormalized, nmCpusPerNode,
memoryPerThread) {
// This potentially takes up the entire node leaving no space for AMs.
return Math.min(floor(nmMemoryPerNodeNormalized / memoryPerThread),
getMaxExecutorsPerNode(nmMemoryPerNodeNormalized, nmCpusPerNode));
}
----------------------------------------------------------
For User Provided Queues
queueFraction User Provided (from capacity-scheduler.xml)
queueAmFraction User Provided (from capacity-scheduler.xml)
numRequestedLlapNodes = floor(queueFraction * numClusterNodes)
amCapacityAvailable = queueAmFraction * queueFraction * totalClusterCapacity)
Rest of the calcaultions remain unchanged
----------------------------------------------------------
**********************************************************************************************************************************************
**********************************************************************************************************************************************
Diffs (updated)
-----
ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog250.java
e81568c
ambari-server/src/main/resources/stacks/HDP/2.5/services/HIVE/configuration/hive-interactive-env.xml
1fd72eb
ambari-server/src/main/resources/stacks/HDP/2.5/services/HIVE/configuration/hive-interactive-site.xml
0207e49
ambari-server/src/main/resources/stacks/HDP/2.5/services/HIVE/configuration/tez-interactive-site.xml
9e588e9
ambari-server/src/main/resources/stacks/HDP/2.5/services/HIVE/themes/theme.json
452537d
ambari-server/src/main/resources/stacks/HDP/2.5/services/YARN/configuration/capacity-scheduler.xml
9ff8484
ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py
dfb7b0c
ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog250Test.java
14fc20b
Diff: https://reviews.apache.org/r/53801/diff/
Testing
-------
Manual Testing on deployed cluster.
Thanks,
Swapan Shridhar