[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r181543985 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -304,45 +304,14 @@ case class LoadDataCommand( } } -val loadPath = +val loadPath = { if (isLocal) { val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. --- End diff -- @wzhfy All test-cases related to windows query suite are passing i think in previous code, for reading the files based on wildcard char, we are trying to read the files parent directory first, and we were listing all files inside that folder and then we are trying to match the pattern for each file inside directory, so i think for getting the parent path we need to explicitly check that it should not have any unsupported characters like '*' ,But now we are directly passing the path with wildchar to globStatus() API of hdfs and this should able to pattern match irrespective of directory/files, in globStatus API i could see they have special handling for windows path, i will look into more details regarding this. ![image](https://user-images.githubusercontent.com/12999161/38765114-20a47d7e-3fd9-11e8-9863-59d179c4a2d8.png) Thanks all for the valuable feedbacks . --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: SPARK-23972: Update Parquet to 1.10.0.
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r181540952 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +58,139 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { --- End diff -- Couldn't it also be writing to an `OffHeapColumnVector`? https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java#L199 If so, I think the copy is 1MB at a time: https://github.com/apache/spark/blob/master/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java#L189 I agree that ByteBuffer shouldn't be supported in this PR. But there's an opportunity to use the bulk copy APIs which would benefit from any future optimization that happens. Plus even if the copy does eventually become a loop inside the column vector implementation, there's more chance of the JIT unrolling the loop since it's smaller. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20704: [SPARK-23551][BUILD] Exclude `hadoop-mapreduce-client-co...
Github user megaserg commented on the issue: https://github.com/apache/spark/pull/20704 Thank you @dongjoon-hyun! This was also affecting our Spark job performance! We're using `mapreduce.fileoutputcommitter.algorithm.version=2` in our Spark job config, as recommended e.g. here: http://spark.apache.org/docs/latest/cloud-integration.html. We're using user-provided Hadoop 2.9.0. However, since this 2.6.5 JAR was in spark/jars, it was given priority in the classpath over Hadoop-distributed 2.9.0 JAR. The 2.6.5 was silently ignoring the `mapreduce.fileoutputcommitter.algorithm.version` setting and used the default, slow algorithm (I believe hadoop-mapreduce-client-core only had one, slow, algorithm until 2.7.0). I believe this affects everyone who uses any mapreduce settings with Spark 2.3.0. Great job! Can we double-check that this JAR is not present in the "without-hadoop" Spark distribution anymore? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r181538566 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -304,45 +304,14 @@ case class LoadDataCommand( } } -val loadPath = +val loadPath = { if (isLocal) { val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. --- End diff -- yeah, this is what I was worried about. We need to be careful to change this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20992: [SPARK-23779][SQL] TaskMemoryManager and UnsafeSorter re...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/20992 Good point. I would add benchmark results. Let me leave ToDo in the description. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19868: [SPARK-22676] Avoid iterating all partition paths...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/19868#discussion_r181538066 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala --- @@ -176,12 +176,13 @@ class HadoopTableReader( val matches = fs.globStatus(pathPattern) matches.foreach(fileStatus => existPathSet += fileStatus.getPath.toString) } -// convert /demo/data/year/month/day to /demo/data/*/*/*/ +// convert /demo/data/year/month/day to /demo/data/year/month/*/ --- End diff -- Sure, that would be great. Is there some existing pr/jira working on that? if not, I can make the change :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20894: [SPARK-23786][SQL] Checking column names of csv headers
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20894 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89360/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20894: [SPARK-23786][SQL] Checking column names of csv headers
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20894 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20894: [SPARK-23786][SQL] Checking column names of csv headers
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20894 **[Test build #89360 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89360/testReport)** for PR 20894 at commit [`b43a7c7`](https://github.com/apache/spark/commit/b43a7c7ec50e03aaf4990e9bbb6989cdb2c076ef). * This patch **fails from timeout after a configured wait of \`300m\`**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20992: [SPARK-23779][SQL] TaskMemoryManager and UnsafeSorter re...
Github user rxin commented on the issue: https://github.com/apache/spark/pull/20992 What are the performance improvements? Without additional data this seems like just an invasive change without any real benefits ... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21065: [SPARK-23979][SQL] MultiAlias should not be a Cod...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21065 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21065: [SPARK-23979][SQL] MultiAlias should not be a CodegenFal...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21065 LGTM, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20988: [SPARK-23877][SQL]: Use filter predicates to prun...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20988#discussion_r181535823 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala --- @@ -129,35 +151,41 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic /** * A pattern that finds the partitioned table relation node inside the given plan, and returns a - * pair of the partition attributes and the table relation node. + * pair of the partition attributes, partition filters, and the table relation node. * * It keeps traversing down the given plan tree if there is a [[Project]] or [[Filter]] with * deterministic expressions, and returns result after reaching the partitioned table relation * node. */ - object PartitionedRelation { - -def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = plan match { - case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) -if fsRelation.partitionSchema.nonEmpty => -val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) -Some((AttributeSet(partAttrs), l)) - - case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty => -val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) -Some((AttributeSet(partAttrs), relation)) - - case p @ Project(projectList, child) if projectList.forall(_.deterministic) => -unapply(child).flatMap { case (partAttrs, relation) => - if (p.references.subsetOf(partAttrs)) Some((p.outputSet, relation)) else None -} + object PartitionedRelation extends PredicateHelper { + +def unapply(plan: LogicalPlan): Option[(AttributeSet, Seq[Expression], LogicalPlan)] = { + plan match { +case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) + if fsRelation.partitionSchema.nonEmpty => + val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) + Some((AttributeSet(partAttrs), Nil, l)) + +case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty => + val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) + Some((AttributeSet(partAttrs), Nil, relation)) + +case p @ Project(projectList, child) if projectList.forall(_.deterministic) => + unapply(child).flatMap { case (partAttrs, filters, relation) => +if (p.references.subsetOf(partAttrs)) Some((p.outputSet, filters, relation)) else None --- End diff -- I'd propose something top-down like ``` def getPartitionedRelation( plan: LogicalPlan, predicates: Seq[Expression]): Option[(AttributeSet, Seq[Expression], LogicalPlan)] = { plan match { case Filter(condition, child) if condition.deterministic => getPartitionedRelation(child, predicates ++ splitConjunctivePredicates(condition)) case Project(projectList, child) if projectList.forall(_.deterministic) => getPartitionedRelation(child, predicates.filter(_.references.subsetOf(child.outputSet))) case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) if fsRelation.partitionSchema.nonEmpty => val partAttrs = ... val partitionFilters = predicates.filter(_.references.subsetOf(partAttrs)) Some(...) case _ => None } } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21031: [SPARK-23923][SQL] Add cardinality function
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21031 According to my understanding, these activities are to improve compatibility with other DBs (like Presto) in https://issues.apache.org/jira/browse/SPARK-23899 and https://issues.apache.org/jira/browse/SPARK-23923. As you pointed out, `cardinality` and `size` has the same except data type. I used the same implementation. @gatorsmile what do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21060: [SPARK-23942][PYTHON][SQL][BRANCH-2.3] Makes collect in ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21060 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89363/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21060: [SPARK-23942][PYTHON][SQL][BRANCH-2.3] Makes collect in ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21060 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20988: [SPARK-23877][SQL]: Use filter predicates to prun...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20988#discussion_r181535484 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala --- @@ -129,35 +151,41 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic /** * A pattern that finds the partitioned table relation node inside the given plan, and returns a - * pair of the partition attributes and the table relation node. + * pair of the partition attributes, partition filters, and the table relation node. * * It keeps traversing down the given plan tree if there is a [[Project]] or [[Filter]] with * deterministic expressions, and returns result after reaching the partitioned table relation * node. */ - object PartitionedRelation { - -def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = plan match { - case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) -if fsRelation.partitionSchema.nonEmpty => -val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) -Some((AttributeSet(partAttrs), l)) - - case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty => -val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) -Some((AttributeSet(partAttrs), relation)) - - case p @ Project(projectList, child) if projectList.forall(_.deterministic) => -unapply(child).flatMap { case (partAttrs, relation) => - if (p.references.subsetOf(partAttrs)) Some((p.outputSet, relation)) else None -} + object PartitionedRelation extends PredicateHelper { + +def unapply(plan: LogicalPlan): Option[(AttributeSet, Seq[Expression], LogicalPlan)] = { + plan match { +case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) + if fsRelation.partitionSchema.nonEmpty => + val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) + Some((AttributeSet(partAttrs), Nil, l)) + +case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty => + val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) + Some((AttributeSet(partAttrs), Nil, relation)) + +case p @ Project(projectList, child) if projectList.forall(_.deterministic) => + unapply(child).flatMap { case (partAttrs, filters, relation) => +if (p.references.subsetOf(partAttrs)) Some((p.outputSet, filters, relation)) else None --- End diff -- what about `Filter(p > 1, Project(a, p, Table(a, b, p, partitioned by p)))`? `p > 1` should also be a partition filter. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21060: [SPARK-23942][PYTHON][SQL][BRANCH-2.3] Makes collect in ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21060 **[Test build #89363 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89363/testReport)** for PR 21060 at commit [`4656724`](https://github.com/apache/spark/commit/4656724d27c208d794f99691cfbf93b4bb118d93). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: SPARK-23972: Update Parquet to 1.10.0.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r181535144 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +58,139 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { --- End diff -- It looks that way, but it actually replaces a similar loop: https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java#L283-L291 The main problem is that ByteBufffer isn't supported in the column vectors. That seems beyond the scope of this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20988: [SPARK-23877][SQL]: Use filter predicates to prune parti...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/20988 can we add a test? We can use `HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount()` to check if this patch can really reduce the number of partitions being fetched. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21071: [SPARK-21962][CORE] Distributed Tracing in Spark
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21071 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21071: [SPARK-21962][CORE] Distributed Tracing in Spark
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21071 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21071: [SPARK-21962][CORE] Distributed Tracing in Spark
GitHub user devaraj-kavali opened a pull request: https://github.com/apache/spark/pull/21071 [SPARK-21962][CORE] Distributed Tracing in Spark ## What changes were proposed in this pull request? This PR integrates with HTrace, it sends traces for the application and tasks when the span receivers are configured. The trace configurations can be updated along with spark configurations by adding prefix 'spark.htrace.' to the HTrace configurations like below, `spark.htrace.span.receiver.classes` org.apache.htrace.core.LocalFileSpanReceiver;org.apache.htrace.impl.HTracedSpanReceiver;org.apache.htrace.impl.ZipkinSpanReceiver `spark.htrace.htraced.receiver.address` IP:PORT `spark.htrace.local.file.span.receiver.path`/path/local-span-file `spark.htrace.sampler.classes` org.apache.htrace.core.AlwaysSampler And also it provides an additional configuration to receive the parent span with the config name `spark.app.spanId`, if the `spark.app.spanId` configuration exist then it takes it as parent span, otherwise it starts a new span for each application. ## How was this patch tested? I have verified using the existing tests with the added test and also verified manually in all these below deployment modes with different tracers individually and together. 1. Local and local-cluster 2. Standalone Client and Cluster modes 3. Yarn Client and Cluster modes 4. Mesos Client and Cluster modes You can merge this pull request into a Git repository by running: $ git pull https://github.com/devaraj-kavali/spark SPARK-21962 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21071.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21071 commit 254e4ed38411d45cc8c2ba8cdace069da219c359 Author: Devaraj K Date: 2018-04-14T00:06:36Z [SPARK-21962][CORE] Distributed Tracing in Spark --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20933: [SPARK-23817][SQL]Migrate ORC file format read pa...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/20933#discussion_r181531212 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.orc + +import java.net.URI +import java.util.Locale + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.orc.{OrcConf, OrcFile} +import org.apache.orc.mapred.OrcStruct +import org.apache.orc.mapreduce.OrcInputFormat + +import org.apache.spark.TaskContext +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Expression, JoinedRow} +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.orc.{OrcColumnarBatchReader, OrcDeserializer, OrcFilters, OrcUtils} +import org.apache.spark.sql.execution.datasources.v2.ColumnarBatchFileSourceReader +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.types.{AtomicType, StructType} +import org.apache.spark.util.SerializableConfiguration + +class OrcDataSourceV2 extends DataSourceV2 with ReadSupport with ReadSupportWithSchema { + override def createReader(options: DataSourceOptions): DataSourceReader = { +new OrcDataSourceReader(options, None) + } + + override def createReader(schema: StructType, options: DataSourceOptions): DataSourceReader = { +new OrcDataSourceReader(options, Some(schema)) + } +} + +case class OrcDataSourceReader(options: DataSourceOptions, userSpecifiedSchema: Option[StructType]) + extends ColumnarBatchFileSourceReader + with SupportsPushDownCatalystFilters { + + override def inferSchema(files: Seq[FileStatus]): Option[StructType] = { +OrcUtils.readSchema(sparkSession, files) + } + + private var pushedFiltersArray: Array[Expression] = Array.empty + + override def readFunction: PartitionedFile => Iterator[InternalRow] = { --- End diff -- With #21029, we can get rid of this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21031: [SPARK-23923][SQL] Add cardinality function
Github user rxin commented on the issue: https://github.com/apache/spark/pull/21031 If there is already size, why do we need to create a new implementation? Why can't we just rewrite cardinality to size? Also I wouldn't add any programming API for this, since there is already size. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21048 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21056: [SPARK-23849][SQL] Tests for samplingRatio of jso...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/21056#discussion_r181530121 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2128,38 +2128,60 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } - test("SPARK-23849: schema inferring touches less data if samplingRation < 1.0") { -val predefinedSample = Set[Int](2, 8, 15, 27, 30, 34, 35, 37, 44, 46, + val sampledTestData = (row: Row) => { +val value = row.getLong(0) +val predefinedSample = Set[Long](2, 8, 15, 27, 30, 34, 35, 37, 44, 46, 57, 62, 68, 72) -withTempPath { path => - val writer = Files.newBufferedWriter(Paths.get(path.getAbsolutePath), -StandardCharsets.UTF_8, StandardOpenOption.CREATE_NEW) - for (i <- 0 until 100) { -if (predefinedSample.contains(i)) { - writer.write(s"""{"f1":${i.toString}}""" + "\n") -} else { - writer.write(s"""{"f1":${(i.toDouble + 0.1).toString}}""" + "\n") -} - } - writer.close() +if (predefinedSample.contains(value)) { + s"""{"f1":${value.toString}}""" +} else { + s"""{"f1":${(value.toDouble + 0.1).toString}}""" +} + } - val ds = spark.read.option("samplingRatio", 0.1).json(path.getCanonicalPath) + test("SPARK-23849: schema inferring touches less data if samplingRatio < 1.0") { +// Set default values for the DataSource parameters to make sure +// that whole test file is mapped to only one partition. This will guarantee +// reliable sampling of the input file. +withSQLConf( + "spark.sql.files.maxPartitionBytes" -> (128 * 1024 * 1024).toString, + "spark.sql.files.openCostInBytes" -> (4 * 1024 * 1024).toString +)(withTempPath { path => + val rdd = spark.sqlContext.range(0, 100, 1, 1).map(sampledTestData) + rdd.write.text(path.getAbsolutePath) + + val ds = spark.read +.option("inferSchema", true) +.option("samplingRatio", 0.1) +.json(path.getCanonicalPath) assert(ds.schema == new StructType().add("f1", LongType)) -} +}) } - test("SPARK-23849: usage of samplingRation while parsing of dataset of strings") { -val dstr = spark.sparkContext.parallelize(0 until 100, 1).map { i => - val predefinedSample = Set[Int](2, 8, 15, 27, 30, 34, 35, 37, 44, 46, -57, 62, 68, 72) - if (predefinedSample.contains(i)) { -s"""{"f1":${i.toString}}""" + "\n" - } else { -s"""{"f1":${(i.toDouble + 0.1).toString}}""" + "\n" - } -}.toDS() -val ds = spark.read.option("samplingRatio", 0.1).json(dstr) + test("SPARK-23849: usage of samplingRatio while parsing a dataset of strings") { +val rdd = spark.sqlContext.range(0, 100, 1, 1).map(sampledTestData) +val ds = spark.read + .option("inferSchema", true) + .option("samplingRatio", 0.1) + .json(rdd) assert(ds.schema == new StructType().add("f1", LongType)) } + + test("SPARK-23849: samplingRatio is out of the range (0, 1.0]") { +val dstr = spark.sparkContext.parallelize(0 until 100, 1).map(_.toString).toDS() --- End diff -- can you just use spark.range? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21053: [SPARK-23924][SQL] Add element_at function
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/21053#discussion_r181529978 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala --- @@ -413,6 +413,78 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { ) } + test("element at function") { --- End diff -- also the function is element_at, not "element at" ... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21048: [SPARK-23966][SS] Refactoring all checkpoint file writin...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/21048 I am merging this to master. Once again, thank you for your reviews. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21053: [SPARK-23924][SQL] Add element_at function
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/21053#discussion_r181529901 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala --- @@ -413,6 +413,78 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { ) } + test("element at function") { --- End diff -- why do we need so many test cases here? this is just to verify the api works end to end. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20933: [SPARK-23817][SQL]Migrate ORC file format read pa...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/20933#discussion_r181529318 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.orc + +import java.net.URI +import java.util.Locale + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.orc.{OrcConf, OrcFile} +import org.apache.orc.mapred.OrcStruct +import org.apache.orc.mapreduce.OrcInputFormat + +import org.apache.spark.TaskContext +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Expression, JoinedRow} +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.orc.{OrcColumnarBatchReader, OrcDeserializer, OrcFilters, OrcUtils} +import org.apache.spark.sql.execution.datasources.v2.ColumnarBatchFileSourceReader +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.types.{AtomicType, StructType} +import org.apache.spark.util.SerializableConfiguration + +class OrcDataSourceV2 extends DataSourceV2 with ReadSupport with ReadSupportWithSchema { + override def createReader(options: DataSourceOptions): DataSourceReader = { +new OrcDataSourceReader(options, None) + } + + override def createReader(schema: StructType, options: DataSourceOptions): DataSourceReader = { +new OrcDataSourceReader(options, Some(schema)) + } +} + +case class OrcDataSourceReader(options: DataSourceOptions, userSpecifiedSchema: Option[StructType]) + extends ColumnarBatchFileSourceReader + with SupportsPushDownCatalystFilters { + + override def inferSchema(files: Seq[FileStatus]): Option[StructType] = { +OrcUtils.readSchema(sparkSession, files) + } + + private var pushedFiltersArray: Array[Expression] = Array.empty + + override def readFunction: PartitionedFile => Iterator[InternalRow] = { --- End diff -- btw i think it's also ok if we know what we want in the final version, and the intermediate change tries to minimize code changes (i haven't looked at the pr at all so don't interpret this comment as endorsing or not endorsing the pr design) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: SPARK-23972: Update Parquet to 1.10.0.
Github user henryr commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r181528729 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +58,139 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +for (int i = 0; i < total; i += 1) { --- End diff -- Here and elsewhere a bulk copy has been replaced by many smaller copies. It would be better to be able to use the bulk version. I think it would be preferable to at least have: if (buffer.hasArray()) { c.putIntsLittleEndian(rowId, total, buffer.array(), 0); } else { for (int i = 0 // ... etc } --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20888: [SPARK-23775][TEST] Make DataFrameRangeSuite not flaky
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20888 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20888: [SPARK-23775][TEST] Make DataFrameRangeSuite not flaky
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20888 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89358/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20888: [SPARK-23775][TEST] Make DataFrameRangeSuite not flaky
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20888 **[Test build #89358 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89358/testReport)** for PR 20888 at commit [`67332fe`](https://github.com/apache/spark/commit/67332fe39f61c1bcefca4fcaf226a1e04f884218). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17359: [SPARK-20028][SQL] Add aggreagate expression nGrams
Github user sijunhe commented on the issue: https://github.com/apache/spark/pull/17359 Would love to see this feature in Spark SQL. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21048: [SPARK-23966][SS] Refactoring all checkpoint file writin...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21048 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89357/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21048: [SPARK-23966][SS] Refactoring all checkpoint file writin...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21048 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21048: [SPARK-23966][SS] Refactoring all checkpoint file writin...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21048 **[Test build #89357 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89357/testReport)** for PR 21048 at commit [`c5b0c98`](https://github.com/apache/spark/commit/c5b0c98257e39d6af2dd8f702b8cbc9f9e6fabe9). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21068: [SPARK-16630][YARN] Blacklist a node if executors won't ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21068 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21068: [SPARK-16630][YARN] Blacklist a node if executors won't ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21068 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89355/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21068: [SPARK-16630][YARN] Blacklist a node if executors won't ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21068 **[Test build #89355 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89355/testReport)** for PR 21068 at commit [`e49bd0d`](https://github.com/apache/spark/commit/e49bd0de5c25df4eb65ba975e948e043c0e076cf). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20998: [SPARK-23888][CORE] speculative task should not run on a...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/20998 @squito I completely agree that the comment is inaccurate. Note that this is for a specific taskset, so impact is limited to that taskset (w.r.t using executors for spec exec) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21069: [SPARK-23920][SQL]add array_remove to remove all ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21069#discussion_r181525550 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -287,3 +287,44 @@ case class ArrayContains(left: Expression, right: Expression) override def prettyName: String = "array_contains" } + +/** + * Remove all elements that equal to element from the given array + */ +@ExpressionDescription( + usage = "_FUNC_(array, element) - Remove all elements that equal to element from array.", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3, null, 3), 3); + [1,2,null] + """, since = "2.4.0") +case class ArrayRemove(left: Expression, right: Expression) + extends BinaryExpression with ImplicitCastInputTypes with CodegenFallback { --- End diff -- As the same reason at https://github.com/apache/spark/pull/21061#discussion_r181399858, I think we should avoid using `CodegenFallback` if possible. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21048: [SPARK-23966][SS] Refactoring all checkpoint file writin...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/21048 @steveloughran Thanks for your comments :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r181524647 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -287,3 +288,80 @@ case class ArrayContains(left: Expression, right: Expression) override def prettyName: String = "array_contains" } + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) + extends BinaryExpression with ExpectsInputTypes with CodegenFallback { --- End diff -- Wholestage codegen doesn't support `CodegenFallback`. So even this expression codegen has no performance advantage itself, it still can makes a difference because it breaks a query to non wholestage codegen and wholestage codegen parts. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20923: [SPARK-23807][BUILD] Add Hadoop 3.1 profile with ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20923#discussion_r181524292 --- Diff: hadoop-cloud/pom.xml --- @@ -38,7 +38,32 @@ hadoop-cloud + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + --- End diff -- Is this still needed after you removed the committer code? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20923: [SPARK-23807][BUILD] Add Hadoop 3.1 profile with ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20923#discussion_r181524069 --- Diff: assembly/pom.xml --- @@ -254,6 +254,14 @@ spark-hadoop-cloud_${scala.binary.version} ${project.version} + + + org.eclipse.jetty --- End diff -- This kinda sucks. Doesn't this also end up pulling up a bunch of other jetty stuff into the packaging? I guess there's no way around it until Hadoop itself shades jetty in some way... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20923: [SPARK-23807][BUILD] Add Hadoop 3.1 profile with ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20923#discussion_r181524354 --- Diff: hadoop-cloud/pom.xml --- @@ -38,7 +38,32 @@ hadoop-cloud + --- End diff -- Is this still needed after you removed the committer code? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21065: [SPARK-23979][SQL] MultiAlias should not be a CodegenFal...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/21065 cc @cloud-fan @hvanhovell --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21069: [SPARK-23920][SQL]add array_remove to remove all element...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21069 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21069: [SPARK-23920][SQL]add array_remove to remove all element...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21069 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89354/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21069: [SPARK-23920][SQL]add array_remove to remove all element...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21069 **[Test build #89354 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89354/testReport)** for PR 21069 at commit [`cd99694`](https://github.com/apache/spark/commit/cd9969442f780e5a0dad74aa61e49151dd2b2250). * This patch passes all tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `case class ArrayRemove(left: Expression, right: Expression)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21033: [SPARK-19320][MESOS]allow specifying a hard limit on num...
Github user yanji84 commented on the issue: https://github.com/apache/spark/pull/21033 Anything else do we need to do to merge in this change? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21044: [SPARK-9312][ML] Add RawPrediction, numClasses, and numF...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21044 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89361/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21044: [SPARK-9312][ML] Add RawPrediction, numClasses, and numF...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21044 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21044: [SPARK-9312][ML] Add RawPrediction, numClasses, and numF...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21044 **[Test build #89361 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89361/testReport)** for PR 21044 at commit [`b3c7fec`](https://github.com/apache/spark/commit/b3c7fec0fda9056b832d1d35e829e9946218e504). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21068: [SPARK-16630][YARN] Blacklist a node if executors...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21068#discussion_r181513236 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/FailureWithinTimeIntervalTracker.scala --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.yarn + +import scala.collection.mutable + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.util.{Clock, SystemClock} + +private[spark] class FailureWithinTimeIntervalTracker(sparkConf: SparkConf) extends Logging { + + private var clock: Clock = new SystemClock + + private val executorFailuresValidityInterval = + sparkConf.get(config.EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L) + + // Queue to store the timestamp of failed executors for each host + private val failedExecutorsTimeStampsPerHost = mutable.Map[String, mutable.Queue[Long]]() + + private val sumFailedExecutorsTimeStamps = new mutable.Queue[Long]() --- End diff -- why is this called "sum"? I think the old name `failedExecutorTimestamps` is more appropriate, same for the other places you added "sum" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21068: [SPARK-16630][YARN] Blacklist a node if executors...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21068#discussion_r181515465 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.yarn + +import scala.collection.JavaConverters._ +import scala.collection.mutable.HashMap + +import org.apache.hadoop.yarn.client.api.AMRMClient +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.yarn.config._ +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.util.{Clock, SystemClock, Utils} + +private[spark] class YarnAllocatorBlacklistTracker( +sparkConf: SparkConf, +amClient: AMRMClient[ContainerRequest], +failureWithinTimeIntervalTracker: FailureWithinTimeIntervalTracker) + extends Logging { + + private val DEFAULT_TIMEOUT = "1h" + + private val BLACKLIST_TIMEOUT_MILLIS = + sparkConf.get(BLACKLIST_TIMEOUT_CONF).getOrElse(Utils.timeStringAsMs(DEFAULT_TIMEOUT)) + + private val IS_YARN_ALLOCATION_BLACKLIST_ENABLED = +sparkConf.get(YARN_ALLOCATION_BLACKLIST_ENABLED).getOrElse(false) + + private val BLACKLIST_MAX_FAILED_EXEC_PER_NODE = sparkConf.get(MAX_FAILED_EXEC_PER_NODE) + + private val BLACKLIST_SIZE_LIMIT = sparkConf.get(YARN_BLACKLIST_SIZE_LIMIT) + + private val BLACKLIST_SIZE_DEFAULT_WEIGHT = sparkConf.get(YARN_BLACKLIST_SIZE_DEFAULT_WEIGHT) + + private var clock: Clock = new SystemClock + + private val allocationBlacklistedNodesWithExpiry = new HashMap[String, Long]() + + private var currentBlacklistedYarnNodes = Set.empty[String] + + private var schedulerBlacklistedNodesWithExpiry = Map.empty[String, Long] + + private var numClusterNodes = (Int.MaxValue / BLACKLIST_SIZE_DEFAULT_WEIGHT).toInt + + def setNumClusterNodes(numClusterNodes: Int): Unit = { +this.numClusterNodes = numClusterNodes + } + + /** + * Use a different clock. This is mainly used for testing. + */ + def setClock(newClock: Clock): Unit = { +clock = newClock + } + + def handleResourceAllocationFailure(hostOpt: Option[String]): Unit = { +hostOpt match { + case Some(hostname) => +// failures on a already blacklisted nodes are not even tracked +// otherwise such failures could shutdown the application +// as resource requests are asynchronous +// and a late failure response could exceed MAX_EXECUTOR_FAILURES +if (!schedulerBlacklistedNodesWithExpiry.contains(hostname) && + !allocationBlacklistedNodesWithExpiry.contains(hostname)) { + failureWithinTimeIntervalTracker.registerFailureOnHost(hostname) + updateAllocationBlacklistedNodes(hostname) +} + case None => +failureWithinTimeIntervalTracker.registerSumExecutorFailure() +} + } + + private def updateAllocationBlacklistedNodes(hostname: String): Unit = { +if (IS_YARN_ALLOCATION_BLACKLIST_ENABLED) { + val failuresOnHost = failureWithinTimeIntervalTracker.getNumExecutorFailuresOnHost(hostname) + if (failuresOnHost > BLACKLIST_MAX_FAILED_EXEC_PER_NODE) { +logInfo("blacklisting host as YARN allocation failed: %s".format(hostname)) +allocationBlacklistedNodesWithExpiry.put( + hostname, + clock.getTimeMillis() + BLACKLIST_TIMEOUT_MILLIS) +refreshBlacklistedNodes() + } +} + } + + def setSchedulerBlacklistedNodes(schedulerBlacklistedNodesWithExpiry: Map[String, Long]): Unit = { +this.schedulerBlacklistedNodesWithExpiry = schedulerBlacklistedNodesWithExpiry +refreshBlacklistedNodes() + } + +
[GitHub] spark issue #21060: [SPARK-23942][PYTHON][SQL][BRANCH-2.3] Makes collect in ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21060 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21060: [SPARK-23942][PYTHON][SQL][BRANCH-2.3] Makes collect in ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21060 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2328/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21060: [SPARK-23942][PYTHON][SQL][BRANCH-2.3] Makes collect in ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21060 **[Test build #89363 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89363/testReport)** for PR 21060 at commit [`4656724`](https://github.com/apache/spark/commit/4656724d27c208d794f99691cfbf93b4bb118d93). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: SPARK-23972: Update Parquet to 1.10.0.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21070 Upstream benchmarks for buffer management changes are here: https://github.com/apache/parquet-mr/pull/390#issuecomment-338505426 That doesn't show the GC benefit for smaller buffer allocations because of the heap size. It is just to show that the changes do no harm. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21060: [SPARK-23942][PYTHON][SQL][BRANCH-2.3] Makes collect in ...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/21060 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: SPARK-23972: Update Parquet to 1.10.0.
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21070 Could you share the performance number? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: SPARK-23972: Update Parquet to 1.10.0.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21070 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: SPARK-23972: Update Parquet to 1.10.0.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21070 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: SPARK-23972: Update Parquet to 1.10.0.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21070 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89362/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: SPARK-23972: Update Parquet to 1.10.0.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21070 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2327/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: SPARK-23972: Update Parquet to 1.10.0.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21070 **[Test build #89362 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89362/testReport)** for PR 21070 at commit [`4df17a6`](https://github.com/apache/spark/commit/4df17a6e9726cb22e499d479a9ab48f5db18a538). * This patch **fails build dependency tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: SPARK-23972: Update Parquet to 1.10.0.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21070 **[Test build #89362 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89362/testReport)** for PR 21070 at commit [`4df17a6`](https://github.com/apache/spark/commit/4df17a6e9726cb22e499d479a9ab48f5db18a538). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21043: [SPARK-23963] [SQL] Properly handle large number ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21043 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: SPARK-23972: Update Parquet to 1.10.0.
GitHub user rdblue opened a pull request: https://github.com/apache/spark/pull/21070 SPARK-23972: Update Parquet to 1.10.0. ## What changes were proposed in this pull request? This updates Parquet to 1.10.0 and updates the vectorized path for buffer management changes. Parquet 1.10.0 uses ByteBufferInputStream instead of byte arrays in encoders. This allows Parquet to break allocations into smaller chunks that are better for garbage collection. ## How was this patch tested? Existing Parquet tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rdblue/spark SPARK-23972-update-parquet-to-1.10.0 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21070.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21070 commit 4df17a6e9726cb22e499d479a9ab48f5db18a538 Author: Ryan Blue Date: 2017-12-01T01:25:53Z SPARK-23972: Update Parquet to 1.10.0. This updates the vectorized path for changes in Parquet 1.10.0, which uses ByteBufferInputStream instead of byte arrays in encoders. This allows Parquet to break allocations into smaller chunks that are better for garbage collection. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21043: [SPARK-23963] [SQL] Properly handle large number of colu...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21043 Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20933: [SPARK-23817][SQL]Migrate ORC file format read pa...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20933#discussion_r181509305 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -368,8 +368,7 @@ case class FileSourceScanExec( val bucketed = selectedPartitions.flatMap { p => p.files.map { f => - val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen) --- End diff -- Yeah, I think the commit itself would be self-contained reorganization. The motivation is to refactor for this PR, which is okay. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20933: [SPARK-23817][SQL]Migrate ORC file format read pa...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20933#discussion_r181507712 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1185,6 +1185,13 @@ object SQLConf { .stringConf .createWithDefault("") + val DISABLED_V2_DATA_SOURCE_READERS = buildConf("spark.sql.disabledV2DataSourceReaders") --- End diff -- We need a better name. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/20997#discussion_r181507520 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala --- @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer} +import org.apache.kafka.common.{KafkaException, TopicPartition} + +import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging + +private[kafka010] sealed trait KafkaDataConsumer[K, V] { + /** + * Get the record for the given offset if available. + * + * @param offset the offset to fetch. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = { +internalConsumer.get(offset, pollTimeoutMs) + } + + /** + * Start a batch on a compacted topic + * + * @param offset the offset to fetch. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = { +internalConsumer.compactedStart(offset, pollTimeoutMs) + } + + /** + * Get the next record in the batch from a compacted topic. + * Assumes compactedStart has been called first, and ignores gaps. + * + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = { +internalConsumer.compactedNext(pollTimeoutMs) + } + + /** + * Rewind to previous record in the batch from a compacted topic. + * + * @throws NoSuchElementException if no previous element + */ + def compactedPrevious(): ConsumerRecord[K, V] = { +internalConsumer.compactedPrevious() + } + + /** + * Release this consumer from being further used. Depending on its implementation, + * this consumer will be either finalized, or reset for reuse later. + */ + def release(): Unit + + /** Reference to the internal implementation that this wrapper delegates to */ + protected def internalConsumer: InternalKafkaConsumer[K, V] +} + + +/** + * A wrapper around Kafka's KafkaConsumer. + * This is not for direct use outside this file. + */ +private[kafka010] +class InternalKafkaConsumer[K, V]( + val groupId: String, + val topicPartition: TopicPartition, + val kafkaParams: ju.Map[String, Object]) extends Logging { + + require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG), +"groupId used for cache key must match the groupId in kafkaParams") + + @volatile private var consumer = createConsumer + + /** indicates whether this consumer is in use or not */ + @volatile var inUse = true + + /** indicate whether this consumer is going to be stopped in the next release */ + @volatile var markedForClose = false + + // TODO if the buffer was kept around as a random-access structure, + // could possibly optimize re-calculating of an RDD in the same batch + @volatile private var buffer = ju.Collections.emptyListIterator[ConsumerRecord[K, V]]() + @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET + + override def toString: String = { +"InternalKafkaConsumer(" + + s"hash=${Integer.toHexString(hashCode)}, " + + s"groupId=$groupId, " + + s"topicPartition=$topicPartition)" + } + + /** Create a KafkaConsumer to fetch records for `topicPartition` */ + private def createConsumer: KafkaConsumer[K, V] = { +val c = new KafkaConsumer[K, V](kafkaParams) +val tps = new ju
[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/20997#discussion_r181506863 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala --- @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer} +import org.apache.kafka.common.{KafkaException, TopicPartition} + +import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging + +private[kafka010] sealed trait KafkaDataConsumer[K, V] { + /** + * Get the record for the given offset if available. + * + * @param offset the offset to fetch. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = { +internalConsumer.get(offset, pollTimeoutMs) + } + + /** + * Start a batch on a compacted topic + * + * @param offset the offset to fetch. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = { +internalConsumer.compactedStart(offset, pollTimeoutMs) + } + + /** + * Get the next record in the batch from a compacted topic. + * Assumes compactedStart has been called first, and ignores gaps. + * + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = { +internalConsumer.compactedNext(pollTimeoutMs) + } + + /** + * Rewind to previous record in the batch from a compacted topic. + * + * @throws NoSuchElementException if no previous element + */ + def compactedPrevious(): ConsumerRecord[K, V] = { +internalConsumer.compactedPrevious() + } + + /** + * Release this consumer from being further used. Depending on its implementation, + * this consumer will be either finalized, or reset for reuse later. + */ + def release(): Unit + + /** Reference to the internal implementation that this wrapper delegates to */ + protected def internalConsumer: InternalKafkaConsumer[K, V] +} + + +/** + * A wrapper around Kafka's KafkaConsumer. + * This is not for direct use outside this file. + */ +private[kafka010] +class InternalKafkaConsumer[K, V]( + val groupId: String, + val topicPartition: TopicPartition, + val kafkaParams: ju.Map[String, Object]) extends Logging { + + require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG), +"groupId used for cache key must match the groupId in kafkaParams") + + @volatile private var consumer = createConsumer + + /** indicates whether this consumer is in use or not */ + @volatile var inUse = true + + /** indicate whether this consumer is going to be stopped in the next release */ + @volatile var markedForClose = false + + // TODO if the buffer was kept around as a random-access structure, + // could possibly optimize re-calculating of an RDD in the same batch + @volatile private var buffer = ju.Collections.emptyListIterator[ConsumerRecord[K, V]]() + @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET + + override def toString: String = { +"InternalKafkaConsumer(" + + s"hash=${Integer.toHexString(hashCode)}, " + + s"groupId=$groupId, " + + s"topicPartition=$topicPartition)" + } + + /** Create a KafkaConsumer to fetch records for `topicPartition` */ + private def createConsumer: KafkaConsumer[K, V] = { +val c = new KafkaConsumer[K, V](kafkaParams) +val tps = new ju
[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/20997#discussion_r181506582 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala --- @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer} +import org.apache.kafka.common.{KafkaException, TopicPartition} + +import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging + +private[kafka010] sealed trait KafkaDataConsumer[K, V] { + /** + * Get the record for the given offset if available. + * + * @param offset the offset to fetch. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = { +internalConsumer.get(offset, pollTimeoutMs) + } + + /** + * Start a batch on a compacted topic + * + * @param offset the offset to fetch. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = { +internalConsumer.compactedStart(offset, pollTimeoutMs) + } + + /** + * Get the next record in the batch from a compacted topic. + * Assumes compactedStart has been called first, and ignores gaps. + * + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = { +internalConsumer.compactedNext(pollTimeoutMs) + } + + /** + * Rewind to previous record in the batch from a compacted topic. + * + * @throws NoSuchElementException if no previous element + */ + def compactedPrevious(): ConsumerRecord[K, V] = { +internalConsumer.compactedPrevious() + } + + /** + * Release this consumer from being further used. Depending on its implementation, + * this consumer will be either finalized, or reset for reuse later. + */ + def release(): Unit + + /** Reference to the internal implementation that this wrapper delegates to */ + protected def internalConsumer: InternalKafkaConsumer[K, V] +} + + +/** + * A wrapper around Kafka's KafkaConsumer. + * This is not for direct use outside this file. + */ +private[kafka010] +class InternalKafkaConsumer[K, V]( + val groupId: String, + val topicPartition: TopicPartition, + val kafkaParams: ju.Map[String, Object]) extends Logging { + + require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG), +"groupId used for cache key must match the groupId in kafkaParams") + + @volatile private var consumer = createConsumer + + /** indicates whether this consumer is in use or not */ + @volatile var inUse = true + + /** indicate whether this consumer is going to be stopped in the next release */ + @volatile var markedForClose = false + + // TODO if the buffer was kept around as a random-access structure, + // could possibly optimize re-calculating of an RDD in the same batch + @volatile private var buffer = ju.Collections.emptyListIterator[ConsumerRecord[K, V]]() + @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET + + override def toString: String = { +"InternalKafkaConsumer(" + + s"hash=${Integer.toHexString(hashCode)}, " + + s"groupId=$groupId, " + + s"topicPartition=$topicPartition)" + } + + /** Create a KafkaConsumer to fetch records for `topicPartition` */ + private def createConsumer: KafkaConsumer[K, V] = { +val c = new KafkaConsumer[K, V](kafkaParams) +val tps = new ju
[GitHub] spark issue #21044: [SPARK-9312][ML] Add RawPrediction, numClasses, and numF...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21044 **[Test build #89361 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89361/testReport)** for PR 21044 at commit [`b3c7fec`](https://github.com/apache/spark/commit/b3c7fec0fda9056b832d1d35e829e9946218e504). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21053: [SPARK-23924][SQL] Add element_at function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21053 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21053: [SPARK-23924][SQL] Add element_at function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21053 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89356/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21053: [SPARK-23924][SQL] Add element_at function
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21053 **[Test build #89356 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89356/testReport)** for PR 21053 at commit [`35844f8`](https://github.com/apache/spark/commit/35844f8402093e3a2cbe0cd57f3236128a5edf8c). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/20997#discussion_r181502862 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.util.concurrent.{Executors, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.util.Random + +import org.apache.kafka.clients.consumer.ConsumerConfig._ +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark._ + +class KafkaDataConsumerSuite extends SparkFunSuite with BeforeAndAfterAll { + + private var testUtils: KafkaTestUtils = _ + + override def beforeAll { +super.beforeAll() +testUtils = new KafkaTestUtils +testUtils.setup() + } + + override def afterAll { +if (testUtils != null) { + testUtils.teardown() + testUtils = null +} +super.afterAll() + } + + test("concurrent use of KafkaDataConsumer") { --- End diff -- Reuse test added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of cached ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20997 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of cached ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20997 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89359/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of cached ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20997 **[Test build #89359 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89359/testReport)** for PR 20997 at commit [`7aa3257`](https://github.com/apache/spark/commit/7aa32578950476e7d409be9ba461623e47f4714d). * This patch passes all tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class KafkaDataConsumerSuite extends SparkFunSuite with BeforeAndAfterAll ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21011: [SPARK-23916][SQL] Add array_join function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21011 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21011: [SPARK-23916][SQL] Add array_join function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21011 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89351/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21011: [SPARK-23916][SQL] Add array_join function
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21011 **[Test build #89351 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89351/testReport)** for PR 21011 at commit [`ad0d4aa`](https://github.com/apache/spark/commit/ad0d4aa5d671b3a99fa1bd30dc833a8b75444f6c). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21063: [SPARK-23886][Structured Streaming][WIP] Update query st...
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/21063 I guess we might not even need to make an API change, just document that these flags only mean anything for microbatch execution. In any case that's a separate discussion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21063: [SPARK-23886][Structured Streaming][WIP] Update query st...
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/21063 I'm not sure isDataAvailable makes sense in the context of continuous processing; it seems fundamentally tied to the microbatch execution model. I think the best option is to just leave it and isTriggerActive always false, and take a TODO to restructure StreamingQueryStatus to eliminate the assumption they're meaningful. (The latter would be an API change, so we'd definitely want a separate PR for it - fortunately StreamingQueryStatus isn't stable yet.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20894: [SPARK-23786][SQL] Checking column names of csv headers
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20894 **[Test build #89360 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89360/testReport)** for PR 20894 at commit [`b43a7c7`](https://github.com/apache/spark/commit/b43a7c7ec50e03aaf4990e9bbb6989cdb2c076ef). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of cached ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20997 **[Test build #89359 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89359/testReport)** for PR 20997 at commit [`7aa3257`](https://github.com/apache/spark/commit/7aa32578950476e7d409be9ba461623e47f4714d). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21045: [WIP][SPARK-23931][SQL] Adds zip function to sparksql
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/21045 @DylanGuedes the first suggestion I can give you is: do not use spark-shell for testing, but write UT and run them with a debugger. Then, you can breakpoint to check the generated code (or you can set the debug log level to have it written to stderr or find the way you prefer): looking at the generated code it is easy to understand the compile error and therefore fix it. Apart from that, your `doGenCode` method doesn't make sense: there you are trying to put the output you expect to see in the shell I guess, but instead you should generate valid Java code which performs the same operations you are doing in the `eval` method (or as in this case `nullSafeEval`): please refer to other functions for drawing your inspiration and understanding how it works. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21045: [WIP][SPARK-23931][SQL] Adds zip function to sparksql
Github user DylanGuedes commented on the issue: https://github.com/apache/spark/pull/21045 Ok so It works fine in spark-shell but in pyspark I got this error: ```shell File "/home/dguedes/Workspace/spark/python/pyspark/sql/functions.py", line 2155, in pyspark.sql.functions.zip Failed example: df.select(zip(df.vals1, df.vals2).alias('zipped')).collect() Exception raised: Traceback (most recent call last): File "/usr/lib64/python2.7/doctest.py", line 1315, in __run compileflags, 1) in test.globs File "", line 1, in df.select(zip(df.vals1, df.vals2).alias('zipped')).collect() File "/home/dguedes/Workspace/spark/python/pyspark/sql/dataframe.py", line 466, in collect port = self._jdf.collectToPython() File "/home/dguedes/Workspace/spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/home/dguedes/Workspace/spark/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/home/dguedes/Workspace/spark/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling o2240.collectToPython. : java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 41, Column 2: failed to compile: org.codehaus.commons.compiler.Com$ ileException: File 'generated.java', Line 41, Column 2: Unexpected token "[" in primary ``` The problem is in the `doGenCode` function, but I can't see why. Any suggestions are welcome :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20888: [SPARK-23775][TEST] Make DataFrameRangeSuite not flaky
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20888 **[Test build #89358 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89358/testReport)** for PR 20888 at commit [`67332fe`](https://github.com/apache/spark/commit/67332fe39f61c1bcefca4fcaf226a1e04f884218). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21068: [SPARK-16630][YARN] Blacklist a node if executors won't ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21068 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89350/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21068: [SPARK-16630][YARN] Blacklist a node if executors won't ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21068 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21068: [SPARK-16630][YARN] Blacklist a node if executors won't ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21068 **[Test build #89350 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89350/testReport)** for PR 21068 at commit [`fd1923e`](https://github.com/apache/spark/commit/fd1923ef3a9b7ab5355e13ddf3d3f537ac00c704). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org