This is an automated email from the ASF dual-hosted git repository. dmollitor pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 40cd40d HIVE-21426: Remove Utilities Global Random (David Mollitor, reviewed by Peter Vary) 40cd40d is described below commit 40cd40d5e212860868d7dd15335c0085568416ce Author: David Mollitor <dmolli...@apache.org> AuthorDate: Mon Oct 21 18:46:42 2019 -0400 HIVE-21426: Remove Utilities Global Random (David Mollitor, reviewed by Peter Vary) --- .../hadoop/hive/ql/exec/SparkHashTableSinkOperator.java | 3 ++- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java | 15 +++++++++------ .../org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java | 10 +++++++--- .../org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java | 4 +++- .../hive/ql/io/rcfile/truncate/ColumnTruncateTask.java | 5 +++-- .../ql/parse/spark/SparkPartitionPruningSinkOperator.java | 5 +++-- 6 files changed, 27 insertions(+), 15 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java index 78ae9a1..10144a1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java @@ -22,6 +22,7 @@ import java.io.ObjectOutputStream; import java.io.OutputStream; import java.io.Serializable; import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; import org.apache.commons.io.FileExistsException; import org.apache.hadoop.conf.Configuration; @@ -140,7 +141,7 @@ public class SparkHashTableSinkOperator fs.mkdirs(path); // Create the folder and its parents if not there while (true) { path = new Path(path, getOperatorId() - + "-" + Math.abs(Utilities.randGen.nextInt())); + + "-" + Math.abs(ThreadLocalRandom.current().nextInt())); try { // This will guarantee file name uniqueness. if (fs.createNewFile(path)) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 7fd42c1..a7770b4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -67,6 +67,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -257,8 +258,6 @@ public final class Utilities { @Deprecated protected static final String DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX = "mapred.dfsclient.parallelism.max"; - public static Random randGen = new Random(); - private static final Object INPUT_SUMMARY_LOCK = new Object(); private static final Object ROOT_HDFS_DIR_LOCK = new Object(); @@ -751,7 +750,8 @@ public final class Utilities { public static String getTaskId(Configuration hconf) { String taskid = (hconf == null) ? null : hconf.get("mapred.task.id"); if (StringUtils.isEmpty(taskid)) { - return (Integer.toString(randGen.nextInt(Integer.MAX_VALUE))); + return (Integer + .toString(ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE))); } else { /* * extract the task and attempt id from the hadoop taskid. in version 17 the leading component @@ -2894,7 +2894,8 @@ public final class Utilities { if (failures >= maxRetries) { throw e; } - long waitTime = getRandomWaitTime(baseWindow, failures, randGen); + long waitTime = getRandomWaitTime(baseWindow, failures, + ThreadLocalRandom.current()); try { Thread.sleep(waitTime); } catch (InterruptedException iex) { @@ -2933,7 +2934,8 @@ public final class Utilities { LOG.error("Error during JDBC connection.", e); throw e; } - long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, randGen); + long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, + ThreadLocalRandom.current()); try { Thread.sleep(waitTime); } catch (InterruptedException e1) { @@ -2972,7 +2974,8 @@ public final class Utilities { LOG.error("Error preparing JDBC Statement {}", stmt, e); throw e; } - long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, randGen); + long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, + ThreadLocalRandom.current()); try { Thread.sleep(waitTime); } catch (InterruptedException e1) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index ab1b52e..cd4f2a0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -31,6 +31,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Properties; +import java.util.concurrent.ThreadLocalRandom; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.ql.exec.AddToClassPathAction; @@ -308,7 +309,8 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop if (noName) { // This is for a special case to ensure unit tests pass - job.set(MRJobConfig.JOB_NAME, "JOB" + Utilities.randGen.nextInt()); + job.set(MRJobConfig.JOB_NAME, + "JOB" + ThreadLocalRandom.current().nextInt()); } try{ @@ -807,8 +809,10 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop // working dirs and system dirs // Workaround is to rename map red working dir to a temp dir in such cases if (hadoopLocalMode) { - tempConf.set(hadoopSysDir, hconf.get(hadoopSysDir) + "/" + Utilities.randGen.nextInt()); - tempConf.set(hadoopWorkDir, hconf.get(hadoopWorkDir) + "/" + Utilities.randGen.nextInt()); + tempConf.set(hadoopSysDir, hconf.get(hadoopSysDir) + "/" + + ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE)); + tempConf.set(hadoopWorkDir, hconf.get(hadoopWorkDir) + "/" + + ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE)); } try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java index acc52af..7eb2ef3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java @@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import java.io.IOException; import java.io.Serializable; +import java.util.concurrent.ThreadLocalRandom; /** * Task for fast merging of ORC and RC files. @@ -124,7 +125,8 @@ public class MergeFileTask extends Task<MergeFileWork> implements Serializable, if (noName) { // This is for a special case to ensure unit tests pass job.set(MRJobConfig.JOB_NAME, - jobName != null ? jobName : "JOB" + Utilities.randGen.nextInt()); + jobName != null ? jobName + : "JOB" + ThreadLocalRandom.current().nextInt()); } // add input path diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java index 8f21f7c..17b5b4b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io.rcfile.truncate; import java.io.IOException; import java.io.Serializable; +import java.util.concurrent.ThreadLocalRandom; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; @@ -157,8 +158,8 @@ public class ColumnTruncateTask extends Task<ColumnTruncateWork> implements Seri if (noName) { // This is for a special case to ensure unit tests pass - job.set(MRJobConfig.JOB_NAME, - jobName != null ? jobName : "JOB" + Utilities.randGen.nextInt()); + job.set(MRJobConfig.JOB_NAME, jobName != null ? jobName + : "JOB" + ThreadLocalRandom.current().nextInt()); } try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java index 1de7a45..1334a26 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.ObjectOutputStream; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; @@ -33,7 +34,6 @@ import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -161,7 +161,8 @@ public class SparkPartitionPruningSinkOperator extends Operator<SparkPartitionPr fs.mkdirs(path); while (true) { - path = new Path(path, String.valueOf(Utilities.randGen.nextInt())); + path = new Path(path, String + .valueOf(ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE))); if (!fs.exists(path)) { break; }