Github user jkbradley commented on a diff in the pull request:
https://github.com/apache/spark/pull/19439#discussion_r148694771
--- Diff: mllib/src/main/scala/org/apache/spark/ml/image/HadoopUtils.scala
---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.image
+
+import scala.language.existentials
+import scala.util.Random
+
+import org.apache.commons.io.FilenameUtils
+import org.apache.hadoop.conf.{Configuration, Configured}
+import org.apache.hadoop.fs.{Path, PathFilter}
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+
+import org.apache.spark.sql.SparkSession
+
+private object RecursiveFlag {
+ /**
+ * Sets the spark recursive flag and then restores it.
+ *
+ * @param value Value to set
+ * @param spark Existing spark session
+ * @param f The function to evaluate after setting the flag
+ * @return Returns the evaluation result T of the function
+ */
+ def withRecursiveFlag[T](value: Boolean, spark: SparkSession)(f: => T):
T = {
+ val flagName = FileInputFormat.INPUT_DIR_RECURSIVE
+ val hadoopConf = spark.sparkContext.hadoopConfiguration
+ val old = Option(hadoopConf.get(flagName))
+ hadoopConf.set(flagName, value.toString)
+ try f finally {
+ old match {
+ case Some(v) => hadoopConf.set(flagName, v)
+ case None => hadoopConf.unset(flagName)
+ }
+ }
+ }
+}
+
+/**
+ * Filter that allows loading a fraction of HDFS files.
+ */
+private class SamplePathFilter extends Configured with PathFilter {
--- End diff --
Tell me if this SamplePathFilter has already been discussed; I may have
missed it in the many comments above. I'm worried about it being
deterministic, but I'm also not that familiar with the Hadoop APIs being used
here.
* If the DataFrame is reloaded (recomputed), or if a task fails and that
partition is recomputed, then will random.nextDouble() really produce the same
results?
* I'd expect we'd need to set a seed, as @thunterdb suggested. I'm fine
with a fixed seed, though it'd be nice to have it configurable in the future.
* Even if we set a seed, then is random.nextDouble computed in a fixed
order over each partition?
We've run into a lot of issues in both RDD and DataFrame sampling methods
with non-deterministic results, so I want to be careful here.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]