Repository: spark Updated Branches: refs/heads/master 8ac09108f -> 8cdf143f4
[SPARK-18103][FOLLOW-UP][SQL][MINOR] Rename `MetadataLogFileCatalog` to `MetadataLogFileIndex` ## What changes were proposed in this pull request? This is a follow-up to https://github.com/apache/spark/pull/15634. ## How was this patch tested? N/A Author: Liwei Lin <lwl...@gmail.com> Closes #15712 from lw-lin/18103. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8cdf143f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8cdf143f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8cdf143f Branch: refs/heads/master Commit: 8cdf143f4b1ca5c6bc0256808e6f42d9ef299cbd Parents: 8ac0910 Author: Liwei Lin <lwl...@gmail.com> Authored: Tue Nov 1 11:17:35 2016 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Tue Nov 1 11:17:35 2016 -0700 ---------------------------------------------------------------------- .../streaming/MetadataLogFileCatalog.scala | 60 -------------------- .../streaming/MetadataLogFileIndex.scala | 60 ++++++++++++++++++++ 2 files changed, 60 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8cdf143f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala deleted file mode 100644 index aeaa134..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming - -import scala.collection.mutable - -import org.apache.hadoop.fs.{FileStatus, Path} - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.execution.datasources._ - - -/** - * A [[FileIndex]] that generates the list of files to processing by reading them from the - * metadata log files generated by the [[FileStreamSink]]. - */ -class MetadataLogFileIndex(sparkSession: SparkSession, path: Path) - extends PartitioningAwareFileIndex(sparkSession, Map.empty, None) { - - private val metadataDirectory = new Path(path, FileStreamSink.metadataDir) - logInfo(s"Reading streaming file log from $metadataDirectory") - private val metadataLog = - new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, metadataDirectory.toUri.toString) - private val allFilesFromLog = metadataLog.allFiles().map(_.toFileStatus).filterNot(_.isDirectory) - private var cachedPartitionSpec: PartitionSpec = _ - - override protected val leafFiles: mutable.LinkedHashMap[Path, FileStatus] = { - new mutable.LinkedHashMap ++= allFilesFromLog.map(f => f.getPath -> f) - } - - override protected val leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = { - allFilesFromLog.toArray.groupBy(_.getPath.getParent) - } - - override def rootPaths: Seq[Path] = path :: Nil - - override def refresh(): Unit = { } - - override def partitionSpec(): PartitionSpec = { - if (cachedPartitionSpec == null) { - cachedPartitionSpec = inferPartitioning() - } - cachedPartitionSpec - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/8cdf143f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala new file mode 100644 index 0000000..aeaa134 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.collection.mutable + +import org.apache.hadoop.fs.{FileStatus, Path} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources._ + + +/** + * A [[FileIndex]] that generates the list of files to processing by reading them from the + * metadata log files generated by the [[FileStreamSink]]. + */ +class MetadataLogFileIndex(sparkSession: SparkSession, path: Path) + extends PartitioningAwareFileIndex(sparkSession, Map.empty, None) { + + private val metadataDirectory = new Path(path, FileStreamSink.metadataDir) + logInfo(s"Reading streaming file log from $metadataDirectory") + private val metadataLog = + new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, metadataDirectory.toUri.toString) + private val allFilesFromLog = metadataLog.allFiles().map(_.toFileStatus).filterNot(_.isDirectory) + private var cachedPartitionSpec: PartitionSpec = _ + + override protected val leafFiles: mutable.LinkedHashMap[Path, FileStatus] = { + new mutable.LinkedHashMap ++= allFilesFromLog.map(f => f.getPath -> f) + } + + override protected val leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = { + allFilesFromLog.toArray.groupBy(_.getPath.getParent) + } + + override def rootPaths: Seq[Path] = path :: Nil + + override def refresh(): Unit = { } + + override def partitionSpec(): PartitionSpec = { + if (cachedPartitionSpec == null) { + cachedPartitionSpec = inferPartitioning() + } + cachedPartitionSpec + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org