[GitHub] spark issue #22429: [SPARK-25440][SQL] Dumping query execution info to a fil...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22429 I took a super quick pass - the change actually quite looks okay in general to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22429: [SPARK-25440][SQL] Dumping query execution info t...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22429#discussion_r232604420 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -176,9 +176,9 @@ case class TakeOrderedAndProjectExec( override def outputPartitioning: Partitioning = SinglePartition - override def simpleString: String = { -val orderByString = Utils.truncatedString(sortOrder, "[", ",", "]") -val outputString = Utils.truncatedString(output, "[", ",", "]") + override def simpleString(maxFields: Option[Int]): String = { --- End diff -- Can we just get rid of the `maxFields`? I think this makes the PR hard to read. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22429: [SPARK-25440][SQL] Dumping query execution info to a fil...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22429 @MaxGekk, couple of questions for its implementation from a cursory look. It's the implementation is complicated here: 1. it tries to use writer and avoid to construct the string. I think it's unlikely a single plan causes OOO. I think you can just iterate the plan and write out the trees. 2. This looks introduces `None` concept to indicate no limit which makes this PR hard to read. I think it's okay to expose that number to `toFile`. I think this is an internal API, right? People could just set whatever number they want I guess. Fix me if I misread. If we get rid of those two points, I think the PR size will be drastically small. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22779: [SPARK-25786][CORE]If the ByteBuffer.hasArray is ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22779#discussion_r232587038 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -338,7 +338,7 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: Boole // Make these lazy vals to avoid creating a buffer unless we use them. private lazy val output = ks.newKryoOutput() - private lazy val input = if (useUnsafe) new KryoUnsafeInput() else new KryoInput() + private lazy val input = if (useUnsafe) new KryoUnsafeInput(4096) else new KryoInput(4096) --- End diff -- why do we need to set the buffer size here while that's set by `setBuffer`? Can't we set it only when the `ByteBuffer` is backed by an accessible byte array? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23011: [SPARK-26013][R][BUILD] Upgrade R tools version from 3.4...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/23011 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22809: [SPARK-19851][SQL] Add support for EVERY and ANY ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22809#discussion_r232564281 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Max.scala --- @@ -57,3 +57,34 @@ case class Max(child: Expression) extends DeclarativeAggregate { override lazy val evaluateExpression: AttributeReference = max } + +abstract class AnyAggBase(arg: Expression) + extends UnevaluableAggrgate with ImplicitCastInputTypes { + + override def children: Seq[Expression] = arg :: Nil + + override def dataType: DataType = BooleanType + + override def inputTypes: Seq[AbstractDataType] = Seq(BooleanType) + + override def checkInputDataTypes(): TypeCheckResult = { +arg.dataType match { + case dt if dt != BooleanType => +TypeCheckResult.TypeCheckFailure(s"Input to function '$prettyName' should have been " + + s"${BooleanType.simpleString}, but it's [${arg.dataType.catalogString}].") + case _ => TypeCheckResult.TypeCheckSuccess +} + } +} + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns true if at least one value of `expr` is true.") --- End diff -- (btw, let's add `since` at `ExpressionDescription` wherever possible .. ) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22693#discussion_r232556859 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -115,26 +116,45 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] { class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { +case filterPlan @ Filter(_, SubqueryAlias(_, relation: HiveTableRelation)) => + val predicates = PhysicalOperation.unapply(filterPlan).map(_._2).getOrElse(Nil) + computeTableStats(relation, predicates) case relation: HiveTableRelation if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty => - val table = relation.tableMeta - val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) { -try { - val hadoopConf = session.sessionState.newHadoopConf() - val tablePath = new Path(table.location) - val fs: FileSystem = tablePath.getFileSystem(hadoopConf) - fs.getContentSummary(tablePath).getLength -} catch { - case e: IOException => -logWarning("Failed to get table size from hdfs.", e) -session.sessionState.conf.defaultSizeInBytes -} - } else { -session.sessionState.conf.defaultSizeInBytes + computeTableStats(relation) + } + + private def computeTableStats( + relation: HiveTableRelation, + predicates: Seq[Expression] = Nil): LogicalPlan = { +val table = relation.tableMeta +val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) { + try { +val hadoopConf = session.sessionState.newHadoopConf() +val tablePath = new Path(table.location) +val fs: FileSystem = tablePath.getFileSystem(hadoopConf) +BigInt(fs.getContentSummary(tablePath).getLength) + } catch { +case e: IOException => + logWarning("Failed to get table size from hdfs.", e) + getSizeInBytesFromTablePartitions(table.identifier, predicates) } +} else { + getSizeInBytesFromTablePartitions(table.identifier, predicates) +} +val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = sizeInBytes))) +relation.copy(tableMeta = withStats) + } - val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes - relation.copy(tableMeta = withStats) + private def getSizeInBytesFromTablePartitions( + tableIdentifier: TableIdentifier, + predicates: Seq[Expression] = Nil): BigInt = { +session.sessionState.catalog.listPartitionsByFilter(tableIdentifier, predicates) match { --- End diff -- How come https://github.com/apache/spark/pull/22743 solves this problem? That PR targets to invalidate cache when configurations are changed. This PR targets to compute stats from HDFS when they are not available. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23012: [SPARK-26014][R] Deprecate R prior to version 3.4...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/23012#discussion_r232549234 --- Diff: docs/index.md --- @@ -31,7 +31,8 @@ Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS). It's easy locally on one machine --- all you need is to have `java` installed on your system `PATH`, or the `JAVA_HOME` environment variable pointing to a Java installation. -Spark runs on Java 8+, Python 2.7+/3.4+ and R 3.1+. For the Scala API, Spark {{site.SPARK_VERSION}} +Spark runs on Java 8+, Python 2.7+/3.4+ and R 3.1+. R prior to version 3.4 is deprecated as of Spark 3.0. --- End diff -- Ah, yea, I switched this to deprecate it for now. I was a bit curious about that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23012: [SPARK-26014][R] Deprecate R prior to version 3.4...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/23012#discussion_r232549053 --- Diff: docs/index.md --- @@ -31,7 +31,8 @@ Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS). It's easy locally on one machine --- all you need is to have `java` installed on your system `PATH`, or the `JAVA_HOME` environment variable pointing to a Java installation. -Spark runs on Java 8+, Python 2.7+/3.4+ and R 3.1+. For the Scala API, Spark {{site.SPARK_VERSION}} +Spark runs on Java 8+, Python 2.7+/3.4+ and R 3.1+. R prior to version 3.4 is deprecated as of Spark 3.0. --- End diff -- Hm .. I was thinking we could change them when we actually drop the support. Technically it does support 3.1+ yet although 3.1, 3.2, and 3.3 are deprecated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22939: [SPARK-25446][R] Add schema_of_json() and schema_...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22939#discussion_r232540931 --- Diff: R/pkg/R/functions.R --- @@ -2230,6 +2237,32 @@ setMethod("from_json", signature(x = "Column", schema = "characterOrstructType") column(jc) }) +#' @details +#' \code{schema_of_json}: Parses a JSON string and infers its schema in DDL format. +#' +#' @rdname column_collection_functions +#' @aliases schema_of_json schema_of_json,characterOrColumn-method +#' @examples +#' +#' \dontrun{ +#' json <- '{"name":"Bob"}' +#' df <- sql("SELECT * FROM range(1)") +#' head(select(df, schema_of_json(json)))} +#' @note schema_of_json since 3.0.0 +setMethod("schema_of_json", signature(x = "characterOrColumn"), + function(x, ...) { +if (class(x) == "character") { + col <- callJStatic("org.apache.spark.sql.functions", "lit", x) +} else { + col <- x@jc --- End diff -- Hmm .. do you mind if we go ahead for this one and talk later within 3.0? I think we're going to deal with this problem within 3.0 if I am not mistaken. I need to make one followup after this anyway. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23012: [SPARK-26014][R] Deprecate R prior to version 3.4 in Spa...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/23012 adding @srowen too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23012: [SPARK-26014][R] Deprecate R prior to version 3.4 in Spa...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/23012 Tests probably will fail since it produces warnings. cc @felixcheung. @shaneknapp, @viirya, @shivaram, @falaki, @mengxr, @yanboliang FYI. This PR is made per http://apache-spark-developers-list.1001551.n3.nabble.com/discuss-SparkR-CRAN-feasibility-check-server-problem-td25605.html --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23012: [SPARK-26014][R] Deprecate R prior to version 3.4...
GitHub user HyukjinKwon opened a pull request: https://github.com/apache/spark/pull/23012 [SPARK-26014][R] Deprecate R prior to version 3.4 in SparkR ## What changes were proposed in this pull request? This PR proposes to bump up the minimum versions of R from 3.1 to 3.4. R version. 3.1.x is too old. It's released 4.5 years ago. R 3.4.0 is released 1.5 years ago. Considering the timing for Spark 3.0, deprecating lower versions, bumping up R to 3.4 might be reasonable option. It should be good to deprecate and drop < R 3.4 support. If we think about the practice, nothing particular is required within R codes as far as I can tell, except: 1. https://github.com/apache/spark/blob/master/R/pkg/src-native/string_hash_code.c 2. `env` becomes immutable but in some low versions they are mutable ... if I remember correctly .. shouldn't be a big deal in SparkR side. 3. We will need to upgrade Jenkins's R version to 3.4, which mean we're not going to test 3.1 R version - this should be okay because we're already not testing R 3.2, 3.3 and 3.4. We test 3.5 in Appveyor, and 3.1 in Jenkins. ## How was this patch tested? Jenkins tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HyukjinKwon/spark SPARK-26014 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23012.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23012 commit dc2dbd923a1396ca5a7a950df35da57cc70c2ab8 Author: hyukjinkwon Date: 2018-11-12T05:39:14Z Deprecate R prior to version 3.4 in SparkR --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23011: [SPARK-26013][R][BUILD] Upgrade R tools version from 3.4...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/23011 cc @felixcheung --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23011: [SPARK-26013][R][BUILD] Upgrade R tools version f...
GitHub user HyukjinKwon opened a pull request: https://github.com/apache/spark/pull/23011 [SPARK-26013][R][BUILD] Upgrade R tools version from 3.4.0 to 3.5.1 in AppVeyor build ## What changes were proposed in this pull request? R tools 3.5.1 is released few months ago. Spark currently uses 3.4.0. We should better upgrade in AppVeyor. ## How was this patch tested? AppVeyor builds. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HyukjinKwon/spark SPARK-26013 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23011.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23011 commit b94d04ac80052ed50830239b06a08bf5b07603e6 Author: hyukjinkwon Date: 2018-11-12T05:02:23Z Upgrade R tools version to 3.5.1 in AppVeyor build --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22962: [SPARK-25921][PySpark] Fix barrier task run without Barr...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22962 Looks making sense to me in general. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22962#discussion_r232528655 --- Diff: python/pyspark/tests.py --- @@ -618,10 +618,13 @@ def test_barrier_with_python_worker_reuse(self): """ Verify that BarrierTaskContext.barrier() with reused python worker. """ +self.sc._conf.set("spark.python.work.reuse", "true") --- End diff -- @xuanyuanking, this will probably need a separate suite case since it's also related with how we start the worker or not. You can make a new class, run a simple job to make sure workers are created and being resued, test it and stop. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22954: [SPARK-25981][R] Enables Arrow optimization from R DataF...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22954 Yea .. I will make the followup works right away after this one get merged. Thanks @felixcheung. Let me address the rest of comments, and wait for Arrow release. @BryanCutler BTW, do you know the rough expected timing for Arrow 0.12.0 release? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232525184 --- Diff: R/pkg/tests/fulltests/test_sparkSQL.R --- @@ -307,6 +307,64 @@ test_that("create DataFrame from RDD", { unsetHiveContext() }) +test_that("createDataFrame Arrow optimization", { + skip_if_not_installed("arrow") + skip_if_not_installed("withr") --- End diff -- Maybe we should hold it for now .. because I realised R API for Arrow requires R 3.5.x and Jenkins's one is 3.1.x if I remember this correctly. Ideally, we could probably do that via AppVeyor if everything goes fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232525068 --- Diff: R/pkg/tests/fulltests/test_sparkSQL.R --- @@ -307,6 +307,64 @@ test_that("create DataFrame from RDD", { unsetHiveContext() }) +test_that("createDataFrame Arrow optimization", { + skip_if_not_installed("arrow") + skip_if_not_installed("withr") + + conf <- callJMethod(sparkSession, "conf") + arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] + + callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false") + tryCatch({ --- End diff -- Just to inject the finally .. :-) .. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22979: [SPARK-25977][SQL] Parsing decimals from CSV usin...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22979#discussion_r232520110 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala --- @@ -149,8 +156,8 @@ class UnivocityParser( case dt: DecimalType => (d: String) => nullSafeDatum(d, name, nullable, options) { datum => -val value = new BigDecimal(datum.replaceAll(",", "")) -Decimal(value, dt.precision, dt.scale) +val bigDecimal = decimalParser.parse(datum).asInstanceOf[BigDecimal] --- End diff -- Sounds good if that's not difficult. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23008: [SPARK-22674][PYTHON] Removed the namedtuple pickling pa...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/23008 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23006: [SPARK-26007][SQL] DataFrameReader.csv() respects...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/23006#discussion_r232494906 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -491,7 +491,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val parsedOptions: CSVOptions = new CSVOptions( extraOptions.toMap, sparkSession.sessionState.conf.csvColumnPruning, - sparkSession.sessionState.conf.sessionLocalTimeZone) + sparkSession.sessionState.conf.sessionLocalTimeZone, + sparkSession.sessionState.conf.columnNameOfCorruptRecord) --- End diff -- Hmm. NVM let's deal with it next time somewhere else --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22880: [SPARK-25407][SQL] Ensure we pass a compatible pr...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22880#discussion_r232489729 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala --- @@ -202,8 +204,12 @@ private[parquet] class ParquetRowConverter( override def start(): Unit = { var i = 0 -while (i < currentRow.numFields) { +while (i < fieldConverters.length) { fieldConverters(i).updater.start() + i += 1 +} +i = 0 +while (i < currentRow.numFields) { --- End diff -- Can we loop once with if? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22880: [SPARK-25407][SQL] Ensure we pass a compatible pr...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22880#discussion_r232489624 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala --- @@ -130,8 +130,8 @@ private[parquet] class ParquetRowConverter( extends ParquetGroupConverter(updater) with Logging { assert( -parquetType.getFieldCount == catalystType.length, -s"""Field counts of the Parquet schema and the Catalyst schema don't match: +parquetType.getFieldCount <= catalystType.length, +s"""Field count of the Parquet schema is greater than the field count of the Catalyst schema: --- End diff -- Can we assert this only when this pruning is enabled? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22880: [SPARK-25407][SQL] Ensure we pass a compatible pr...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22880#discussion_r232489418 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala --- @@ -182,18 +182,20 @@ private[parquet] class ParquetRowConverter( // Converters for each field. private val fieldConverters: Array[Converter with HasParentContainerUpdater] = { -parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map { - case ((parquetFieldType, catalystField), ordinal) => -// Converted field value should be set to the `ordinal`-th cell of `currentRow` -newConverter(parquetFieldType, catalystField.dataType, new RowUpdater(currentRow, ordinal)) +parquetType.getFields.asScala.map { --- End diff -- also .. nit: `parquetType.getFields.asScala.map { parquetField =>` per https://github.com/databricks/scala-style-guide#pattern-matching --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22880: [SPARK-25407][SQL] Ensure we pass a compatible pr...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22880#discussion_r232489371 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -93,13 +141,14 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) log.debug(s"Preparing for read Parquet file with message type: $fileSchema") val parquetRequestedSchema = readContext.getRequestedSchema -logInfo { - s"""Going to read the following fields from the Parquet file: - | - |Parquet form: +log.info { + s"""Going to read the following fields from the Parquet file with the following schema: + |Parquet file schema: + |$fileSchema + |Parquet read schema: --- End diff -- Yea, we should maybe change this into debugging level for them. I would additionally log them somewhere as debugging level. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22880: [SPARK-25407][SQL] Ensure we pass a compatible pr...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22880#discussion_r232489340 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -93,13 +141,14 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) log.debug(s"Preparing for read Parquet file with message type: $fileSchema") val parquetRequestedSchema = readContext.getRequestedSchema -logInfo { - s"""Going to read the following fields from the Parquet file: - | - |Parquet form: +log.info { + s"""Going to read the following fields from the Parquet file with the following schema: + |Parquet file schema: + |$fileSchema + |Parquet read schema: --- End diff -- Yea, we should maybe change this into debugging level. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22880: [SPARK-25407][SQL] Ensure we pass a compatible pruned sc...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22880 Looks good. I or someone else should take a closer look before getting this in. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22979: [SPARK-25977][SQL] Parsing decimals from CSV usin...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22979#discussion_r232487851 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala --- @@ -149,8 +156,8 @@ class UnivocityParser( case dt: DecimalType => (d: String) => nullSafeDatum(d, name, nullable, options) { datum => -val value = new BigDecimal(datum.replaceAll(",", "")) -Decimal(value, dt.precision, dt.scale) +val bigDecimal = decimalParser.parse(datum).asInstanceOf[BigDecimal] --- End diff -- For instance, there was a similar try to change the date parsing library. I already know the different is quite breaking - so I suggested to add a configuration or fallback for now. Probably we should similarily just document the behaviour change in the migration guide but actually less sure yet even about this. anyway will take another look shortly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22979: [SPARK-25977][SQL] Parsing decimals from CSV usin...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22979#discussion_r232487778 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala --- @@ -149,8 +156,8 @@ class UnivocityParser( case dt: DecimalType => (d: String) => nullSafeDatum(d, name, nullable, options) { datum => -val value = new BigDecimal(datum.replaceAll(",", "")) -Decimal(value, dt.precision, dt.scale) +val bigDecimal = decimalParser.parse(datum).asInstanceOf[BigDecimal] --- End diff -- Ah, right. The previous codes will anyway throw an exception, I see. One thing I am a little bit unsure is how much different the behaviour is. For instance, looks the previous one handles sign character as well (`+` and `-`). Let me take a closer look. I think I need to. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23006: [SPARK-26007][SQL] DataFrameReader.csv() respects to spa...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/23006 Looks good otherwise. I or someone else should take a closer look. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22954: [SPARK-25981][R] Enables Arrow optimization from R DataF...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22954 Hm .. the CRAN passed in my local. Let me workaround for now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22973: [SPARK-25972][PYTHON] Missed JSON options in streaming.p...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22973 Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23006: [SPARK-26007][SQL] DataFrameReader.csv() respects...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/23006#discussion_r232487030 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -491,7 +491,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val parsedOptions: CSVOptions = new CSVOptions( extraOptions.toMap, sparkSession.sessionState.conf.csvColumnPruning, - sparkSession.sessionState.conf.sessionLocalTimeZone) + sparkSession.sessionState.conf.sessionLocalTimeZone, + sparkSession.sessionState.conf.columnNameOfCorruptRecord) --- End diff -- Yea. Is the change a lot? I was thinking the default value constructor make it more error-prone. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregati...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22305 @icexelloss, while we are here, mind fixing the example in the PR description as self-contained workable example? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22979: [SPARK-25977][SQL] Parsing decimals from CSV usin...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22979#discussion_r232486778 --- Diff: docs/sql-migration-guide-upgrade.md --- @@ -9,6 +9,8 @@ displayTitle: Spark SQL Upgrading Guide ## Upgrading From Spark SQL 2.4 to 3.0 + - Since Spark 3.0, to parse decimals in locale specific format from CSV, set the `locale` option to proper value. --- End diff -- While we are here, let's format the migration guide item format as others. like .. In Spark version 2.4 and earlier, it only deals with locale specific decimal notation like `,`. Since Spark 3.0, the locale can be set by `locale` and default locale is Please feel free to change as you think is righter --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22979: [SPARK-25977][SQL] Parsing decimals from CSV usin...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22979#discussion_r232486670 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala --- @@ -149,8 +156,8 @@ class UnivocityParser( case dt: DecimalType => (d: String) => nullSafeDatum(d, name, nullable, options) { datum => -val value = new BigDecimal(datum.replaceAll(",", "")) -Decimal(value, dt.precision, dt.scale) +val bigDecimal = decimalParser.parse(datum).asInstanceOf[BigDecimal] --- End diff -- @MaxGekk, is it safe that we assume this `Number` is `BigDecimal`? Looks there are some possibilities that it can return other types. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22979: [SPARK-25977][SQL] Parsing decimals from CSV usin...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22979#discussion_r232486599 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala --- @@ -104,6 +105,12 @@ class UnivocityParser( requiredSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray } + private val decimalParser = { +val df = new DecimalFormat("", new DecimalFormatSymbols(options.locale)) --- End diff -- not a big deal but I would just name it `decimalFormat` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23006: [SPARK-26007][SQL] DataFrameReader.csv() respects...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/23006#discussion_r232486474 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -491,7 +491,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val parsedOptions: CSVOptions = new CSVOptions( extraOptions.toMap, sparkSession.sessionState.conf.csvColumnPruning, - sparkSession.sessionState.conf.sessionLocalTimeZone) + sparkSession.sessionState.conf.sessionLocalTimeZone, + sparkSession.sessionState.conf.columnNameOfCorruptRecord) --- End diff -- @MaxGekk, do you mind if I ask separate the current constructor in `CSVOptions` to, one requiring `defaultColumnNameOfCorruptRecord` and one `defaultColumnNameOfCorruptRecord` with default value `""`, and then make some comments about when they are called? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22905: [SPARK-25894][SQL] Add a ColumnarFileFormat type ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22905#discussion_r232486218 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -306,7 +306,15 @@ case class FileSourceScanExec( withOptPartitionCount } -withSelectedBucketsCount +val withOptColumnCount = relation.fileFormat match { + case columnar: ColumnarFileFormat => +val sqlConf = relation.sparkSession.sessionState.conf +val columnCount = columnar.columnCountForSchema(sqlConf, requiredSchema) +withSelectedBucketsCount + ("ColumnCount" -> columnCount.toString) --- End diff -- 1. The column pruning is now specific for Parquet .. it's source specific for now. 2. I really think it's more appropriate to check if something as expected or not by logging. > That's speaking from experience, not conjecture. I am not underestimating your statement. Let's be very clear why it should be put in metadata over logging. How and why it can be useful? in what cases? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22905: [SPARK-25894][SQL] Add a ColumnarFileFormat type ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22905#discussion_r232486141 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -306,7 +306,15 @@ case class FileSourceScanExec( withOptPartitionCount } -withSelectedBucketsCount +val withOptColumnCount = relation.fileFormat match { + case columnar: ColumnarFileFormat => +val sqlConf = relation.sparkSession.sessionState.conf +val columnCount = columnar.columnCountForSchema(sqlConf, requiredSchema) +withSelectedBucketsCount + ("ColumnCount" -> columnCount.toString) --- End diff -- > Who wants that? If someone wants to put metadata somewhere in the physical plan, let them open a PR and make a case for it No .. I don't think we should add it only because it's requested once. They look same instances to me. I will have no argument if this one is added and other people request to add others later. We should make it clear why this one should be specifically added. We're not going to add all the information to metadata as requested. If the purpose of adding it is to check if the pushing down is actually working or not, the logging sounds appropriate for its purpose. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow send u...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22275 Thanks for asking me. Will take a look within few days. Don't block because of me for clarification. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r232485612 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala --- @@ -27,17 +27,62 @@ import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan} import org.apache.spark.sql.execution.arrow.ArrowUtils -import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.sql.execution.window._ +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils +/** + * This class calculates and outputs windowed aggregates over the rows in a single partition. + * + * It is very similar to [[WindowExec]] and has similar logic. The main difference is that this + * node doesn't not compute any window aggregation values. Instead, it computes the lower and + * upper bound for each window (i.e. window bounds) and pass the data and indices to python work + * to do the actual window aggregation. + * + * It currently materialize all data associated with the same partition key and pass them to --- End diff -- tiny typo: `materialize` -> `materializes` and `pass` -> `passes`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r232485777 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala --- @@ -73,68 +118,151 @@ case class WindowInPandasExec( } /** - * Create the resulting projection. - * - * This method uses Code Generation. It can only be used on the executor side. + * Get all relevant helper functions and data structures for window bounds * - * @param expressions unbound ordered function expressions. - * @return the final resulting projection. + * This function returns: + * (1) Total number of window bound indices in the python input row + * (2) Function from frame index to its lower bound column index in the python input row + * (3) Function from frame index to its upper bound column index in the python input row + * (4) Function indicates whether a frame requires window bound indices + * (5) Function from frame index to its eval type */ - private[this] def createResultProjection(expressions: Seq[Expression]): UnsafeProjection = { -val references = expressions.zipWithIndex.map { case (e, i) => - // Results of window expressions will be on the right side of child's output - BoundReference(child.output.size + i, e.dataType, e.nullable) + private def computeWindowBoundHelpers( + factories: Seq[InternalRow => WindowFunctionFrame] + ): (Int, Int => Int, Int => Int, Int => Boolean, Int => Int) = { --- End diff -- h .. let's at least define a `type` for it. Looks a bit confusing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregati...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22305 adding @hvanhovell --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r232485476 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -63,7 +65,7 @@ private[spark] object PythonEvalType { */ private[spark] abstract class BasePythonRunner[IN, OUT]( funcs: Seq[ChainedPythonFunctions], -evalType: Int, +evalTypes: Seq[Int], --- End diff -- If that's going to require another necessary change, then please go ahead. At least two committers here understood why it's introduced here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r232485435 --- Diff: python/pyspark/worker.py --- @@ -154,6 +154,47 @@ def wrapped(*series): return lambda *a: (wrapped(*a), arrow_return_type) +def wrap_bounded_window_agg_pandas_udf(f, return_type): +arrow_return_type = to_arrow_type(return_type) + +def wrapped(begin_index, end_index, *series): +import numpy as np +import pandas as pd +result = [] +for i in range(0, len(begin_index)): +begin = begin_index[i] +end = end_index[i] +range_index = np.arange(begin, end) +# Note: Create a slice from a series is actually pretty expensive to +# do for each window. However, there is no way to reduce/eliminate +# the cost of creating sub series here AFAIK. +# TODO: s.take might be the best way to create sub series +series_slices = [s.take(range_index) for s in series] +result.append(f(*series_slices)) +return pd.Series(result) + +return lambda *a: (wrapped(*a), arrow_return_type) + + +def wrap_bounded_window_agg_pandas_udf_np(f, return_type): --- End diff -- Let's get rid of it then. Looks we're going to make it as a separate one. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r232485374 --- Diff: python/pyspark/sql/tests.py --- @@ -89,6 +89,7 @@ from pyspark.sql.types import _merge_type from pyspark.tests import QuietTest, ReusedPySparkTestCase, PySparkTestCase, SparkSubmitTests from pyspark.sql.functions import UserDefinedFunction, sha2, lit +import pyspark.sql.functions as F --- End diff -- nit: looks it's used nowhere. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r232485348 --- Diff: python/pyspark/sql/tests.py --- @@ -7064,12 +7098,104 @@ def test_invalid_args(self): foo_udf = pandas_udf(lambda x: x, 'v double', PandasUDFType.GROUPED_MAP) df.withColumn('v2', foo_udf(df['v']).over(w)) -with QuietTest(self.sc): -with self.assertRaisesRegexp( -AnalysisException, -'.*Only unbounded window frame is supported.*'): -df.withColumn('mean_v', mean_udf(df['v']).over(ow)) +def test_bounded_simple(self): +from pyspark.sql.functions import mean, max, min, count + +df = self.data +w1 = self.sliding_row_window +w2 = self.shrinking_range_window + +plus_one = self.python_plus_one +count_udf = self.pandas_agg_count_udf +mean_udf = self.pandas_agg_mean_udf +max_udf = self.pandas_agg_max_udf +min_udf = self.pandas_agg_min_udf + +result1 = df.withColumn('mean_v', mean_udf(plus_one(df['v'])).over(w1))\ +.withColumn('count_v', count_udf(df['v']).over(w2)) \ +.withColumn('max_v', max_udf(df['v']).over(w2)) \ +.withColumn('min_v', min_udf(df['v']).over(w1)) \ + +expected1 = df.withColumn('mean_v', mean(plus_one(df['v'])).over(w1))\ +.withColumn('count_v', count(df['v']).over(w2)) \ +.withColumn('max_v', max(df['v']).over(w2)) \ +.withColumn('min_v', min(df['v']).over(w1)) \ + +self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) + +def test_growing_window(self): +from pyspark.sql.functions import mean + +df = self.data +w1 = self.growing_row_window +w2 = self.growing_range_window +mean_udf = self.pandas_agg_mean_udf + +result1 = df.withColumn('m1', mean_udf(df['v']).over(w1)) \ +.withColumn('m2', mean_udf(df['v']).over(w2)) + +expected1 = df.withColumn('m1', mean(df['v']).over(w1)) \ +.withColumn('m2', mean(df['v']).over(w2)) + +self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) + +def test_sliding_window(self): +from pyspark.sql.functions import mean + +df = self.data +w1 = self.sliding_row_window +w2 = self.sliding_range_window + +mean_udf = self.pandas_agg_mean_udf + +result1 = df.withColumn('m1', mean_udf(df['v']).over(w1)) \ +.withColumn('m2', mean_udf(df['v']).over(w2)) + +expected1 = df.withColumn('m1', mean(df['v']).over(w1)) \ +.withColumn('m2', mean(df['v']).over(w2)) + +result1.show() +expected1.show() --- End diff -- nit: looks a mistake --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r232485316 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -63,7 +65,7 @@ private[spark] object PythonEvalType { */ private[spark] abstract class BasePythonRunner[IN, OUT]( funcs: Seq[ChainedPythonFunctions], -evalType: Int, +evalTypes: Seq[Int], --- End diff -- Yea, setting `evalTypes` to `Seq` doesn't looks a great idea. I agree with https://github.com/apache/spark/pull/22305/files#r223774544. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregati...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22305 @icexelloss, let's take out NumPy discussion in this PR. It's super bigger scope then this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22938: [SPARK-25935][SQL] Prevent null rows from JSON pa...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22938#discussion_r232484880 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -550,15 +550,33 @@ case class JsonToStructs( s"Input schema ${nullableSchema.catalogString} must be a struct, an array or a map.") } - // This converts parsed rows to the desired output by the given schema. @transient - lazy val converter = nullableSchema match { -case _: StructType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null -case _: ArrayType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null -case _: MapType => - (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null + private lazy val castRow = nullableSchema match { +case _: StructType => (row: InternalRow) => row +case _: ArrayType => (row: InternalRow) => + if (row.isNullAt(0)) { +new GenericArrayData(Array()) --- End diff -- I think it's okay to return `null` for map and array. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22979: [SPARK-25977][SQL] Parsing decimals from CSV usin...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22979#discussion_r232484798 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala --- @@ -226,4 +227,17 @@ class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with P InternalRow(17836)) // number of days from 1970-01-01 } } + + test("parse decimals using locale") { +Seq("en-US", "ko-KR", "ru-RU", "de-DE").foreach { langTag => + val schema = new StructType().add("d", DecimalType(10, 5)) + val options = Map("locale" -> langTag, "sep" -> "|") + val expected = Decimal(1000.001, 10, 5) + val df = new DecimalFormat("", new DecimalFormatSymbols(Locale.forLanguageTag(langTag))) + val input = df.format(expected.toBigDecimal) + checkEvaluation( +CsvToStructs(schema, options, Literal.create(input), gmtId), +InternalRow(expected)) +} + } --- End diff -- @MaxGekk, there's `UnivocityParserSuite`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22979: [SPARK-25977][SQL] Parsing decimals from CSV usin...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22979#discussion_r232484751 --- Diff: docs/sql-migration-guide-upgrade.md --- @@ -9,6 +9,8 @@ displayTitle: Spark SQL Upgrading Guide ## Upgrading From Spark SQL 2.4 to 3.0 + - Since Spark 3.0, to parse decimals in locale specific format from CSV, set the `locale` option to proper value. --- End diff -- @MaxGekk, it's not a behaviour change I guess. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22973: [SPARK-25972][PYTHON] Missed JSON options in stre...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22973#discussion_r232484720 --- Diff: python/pyspark/sql/streaming.py --- @@ -467,11 +468,18 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, :param allowUnquotedControlChars: allows JSON Strings to contain unquoted control characters (ASCII characters with value less than 32, including tab and line feed characters) or not. +:param encoding: allows to forcibly set one of standard basic or extended encoding for + the JSON files. For example UTF-16BE, UTF-32LE. If None is set, + the encoding of input JSON will be detected automatically + when the multiLine option is set to ``true``. :param lineSep: defines the line separator that should be used for parsing. If None is set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``. :param locale: sets a locale as language tag in IETF BCP 47 format. If None is set, it uses the default value, ``en-US``. For instance, ``locale`` is used while parsing dates and timestamps. +:param dropFieldIfAllNull: whether to ignore column of all null values or empty + array/struct during schema inference. If None is set, it + uses the default value, ``false``. --- End diff -- @MaxGekk, let's match its order (the doc and parameters). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20788: [SPARK-23647][PYTHON][SQL] Adds more types for hint in p...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20788 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20788: [SPARK-23647][PYTHON][SQL] Adds more types for hint in p...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20788 Simply calling it should be enough. See https://github.com/apache/spark/pull/21649/files --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22954: [SPARK-25981][R] Enables Arrow optimization from R DataF...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22954 Let me hide some comments that are addressed (it looks messy). Please make unhide if I mistakenly hide some comments that are not addressed yet. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232478997 --- Diff: R/pkg/R/SQLContext.R --- @@ -147,6 +147,55 @@ getDefaultSqlSource <- function() { l[["spark.sql.sources.default"]] } +writeToTempFileInArrow <- function(rdf, numPartitions) { + # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace + # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works + # around by avoiding direct requireNamespace. + requireNamespace1 <- requireNamespace + if (requireNamespace1("arrow", quietly = TRUE)) { +record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) +record_batch_stream_writer <- get( + "record_batch_stream_writer", envir = asNamespace("arrow"), inherits = FALSE) +file_output_stream <- get( + "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE) +write_record_batch <- get( + "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE) + +# Currently arrow requires withr; otherwise, write APIs don't work. +# Direct 'require' is not recommended by CRAN. Here's a workaround. +require1 <- require +if (require1("withr", quietly = TRUE)) { + numPartitions <- if (!is.null(numPartitions)) { +numToInt(numPartitions) + } else { +1 + } + fileName <- tempfile(pattern = "spark-arrow", fileext = ".tmp") + chunk <- as.integer(ceiling(nrow(rdf) / numPartitions)) + rdf_slices <- split(rdf, rep(1:ceiling(nrow(rdf) / chunk), each = chunk)[1:nrow(rdf)]) --- End diff -- Not sure. I think the intention is the same. Let me stick to R's one for now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232476906 --- Diff: R/pkg/R/SQLContext.R --- @@ -147,6 +147,55 @@ getDefaultSqlSource <- function() { l[["spark.sql.sources.default"]] } +writeToTempFileInArrow <- function(rdf, numPartitions) { + # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace + # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works + # around by avoiding direct requireNamespace. + requireNamespace1 <- requireNamespace + if (requireNamespace1("arrow", quietly = TRUE)) { +record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) +record_batch_stream_writer <- get( + "record_batch_stream_writer", envir = asNamespace("arrow"), inherits = FALSE) +file_output_stream <- get( + "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE) +write_record_batch <- get( + "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE) + +# Currently arrow requires withr; otherwise, write APIs don't work. +# Direct 'require' is not recommended by CRAN. Here's a workaround. +require1 <- require +if (require1("withr", quietly = TRUE)) { + numPartitions <- if (!is.null(numPartitions)) { +numToInt(numPartitions) + } else { +1 + } + fileName <- tempfile(pattern = "spark-arrow", fileext = ".tmp") + chunk <- as.integer(ceiling(nrow(rdf) / numPartitions)) + rdf_slices <- split(rdf, rep(1:ceiling(nrow(rdf) / chunk), each = chunk)[1:nrow(rdf)]) --- End diff -- Let me try to reuse the R side slicing logic. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232475881 --- Diff: R/pkg/R/SQLContext.R --- @@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, x } } + data[] <- lapply(data, cleanCols) - # drop factors and wrap lists - data <- setNames(lapply(data, cleanCols), NULL) + args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) + if (arrowEnabled) { +shouldUseArrow <- tryCatch({ + stopifnot(length(data) > 0) + dataHead <- head(data, 1) + # Currenty Arrow optimization does not support POSIXct and raw for now. + # Also, it does not support explicit float type set by users. It leads to + # incorrect conversion. We will fall back to the path without Arrow optimization. + if (any(sapply(dataHead, function(x) is(x, "POSIXct" { --- End diff -- Looks okay in this case specifically: ```r > any(sapply(head(data.frame(list(list(a=NA))), 1), is.raw)) [1] FALSE > any(sapply(head(data.frame(list(list(a=NA))), 1), function(x) is(x, "POSIXct"))) [1] FALSE > any(sapply(head(data.frame(list(list(a=1))), 1), is.raw)) [1] FALSE > any(sapply(head(data.frame(list(list(a="a"))), 1), function(x) is(x, "POSIXct"))) [1] FALSE > any(sapply(head(data.frame(list(list(a=raw(1, 1), is.raw)) [1] TRUE > any(sapply(head(data.frame(list(list(a=as.POSIXct("2000-01-01", 1), function(x) is(x, "POSIXct"))) [1] TRUE ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232475777 --- Diff: R/pkg/R/SQLContext.R --- @@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, x } } + data[] <- lapply(data, cleanCols) - # drop factors and wrap lists - data <- setNames(lapply(data, cleanCols), NULL) + args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) + if (arrowEnabled) { +shouldUseArrow <- tryCatch({ + stopifnot(length(data) > 0) + dataHead <- head(data, 1) + # Currenty Arrow optimization does not support POSIXct and raw for now. + # Also, it does not support explicit float type set by users. It leads to + # incorrect conversion. We will fall back to the path without Arrow optimization. + if (any(sapply(dataHead, function(x) is(x, "POSIXct" { +stop("Arrow optimization with R DataFrame does not support POSIXct type yet.") + } + if (any(sapply(dataHead, is.raw))) { +stop("Arrow optimization with R DataFrame does not support raw type yet.") + } + if (inherits(schema, "structType")) { +if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) { + stop("Arrow optimization with R DataFrame does not support FloatType type yet.") +} + } + firstRow <- do.call(mapply, append(args, dataHead))[[1]] + fileName <- writeToTempFileInArrow(data, numPartitions) + tryCatch( +jrddInArrow <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", + "readArrowStreamFromFile", + sparkSession, + fileName), + finally = { +file.remove(fileName) --- End diff -- I believe either way is fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232475752 --- Diff: R/pkg/R/SQLContext.R --- @@ -147,6 +147,55 @@ getDefaultSqlSource <- function() { l[["spark.sql.sources.default"]] } +writeToTempFileInArrow <- function(rdf, numPartitions) { + # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace + # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works + # around by avoiding direct requireNamespace. + requireNamespace1 <- requireNamespace + if (requireNamespace1("arrow", quietly = TRUE)) { +record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) +record_batch_stream_writer <- get( + "record_batch_stream_writer", envir = asNamespace("arrow"), inherits = FALSE) +file_output_stream <- get( + "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE) +write_record_batch <- get( + "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE) + +# Currently arrow requires withr; otherwise, write APIs don't work. +# Direct 'require' is not recommended by CRAN. Here's a workaround. +require1 <- require +if (require1("withr", quietly = TRUE)) { --- End diff -- Yea, I at least managed to get rid of this hack itself. Will push soon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23001: [INFRA] Close stale PRs
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/23001 Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232473761 --- Diff: R/pkg/R/SQLContext.R --- @@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, x } } + data[] <- lapply(data, cleanCols) - # drop factors and wrap lists - data <- setNames(lapply(data, cleanCols), NULL) + args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) + if (arrowEnabled) { +shouldUseArrow <- tryCatch({ + stopifnot(length(data) > 0) + dataHead <- head(data, 1) + # Currenty Arrow optimization does not support POSIXct and raw for now. + # Also, it does not support explicit float type set by users. It leads to + # incorrect conversion. We will fall back to the path without Arrow optimization. + if (any(sapply(dataHead, function(x) is(x, "POSIXct" { +stop("Arrow optimization with R DataFrame does not support POSIXct type yet.") + } + if (any(sapply(dataHead, is.raw))) { +stop("Arrow optimization with R DataFrame does not support raw type yet.") + } + if (inherits(schema, "structType")) { +if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) { + stop("Arrow optimization with R DataFrame does not support FloatType type yet.") --- End diff -- I suspect that it happens when `numeric` (which is like `1.0`) is casted into float type. I think it's related with casting behaviour. Let me take a look and file a JIRA there in Arrow side but if you don't mind I will focus on matching exact type cases for now ... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232473723 --- Diff: R/pkg/R/SQLContext.R --- @@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, x } } + data[] <- lapply(data, cleanCols) - # drop factors and wrap lists - data <- setNames(lapply(data, cleanCols), NULL) + args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) + if (arrowEnabled) { +shouldUseArrow <- tryCatch({ --- End diff -- Yup, let me try. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232473716 --- Diff: R/pkg/R/SQLContext.R --- @@ -172,10 +221,10 @@ getDefaultSqlSource <- function() { createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, numPartitions = NULL) { sparkSession <- getSparkSession() - + arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == "true" --- End diff -- Yea,I checked that it always has the default value. I initially left the default value but took it out after double checking. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232473705 --- Diff: R/pkg/R/SQLContext.R --- @@ -147,6 +147,55 @@ getDefaultSqlSource <- function() { l[["spark.sql.sources.default"]] } +writeToTempFileInArrow <- function(rdf, numPartitions) { + # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace + # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works + # around by avoiding direct requireNamespace. + requireNamespace1 <- requireNamespace + if (requireNamespace1("arrow", quietly = TRUE)) { +record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) +record_batch_stream_writer <- get( + "record_batch_stream_writer", envir = asNamespace("arrow"), inherits = FALSE) +file_output_stream <- get( + "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE) +write_record_batch <- get( + "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE) + +# Currently arrow requires withr; otherwise, write APIs don't work. +# Direct 'require' is not recommended by CRAN. Here's a workaround. +require1 <- require +if (require1("withr", quietly = TRUE)) { + numPartitions <- if (!is.null(numPartitions)) { +numToInt(numPartitions) + } else { +1 + } + fileName <- tempfile(pattern = "spark-arrow", fileext = ".tmp") + chunk <- as.integer(ceiling(nrow(rdf) / numPartitions)) + rdf_slices <- split(rdf, rep(1:ceiling(nrow(rdf) / chunk), each = chunk)[1:nrow(rdf)]) + stream_writer <- NULL + for (rdf_slice in rdf_slices) { +batch <- record_batch(rdf_slice) +if (is.null(stream_writer)) { + # We should avoid private calls like 'close_on_exit' (CRAN disallows) but looks + # there's no exposed API for it. Here's a workaround but ideally this should + # be removed. + close_on_exit <- get("close_on_exit", envir = asNamespace("arrow"), inherits = FALSE) --- End diff -- Hm, possibly yea. Let me try it. In this way, we could get rid of `require`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232473697 --- Diff: R/pkg/R/SQLContext.R --- @@ -147,6 +147,55 @@ getDefaultSqlSource <- function() { l[["spark.sql.sources.default"]] } +writeToTempFileInArrow <- function(rdf, numPartitions) { + # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace + # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works + # around by avoiding direct requireNamespace. + requireNamespace1 <- requireNamespace + if (requireNamespace1("arrow", quietly = TRUE)) { +record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) +record_batch_stream_writer <- get( + "record_batch_stream_writer", envir = asNamespace("arrow"), inherits = FALSE) +file_output_stream <- get( + "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE) +write_record_batch <- get( + "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE) + +# Currently arrow requires withr; otherwise, write APIs don't work. +# Direct 'require' is not recommended by CRAN. Here's a workaround. +require1 <- require +if (require1("withr", quietly = TRUE)) { + numPartitions <- if (!is.null(numPartitions)) { +numToInt(numPartitions) + } else { +1 + } + fileName <- tempfile(pattern = "spark-arrow", fileext = ".tmp") + chunk <- as.integer(ceiling(nrow(rdf) / numPartitions)) + rdf_slices <- split(rdf, rep(1:ceiling(nrow(rdf) / chunk), each = chunk)[1:nrow(rdf)]) --- End diff -- This resembles PySpark side logic: https://github.com/apache/spark/blob/d367bdcf521f564d2d7066257200be26b27ea926/python/pyspark/sql/session.py#L554-L556 Let me check the difference between them --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232473669 --- Diff: R/pkg/R/SQLContext.R --- @@ -147,6 +147,55 @@ getDefaultSqlSource <- function() { l[["spark.sql.sources.default"]] } +writeToTempFileInArrow <- function(rdf, numPartitions) { + # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace + # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works + # around by avoiding direct requireNamespace. + requireNamespace1 <- requireNamespace + if (requireNamespace1("arrow", quietly = TRUE)) { +record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) +record_batch_stream_writer <- get( + "record_batch_stream_writer", envir = asNamespace("arrow"), inherits = FALSE) +file_output_stream <- get( + "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE) +write_record_batch <- get( + "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE) + +# Currently arrow requires withr; otherwise, write APIs don't work. +# Direct 'require' is not recommended by CRAN. Here's a workaround. +require1 <- require +if (require1("withr", quietly = TRUE)) { + numPartitions <- if (!is.null(numPartitions)) { +numToInt(numPartitions) + } else { +1 --- End diff -- We should; however, it follows the original code path's behaviour. I matched it as the same so that we can compare the performances in the same conditions. If you don't mind, I will fix both in a separate PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232473643 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala --- @@ -225,4 +226,25 @@ private[sql] object SQLUtils extends Logging { } sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray } + + /** + * R callable function to read a file in Arrow stream format and create a `RDD` + * using each serialized ArrowRecordBatch as a partition. + */ + def readArrowStreamFromFile( + sparkSession: SparkSession, + filename: String): JavaRDD[Array[Byte]] = { +ArrowConverters.readArrowStreamFromFile(sparkSession.sqlContext, filename) --- End diff -- Hmhmhm .. yea. What I was trying to do is to add SQL related codes called in R from JVM, into here when they are not official APIs in order to avoid, we change the internal APIs within Scala, and it causes R test failure. I was trying to do the similar things within PySpark side. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232473611 --- Diff: R/pkg/R/SQLContext.R --- @@ -147,6 +147,55 @@ getDefaultSqlSource <- function() { l[["spark.sql.sources.default"]] } +writeToTempFileInArrow <- function(rdf, numPartitions) { + # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace + # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works + # around by avoiding direct requireNamespace. + requireNamespace1 <- requireNamespace + if (requireNamespace1("arrow", quietly = TRUE)) { +record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) +record_batch_stream_writer <- get( + "record_batch_stream_writer", envir = asNamespace("arrow"), inherits = FALSE) +file_output_stream <- get( + "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE) +write_record_batch <- get( + "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE) + +# Currently arrow requires withr; otherwise, write APIs don't work. +# Direct 'require' is not recommended by CRAN. Here's a workaround. +require1 <- require +if (require1("withr", quietly = TRUE)) { --- End diff -- Yup .. it is .. looks we shouldn't have this error from a cursory look in R API of Arrow. Maybe this can be gone when I use official R Arrow release version. Let me check it later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23001: [INFRA] Close stale PRs
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/23001 I took a quick pass. Mind adding those please?: #22539 #22539 #21868 #21514 #21402 #21322 #21257 #20163 #19691 #18697 #18636 #17176 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18697: [SPARK-16683][SQL] Repeated joins to same table can leak...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/18697 Let't close this then. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20163: [SPARK-22966][PYTHON][SQL] Python UDFs with returnType=S...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20163 Let's leave this closed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20168: [SPARK-22730][ML] Add ImageSchema support for all OpenCv...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20168 @tomasatdatabricks, mind updating this? Lately I happened to take a look for this few times. I will try to review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20503: [SPARK-23299][SQL][PYSPARK] Fix __repr__ behaviour for R...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20503 ping @ashashwat to update --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20788: [SPARK-23647][PYTHON][SQL] Adds more types for hint in p...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20788 @DylanGuedes let's add tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwrite a p...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21257 ping @zheh12 to address comments. I am going to suggest to close this for now while I am identifying PRs to close now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21322 Shall we leave this PR closed and start it from a design doc? Let me suggest to close this for now while I am looking through old PRs. @JeetKunDoug, please feel free to create a clone of this PR if there's any reason to keep this open that I missed. No objection. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21402: SPARK-24355 Spark external shuffle server improvement to...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21402 @redsanket you should close it by yourself. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21514: [SPARK-22860] [SPARK-24621] [Core] [WebUI] - hide key pa...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21514 I'm going to suggest to close this. The review comments were not addressed more then few months and there's not quite a great point to keep inactive PRs. Feel free to take over this if any of you here is interested in this. Or @tooptoop4, please recreate a PR after addressing review commnets here. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21514: [SPARK-22860] [SPARK-24621] [Core] [WebUI] - hide key pa...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21514 ping @tooptoop4 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21868: [SPARK-24906][SQL] Adaptively enlarge split / partition ...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21868 I think we should fix this. Basically the dynamic estimation logic is too flaky, and I think we need this for the current status. Let's don't add it for now. While I am revisiting old PRs, I am trying to suggest to close PRs that look not likely to be merged. Let me suggest to close this for now but please feel free to recreate a PR if you strongly this is needed in Spark. No objection. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22060: [DO NOT MERGE][TEST ONLY] Add once-policy rule check
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22060 hey @maryannxue, where are we here? Let's close this if it's going to be inactive a couple of weeks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22144: [SPARK-24935][SQL] : Problem with Executing Hive UDF's f...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22144 So, 2.4 is out. Where are we? Rereading the comments above, looks we should: 1. Find the root cause 2. Officially drop it if the workaround is not easy 3. Fix it if the workaround is simple 4. add a test If not, I would just document that we dropped this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22184: [SPARK-25132][SQL][DOC] Add migration doc for case-insen...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22184 @seancxmao, so this behaviour changes description is only valid when we upgrade spark 2.3 to 2.4? Then we can add it in `Upgrading From Spark SQL 2.3 to 2.4`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21363: [SPARK-19228][SQL] Migrate on Java 8 time from FastDateF...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21363 I am going to suggest to close this since it's being active more then few weeks. It should be good to fix. Let me leave some cc's who might be interested in this just FYI. Feel free to take over this when you guys find some time or are interested in this. Adding @xuanyuanking, @mgaido91, @viirya, @MaxGekk, @softmanu who I could think of for now. Feel free to ignore my cc if you guys are busy or having more important fixes you guys are working on. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22962: [SPARK-25921][PySpark] Fix BarrierTaskContext while pyth...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22962 Please fix the PR title to describe what it fixes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix BarrierTaskContext whi...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22962#discussion_r232448083 --- Diff: python/pyspark/taskcontext.py --- @@ -144,10 +144,19 @@ def __init__(self): """Construct a BarrierTaskContext, use get instead""" pass +def __new__(cls): --- End diff -- Why should we rewrite `__new__`? Can't we do this in `_getOrCreate` as well? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix BarrierTaskContext whi...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22962#discussion_r232447967 --- Diff: python/pyspark/taskcontext.py --- @@ -144,10 +144,19 @@ def __init__(self): """Construct a BarrierTaskContext, use get instead""" pass +def __new__(cls): +""" +Rewrite __new__ method to BarrierTaskContext for _getOrCreate called when _taskContext +is not instance of BarrierTaskContext. +""" +if not isinstance(cls._taskContext, BarrierTaskContext): +cls._taskContext = object.__new__(cls) +return cls._taskContext --- End diff -- Why should we rewrite `__new__`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22962: [SPARK-25921][PySpark] Fix BarrierTaskContext while pyth...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22962 @xuanyuanking, mind explaining how and why it happens rather then what happens in PR description? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix BarrierTaskContext whi...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22962#discussion_r232447862 --- Diff: python/pyspark/tests.py --- @@ -614,6 +614,18 @@ def context_barrier(x): times = rdd.barrier().mapPartitions(f).map(context_barrier).collect() self.assertTrue(max(times) - min(times) < 1) +def test_barrier_with_python_worker_reuse(self): +""" +Verify that BarrierTaskContext.barrier() with reused python worker. +""" +rdd = self.sc.parallelize(range(4), 4) --- End diff -- Let's explicitly set `spark.python.worker.reuse` or at least let's assert. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22932: [SPARK-25102][SQL] Write Spark version to ORC/Parquet fi...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22932 double checked. A late LGTM too --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22994: [BUILD] refactor dev/lint-python in to something readabl...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22994 I agree with this change. The current script is a total mess - I will try to help take a look when the tests pass. BTW, it would be awesome if PR description contains what this PR tries to fix later when the tests pass --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22963: [SPARK-25962][BUILD][PYTHON] Specify minimum versions fo...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22963 I also agree with @srowen's (https://github.com/apache/spark/pull/22963#issuecomment-437133365) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22954: [SPARK-25981][R] Enables Arrow optimization from R DataF...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22954 Hey guys thanks for reviewing! Will address them soon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22963: [SPARK-25962][BUILD][PYTHON] Specify minimum versions fo...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22963 OMG, I don't know why I missed these comments. I will read it tomorrow (now it's 6 am and I could get sleep .. ) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22989: [SPARK-25986][Build] Banning throw new OutOfMemor...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22989#discussion_r232289280 --- Diff: scalastyle-config.xml --- @@ -240,6 +240,18 @@ This file is divided into 3 sections: ]]> + --- End diff -- not a big deal but I would name it `nothrowoutofmemory` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org