Author: szetszwo Date: Thu Aug 21 05:22:10 2014 New Revision: 1619293 URL: http://svn.apache.org/r1619293 Log: Merge r1609845 through r1619277 from trunk.
Added: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java - copied unchanged from r1619277, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java - copied unchanged from r1619277, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/ (props changed) hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/CHANGES.txt (contents, props changed) hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mapred hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mapred-config.sh hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/conf/ (props changed) hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/conf/mapred-env.sh hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (props changed) hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapredCommands.apt.vm hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/DistCp.md.vm hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReduceTask.java hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java Propchange: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/ ------------------------------------------------------------------------------ Merged /hadoop/common/branches/fs-encryption/hadoop-mapreduce-project:r1594376-1619194 Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1618764-1619277 Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/CHANGES.txt?rev=1619293&r1=1619292&r2=1619293&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/CHANGES.txt Thu Aug 21 05:22:10 2014 @@ -86,6 +86,9 @@ Trunk (Unreleased) MAPREDUCE-6019. MapReduce changes for exposing YARN/MR endpoints on multiple interfaces. (Craig Welch, Milan Potocnik, Arpit Agarwal via xgong) + MAPREDUCE-6013. [post-HADOOP-9902] mapred version is missing (Akira AJISAKA + via aw) + BUG FIXES MAPREDUCE-5714. Removed forceful JVM exit in shutDownJob. @@ -151,6 +154,16 @@ Trunk (Unreleased) MAPREDUCE-5867. Fix NPE in KillAMPreemptionPolicy related to ProportionalCapacityPreemptionPolicy (Sunil G via devaraj) + BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS + + MAPREDUCE-5890. Support for encrypting Intermediate + data and spills in local filesystem. (asuresh via tucu) + + MAPREDUCE-6007. Add support to distcp to preserve raw.* namespace + extended attributes. (clamb) + + MAPREDUCE-6041. Fix TestOptionsParser. (clamb) + Release 2.6.0 - UNRELEASED INCOMPATIBLE CHANGES @@ -236,7 +249,7 @@ Release 2.6.0 - UNRELEASED MAPREDUCE-6012. DBInputSplit creates invalid ranges on Oracle. (Wei Yan via kasha) -Release 2.5.0 - UNRELEASED +Release 2.5.0 - 2014-08-11 INCOMPATIBLE CHANGES Propchange: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/CHANGES.txt ------------------------------------------------------------------------------ Merged /hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt:r1594376-1619194 Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1618764-1619277 Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mapred URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mapred?rev=1619293&r1=1619292&r2=1619293&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mapred (original) +++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mapred Thu Aug 21 05:22:10 2014 @@ -15,138 +15,134 @@ # See the License for the specific language governing permissions and # limitations under the License. -bin=`which $0` -bin=`dirname ${bin}` -bin=`cd "$bin" > /dev/null; pwd` - -DEFAULT_LIBEXEC_DIR="$bin"/../libexec -HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} -if [ -e ${HADOOP_LIBEXEC_DIR}/mapred-config.sh ]; then - . ${HADOOP_LIBEXEC_DIR}/mapred-config.sh -else - . "$bin/mapred-config.sh" -fi - -function print_usage(){ - echo "Usage: mapred [--config confdir] COMMAND" +function hadoop_usage +{ + echo "Usage: mapred [--config confdir] [--daemon (start|stop|status)] COMMAND" echo " where COMMAND is one of:" - echo " pipes run a Pipes job" - echo " job manipulate MapReduce jobs" - echo " queue get information regarding JobQueues" + + echo " archive -archiveName NAME -p <parent path> <src>* <dest> create a hadoop archive" echo " classpath prints the class path needed for running" echo " mapreduce subcommands" - echo " historyserver run job history servers as a standalone daemon" echo " distcp <srcurl> <desturl> copy file or directories recursively" - echo " archive -archiveName NAME -p <parent path> <src>* <dest> create a hadoop archive" - echo " hsadmin job history server admin interface" + echo " job manipulate MapReduce jobs" + echo " historyserver run job history servers as a standalone daemon" + echo " pipes run a Pipes job" + echo " queue get information regarding JobQueues" + echo " sampler sampler" + echo " version print the version" echo "" echo "Most commands print help when invoked w/o parameters." } +this="${BASH_SOURCE-$0}" +bin=$(cd -P -- "$(dirname -- "${this}")" >/dev/null && pwd -P) + +# let's locate libexec... +if [[ -n "${HADOOP_PREFIX}" ]]; then + DEFAULT_LIBEXEC_DIR="${HADOOP_PREFIX}/libexec" +else + DEFAULT_LIBEXEC_DIR="${bin}/../libexec" +fi + +HADOOP_LIBEXEC_DIR="${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}" +# shellcheck disable=SC2034 +HADOOP_NEW_CONFIG=true +if [[ -f "${HADOOP_LIBEXEC_DIR}/mapred-config.sh" ]]; then + . "${HADOOP_LIBEXEC_DIR}/mapred-config.sh" +else + echo "ERROR: Cannot execute ${HADOOP_LIBEXEC_DIR}/mapred-config.sh." 2>&1 + exit 1 +fi + + if [ $# = 0 ]; then - print_usage - exit + hadoop_exit_with_usage 1 fi COMMAND=$1 shift -case $COMMAND in - # usage flags - --help|-help|-h) - print_usage - exit - ;; +case ${COMMAND} in + mradmin|jobtracker|tasktracker|groups) + echo "Sorry, the ${COMMAND} command is no longer supported." + echo "You may find similar functionality with the \"yarn\" shell command." + hadoop_exit_with_usage 1 + ;; + archive) + CLASS=org.apache.hadoop.tools.HadoopArchives + hadoop_add_classpath "${TOOL_PATH}" + HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}" + ;; + classpath) + hadoop_finalize + echo "${CLASSPATH}" + exit 0 + ;; + distcp) + CLASS=org.apache.hadoop.tools.DistCp + hadoop_add_classpath "${TOOL_PATH}" + HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}" + ;; + historyserver) + daemon="true" + CLASS=org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer + HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_JOB_HISTORYSERVER_OPTS}" + if [ -n "${HADOOP_JOB_HISTORYSERVER_HEAPSIZE}" ]; then + JAVA_HEAP_MAX="-Xmx${HADOOP_JOB_HISTORYSERVER_HEAPSIZE}m" + fi + HADOOP_DAEMON_ROOT_LOGGER=${HADOOP_JHS_LOGGER:-$HADOOP_DAEMON_ROOT_LOGGER} + ;; + job) + CLASS=org.apache.hadoop.mapred.JobClient + ;; + pipes) + CLASS=org.apache.hadoop.mapred.pipes.Submitter + HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}" + ;; + queue) + CLASS=org.apache.hadoop.mapred.JobQueueClient + ;; + sampler) + CLASS=org.apache.hadoop.mapred.lib.InputSampler + HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}" + ;; + version) + CLASS=org.apache.hadoop.util.VersionInfo + HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}" + ;; + -*|*) + hadoop_exit_with_usage 1 + ;; esac -if [ "$COMMAND" = "job" ] ; then - CLASS=org.apache.hadoop.mapred.JobClient - HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS" -elif [ "$COMMAND" = "queue" ] ; then - CLASS=org.apache.hadoop.mapred.JobQueueClient - HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS" -elif [ "$COMMAND" = "pipes" ] ; then - CLASS=org.apache.hadoop.mapred.pipes.Submitter - HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS" -elif [ "$COMMAND" = "sampler" ] ; then - CLASS=org.apache.hadoop.mapred.lib.InputSampler - HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS" -elif [ "$COMMAND" = "classpath" ] ; then - echo -n -elif [ "$COMMAND" = "historyserver" ] ; then - CLASS=org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer - HADOOP_OPTS="$HADOOP_OPTS -Dmapred.jobsummary.logger=${HADOOP_JHS_LOGGER:-INFO,console} $HADOOP_JOB_HISTORYSERVER_OPTS" - if [ "$HADOOP_JOB_HISTORYSERVER_HEAPSIZE" != "" ]; then - JAVA_HEAP_MAX="-Xmx""$HADOOP_JOB_HISTORYSERVER_HEAPSIZE""m" - fi -elif [ "$COMMAND" = "mradmin" ] \ - || [ "$COMMAND" = "jobtracker" ] \ - || [ "$COMMAND" = "tasktracker" ] \ - || [ "$COMMAND" = "groups" ] ; then - echo "Sorry, the $COMMAND command is no longer supported." - echo "You may find similar functionality with the \"yarn\" shell command." - print_usage - exit 1 -elif [ "$COMMAND" = "distcp" ] ; then - CLASS=org.apache.hadoop.tools.DistCp - CLASSPATH=${CLASSPATH}:${TOOL_PATH} - HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS" -elif [ "$COMMAND" = "archive" ] ; then - CLASS=org.apache.hadoop.tools.HadoopArchives - CLASSPATH=${CLASSPATH}:${TOOL_PATH} - HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS" -elif [ "$COMMAND" = "hsadmin" ] ; then - CLASS=org.apache.hadoop.mapreduce.v2.hs.client.HSAdmin - HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS" -else - echo $COMMAND - invalid command - print_usage - exit 1 -fi +daemon_outfile="${HADOOP_LOG_DIR}/hadoop-${HADOOP_IDENT_STRING}-${COMMAND}-${HOSTNAME}.out" +daemon_pidfile="${HADOOP_PID_DIR}/hadoop-${HADOOP_IDENT_STRING}-${COMMAND}.pid" -# for developers, add mapred classes to CLASSPATH -if [ -d "$HADOOP_MAPRED_HOME/build/classes" ]; then - CLASSPATH=${CLASSPATH}:$HADOOP_MAPRED_HOME/build/classes -fi -if [ -d "$HADOOP_MAPRED_HOME/build/webapps" ]; then - CLASSPATH=${CLASSPATH}:$HADOOP_MAPRED_HOME/build -fi -if [ -d "$HADOOP_MAPRED_HOME/build/test/classes" ]; then - CLASSPATH=${CLASSPATH}:$HADOOP_MAPRED_HOME/build/test/classes -fi -if [ -d "$HADOOP_MAPRED_HOME/build/tools" ]; then - CLASSPATH=${CLASSPATH}:$HADOOP_MAPRED_HOME/build/tools -fi -# for releases, add core mapred jar & webapps to CLASSPATH -if [ -d "$HADOOP_PREFIX/${MAPRED_DIR}/webapps" ]; then - CLASSPATH=${CLASSPATH}:$HADOOP_PREFIX/${MAPRED_DIR} -fi -for f in $HADOOP_MAPRED_HOME/${MAPRED_DIR}/*.jar; do - CLASSPATH=${CLASSPATH}:$f; -done - -# Need YARN jars also -for f in $HADOOP_YARN_HOME/${YARN_DIR}/*.jar; do - CLASSPATH=${CLASSPATH}:$f; -done - -# add libs to CLASSPATH -for f in $HADOOP_MAPRED_HOME/${MAPRED_LIB_JARS_DIR}/*.jar; do - CLASSPATH=${CLASSPATH}:$f; -done - -# add modules to CLASSPATH -for f in $HADOOP_MAPRED_HOME/modules/*.jar; do - CLASSPATH=${CLASSPATH}:$f; -done - -if [ "$COMMAND" = "classpath" ] ; then - echo $CLASSPATH - exit +if [[ "${HADOOP_DAEMON_MODE}" != "default" ]]; then + # shellcheck disable=SC2034 + HADOOP_ROOT_LOGGER="${HADOOP_DAEMON_ROOT_LOGGER}" + hadoop_add_param HADOOP_OPTS mapred.jobsummary.logger "-Dmapred.jobsummary.logger=${HADOOP_ROOT_LOGGER}" + # shellcheck disable=SC2034 + HADOOP_LOGFILE="hadoop-${HADOOP_IDENT_STRING}-${COMMAND}-${HOSTNAME}.log" fi -HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,NullAppender}" +hadoop_add_param HADOOP_OPTS Xmx "${JAVA_HEAP_MAX}" +hadoop_finalize export CLASSPATH -exec "$JAVA" -Dproc_$COMMAND $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@" + +if [[ -n "${daemon}" ]]; then + if [[ -n "${secure_service}" ]]; then + hadoop_secure_daemon_handler "${HADOOP_DAEMON_MODE}" "${COMMAND}"\ + "${CLASS}" "${daemon_pidfile}" "${daemon_outfile}" \ + "${priv_pidfile}" "${priv_outfile}" "${priv_errfile}" "$@" + else + hadoop_daemon_handler "${HADOOP_DAEMON_MODE}" "${COMMAND}" "${CLASS}" \ + "${daemon_pidfile}" "${daemon_outfile}" "$@" + fi + exit $? +else + hadoop_java_exec "${COMMAND}" "${CLASS}" "$@" +fi + Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mapred-config.sh URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mapred-config.sh?rev=1619293&r1=1619292&r2=1619293&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mapred-config.sh (original) +++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mapred-config.sh Thu Aug 21 05:22:10 2014 @@ -18,35 +18,55 @@ # included in all the mapred scripts with source command # should not be executed directly -bin=`which "$0"` -bin=`dirname "${bin}"` -bin=`cd "$bin"; pwd` +function hadoop_subproject_init +{ + if [ -e "${HADOOP_CONF_DIR}/mapred-env.sh" ]; then + . "${HADOOP_CONF_DIR}/mapred-env.sh" + fi + + # at some point in time, someone thought it would be a good idea to + # create separate vars for every subproject. *sigh* + # let's perform some overrides and setup some defaults for bw compat + # this way the common hadoop var's == subproject vars and can be + # used interchangeable from here on out + # ... + # this should get deprecated at some point. + HADOOP_LOG_DIR="${HADOOP_MAPRED_LOG_DIR:-$HADOOP_LOG_DIR}" + HADOOP_MAPRED_LOG_DIR="${HADOOP_LOG_DIR}" + + HADOOP_LOGFILE="${HADOOP_MAPRED_LOGFILE:-$HADOOP_LOGFILE}" + HADOOP_MAPRED_LOGFILE="${HADOOP_LOGFILE}" + + HADOOP_NICENESS="${HADOOP_MAPRED_NICENESS:-$HADOOP_NICENESS}" + HADOOP_MAPRED_NICENESS="${HADOOP_NICENESS}" + + HADOOP_STOP_TIMEOUT="${HADOOP_MAPRED_STOP_TIMEOUT:-$HADOOP_STOP_TIMEOUT}" + HADOOP_MAPRED_STOP_TIMEOUT="${HADOOP_STOP_TIMEOUT}" + + HADOOP_PID_DIR="${HADOOP_MAPRED_PID_DIR:-$HADOOP_PID_DIR}" + HADOOP_MAPRED_PID_DIR="${HADOOP_PID_DIR}" + + HADOOP_ROOT_LOGGER="${HADOOP_MAPRED_ROOT_LOGGER:-INFO,console}" + HADOOP_MAPRED_ROOT_LOGGER="${HADOOP_ROOT_LOGGER}" + + HADOOP_MAPRED_HOME="${HADOOP_MAPRED_HOME:-$HADOOP_HOME_DIR}" + + HADOOP_IDENT_STRING="${HADOOP_MAPRED_IDENT_STRING:-$HADOOP_IDENT_STRING}" + HADOOP_MAPRED_IDENT_STRING="${HADOOP_IDENT_STRING}" +} + +if [[ -z "${HADOOP_LIBEXEC_DIR}" ]]; then + _mc_this="${BASH_SOURCE-$0}" + HADOOP_LIBEXEC_DIR=$(cd -P -- "$(dirname -- "${_mc_this}")" >/dev/null && pwd -P) +fi -DEFAULT_LIBEXEC_DIR="$bin"/../libexec -HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} -if [ -e "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh" ]; then +if [[ -e "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh" ]]; then . "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh" -elif [ -e "${HADOOP_COMMON_HOME}/libexec/hadoop-config.sh" ]; then - . "$HADOOP_COMMON_HOME"/libexec/hadoop-config.sh -elif [ -e "${HADOOP_COMMON_HOME}/bin/hadoop-config.sh" ]; then - . "$HADOOP_COMMON_HOME"/bin/hadoop-config.sh -elif [ -e "${HADOOP_HOME}/bin/hadoop-config.sh" ]; then - . "$HADOOP_HOME"/bin/hadoop-config.sh -elif [ -e "${HADOOP_MAPRED_HOME}/bin/hadoop-config.sh" ]; then - . "$HADOOP_MAPRED_HOME"/bin/hadoop-config.sh +elif [[ -e "${HADOOP_COMMON_HOME}/libexec/hadoop-config.sh" ]]; then + . "${HADOOP_COMMON_HOME}/libexec/hadoop-config.sh" +elif [[ -e "${HADOOP_HOME}/libexec/hadoop-config.sh" ]]; then + . "${HADOOP_HOME}/libexec/hadoop-config.sh" else echo "Hadoop common not found." exit fi - -# Only set locally to use in HADOOP_OPTS. No need to export. -# The following defaults are useful when somebody directly invokes bin/mapred. -HADOOP_MAPRED_LOG_DIR=${HADOOP_MAPRED_LOG_DIR:-${HADOOP_MAPRED_HOME}/logs} -HADOOP_MAPRED_LOGFILE=${HADOOP_MAPRED_LOGFILE:-hadoop.log} -HADOOP_MAPRED_ROOT_LOGGER=${HADOOP_MAPRED_ROOT_LOGGER:-INFO,console} - -HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.log.dir=$HADOOP_MAPRED_LOG_DIR" -HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.log.file=$HADOOP_MAPRED_LOGFILE" -export HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.root.logger=${HADOOP_MAPRED_ROOT_LOGGER}" - - Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh?rev=1619293&r1=1619292&r2=1619293&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh (original) +++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh Thu Aug 21 05:22:10 2014 @@ -15,133 +15,32 @@ # See the License for the specific language governing permissions and # limitations under the License. - -# -# Environment Variables -# -# HADOOP_JHS_LOGGER Hadoop JobSummary logger. -# HADOOP_CONF_DIR Alternate conf dir. Default is ${HADOOP_MAPRED_HOME}/conf. -# HADOOP_MAPRED_PID_DIR The pid files are stored. /tmp by default. -# HADOOP_MAPRED_NICENESS The scheduling priority for daemons. Defaults to 0. -## - -usage="Usage: mr-jobhistory-daemon.sh [--config <conf-dir>] (start|stop) <mapred-command> " - -# if no args specified, show usage -if [ $# -le 1 ]; then - echo $usage - exit 1 -fi - -bin=`dirname "${BASH_SOURCE-$0}"` -bin=`cd "$bin"; pwd` - -DEFAULT_LIBEXEC_DIR="$bin"/../libexec -HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} -if [ -e ${HADOOP_LIBEXEC_DIR}/mapred-config.sh ]; then - . $HADOOP_LIBEXEC_DIR/mapred-config.sh -fi - -# get arguments -startStop=$1 -shift -command=$1 -shift - -hadoop_rotate_log () +function hadoop_usage { - log=$1; - num=5; - if [ -n "$2" ]; then - num=$2 - fi - if [ -f "$log" ]; then # rotate logs - while [ $num -gt 1 ]; do - prev=`expr $num - 1` - [ -f "$log.$prev" ] && mv "$log.$prev" "$log.$num" - num=$prev - done - mv "$log" "$log.$num"; - fi + echo "Usage: mr-jobhistory-daemon.sh [--config confdir] (start|stop|status) <hadoop-command> <args...>" } -if [ "$HADOOP_MAPRED_IDENT_STRING" = "" ]; then - export HADOOP_MAPRED_IDENT_STRING="$USER" -fi - -export HADOOP_MAPRED_HOME=${HADOOP_MAPRED_HOME:-${HADOOP_PREFIX}} -export HADOOP_MAPRED_LOGFILE=mapred-$HADOOP_MAPRED_IDENT_STRING-$command-$HOSTNAME.log -export HADOOP_MAPRED_ROOT_LOGGER=${HADOOP_MAPRED_ROOT_LOGGER:-INFO,RFA} -export HADOOP_JHS_LOGGER=${HADOOP_JHS_LOGGER:-INFO,JSA} - -if [ -f "${HADOOP_CONF_DIR}/mapred-env.sh" ]; then - . "${HADOOP_CONF_DIR}/mapred-env.sh" -fi - -mkdir -p "$HADOOP_MAPRED_LOG_DIR" -chown $HADOOP_MAPRED_IDENT_STRING $HADOOP_MAPRED_LOG_DIR - -if [ "$HADOOP_MAPRED_PID_DIR" = "" ]; then - HADOOP_MAPRED_PID_DIR=/tmp -fi - -HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.id.str=$HADOOP_MAPRED_IDENT_STRING" - -log=$HADOOP_MAPRED_LOG_DIR/mapred-$HADOOP_MAPRED_IDENT_STRING-$command-$HOSTNAME.out -pid=$HADOOP_MAPRED_PID_DIR/mapred-$HADOOP_MAPRED_IDENT_STRING-$command.pid - -HADOOP_MAPRED_STOP_TIMEOUT=${HADOOP_MAPRED_STOP_TIMEOUT:-5} - -# Set default scheduling priority -if [ "$HADOOP_MAPRED_NICENESS" = "" ]; then - export HADOOP_MAPRED_NICENESS=0 +# let's locate libexec... +if [[ -n "${HADOOP_PREFIX}" ]]; then + DEFAULT_LIBEXEC_DIR="${HADOOP_PREFIX}/libexec" +else + this="${BASH_SOURCE-$0}" + bin=$(cd -P -- "$(dirname -- "${this}")" >/dev/null && pwd -P) + DEFAULT_LIBEXEC_DIR="${bin}/../libexec" +fi + +HADOOP_LIBEXEC_DIR="${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}" +# shellcheck disable=SC2034 +HADOOP_NEW_CONFIG=true +if [[ -f "${HADOOP_LIBEXEC_DIR}/yarn-config.sh" ]]; then + . "${HADOOP_LIBEXEC_DIR}/yarn-config.sh" +else + echo "ERROR: Cannot execute ${HADOOP_LIBEXEC_DIR}/yarn-config.sh." 2>&1 + exit 1 fi -case $startStop in - - (start) - - mkdir -p "$HADOOP_MAPRED_PID_DIR" - - if [ -f $pid ]; then - if kill -0 `cat $pid` > /dev/null 2>&1; then - echo $command running as process `cat $pid`. Stop it first. - exit 1 - fi - fi - - hadoop_rotate_log $log - echo starting $command, logging to $log - cd "$HADOOP_MAPRED_HOME" - nohup nice -n $HADOOP_MAPRED_NICENESS "$HADOOP_MAPRED_HOME"/bin/mapred --config $HADOOP_CONF_DIR $command "$@" > "$log" 2>&1 < /dev/null & - echo $! > $pid - sleep 1; head "$log" - ;; - - (stop) - - if [ -f $pid ]; then - TARGET_PID=`cat $pid` - if kill -0 $TARGET_PID > /dev/null 2>&1; then - echo stopping $command - kill $TARGET_PID - sleep $HADOOP_MAPRED_STOP_TIMEOUT - if kill -0 $TARGET_PID > /dev/null 2>&1; then - echo "$command did not stop gracefully after $HADOOP_MAPRED_STOP_TIMEOUT seconds: killing with kill -9" - kill -9 $TARGET_PID - fi - else - echo no $command to stop - fi - rm -f $pid - else - echo no $command to stop - fi - ;; - - (*) - echo $usage - exit 1 - ;; +daemonmode=$1 +shift -esac +exec "${HADOOP_MAPRED_HOME}/bin/mapred" \ +--config "${HADOOP_CONF_DIR}" --daemon "${daemonmode}" "$@" Propchange: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/conf/ ------------------------------------------------------------------------------ Merged /hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/conf:r1594376-1619194 Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1609845-1619277 Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/conf/mapred-env.sh URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/conf/mapred-env.sh?rev=1619293&r1=1619292&r2=1619293&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/conf/mapred-env.sh (original) +++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/conf/mapred-env.sh Thu Aug 21 05:22:10 2014 @@ -13,15 +13,59 @@ # See the License for the specific language governing permissions and # limitations under the License. -# export JAVA_HOME=/home/y/libexec/jdk1.6.0/ +## +## THIS FILE ACTS AS AN OVERRIDE FOR hadoop-env.sh FOR ALL +## WORK DONE BY THE mapred AND RELATED COMMANDS. +## +## Precedence rules: +## +## mapred-env.sh > hadoop-env.sh > hard-coded defaults +## +## MAPRED_xyz > HADOOP_xyz > hard-coded defaults +## -export HADOOP_JOB_HISTORYSERVER_HEAPSIZE=1000 +### +# Generic settings for MapReduce +### -export HADOOP_MAPRED_ROOT_LOGGER=INFO,RFA +#Override the log4j settings for all MR apps +# export MAPRED_ROOT_LOGGER="INFO,console" +# Override Hadoop's log directory & file +# export HADOOP_MAPRED_LOG_DIR="" + +# Override Hadoop's pid directory +# export HADOOP_MAPRED_PID_DIR= + +# Override Hadoop's identity string. $USER by default. +# This is used in writing log and pid files, so keep that in mind! +# export HADOOP_MAPRED_IDENT_STRING=$USER + +# Override Hadoop's process priority +# Note that sub-processes will also run at this level! +# export HADOOP_MAPRED_NICENESS=0 + +### +# Job History Server specific parameters +### + +# Specify the max heapsize for the Job History Server using a numerical value +# in the scale of MB. For example, to specify an jvm option of -Xmx1000m, set +# the value to 1000. +# This value will be overridden by an Xmx setting specified in either +# MAPRED_OPTS, HADOOP_OPTS, and/or HADOOP_JOB_HISTORYSERVER_OPTS. +# If not specified, the default value will be picked from either YARN_HEAPMAX +# or JAVA_HEAP_MAX with YARN_HEAPMAX as the preferred option of the two. +# +#export HADOOP_JOB_HISTORYSERVER_HEAPSIZE=1000 + +# Specify the JVM options to be used when starting the ResourceManager. +# These options will be appended to the options specified as YARN_OPTS +# and therefore may override any similar flags set in YARN_OPTS #export HADOOP_JOB_HISTORYSERVER_OPTS= -#export HADOOP_MAPRED_LOG_DIR="" # Where log files are stored. $HADOOP_MAPRED_HOME/logs by default. -#export HADOOP_JHS_LOGGER=INFO,RFA # Hadoop JobSummary logger. -#export HADOOP_MAPRED_PID_DIR= # The pid files are stored. /tmp by default. -#export HADOOP_MAPRED_IDENT_STRING= #A string representing this instance of hadoop. $USER by default -#export HADOOP_MAPRED_NICENESS= #The scheduling priority for daemons. Defaults to 0. + +# Specify the log4j settings for the JobHistoryServer +#export HADOOP_JHS_LOGGER=INFO,RFA + + + Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java?rev=1619293&r1=1619292&r2=1619293&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java Thu Aug 21 05:22:10 2014 @@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; @@ -43,6 +44,7 @@ import org.apache.hadoop.mapred.Merger.S import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.CryptoUtils; /** * <code>BackupStore</code> is an utility class that is used to support @@ -572,7 +574,9 @@ public class BackupStore<K,V> { file = lDirAlloc.getLocalPathForWrite(tmp.toUri().getPath(), -1, conf); - return new Writer<K, V>(conf, fs, file); + FSDataOutputStream out = fs.create(file); + out = CryptoUtils.wrapIfNecessary(conf, out); + return new Writer<K, V>(conf, out, null, null, null, null, true); } } Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java?rev=1619293&r1=1619292&r2=1619293&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java Thu Aug 21 05:22:10 2014 @@ -90,13 +90,11 @@ public class IFile { DataOutputBuffer buffer = new DataOutputBuffer(); - public Writer(Configuration conf, FileSystem fs, Path file, - Class<K> keyClass, Class<V> valueClass, - CompressionCodec codec, - Counters.Counter writesCounter) throws IOException { - this(conf, fs.create(file), keyClass, valueClass, codec, - writesCounter); - ownOutputStream = true; + public Writer(Configuration conf, FSDataOutputStream out, + Class<K> keyClass, Class<V> valueClass, + CompressionCodec codec, Counters.Counter writesCounter) + throws IOException { + this(conf, out, keyClass, valueClass, codec, writesCounter, false); } protected Writer(Counters.Counter writesCounter) { @@ -105,7 +103,8 @@ public class IFile { public Writer(Configuration conf, FSDataOutputStream out, Class<K> keyClass, Class<V> valueClass, - CompressionCodec codec, Counters.Counter writesCounter) + CompressionCodec codec, Counters.Counter writesCounter, + boolean ownOutputStream) throws IOException { this.writtenRecordsCounter = writesCounter; this.checksumOut = new IFileOutputStream(out); @@ -137,11 +136,7 @@ public class IFile { this.valueSerializer = serializationFactory.getSerializer(valueClass); this.valueSerializer.open(buffer); } - } - - public Writer(Configuration conf, FileSystem fs, Path file) - throws IOException { - this(conf, fs, file, null, null, null, null); + this.ownOutputStream = ownOutputStream; } public void close() throws IOException { Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1619293&r1=1619292&r2=1619293&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java Thu Aug 21 05:22:10 2014 @@ -66,6 +66,7 @@ import org.apache.hadoop.mapreduce.lib.m import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex; import org.apache.hadoop.mapreduce.task.MapContextImpl; +import org.apache.hadoop.mapreduce.CryptoUtils; import org.apache.hadoop.util.IndexedSortable; import org.apache.hadoop.util.IndexedSorter; import org.apache.hadoop.util.Progress; @@ -1580,7 +1581,8 @@ public class MapTask extends Task { IFile.Writer<K, V> writer = null; try { long segmentStart = out.getPos(); - writer = new Writer<K, V>(job, out, keyClass, valClass, codec, + FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out); + writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec, spilledRecordsCounter); if (combinerRunner == null) { // spill directly @@ -1617,8 +1619,8 @@ public class MapTask extends Task { // record offsets rec.startOffset = segmentStart; - rec.rawLength = writer.getRawLength(); - rec.partLength = writer.getCompressedLength(); + rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job); + rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job); spillRec.putIndex(rec, i); writer = null; @@ -1668,7 +1670,8 @@ public class MapTask extends Task { try { long segmentStart = out.getPos(); // Create a new codec, don't care! - writer = new IFile.Writer<K,V>(job, out, keyClass, valClass, codec, + FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out); + writer = new IFile.Writer<K,V>(job, partitionOut, keyClass, valClass, codec, spilledRecordsCounter); if (i == partition) { @@ -1682,8 +1685,8 @@ public class MapTask extends Task { // record offsets rec.startOffset = segmentStart; - rec.rawLength = writer.getRawLength(); - rec.partLength = writer.getCompressedLength(); + rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job); + rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job); spillRec.putIndex(rec, i); writer = null; @@ -1825,12 +1828,13 @@ public class MapTask extends Task { try { for (int i = 0; i < partitions; i++) { long segmentStart = finalOut.getPos(); + FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut); Writer<K, V> writer = - new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null); + new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec, null); writer.close(); rec.startOffset = segmentStart; - rec.rawLength = writer.getRawLength(); - rec.partLength = writer.getCompressedLength(); + rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job); + rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job); sr.putIndex(rec, i); } sr.writeToFile(finalIndexFile, job); @@ -1879,8 +1883,9 @@ public class MapTask extends Task { //write merged output to disk long segmentStart = finalOut.getPos(); + FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut); Writer<K, V> writer = - new Writer<K, V>(job, finalOut, keyClass, valClass, codec, + new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec, spilledRecordsCounter); if (combinerRunner == null || numSpills < minSpillsForCombine) { Merger.writeFile(kvIter, writer, reporter, job); @@ -1896,8 +1901,8 @@ public class MapTask extends Task { // record offsets rec.startOffset = segmentStart; - rec.rawLength = writer.getRawLength(); - rec.partLength = writer.getCompressedLength(); + rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job); + rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job); spillRec.putIndex(rec, parts); } spillRec.writeToFile(finalIndexFile, job); Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java?rev=1619293&r1=1619292&r2=1619293&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java Thu Aug 21 05:22:10 2014 @@ -30,6 +30,7 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.ChecksumFileSystem; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; @@ -40,6 +41,7 @@ import org.apache.hadoop.mapred.IFile.Re import org.apache.hadoop.mapred.IFile.Writer; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.CryptoUtils; import org.apache.hadoop.util.PriorityQueue; import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.Progressable; @@ -298,8 +300,12 @@ public class Merger { void init(Counters.Counter readsCounter) throws IOException { if (reader == null) { FSDataInputStream in = fs.open(file); + in.seek(segmentOffset); - reader = new Reader<K, V>(conf, in, segmentLength, codec, readsCounter); + in = CryptoUtils.wrapIfNecessary(conf, in); + reader = new Reader<K, V>(conf, in, + segmentLength - CryptoUtils.cryptoPadding(conf), + codec, readsCounter); } if (mapOutputsCounter != null) { @@ -714,9 +720,10 @@ public class Merger { tmpFilename.toString(), approxOutputSize, conf); - Writer<K, V> writer = - new Writer<K, V>(conf, fs, outputFile, keyClass, valueClass, codec, - writesCounter); + FSDataOutputStream out = fs.create(outputFile); + out = CryptoUtils.wrapIfNecessary(conf, out); + Writer<K, V> writer = new Writer<K, V>(conf, out, keyClass, valueClass, + codec, writesCounter, true); writeFile(this, writer, reporter, conf); writer.close(); Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=1619293&r1=1619292&r2=1619293&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java Thu Aug 21 05:22:10 2014 @@ -291,7 +291,7 @@ class JobSubmitter { /** * configure the jobconf of the user with the command line options of * -libjars, -files, -archives. - * @param conf + * @param job * @throws IOException */ private void copyAndConfigureFiles(Job job, Path jobSubmitDir) @@ -376,8 +376,13 @@ class JobSubmitter { if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) { KeyGenerator keyGen; try { + + int keyLen = CryptoUtils.isShuffleEncrypted(conf) + ? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS, + MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS) + : SHUFFLE_KEY_LENGTH; keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM); - keyGen.init(SHUFFLE_KEY_LENGTH); + keyGen.init(keyLen); } catch (NoSuchAlgorithmException e) { throw new IOException("Error generating shuffle secret key", e); } Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1619293&r1=1619292&r2=1619293&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Thu Aug 21 05:22:10 2014 @@ -771,4 +771,18 @@ public interface MRJobConfig { public static final String TASK_PREEMPTION = "mapreduce.job.preemption"; + + public static final String MR_ENCRYPTED_INTERMEDIATE_DATA = + "mapreduce.job.encrypted-intermediate-data"; + public static final boolean DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA = false; + + public static final String MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS = + "mapreduce.job.encrypted-intermediate-data-key-size-bits"; + public static final int DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS = + 128; + + public static final String MR_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB = + "mapreduce.job.encrypted-intermediate-data.buffer.kb"; + public static final int DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB = + 128; } Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1619293&r1=1619292&r2=1619293&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Thu Aug 21 05:22:10 2014 @@ -19,6 +19,7 @@ package org.apache.hadoop.mapreduce.task import java.io.DataInputStream; import java.io.IOException; +import java.io.InputStream; import java.net.ConnectException; import java.net.HttpURLConnection; import java.net.MalformedURLException; @@ -43,6 +44,7 @@ import org.apache.hadoop.mapreduce.MRCon import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; +import org.apache.hadoop.mapreduce.CryptoUtils; import org.apache.hadoop.security.ssl.SSLFactory; import com.google.common.annotations.VisibleForTesting; @@ -65,6 +67,7 @@ class Fetcher<K,V> extends Thread { CONNECTION, WRONG_REDUCE} private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors"; + private final JobConf jobConf; private final Counters.Counter connectionErrs; private final Counters.Counter ioErrs; private final Counters.Counter wrongLengthErrs; @@ -104,6 +107,7 @@ class Fetcher<K,V> extends Thread { Reporter reporter, ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter, SecretKey shuffleKey, int id) { + this.jobConf = job; this.reporter = reporter; this.scheduler = scheduler; this.merger = merger; @@ -396,7 +400,11 @@ class Fetcher<K,V> extends Thread { return remaining.toArray(new TaskAttemptID[remaining.size()]); } - + InputStream is = input; + is = CryptoUtils.wrapIfNecessary(jobConf, is, compressedLength); + compressedLength -= CryptoUtils.cryptoPadding(jobConf); + decompressedLength -= CryptoUtils.cryptoPadding(jobConf); + // Do some basic sanity verification if (!verifySanity(compressedLength, decompressedLength, forReduce, remaining, mapId)) { @@ -433,7 +441,7 @@ class Fetcher<K,V> extends Thread { LOG.info("fetcher#" + id + " about to shuffle output of map " + mapOutput.getMapId() + " decomp: " + decompressedLength + " len: " + compressedLength + " to " + mapOutput.getDescription()); - mapOutput.shuffle(host, input, compressedLength, decompressedLength, + mapOutput.shuffle(host, is, compressedLength, decompressedLength, metrics, reporter); } catch (java.lang.InternalError e) { LOG.warn("Failed to shuffle for fetcher#"+id, e); Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java?rev=1619293&r1=1619292&r2=1619293&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java Thu Aug 21 05:22:10 2014 @@ -36,6 +36,7 @@ import org.apache.hadoop.mapred.MapOutpu import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.SpillRecord; import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.CryptoUtils; /** * LocalFetcher is used by LocalJobRunner to perform a local filesystem @@ -145,6 +146,9 @@ class LocalFetcher<K,V> extends Fetcher< // now read the file, seek to the appropriate section, and send it. FileSystem localFs = FileSystem.getLocal(job).getRaw(); FSDataInputStream inStream = localFs.open(mapOutputFileName); + + inStream = CryptoUtils.wrapIfNecessary(job, inStream); + try { inStream.seek(ir.startOffset); Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java?rev=1619293&r1=1619292&r2=1619293&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java Thu Aug 21 05:22:10 2014 @@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.ChecksumFileSystem; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.LocalFileSystem; @@ -54,6 +55,7 @@ import org.apache.hadoop.mapred.Task.Com import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.CryptoUtils; import org.apache.hadoop.mapreduce.task.reduce.MapOutput.MapOutputComparator; import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.ReflectionUtils; @@ -228,6 +230,10 @@ public class MergeManagerImpl<K, V> impl return new InMemoryMerger(this); } + protected MergeThread<CompressAwarePath,K,V> createOnDiskMerger() { + return new OnDiskMerger(this); + } + TaskAttemptID getReduceId() { return reduceId; } @@ -453,11 +459,10 @@ public class MergeManagerImpl<K, V> impl mergeOutputSize).suffix( Task.MERGED_OUTPUT_PREFIX); - Writer<K,V> writer = - new Writer<K,V>(jobConf, rfs, outputPath, - (Class<K>) jobConf.getMapOutputKeyClass(), - (Class<V>) jobConf.getMapOutputValueClass(), - codec, null); + FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath)); + Writer<K, V> writer = new Writer<K, V>(jobConf, out, + (Class<K>) jobConf.getMapOutputKeyClass(), + (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true); RawKeyValueIterator rIter = null; CompressAwarePath compressAwarePath; @@ -537,11 +542,12 @@ public class MergeManagerImpl<K, V> impl Path outputPath = localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX); - Writer<K,V> writer = - new Writer<K,V>(jobConf, rfs, outputPath, - (Class<K>) jobConf.getMapOutputKeyClass(), - (Class<V>) jobConf.getMapOutputValueClass(), - codec, null); + + FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath)); + Writer<K, V> writer = new Writer<K, V>(jobConf, out, + (Class<K>) jobConf.getMapOutputKeyClass(), + (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true); + RawKeyValueIterator iter = null; CompressAwarePath compressAwarePath; Path tmpDir = new Path(reduceId.toString()); @@ -717,8 +723,10 @@ public class MergeManagerImpl<K, V> impl keyClass, valueClass, memDiskSegments, numMemDiskSegments, tmpDir, comparator, reporter, spilledRecordsCounter, null, mergePhase); - Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath, - keyClass, valueClass, codec, null); + + FSDataOutputStream out = CryptoUtils.wrapIfNecessary(job, fs.create(outputPath)); + Writer<K, V> writer = new Writer<K, V>(job, out, keyClass, valueClass, + codec, null, true); try { Merger.writeFile(rIter, writer, reporter, job); writer.close(); Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java?rev=1619293&r1=1619292&r2=1619293&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java Thu Aug 21 05:22:10 2014 @@ -37,6 +37,7 @@ import org.apache.hadoop.mapred.Reporter import org.apache.hadoop.mapred.MapOutputFile; import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.CryptoUtils; import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath; import com.google.common.annotations.VisibleForTesting; @@ -75,7 +76,7 @@ class OnDiskMapOutput<K, V> extends MapO this.merger = merger; this.outputPath = outputPath; tmpOutputPath = getTempPath(outputPath, fetcher); - disk = fs.create(tmpOutputPath); + disk = CryptoUtils.wrapIfNecessary(conf, fs.create(tmpOutputPath)); } @VisibleForTesting Propchange: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ------------------------------------------------------------------------------ Merged /hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1594376-1619194 Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1618417-1619277 Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapredCommands.apt.vm URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapredCommands.apt.vm?rev=1619293&r1=1619292&r2=1619293&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapredCommands.apt.vm (original) +++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapredCommands.apt.vm Thu Aug 21 05:22:10 2014 @@ -177,6 +177,12 @@ MapReduce Commands Guide Creates a hadoop archive. More information can be found at {{{./HadoopArchives.html}Hadoop Archives Guide}}. +** <<<version>>> + + Prints the version. + + Usage: <<<mapred version>>> + * Administration Commands Commands useful for administrators of a hadoop cluster. Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/DistCp.md.vm URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/DistCp.md.vm?rev=1619293&r1=1619292&r2=1619293&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/DistCp.md.vm (original) +++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/DistCp.md.vm Thu Aug 21 05:22:10 2014 @@ -191,6 +191,26 @@ $H3 Update and Overwrite If `-update` is used, `1` is overwritten as well. +$H3 raw Namespace Extended Attribute Preservation + + This section only applies to HDFS. + + If the target and all of the source pathnames are in the /.reserved/raw + hierarchy, then 'raw' namespace extended attributes will be preserved. + 'raw' xattrs are used by the system for internal functions such as encryption + meta data. They are only visible to users when accessed through the + /.reserved/raw hierarchy. + + raw xattrs are preserved based solely on whether /.reserved/raw prefixes are + supplied. The -p (preserve, see below) flag does not impact preservation of + raw xattrs. + + To prevent raw xattrs from being preserved, simply do not use the + /.reserved/raw prefix on any of the source and target paths. + + If the /.reserved/raw prefix is specified on only a subset of the source and + target paths, an error will be displayed and a non-0 exit code returned. + Command Line Options -------------------- Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java?rev=1619293&r1=1619292&r2=1619293&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java Thu Aug 21 05:22:10 2014 @@ -24,14 +24,16 @@ import static org.mockito.Mockito.doAnsw import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeMap; +import org.apache.hadoop.fs.FSDataInputStream; import org.junit.Assert; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -51,10 +53,16 @@ import org.apache.hadoop.mapred.RawKeyVa import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.security.TokenCache; +import org.apache.hadoop.mapreduce.CryptoUtils; import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl; +import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.Progressable; import org.junit.After; @@ -63,40 +71,48 @@ import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import com.google.common.collect.Lists; + public class TestMerger { private Configuration conf; private JobConf jobConf; private FileSystem fs; - + @Before public void setup() throws IOException { conf = new Configuration(); jobConf = new JobConf(); fs = FileSystem.getLocal(conf); } - - @After - public void cleanup() throws IOException { - fs.delete(new Path(jobConf.getLocalDirs()[0]), true); + + + @Test + public void testEncryptedMerger() throws Throwable { + jobConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true); + conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true); + Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); + TokenCache.setShuffleSecretKey(new byte[16], credentials); + UserGroupInformation.getCurrentUser().addCredentials(credentials); + testInMemoryAndOnDiskMerger(); } - + @Test - public void testInMemoryMerger() throws Throwable { + public void testInMemoryAndOnDiskMerger() throws Throwable { JobID jobId = new JobID("a", 0); - TaskAttemptID reduceId = new TaskAttemptID( + TaskAttemptID reduceId1 = new TaskAttemptID( new TaskID(jobId, TaskType.REDUCE, 0), 0); TaskAttemptID mapId1 = new TaskAttemptID( new TaskID(jobId, TaskType.MAP, 1), 0); TaskAttemptID mapId2 = new TaskAttemptID( new TaskID(jobId, TaskType.MAP, 2), 0); - + LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR); - + MergeManagerImpl<Text, Text> mergeManager = new MergeManagerImpl<Text, Text>( - reduceId, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null, + reduceId1, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null, null, null, new Progress(), new MROutputFiles()); - + // write map outputs Map<String, String> map1 = new TreeMap<String, String>(); map1.put("apple", "disgusting"); @@ -113,32 +129,88 @@ public class TestMerger { mapOutputBytes1.length); System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0, mapOutputBytes2.length); - + // create merger and run merge MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> inMemoryMerger = mergeManager.createInMemoryMerger(); - List<InMemoryMapOutput<Text, Text>> mapOutputs = + List<InMemoryMapOutput<Text, Text>> mapOutputs1 = new ArrayList<InMemoryMapOutput<Text, Text>>(); - mapOutputs.add(mapOutput1); - mapOutputs.add(mapOutput2); - - inMemoryMerger.merge(mapOutputs); - + mapOutputs1.add(mapOutput1); + mapOutputs1.add(mapOutput2); + + inMemoryMerger.merge(mapOutputs1); + Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size()); - Path outPath = mergeManager.onDiskMapOutputs.iterator().next(); - + + TaskAttemptID reduceId2 = new TaskAttemptID( + new TaskID(jobId, TaskType.REDUCE, 3), 0); + TaskAttemptID mapId3 = new TaskAttemptID( + new TaskID(jobId, TaskType.MAP, 4), 0); + TaskAttemptID mapId4 = new TaskAttemptID( + new TaskID(jobId, TaskType.MAP, 5), 0); + // write map outputs + Map<String, String> map3 = new TreeMap<String, String>(); + map3.put("apple", "awesome"); + map3.put("carrot", "amazing"); + Map<String, String> map4 = new TreeMap<String, String>(); + map4.put("banana", "bla"); + byte[] mapOutputBytes3 = writeMapOutput(conf, map3); + byte[] mapOutputBytes4 = writeMapOutput(conf, map4); + InMemoryMapOutput<Text, Text> mapOutput3 = new InMemoryMapOutput<Text, Text>( + conf, mapId3, mergeManager, mapOutputBytes3.length, null, true); + InMemoryMapOutput<Text, Text> mapOutput4 = new InMemoryMapOutput<Text, Text>( + conf, mapId4, mergeManager, mapOutputBytes4.length, null, true); + System.arraycopy(mapOutputBytes3, 0, mapOutput3.getMemory(), 0, + mapOutputBytes3.length); + System.arraycopy(mapOutputBytes4, 0, mapOutput4.getMemory(), 0, + mapOutputBytes4.length); + +// // create merger and run merge + MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> inMemoryMerger2 = + mergeManager.createInMemoryMerger(); + List<InMemoryMapOutput<Text, Text>> mapOutputs2 = + new ArrayList<InMemoryMapOutput<Text, Text>>(); + mapOutputs2.add(mapOutput3); + mapOutputs2.add(mapOutput4); + + inMemoryMerger2.merge(mapOutputs2); + + Assert.assertEquals(2, mergeManager.onDiskMapOutputs.size()); + + List<CompressAwarePath> paths = new ArrayList<CompressAwarePath>(); + Iterator<CompressAwarePath> iterator = mergeManager.onDiskMapOutputs.iterator(); List<String> keys = new ArrayList<String>(); List<String> values = new ArrayList<String>(); - readOnDiskMapOutput(conf, fs, outPath, keys, values); - Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot")); - Assert.assertEquals(values, Arrays.asList("disgusting", "pretty good", "delicious")); + while (iterator.hasNext()) { + CompressAwarePath next = iterator.next(); + readOnDiskMapOutput(conf, fs, next, keys, values); + paths.add(next); + } + Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot", "apple", "banana", "carrot")); + Assert.assertEquals(values, Arrays.asList("awesome", "bla", "amazing", "disgusting", "pretty good", "delicious")); + mergeManager.close(); + + mergeManager = new MergeManagerImpl<Text, Text>( + reduceId2, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null, + null, null, new Progress(), new MROutputFiles()); + + MergeThread<CompressAwarePath,Text,Text> onDiskMerger = mergeManager.createOnDiskMerger(); + onDiskMerger.merge(paths); + + Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size()); + + keys = new ArrayList<String>(); + values = new ArrayList<String>(); + readOnDiskMapOutput(conf, fs, mergeManager.onDiskMapOutputs.iterator().next(), keys, values); + Assert.assertEquals(keys, Arrays.asList("apple", "apple", "banana", "banana", "carrot", "carrot")); + Assert.assertEquals(values, Arrays.asList("awesome", "disgusting", "pretty good", "bla", "amazing", "delicious")); mergeManager.close(); Assert.assertEquals(0, mergeManager.inMemoryMapOutputs.size()); Assert.assertEquals(0, mergeManager.inMemoryMergedMapOutputs.size()); Assert.assertEquals(0, mergeManager.onDiskMapOutputs.size()); } - + private byte[] writeMapOutput(Configuration conf, Map<String, String> keysToValues) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -152,11 +224,13 @@ public class TestMerger { writer.close(); return baos.toByteArray(); } - + private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path, List<String> keys, List<String> values) throws IOException { - IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, fs, - path, null, null); + FSDataInputStream in = CryptoUtils.wrapIfNecessary(conf, fs.open(path)); + + IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, in, + fs.getFileStatus(path).getLen(), null, null); DataInputBuffer keyBuff = new DataInputBuffer(); DataInputBuffer valueBuff = new DataInputBuffer(); Text key = new Text(); @@ -169,17 +243,17 @@ public class TestMerger { values.add(value.toString()); } } - + @Test public void testCompressed() throws IOException { testMergeShouldReturnProperProgress(getCompressedSegments()); - } - +} + @Test public void testUncompressed() throws IOException { testMergeShouldReturnProperProgress(getUncompressedSegments()); } - + @SuppressWarnings( { "deprecation", "unchecked" }) public void testMergeShouldReturnProperProgress( List<Segment<Text, Text>> segments) throws IOException { @@ -212,7 +286,7 @@ public class TestMerger { } return segments; } - + private List<Segment<Text, Text>> getCompressedSegments() throws IOException { List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>(); for (int i = 1; i < 1; i++) { @@ -220,7 +294,7 @@ public class TestMerger { } return segments; } - + private Segment<Text, Text> getUncompressedSegment(int i) throws IOException { return new Segment<Text, Text>(getReader(i), false); } @@ -258,7 +332,7 @@ public class TestMerger { } }; } - + private Answer<?> getValueAnswer(final String segmentName) { return new Answer<Void>() { int i = 0; Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java?rev=1619293&r1=1619292&r2=1619293&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java Thu Aug 21 05:22:10 2014 @@ -18,6 +18,8 @@ package org.apache.hadoop.mapred; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; @@ -42,7 +44,7 @@ public class TestIFile { DefaultCodec codec = new GzipCodec(); codec.setConf(conf); IFile.Writer<Text, Text> writer = - new IFile.Writer<Text, Text>(conf, rfs, path, Text.class, Text.class, + new IFile.Writer<Text, Text>(conf, rfs.create(path), Text.class, Text.class, codec, null); writer.close(); } @@ -56,12 +58,15 @@ public class TestIFile { Path path = new Path(new Path("build/test.ifile"), "data"); DefaultCodec codec = new GzipCodec(); codec.setConf(conf); + FSDataOutputStream out = rfs.create(path); IFile.Writer<Text, Text> writer = - new IFile.Writer<Text, Text>(conf, rfs, path, Text.class, Text.class, + new IFile.Writer<Text, Text>(conf, out, Text.class, Text.class, codec, null); writer.close(); + FSDataInputStream in = rfs.open(path); IFile.Reader<Text, Text> reader = - new IFile.Reader<Text, Text>(conf, rfs, path, codec, null); + new IFile.Reader<Text, Text>(conf, in, rfs.getFileStatus(path).getLen(), + codec, null); reader.close(); // test check sum Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReduceTask.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReduceTask.java?rev=1619293&r1=1619292&r2=1619293&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReduceTask.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReduceTask.java Thu Aug 21 05:22:10 2014 @@ -80,7 +80,7 @@ public class TestReduceTask extends Test FileSystem rfs = ((LocalFileSystem)localFs).getRaw(); Path path = new Path(tmpDir, "data.in"); IFile.Writer<Text, Text> writer = - new IFile.Writer<Text, Text>(conf, rfs, path, Text.class, Text.class, + new IFile.Writer<Text, Text>(conf, rfs.create(path), Text.class, Text.class, codec, null); for(Pair p: vals) { writer.append(new Text(p.key), new Text(p.value)); Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java?rev=1619293&r1=1619292&r2=1619293&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java Thu Aug 21 05:22:10 2014 @@ -95,9 +95,9 @@ public class TestPipeApplication { new Counters.Counter(), new Progress()); FileSystem fs = new RawLocalFileSystem(); fs.setConf(conf); - Writer<IntWritable, Text> wr = new Writer<IntWritable, Text>(conf, fs, - new Path(workSpace + File.separator + "outfile"), IntWritable.class, - Text.class, null, null); + Writer<IntWritable, Text> wr = new Writer<IntWritable, Text>(conf, fs.create( + new Path(workSpace + File.separator + "outfile")), IntWritable.class, + Text.class, null, null, true); output.setWriter(wr); // stub for client File fCommand = getFileCommand("org.apache.hadoop.mapred.pipes.PipeApplicationRunnableStub"); @@ -177,9 +177,9 @@ public class TestPipeApplication { new Progress()); FileSystem fs = new RawLocalFileSystem(); fs.setConf(conf); - Writer<IntWritable, Text> wr = new Writer<IntWritable, Text>(conf, fs, - new Path(workSpace.getAbsolutePath() + File.separator + "outfile"), - IntWritable.class, Text.class, null, null); + Writer<IntWritable, Text> wr = new Writer<IntWritable, Text>(conf, fs.create( + new Path(workSpace.getAbsolutePath() + File.separator + "outfile")), + IntWritable.class, Text.class, null, null, true); output.setWriter(wr); conf.set(Submitter.PRESERVE_COMMANDFILE, "true");