[GitHub] spark issue #16894: [SPARK-17897] [SQL] [BACKPORT-2.0] Fixed IsNotNull Const...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16894 **[Test build #72734 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72734/consoleFull)** for PR 16894 at commit [`11d2684`](https://github.com/apache/spark/commit/11d2684b1def705ef72b0a64b13c93c8a09d3efc). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16894: [SPARK-17897] [SQL] [BACKPORT-2.0] Fixed IsNotNul...
GitHub user gatorsmile opened a pull request: https://github.com/apache/spark/pull/16894 [SPARK-17897] [SQL] [BACKPORT-2.0] Fixed IsNotNull Constraint Inference Rule ### What changes were proposed in this pull request? This PR is to backport https://github.com/apache/spark/pull/16067 to Spark 2.0 The `constraints` of an operator is the expressions that evaluate to `true` for all the rows produced. That means, the expression result should be neither `false` nor `unknown` (NULL). Thus, we can conclude that `IsNotNull` on all the constraints, which are generated by its own predicates or propagated from the children. The constraint can be a complex expression. For better usage of these constraints, we try to push down `IsNotNull` to the lowest-level expressions (i.e., `Attribute`). `IsNotNull` can be pushed through an expression when it is null intolerant. (When the input is NULL, the null-intolerant expression always evaluates to NULL.) Below is the existing code we have for `IsNotNull` pushdown. ```Scala private def scanNullIntolerantExpr(expr: Expression): Seq[Attribute] = expr match { case a: Attribute => Seq(a) case _: NullIntolerant | IsNotNull(_: NullIntolerant) => expr.children.flatMap(scanNullIntolerantExpr) case _ => Seq.empty[Attribute] } ``` **`IsNotNull` itself is not null-intolerant.** It converts `null` to `false`. If the expression does not include any `Not`-like expression, it works; otherwise, it could generate a wrong result. This PR is to fix the above function by removing the `IsNotNull` from the inference. After the fix, when a constraint has a `IsNotNull` expression, we infer new attribute-specific `IsNotNull` constraints if and only if `IsNotNull` appears in the root. Without the fix, the following test case will return empty. ```Scala val data = Seq[java.lang.Integer](1, null).toDF("key") data.filter("not key is not null").show() ``` Before the fix, the optimized plan is like ``` == Optimized Logical Plan == Project [value#1 AS key#3] +- Filter (isnotnull(value#1) && NOT isnotnull(value#1)) +- LocalRelation [value#1] ``` After the fix, the optimized plan is like ``` == Optimized Logical Plan == Project [value#1 AS key#3] +- Filter NOT isnotnull(value#1) +- LocalRelation [value#1] ``` ### How was this patch tested? Added a test You can merge this pull request into a Git repository by running: $ git pull https://github.com/gatorsmile/spark isNotNull2.0 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16894.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16894 commit 11d2684b1def705ef72b0a64b13c93c8a09d3efc Author: Xiao Li Date: 2017-02-11T07:49:17Z fix. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16893: [SPARK-19555][SQL] Improve the performance of StringUtil...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16893 **[Test build #72733 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72733/testReport)** for PR 16893 at commit [`e68eab0`](https://github.com/apache/spark/commit/e68eab0841bde02c773fb169191361f4eb55d742). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16893: [SPARK-19555][SQL] Improve the performance of Str...
GitHub user lins05 opened a pull request: https://github.com/apache/spark/pull/16893 [SPARK-19555][SQL] Improve the performance of StringUtils.escapeLikeRegex method ## What changes were proposed in this pull request? Copied from [SPARK-19555](https://issues.apache.org/jira/browse/SPARK-19555) Spark's `StringUtils.escapeLikeRegex()` method is written inefficiently, performing tons of object allocations due to the use `zip()`, `flatMap()` , and `mkString`. Instead, I think method should be rewritten in an imperative style using a Java string builder. This method can become a performance bottleneck in cases where regex expressions are used with non-constant-foldable expressions (e.g. the regex expression comes from the data rather than being part of the query). ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/lins05/spark spark-19555-improve-escape-like-regex Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16893.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16893 commit bbcdeda98c14705b6de3efab70f2c58bc4539bb9 Author: Shuai Lin Date: 2017-02-11T07:46:34Z [SPARK-19555][SQL] Improve the performance of StringUtils.escapeLikeRegex --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16891: [SPARK-19318][SQL] Fix to treat JDBC connection properti...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16891 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72731/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16891: [SPARK-19318][SQL] Fix to treat JDBC connection properti...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16891 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16891: [SPARK-19318][SQL] Fix to treat JDBC connection properti...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16891 **[Test build #72731 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72731/testReport)** for PR 16891 at commit [`41d3362`](https://github.com/apache/spark/commit/41d336251cb70d7be26171c6a1f484e742ba83bd). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16067: [SPARK-17897] [SQL] Fixed IsNotNull Constraint Inference...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/16067 Sure, let me back port it now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/16620 Also, if you implement the new change I proposed, I think it's relatively straightforward to write a new test in DAGSchedulerSuite for the new behavior (which will be pretty similar to the test I modified in #16892). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16892: [SPARK-19560] Improve DAGScheduler tests.
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/16892 cc @mateiz, whose test I deleted / rolled into the existing one --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16876: [SPARK-19537] Move pendingPartitions to ShuffleMa...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/16876 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16876: [SPARK-19537] Move pendingPartitions to ShuffleMapStage.
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/16876 Thanks all for the review; merged this into master --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16876: [SPARK-19537] Move pendingPartitions to ShuffleMapStage.
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/16876 @jinxing64 I wrote a long comment on your other PR to attempt to explain (my current understanding of) these issues! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/16620 tl;dr I donât think Markâs change is quite correct, which is why the tests were failing. Instead, I think we need to replace the failedEpoch if/else statement and the pendingPartitions update in DAGScheduler.handleTaskCompletion with: `if (stageIdToStage(task.stageId).latestInfo.attemptId == task.stageAttemptId) {⨠// This task was for the currently running attempt of the stage. Since the task ⨠// completed successfully from the perspective of the TaskSetManager, mark it as ⨠// no longer pending (the TaskSetManager may consider the task complete even // when the output needs to be ignored because the task's epoch is too small below). ⨠shuffleStage.pendingPartitions -= task.partitionId â¨} â¨â¨if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { ⨠logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")⨠} else { ⨠// The epoch of the task is acceptable (i.e., the task was launched after the most⨠// recent failure we're aware of for the executor), so mark the task's output as ⨠// available. ⨠shuffleStage.addOutputLoc(smt.partitionId, status)⨠// Remove the task's partition from pending partitions. This may have already been ⨠// done above, but will not have been done yet in cases where the task attempt was⨠// from an earlier attempt of the stage (i.e., not the attempt that's currently ⨠// running). This allows the DAGScheduler to mark the stage as complete when one⨠// copy of each task has finished successfully, even if the currently active stage ⨠// still has tasks running.⨠shuffleStage.pendingPartitions -= task.partitionIdâ¨} ` I submitted #16892 to attempt to clarify the test case where Markâs change originally failed (this PR shouldn't block on that -- that's just to clarify things for ourselves in the future), and also wrote a very long write up of whatâs going on below. âââââ There are three relevant pieces of state to consider here: (1) The tasks that the TaskSetManager (TSM) considers currently pending. The TSM encodes these pending tasks in its âsuccessfulâ array. When a task set is launched, all of its tasks are considered pending, and all of the entries in the successful array are False. Tasks are no longer considered pending (and are marked as True in the âsuccessfulâ array) if either (a) a copy of the task finishes successfully or (b) a copy of the task fails with a fetch failed (in which case the TSM assumes that the task will never complete successfully, because the previous stage needs to be re-run). Additionally, a task that previously completed successfully can be re-marked as pending if the stage is a shuffle map stage, and the executor where the task ran died (this is because the map output needs to be re-generated, and the TSM will re-schedule the task). The TSM notifies the DAGScheduler that the stage has completed if either (a) the stage fails (e.g., thereâs a fetch failure) or (b) all of the entries in âsuccessfulâ are true (i.e., there are no more pending tasks). (2) ShuffleMapStage.pendingPartitions. This variable is used by the DAGScheduler to track the pending tasks for a stage, and mostly is consistent with the TSMâs pending tasks (described above). When a stage begins, the DAGScheduler marks all of the partitions that need to be computed as pending, and then removes them from pendingPartitions as the TSM notifies the DAGScheduler that tasks have successfully completed. When a TSM determines that a task needs to be re-run (because itâs a shuffle map task that ran on a now-dead executor), the TSM sends a Resubmitted task completion event to the DAGScheduler, which causes the DAGScheduler to re-add the task to pendingPartitions (in doing so, the DAGScheduler is keeping pendingPartitions consistent with the TSMâs pending tasks). I believe there are two scenarios (currently) where ShuffleMapStage.pendingPartitions and the TSMâs pending tasks become inconsistent: -Scenario A (performance optimization, as discussed here already): This happens if a ShuffleMapStage gets re-run (e.g., because the first time it ran, it encountered a fetch failure, so the previous stage needed to be re-run to generate the missing output). Call the original attempt #0 and the currently running attempt #1. If thereâs a task from attempt #0 thatâs still running, and it is running on an executor that *was not* marked as failed (this is the condition captured by the failedEpoch if-statement), and it completes successfully, this event will be handled by the TSM for attempt #0. When the DAGScheduler hears that the task completed s
[GitHub] spark issue #16892: [SPARK-19560] Improve DAGScheduler tests.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16892 **[Test build #72732 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72732/testReport)** for PR 16892 at commit [`06b26d3`](https://github.com/apache/spark/commit/06b26d3738403da30bbf2c7ef2bfdb86039baa02). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16892: [SPARK-19560] Improve DAGScheduler tests.
GitHub user kayousterhout opened a pull request: https://github.com/apache/spark/pull/16892 [SPARK-19560] Improve DAGScheduler tests. This commit improves the tests that check the case when a ShuffleMapTask completes successfully on an executor that has failed. This commit improves the commenting around the existing test for this, and adds some additional checks to make it more clear what went wrong if the tests fail (the fact that these tests are hard to understand came up in the context of @markhamstra's proposed fix for #16620). This commit also removes a test that I realized tested exactly the same functionality. @markhamstra, I verified that the new version of the test still fails (and in a more helpful way) for your proposed change for #16620. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kayousterhout/spark-1 SPARK-19560 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16892.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16892 commit 06b26d3738403da30bbf2c7ef2bfdb86039baa02 Author: Kay Ousterhout Date: 2017-02-11T06:04:11Z [SPARK-19560] Improve DAGScheduler tests. This commit improves the tests that check the case when a ShuffleMapTask completes successfully on an executor that has failed. This commit improves the commenting around the existing test for this, and adds some additional checks to make it more clear what went wrong if the tests fail (the fact that these tests are hard to understand came up in the context of #16620). This commit also removes a test that I realized tested exactly the same functionality. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16785: [SPARK-19443][SQL] The function to generate constraints ...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/16785 @cloud-fan yeah, i agreed with you and @hvanhovell. For too slow constraint propagation, except for attacking `getAliasedConstraints` like this change, maybe we can have other way to improve the process doing constraint propagation. If we can't, for such long lineages, I think we should use checkpointing to fix it like #16775. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16724: [SPARK-19352][SQL] Keep sort order of rows after externa...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/16724 @cloud-fan Is the current change not suitable? We can change it to only preserve data order when specifying partitioning and no bucketing for the output. This change only adds a new constructor to `UnsafeKVExternalSorter`. No other API change I think. As the data output is going through this external sorter, it definitely changes the data order without this change. I think we may not be able to preserve data order with a workaround which doesn't touch `UnsafeKVExternalSorter`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/16395#discussion_r100660551 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -0,0 +1,623 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.statsEstimation + +import java.sql.{Date, Timestamp} + +import scala.collection.immutable.{HashSet, Map} +import scala.collection.mutable + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.CatalystConf +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * @param plan a LogicalPlan node that must be an instance of Filter + * @param catalystConf a configuration showing if CBO is enabled + */ +case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Logging { + + /** + * We use a mutable colStats because we need to update the corresponding ColumnStat + * for a column after we apply a predicate condition. For example, column c has + * [min, max] value as [0, 100]. In a range condition such as (c > 40 AND c <= 50), + * we need to set the column's [min, max] value to [40, 100] after we evaluate the + * first condition c > 40. We need to set the column's [min, max] value to [40, 50] + * after we evaluate the second condition c <= 50. + */ + private var mutableColStats: mutable.Map[ExprId, ColumnStat] = mutable.Map.empty + + /** + * Returns an option of Statistics for a Filter logical plan node. + * For a given compound expression condition, this method computes filter selectivity + * (or the percentage of rows meeting the filter condition), which + * is used to compute row count, size in bytes, and the updated statistics after a given + * predicated is applied. + * + * @return Option[Statistics] When there is no statistics collected, it returns None. + */ + def estimate: Option[Statistics] = { +val stats: Statistics = plan.child.stats(catalystConf) +if (stats.rowCount.isEmpty) return None + +// save a mutable copy of colStats so that we can later change it recursively +mutableColStats = mutable.Map(stats.attributeStats.map(kv => (kv._1.exprId, kv._2)).toSeq: _*) + +// estimate selectivity of this filter predicate +val filterSelectivity: Double = calculateConditions(plan.condition) + +// attributeStats has mapping Attribute-to-ColumnStat. +// mutableColStats has mapping ExprId-to-ColumnStat. +// We use an ExprId-to-Attribute map to facilitate the mapping Attribute-to-ColumnStat +val expridToAttrMap: Map[ExprId, Attribute] = + stats.attributeStats.map(kv => (kv._1.exprId, kv._1)) +// copy mutableColStats contents to an immutable AttributeMap. +val mutableAttributeStats: mutable.Map[Attribute, ColumnStat] = + mutableColStats.map(kv => expridToAttrMap(kv._1) -> kv._2) +val newColStats = AttributeMap(mutableAttributeStats.toSeq) + +val filteredRowCount: BigInt = + EstimationUtils.ceil(BigDecimal(stats.rowCount.get) * filterSelectivity) +val filteredSizeInBytes: BigInt = EstimationUtils.ceil(BigDecimal( +EstimationUtils.getOutputSize(plan.output, filteredRowCount, newColStats) +)) + +Some(stats.copy(sizeInBytes = filteredSizeInBytes, rowCount = Some(filteredRowCount), + attributeStats = newColStats)) + } + + /** + * Returns a percentage of rows meeting a compound condition in Filter node. + * A compound condition is decomposed into multiple single conditions linked with AND, OR, NOT. + * For logical AND conditions, we
[GitHub] spark pull request #16228: [SPARK-17076] [SQL] Cardinality estimation for jo...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/16228#discussion_r100660535 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/Range.scala --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.statsEstimation + +import java.math.{BigDecimal => JDecimal} +import java.sql.{Date, Timestamp} + +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.types.{BooleanType, DateType, TimestampType, _} + + +/** Value range of a column. */ +trait Range + +/** For simplicity we use decimal to unify operations of numeric ranges. */ +case class NumericRange(min: JDecimal, max: JDecimal) extends Range + +/** + * This version of Spark does not have min/max for binary/string types, we define their default + * behaviors by this class. + */ +class DefaultRange extends Range + +/** This is for columns with only null values. */ +class NullRange extends Range + +object Range { + def apply(min: Option[Any], max: Option[Any], dataType: DataType): Range = dataType match { +case StringType | BinaryType => new DefaultRange() +case _ if min.isEmpty || max.isEmpty => new NullRange() +case _ => toNumericRange(min.get, max.get, dataType) + } + + def isIntersected(r1: Range, r2: Range): Boolean = (r1, r2) match { +case (_, _: DefaultRange) | (_: DefaultRange, _) => + // Skip overlapping check for binary/string types + true +case (_, _: NullRange) | (_: NullRange, _) => + false +case (n1: NumericRange, n2: NumericRange) => + n1.min.compareTo(n2.max) <= 0 && n1.max.compareTo(n2.min) >= 0 + } + + /** This is only for two overlapped ranges. */ --- End diff -- ok I'll improve the doc --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16228: [SPARK-17076] [SQL] Cardinality estimation for jo...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/16228#discussion_r100660524 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala --- @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.statsEstimation + +import scala.collection.mutable + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.CatalystConf +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Expression} +import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Join, Statistics} +import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._ +import org.apache.spark.sql.types.DataType + + +object JoinEstimation extends Logging { + /** + * Estimate statistics after join. Return `None` if the join type is not supported, or we don't + * have enough statistics for estimation. + */ + def estimate(conf: CatalystConf, join: Join): Option[Statistics] = { +join.joinType match { + case Inner | Cross | LeftOuter | RightOuter | FullOuter => +InnerOuterEstimation(conf, join).doEstimate() + case LeftSemi | LeftAnti => +LeftSemiAntiEstimation(conf, join).doEstimate() + case _ => +logDebug(s"[CBO] Unsupported join type: ${join.joinType}") +None +} + } +} + +case class InnerOuterEstimation(conf: CatalystConf, join: Join) extends Logging { + + private val leftStats = join.left.stats(conf) + private val rightStats = join.right.stats(conf) + + /** + * Estimate output size and number of rows after a join operator, and update output column stats. + */ + def doEstimate(): Option[Statistics] = join match { +case _ if !rowCountsExist(conf, join.left, join.right) => + None + +case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) => + // 1. Compute join selectivity + val joinKeyPairs = extractJoinKeys(leftKeys, rightKeys) + val selectivity = joinSelectivity(joinKeyPairs, leftStats, rightStats) + + // 2. Estimate the number of output rows + val leftRows = leftStats.rowCount.get + val rightRows = rightStats.rowCount.get + val innerRows = ceil(BigDecimal(leftRows * rightRows) * selectivity) + + // Make sure outputRows won't be too small based on join type. + val outputRows = joinType match { +case LeftOuter => + // All rows from left side should be in the result. + leftRows.max(innerRows) +case RightOuter => + // All rows from right side should be in the result. + rightRows.max(innerRows) +case FullOuter => + // Simulate full outer join as obtaining the number of elements in the union of two + // finite sets: A \cup B = A + B - A \cap B => A FOJ B = A + B - A IJ B. + // But the "inner join" part can be much larger than A \cap B, making the simulated + // result much smaller. To prevent this, we choose the larger one between the simulated + // part and the inner part. + (leftRows + rightRows - innerRows).max(innerRows) +case _ => + // Don't change for inner or cross join + innerRows + } + + // 3. Update statistics based on the output of join + val intersectedStats = if (selectivity == 0) { +AttributeMap[ColumnStat](Nil) + } else { +updateIntersectedStats(joinKeyPairs, leftStats, rightStats) + } + val inputAttrStats = AttributeMap( +leftStats.attributeStats.toSeq ++ rightSt
[GitHub] spark pull request #16228: [SPARK-17076] [SQL] Cardinality estimation for jo...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/16228#discussion_r100660451 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala --- @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.statsEstimation + +import scala.collection.mutable + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.CatalystConf +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Expression} +import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Join, Statistics} +import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._ +import org.apache.spark.sql.types.DataType + + +object JoinEstimation extends Logging { + /** + * Estimate statistics after join. Return `None` if the join type is not supported, or we don't + * have enough statistics for estimation. + */ + def estimate(conf: CatalystConf, join: Join): Option[Statistics] = { +join.joinType match { + case Inner | Cross | LeftOuter | RightOuter | FullOuter => +InnerOuterEstimation(conf, join).doEstimate() + case LeftSemi | LeftAnti => +LeftSemiAntiEstimation(conf, join).doEstimate() + case _ => +logDebug(s"[CBO] Unsupported join type: ${join.joinType}") +None +} + } +} + +case class InnerOuterEstimation(conf: CatalystConf, join: Join) extends Logging { + + private val leftStats = join.left.stats(conf) + private val rightStats = join.right.stats(conf) + + /** + * Estimate output size and number of rows after a join operator, and update output column stats. + */ + def doEstimate(): Option[Statistics] = join match { +case _ if !rowCountsExist(conf, join.left, join.right) => + None + +case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) => + // 1. Compute join selectivity + val joinKeyPairs = extractJoinKeys(leftKeys, rightKeys) + val selectivity = joinSelectivity(joinKeyPairs, leftStats, rightStats) + + // 2. Estimate the number of output rows + val leftRows = leftStats.rowCount.get + val rightRows = rightStats.rowCount.get + val innerRows = ceil(BigDecimal(leftRows * rightRows) * selectivity) + + // Make sure outputRows won't be too small based on join type. + val outputRows = joinType match { +case LeftOuter => + // All rows from left side should be in the result. + leftRows.max(innerRows) +case RightOuter => + // All rows from right side should be in the result. + rightRows.max(innerRows) +case FullOuter => + // Simulate full outer join as obtaining the number of elements in the union of two + // finite sets: A \cup B = A + B - A \cap B => A FOJ B = A + B - A IJ B. + // But the "inner join" part can be much larger than A \cap B, making the simulated + // result much smaller. To prevent this, we choose the larger one between the simulated + // part and the inner part. + (leftRows + rightRows - innerRows).max(innerRows) +case _ => + // Don't change for inner or cross join + innerRows + } + + // 3. Update statistics based on the output of join + val intersectedStats = if (selectivity == 0) { +AttributeMap[ColumnStat](Nil) + } else { +updateIntersectedStats(joinKeyPairs, leftStats, rightStats) + } + val inputAttrStats = AttributeMap( +leftStats.attributeStats.toSeq ++ rightSt
[GitHub] spark pull request #16228: [SPARK-17076] [SQL] Cardinality estimation for jo...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/16228#discussion_r100660417 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala --- @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.statsEstimation + +import scala.collection.mutable + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.CatalystConf +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Expression} +import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Join, Statistics} +import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._ +import org.apache.spark.sql.types.DataType + + +object JoinEstimation extends Logging { + /** + * Estimate statistics after join. Return `None` if the join type is not supported, or we don't + * have enough statistics for estimation. + */ + def estimate(conf: CatalystConf, join: Join): Option[Statistics] = { +join.joinType match { + case Inner | Cross | LeftOuter | RightOuter | FullOuter => +InnerOuterEstimation(conf, join).doEstimate() + case LeftSemi | LeftAnti => +LeftSemiAntiEstimation(conf, join).doEstimate() + case _ => +logDebug(s"[CBO] Unsupported join type: ${join.joinType}") +None +} + } +} + +case class InnerOuterEstimation(conf: CatalystConf, join: Join) extends Logging { + + private val leftStats = join.left.stats(conf) + private val rightStats = join.right.stats(conf) + + /** + * Estimate output size and number of rows after a join operator, and update output column stats. + */ + def doEstimate(): Option[Statistics] = join match { +case _ if !rowCountsExist(conf, join.left, join.right) => + None + +case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) => + // 1. Compute join selectivity + val joinKeyPairs = extractJoinKeys(leftKeys, rightKeys) + val selectivity = joinSelectivity(joinKeyPairs, leftStats, rightStats) + + // 2. Estimate the number of output rows + val leftRows = leftStats.rowCount.get + val rightRows = rightStats.rowCount.get + val innerRows = ceil(BigDecimal(leftRows * rightRows) * selectivity) + + // Make sure outputRows won't be too small based on join type. + val outputRows = joinType match { +case LeftOuter => + // All rows from left side should be in the result. + leftRows.max(innerRows) +case RightOuter => + // All rows from right side should be in the result. + rightRows.max(innerRows) +case FullOuter => + // Simulate full outer join as obtaining the number of elements in the union of two + // finite sets: A \cup B = A + B - A \cap B => A FOJ B = A + B - A IJ B. --- End diff -- It's a set operation. Please ignore this, because I'll update computing equation for FullOuter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16228: [SPARK-17076] [SQL] Cardinality estimation for jo...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/16228#discussion_r100660406 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -340,14 +340,22 @@ case class Join( case _ => resolvedExceptNatural } - override def computeStats(conf: CatalystConf): Statistics = joinType match { -case LeftAnti | LeftSemi => - // LeftSemi and LeftAnti won't ever be bigger than left - left.stats(conf).copy() -case _ => - // make sure we don't propagate isBroadcastable in other joins, because - // they could explode the size. - super.computeStats(conf).copy(isBroadcastable = false) + override def computeStats(conf: CatalystConf): Statistics = { +def simpleEstimation: Statistics = joinType match { + case LeftAnti | LeftSemi => +// LeftSemi and LeftAnti won't ever be bigger than left +left.stats(conf) + case _ => +// Make sure we don't propagate isBroadcastable in other joins, because +// they could explode the size. +super.computeStats(conf).copy(isBroadcastable = false) --- End diff -- We can't make sure we won't change the behavior of super.computeStats(conf), so I think it's safer to do this here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16847: [SPARK-19318][SQL] Fix to send JDBC connection propertie...
Github user sureshthalamati commented on the issue: https://github.com/apache/spark/pull/16847 @cloud-fan @gatorsmile I created two PR for the same issue , as I was not sure what is the best approach to fix this issue. I would appreciate your feedback. Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16891: [SPARK-19318][SQL] Fix to treat JDBC connection properti...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16891 **[Test build #72731 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72731/testReport)** for PR 16891 at commit [`41d3362`](https://github.com/apache/spark/commit/41d336251cb70d7be26171c6a1f484e742ba83bd). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16715: [Spark-18080][ML] Python API & Examples for Locality Sen...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/16715 Could you please add tag "[PYTHON]" to the PR title? Also, please remove "Please review http://spark.apache.org/contributing.html before opening a pull request." from the PR description since that will become part of the commit message. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16891: [SPARK-19318][SQL] Fix to treat JDBC connection p...
GitHub user sureshthalamati opened a pull request: https://github.com/apache/spark/pull/16891 [SPARK-19318][SQL] Fix to treat JDBC connection properties specified by the user in case-sensitive manner. ## What changes were proposed in this pull request? The reason for test failure is that the property âoracle.jdbc.mapDateToTimestampâ set by the test was getting converted into all lower case. Oracle database expects this property in case-sensitive manner. This test was passing in previous releases because connection properties were sent as user specified for the test case scenario. Fixes to handle all option uniformly in case-insensitive manner, converted the JDBC connection properties also to lower case. This PR enhances CaseInsensitiveMap to keep track of input case-sensitive keys , and uses those when creating connection properties that are passed to the JDBC connection. Alternative approach PR https://github.com/apache/spark/pull/16847 is to pass original input keys to JDBC data source by adding check in the Data source class and handle case-insensitivity in the JDBC source code. ## How was this patch tested? Added new test cases to JdbcSuite , and OracleIntegrationSuite. Ran docker integration tests passed on my laptop, all tests passed successfully. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sureshthalamati/spark jdbc_case_senstivity_props_fix-SPARK-19318 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16891.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16891 commit b2eba09bff1015d9eccc15a1c2ed7c09b6a9 Author: sureshthalamati Date: 2017-02-09T22:36:54Z [SPARK-19318][SQL} Fix to keep track of JDBC connection properties in the user specified options in case-sensitive manner. commit 41d336251cb70d7be26171c6a1f484e742ba83bd Author: sureshthalamati Date: 2017-02-11T04:28:00Z removed unnecessary getOrelse as the map is internal one, and the should exist --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16870: [SPARK-19496][SQL]to_date udf to return null when input ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16870 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72730/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16870: [SPARK-19496][SQL]to_date udf to return null when input ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16870 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16870: [SPARK-19496][SQL]to_date udf to return null when input ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16870 **[Test build #72730 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72730/testReport)** for PR 16870 at commit [`2dc241e`](https://github.com/apache/spark/commit/2dc241e7c83ca265d0037a1144393478dd2b66aa). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16847: [SPARK-19318][SQL] Fix to send JDBC connection propertie...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16847 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72728/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16847: [SPARK-19318][SQL] Fix to send JDBC connection propertie...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16847 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16847: [SPARK-19318][SQL] Fix to send JDBC connection propertie...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16847 **[Test build #72728 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72728/testReport)** for PR 16847 at commit [`81e9060`](https://github.com/apache/spark/commit/81e906052ea766fe4f228b84db5caea22860dae4). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16386 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72729/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16386 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16386 **[Test build #72729 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72729/testReport)** for PR 16386 at commit [`b0a5cc8`](https://github.com/apache/spark/commit/b0a5cc8f0c6475b4fa603036f11a9c1248295b40). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16826: [WIP][SPARK-19540][SQL] Add ability to clone SparkSessio...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16826 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72727/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16826: [WIP][SPARK-19540][SQL] Add ability to clone SparkSessio...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16826 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16826: [WIP][SPARK-19540][SQL] Add ability to clone SparkSessio...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16826 **[Test build #72727 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72727/testReport)** for PR 16826 at commit [`6da6bda`](https://github.com/apache/spark/commit/6da6bdadc526d71ce887b0bcbef57272a713d6b2). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16871: [SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16871 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72724/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16871: [SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16871 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16871: [SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16871 **[Test build #72724 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72724/testReport)** for PR 16871 at commit [`7f8a2cb`](https://github.com/apache/spark/commit/7f8a2cbc984d61c0bbbc98cc4b4f3355bf8ecb73). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16889: [SPARK-17668][SQL] Use Expressions for conversions to/fr...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/16889 FYI the test that failed here seems to be flaky and probably not related to this PR: https://issues.apache.org/jira/browse/SPARK-19559 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16870: [SPARK-19496][SQL]to_date udf to return null when input ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16870 **[Test build #72730 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72730/testReport)** for PR 16870 at commit [`2dc241e`](https://github.com/apache/spark/commit/2dc241e7c83ca265d0037a1144393478dd2b66aa). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15009: [SPARK-17443][SPARK-11035] Stop Spark Application if lau...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15009 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72723/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15009: [SPARK-17443][SPARK-11035] Stop Spark Application if lau...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15009 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16870: [SPARK-19496][SQL]to_date udf to return null when...
Github user windpiger commented on a diff in the pull request: https://github.com/apache/spark/pull/16870#discussion_r100656533 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala --- @@ -465,15 +465,15 @@ case class DateFormatClass(left: Expression, right: Expression, timeZoneId: Opti copy(timeZoneId = Option(timeZoneId)) override protected def nullSafeEval(timestamp: Any, format: Any): Any = { -val df = DateTimeUtils.newDateFormat(format.toString, timeZone) +val df = DateTimeUtils.newDateFormat(format.toString, timeZone, isLenient = true) UTF8String.fromString(df.format(new java.util.Date(timestamp.asInstanceOf[Long] / 1000))) } override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val dtu = DateTimeUtils.getClass.getName.stripSuffix("$") val tz = ctx.addReferenceMinorObj(timeZone) defineCodeGen(ctx, ev, (timestamp, format) => { - s"""UTF8String.fromString($dtu.newDateFormat($format.toString(), $tz) + s"""UTF8String.fromString($dtu.newDateFormat($format.toString(), $tz, false) --- End diff -- oh, sorry let me fix it, thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15009: [SPARK-17443][SPARK-11035] Stop Spark Application if lau...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15009 **[Test build #72723 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72723/testReport)** for PR 15009 at commit [`cc2c0be`](https://github.com/apache/spark/commit/cc2c0be9aab069b827fc6d46fb8201a1902c87ba). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16686: [SPARK-18682][SS] Batch Source for Kafka
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/16686 I recently have noticed a few flaky test failures of KafkaSourceSuite.subscribing topic by pattern with topic deletions (e.g., https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72725/testReport/org.apache.spark.sql.kafka010/KafkaSourceSuite/subscribing_topic_by_pattern_with_topic_deletions/). Is it possible those were caused by this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16890: when colum is use alias ,the order by result is wrong
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16890 Can one of the admins verify this patch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16890: when colum is use alias ,the order by result is w...
GitHub user muyannian opened a pull request: https://github.com/apache/spark/pull/16890 when colum is use alias ,the order by result is wrong i write two sql. the first order by result is wrong, but the second order by result is right,that may be a bug? ---sql 1 select amtlong as yasname ,usernick,count(*) as cnt,sum(amtdouble) as amt from ydb_import_txt group by usernick, amtlong order by amt desc,cnt,nick,amtlong limit 230 select amtlong as yasname ,usernick,count(*) as cnt,sum(amtdouble) as amt from ydb_import_txt group by usernick, amtlong order by amt desc,cnt,nick,amtlong limit 230 220@ 9189 å¥é¸¿ç 1 99.97 221@ 7105 å¥é¸¿ç 1 99.97 --sql2 select amtlong as yasname ,usernick,count(*) as cnt,sum(amtdouble) as amt from ydb_import_txt group by usernick, amtlong order by amt desc,cnt,nick,amtlong limit 230 select amtlong as yasname ,usernick,count(*) as cnt,sum(amtdouble) as amt from ydb_import_txt group by usernick, amtlong order by amt desc,cnt,nick,yasname limit 230 220@ 7105 å¥é¸¿ç 1 99.97 221@ 9189 å¥é¸¿ç 1 99.97 You can merge this pull request into a Git repository by running: $ git pull https://github.com/apache/spark master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16890.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16890 commit 923e594844a7ad406195b91877f0fb374d5a454b Author: Dongjoon Hyun Date: 2017-01-08T02:55:01Z [SPARK-18941][SQL][DOC] Add a new behavior document on `CREATE/DROP TABLE` with `LOCATION` ## What changes were proposed in this pull request? This PR adds a new behavior change description on `CREATE TABLE ... LOCATION` at `sql-programming-guide.md` clearly under `Upgrading From Spark SQL 1.6 to 2.0`. This change is introduced at Apache Spark 2.0.0 as [SPARK-15276](https://issues.apache.org/jira/browse/SPARK-15276). ## How was this patch tested? ``` SKIP_API=1 jekyll build ``` **Newly Added Description** https://cloud.githubusercontent.com/assets/9700541/21743606/7efe2b12-d4ba-11e6-8a0d-551222718ea2.png";> Author: Dongjoon Hyun Closes #16400 from dongjoon-hyun/SPARK-18941. commit 6b6b555a1e667a9f03dfe4a21e56c513a353a58d Author: Yanbo Liang Date: 2017-01-08T09:10:36Z [SPARK-18862][SPARKR][ML] Split SparkR mllib.R into multiple files ## What changes were proposed in this pull request? SparkR ```mllib.R``` is getting bigger as we add more ML wrappers, I'd like to split it into multiple files to make us easy to maintain: * mllib_classification.R * mllib_clustering.R * mllib_recommendation.R * mllib_regression.R * mllib_stat.R * mllib_tree.R * mllib_utils.R Note: Only reorg, no actual code change. ## How was this patch tested? Existing tests. Author: Yanbo Liang Closes #16312 from yanboliang/spark-18862. commit cd1d00adaff65e8adfebc2342dd422c53f98166b Author: zuotingbing Date: 2017-01-08T09:29:01Z [SPARK-19026] SPARK_LOCAL_DIRS(multiple directories on different disks) cannot be deleted JIRA Issue: https://issues.apache.org/jira/browse/SPARK-19026 SPARK_LOCAL_DIRS (Standalone) can be a comma-separated list of multiple directories on different disks, e.g. SPARK_LOCAL_DIRS=/dir1,/dir2,/dir3, if there is a IOExecption when create sub directory on dir3 , the sub directory which have been created successfully on dir1 and dir2 cannot be deleted anymore when the application finishes. So we should catch the IOExecption at Utils.createDirectory , otherwise the variable "appDirectories(appId)" which the function maybeCleanupApplication calls will not be set then dir1 and dir2 will not be cleaned up . Author: zuotingbing Closes #16439 from zuotingbing/master. commit 4351e62207957bec663108a571cff2bfaaa9e7d5 Author: Dilip Biswal Date: 2017-01-08T22:09:07Z [SPARK-19093][SQL] Cached tables are not used in SubqueryExpression ## What changes were proposed in this pull request? Consider the plans inside subquery expressions while looking up cache manager to make use of cached data. Currently CacheManager.useCachedData does not consider the subquery expressions in the plan. SQL ``` select * from rows where not exists (select * from rows) ``` Before the fix ``` == Optimized Logical Plan == Join LeftAnti :- InMemoryRelation [_1#3775, _2#3776], true, 1, StorageLevel(disk, memory, deserialized, 1 replicas) : +- *FileScan parquet [_1#3775,_2#3776] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/tmp/rows], PartitionFilte
[GitHub] spark issue #15604: [SPARK-18066] [CORE] [TESTS] Add Pool usage policies tes...
Github user erenavsarogullari commented on the issue: https://github.com/apache/spark/pull/15604 build failure looks unrelated (due to timeout at KafkaSourceSuite level) Jenkins, retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16868: [SPARK-19115] [SQL] Supporting Create External Table Lik...
Github user ouyangxiaochen commented on the issue: https://github.com/apache/spark/pull/16868 I have run test cases successfully. Please run the test cases again.Thanks a lot! @SparkQA --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15604: [SPARK-18066] [CORE] [TESTS] Add Pool usage policies tes...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15604 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15604: [SPARK-18066] [CORE] [TESTS] Add Pool usage policies tes...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15604 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72725/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15604: [SPARK-18066] [CORE] [TESTS] Add Pool usage policies tes...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15604 **[Test build #72725 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72725/testReport)** for PR 15604 at commit [`f84abe7`](https://github.com/apache/spark/commit/f84abe74dd4426cf520b38f6380dff693dacc92b). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16386 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16386 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72726/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16386 **[Test build #72726 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72726/testReport)** for PR 16386 at commit [`0f4686d`](https://github.com/apache/spark/commit/0f4686d39890e84dd8542297eeb688746f80ee55). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16777: [SPARK-19435][SQL] Type coercion between ArrayTypes
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/16777 Let me check other DBMSs and back. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files
Github user NathanHowell commented on the issue: https://github.com/apache/spark/pull/16386 @cloud-fan I just pushed a few more changes to address some of your comments. I'll be back later next week to continue work. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16386 **[Test build #72729 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72729/testReport)** for PR 16386 at commit [`b0a5cc8`](https://github.com/apache/spark/commit/b0a5cc8f0c6475b4fa603036f11a9c1248295b40). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100653879 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.option("compression", "GzIp") +.text(path) + + new File(path).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".gz"))) + } + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.option("compression", "gZiP") +.save(jsonDir) + + new File(jsonDir).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".json.gz"))) + } + + val jsonCopy = spark.read +.format("json") +.load(jsonDir) + + assert(jsonCopy.count === jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) +} + } + + test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.save(jsonDir) + + val compressedFiles = new File(jsonDir).listFiles() + assert(compressedFiles.exists(_.getName.endsWith(".json"))) + + val jsonCopy = spark.read +.format("json") +.load(jsonDir) + + assert(jsonCopy.count === jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) +} + } + + test("SPARK-18352: Expect one JSON document per file") { +// the json parser terminates as soon as it sees a matching END_OBJECT or END_ARRAY token. +// this might not be the optimal behavior but this test verifies that only the first value +// is parsed and the rest are discarded. + +// alternatively the parser could continue parsing following objects, which may further reduce +// allocations by skipping the line reader entirely + +withTempPath { dir => + val path = dir.getCanonicalPath + spark +.createDataFrame(Seq(Tuple1("{}{invalid}"))) +.coalesce(1) +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + // no corrupt record column should be created + assert(jsonDF.schema === StructType(Seq())) + // only the first object should be read + assert(jsonDF.count() === 1) +} + } + + test("SPARK-18352: Handle corrupt documents") { +withTempPath { dir => + val path = dir.getCanonicalPath + val corruptRecordCount = additionalCorruptRecords.count().toInt + assert(corruptRecordCount === 5) + + additionalCorruptRecords +.toDF("value") +.repartition(corruptRecordCount * 4) +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) --- End diff -- Probably, but I'm out of time for today. I'll be out for a few days and can pick this back up on Thursday or Friday next week. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@s
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100653835 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.option("compression", "GzIp") +.text(path) + + new File(path).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".gz"))) + } + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.option("compression", "gZiP") +.save(jsonDir) + + new File(jsonDir).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".json.gz"))) + } + + val jsonCopy = spark.read +.format("json") +.load(jsonDir) + + assert(jsonCopy.count === jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) +} + } + + test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.save(jsonDir) + + val compressedFiles = new File(jsonDir).listFiles() + assert(compressedFiles.exists(_.getName.endsWith(".json"))) + + val jsonCopy = spark.read +.format("json") +.load(jsonDir) + + assert(jsonCopy.count === jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) +} + } + + test("SPARK-18352: Expect one JSON document per file") { +// the json parser terminates as soon as it sees a matching END_OBJECT or END_ARRAY token. +// this might not be the optimal behavior but this test verifies that only the first value +// is parsed and the rest are discarded. + +// alternatively the parser could continue parsing following objects, which may further reduce +// allocations by skipping the line reader entirely + +withTempPath { dir => + val path = dir.getCanonicalPath + spark +.createDataFrame(Seq(Tuple1("{}{invalid}"))) +.coalesce(1) +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + // no corrupt record column should be created + assert(jsonDF.schema === StructType(Seq())) + // only the first object should be read + assert(jsonDF.count() === 1) +} + } + + test("SPARK-18352: Handle corrupt documents") { +withTempPath { dir => + val path = dir.getCanonicalPath + val corruptRecordCount = additionalCorruptRecords.count().toInt + assert(corruptRecordCount === 5) + + additionalCorruptRecords +.toDF("value") +.repartition(corruptRecordCount * 4) --- End diff -- Probably, but I'm out of time for today. I'll be out for a few days and can pick this back up on Thursday or Friday next week. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100653757 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.option("compression", "GzIp") +.text(path) + + new File(path).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".gz"))) + } + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.option("compression", "gZiP") +.save(jsonDir) + + new File(jsonDir).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".json.gz"))) + } + + val jsonCopy = spark.read +.format("json") +.load(jsonDir) + + assert(jsonCopy.count === jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) +} + } + + test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.save(jsonDir) + + val compressedFiles = new File(jsonDir).listFiles() + assert(compressedFiles.exists(_.getName.endsWith(".json"))) + + val jsonCopy = spark.read +.format("json") +.load(jsonDir) + + assert(jsonCopy.count === jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) +} + } + + test("SPARK-18352: Expect one JSON document per file") { +// the json parser terminates as soon as it sees a matching END_OBJECT or END_ARRAY token. +// this might not be the optimal behavior but this test verifies that only the first value +// is parsed and the rest are discarded. + +// alternatively the parser could continue parsing following objects, which may further reduce +// allocations by skipping the line reader entirely + +withTempPath { dir => + val path = dir.getCanonicalPath + spark +.createDataFrame(Seq(Tuple1("{}{invalid}"))) +.coalesce(1) +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + // no corrupt record column should be created + assert(jsonDF.schema === StructType(Seq())) + // only the first object should be read + assert(jsonDF.count() === 1) +} + } + + test("SPARK-18352: Handle corrupt documents") { +withTempPath { dir => + val path = dir.getCanonicalPath + val corruptRecordCount = additionalCorruptRecords.count().toInt + assert(corruptRecordCount === 5) + + additionalCorruptRecords +.toDF("value") +.repartition(corruptRecordCount * 4) +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + assert(jsonDF.count() === corruptRecordCount) + assert(jsonDF.schema === new StructType() +.add("_corrupt_record", StringType) +.add("dummy", StringType)) + val counts = jsonDF +.join( + additionalCorruptRecords.toDF("value"), + F.regexp_replace($"_corrupt_record", "(^\\s+|\\s+$)", "") === F.trim($"value"), --- End diff -- This is the same as `F.trim` but it works on all whitespace characters, not just 0x20 (ascii space)... if trim removed line endings and not just spaces it would have worked instead. --- If your project is set up for it, you c
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100653580 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.option("compression", "GzIp") +.text(path) + + new File(path).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".gz"))) + } + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.option("compression", "gZiP") +.save(jsonDir) + + new File(jsonDir).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".json.gz"))) + } + + val jsonCopy = spark.read +.format("json") +.load(jsonDir) + + assert(jsonCopy.count === jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) +} + } + + test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.save(jsonDir) + + val compressedFiles = new File(jsonDir).listFiles() + assert(compressedFiles.exists(_.getName.endsWith(".json"))) + + val jsonCopy = spark.read +.format("json") +.load(jsonDir) + + assert(jsonCopy.count === jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) +} + } + + test("SPARK-18352: Expect one JSON document per file") { +// the json parser terminates as soon as it sees a matching END_OBJECT or END_ARRAY token. +// this might not be the optimal behavior but this test verifies that only the first value +// is parsed and the rest are discarded. + +// alternatively the parser could continue parsing following objects, which may further reduce +// allocations by skipping the line reader entirely + +withTempPath { dir => + val path = dir.getCanonicalPath + spark +.createDataFrame(Seq(Tuple1("{}{invalid}"))) +.coalesce(1) +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + // no corrupt record column should be created + assert(jsonDF.schema === StructType(Seq())) + // only the first object should be read + assert(jsonDF.count() === 1) +} + } + + test("SPARK-18352: Handle corrupt documents") { +withTempPath { dir => + val path = dir.getCanonicalPath + val corruptRecordCount = additionalCorruptRecords.count().toInt + assert(corruptRecordCount === 5) + + additionalCorruptRecords +.toDF("value") +.repartition(corruptRecordCount * 4) --- End diff -- Yep, I'll fix. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16880: [SPARK-19542][SS]Delete the temp checkpoint if a query i...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16880 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72721/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16880: [SPARK-19542][SS]Delete the temp checkpoint if a query i...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16880 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16880: [SPARK-19542][SS]Delete the temp checkpoint if a query i...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16880 **[Test build #72721 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72721/testReport)** for PR 16880 at commit [`cae981f`](https://github.com/apache/spark/commit/cae981f1ab9ee6f3d0d3cd4b53ccf6431551a0c0). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100652445 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala --- @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.json + +import java.io.InputStream + +import scala.reflect.ClassTag + +import com.fasterxml.jackson.core.{JsonFactory, JsonParser} +import com.google.common.io.ByteStreams +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.{LongWritable, Text} +import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, TextInputFormat} + +import org.apache.spark.TaskContext +import org.apache.spark.input.{PortableDataStream, StreamInputFormat} +import org.apache.spark.rdd.{BinaryFileRDD, RDD} +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions} +import org.apache.spark.sql.execution.datasources.{CodecStreams, HadoopFileLinesReader, PartitionedFile} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.Utils + +/** + * Common functions for parsing JSON files + * @tparam T A datatype containing the unparsed JSON, such as [[Text]] or [[String]] + */ +abstract class JsonDataSource[T] extends Serializable { + def isSplitable: Boolean + + /** + * Parse a [[PartitionedFile]] into 0 or more [[InternalRow]] instances + */ + def readFile( +conf: Configuration, +file: PartitionedFile, +parser: JacksonParser): Iterator[InternalRow] + + /** + * Create an [[RDD]] that handles the preliminary parsing of [[T]] records + */ + protected def createBaseRdd( +sparkSession: SparkSession, +inputPaths: Seq[FileStatus]): RDD[T] + + /** + * A generic wrapper to invoke the correct [[JsonFactory]] method to allocate a [[JsonParser]] + * for an instance of [[T]] + */ + def createParser(jsonFactory: JsonFactory, value: T): JsonParser + + final def infer( + sparkSession: SparkSession, + inputPaths: Seq[FileStatus], + parsedOptions: JSONOptions): Option[StructType] = { +if (inputPaths.nonEmpty) { + val jsonSchema = JsonInferSchema.infer( +createBaseRdd(sparkSession, inputPaths), +parsedOptions, +createParser) + checkConstraints(jsonSchema) + Some(jsonSchema) +} else { + None +} + } + + /** Constraints to be imposed on schema to be stored. */ + private def checkConstraints(schema: StructType): Unit = { --- End diff -- IIRC this was a check added because some of the backends (maybe parquet?) were writing corrupt files... if this is checked globally now it should be fine to remove --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16847: [SPARK-19318][SQL] Fix to send JDBC connection propertie...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16847 **[Test build #72728 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72728/testReport)** for PR 16847 at commit [`81e9060`](https://github.com/apache/spark/commit/81e906052ea766fe4f228b84db5caea22860dae4). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16847: [SPARK-19318][SQL] Fix to send JDBC connection propertie...
Github user dilipbiswal commented on the issue: https://github.com/apache/spark/pull/16847 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100652259 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType --- End diff -- Right, it's a single value that spans multiple lines. The Python test is reusing some Python specific test data. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100652192 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.option("compression", "GzIp") +.text(path) + + new File(path).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".gz"))) + } + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.option("compression", "gZiP") +.save(jsonDir) + + new File(jsonDir).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".json.gz"))) + } + + val jsonCopy = spark.read --- End diff -- I'm not sure what you mean? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100651990 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.option("compression", "GzIp") +.text(path) + + new File(path).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".gz"))) + } + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.option("compression", "gZiP") --- End diff -- Right --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100651910 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.option("compression", "GzIp") +.text(path) + + new File(path).listFiles() match { +case compressedFiles => --- End diff -- For a reason that is no longer relevant, I'll switch this --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100651706 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala --- @@ -79,7 +80,7 @@ private[sql] object JsonInferSchema { private[this] val structFieldComparator = new Comparator[StructField] { override def compare(o1: StructField, o2: StructField): Int = { - o1.name.compare(o2.name) + o1.name.compareTo(o2.name) --- End diff -- `.compare` is a very expensive way of comparing two strings. `compare`: ``` public int compare(org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField); Code: 0: new #14 // class scala/collection/immutable/StringOps 3: dup 4: getstatic #20 // Field scala/Predef$.MODULE$:Lscala/Predef$; 7: aload_1 8: invokevirtual #26 // Method org/apache/spark/sql/types/StructField.name:()Ljava/lang/String; 11: invokevirtual #30 // Method scala/Predef$.augmentString:(Ljava/lang/String;)Ljava/lang/String; 14: invokespecial #34 // Method scala/collection/immutable/StringOps."":(Ljava/lang/String;)V 17: aload_2 18: invokevirtual #26 // Method org/apache/spark/sql/types/StructField.name:()Ljava/lang/String; 21: invokevirtual #37 // Method scala/collection/immutable/StringOps.compare:(Ljava/lang/String;)I 24: ireturn ``` `compareTo`: ``` public int compare(org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField); Code: 0: aload_1 1: invokevirtual #18 // Method org/apache/spark/sql/types/StructField.name:()Ljava/lang/String; 4: aload_2 5: invokevirtual #18 // Method org/apache/spark/sql/types/StructField.name:()Ljava/lang/String; 8: invokevirtual #24 // Method java/lang/String.compareTo:(Ljava/lang/String;)I 11: ireturn ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100651385 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.option("compression", "GzIp") +.text(path) + + new File(path).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".gz"))) + } + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.option("compression", "gZiP") +.save(jsonDir) + + new File(jsonDir).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".json.gz"))) + } + + val jsonCopy = spark.read +.format("json") +.load(jsonDir) + + assert(jsonCopy.count === jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) +} + } + + test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.save(jsonDir) + + val compressedFiles = new File(jsonDir).listFiles() + assert(compressedFiles.exists(_.getName.endsWith(".json"))) + + val jsonCopy = spark.read +.format("json") +.load(jsonDir) + + assert(jsonCopy.count === jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) +} + } + + test("SPARK-18352: Expect one JSON document per file") { +// the json parser terminates as soon as it sees a matching END_OBJECT or END_ARRAY token. +// this might not be the optimal behavior but this test verifies that only the first value +// is parsed and the rest are discarded. + +// alternatively the parser could continue parsing following objects, which may further reduce +// allocations by skipping the line reader entirely + +withTempPath { dir => + val path = dir.getCanonicalPath + spark +.createDataFrame(Seq(Tuple1("{}{invalid}"))) +.coalesce(1) +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + // no corrupt record column should be created + assert(jsonDF.schema === StructType(Seq())) + // only the first object should be read + assert(jsonDF.count() === 1) +} + } + + test("SPARK-18352: Handle corrupt documents") { +withTempPath { dir => + val path = dir.getCanonicalPath + val corruptRecordCount = additionalCorruptRecords.count().toInt + assert(corruptRecordCount === 5) + + additionalCorruptRecords +.toDF("value") +.repartition(corruptRecordCount * 4) +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + assert(jsonDF.count() === corruptRecordCount) + assert(jsonDF.schema === new StructType() +.add("_corrupt_record", StringType) +.add("dummy", StringType)) + val counts = jsonDF +.join( + additionalCorruptRecords.toDF("value"), + F.regexp_replace($"_corrupt_record", "(^\\s+|\\s+$)", "") === F.trim($"value"), --- End diff -- why we need to remove all white spaces in `_corrupt_record`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wi
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100651178 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.option("compression", "GzIp") +.text(path) + + new File(path).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".gz"))) + } + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.option("compression", "gZiP") +.save(jsonDir) + + new File(jsonDir).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".json.gz"))) + } + + val jsonCopy = spark.read +.format("json") +.load(jsonDir) + + assert(jsonCopy.count === jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) +} + } + + test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.save(jsonDir) + + val compressedFiles = new File(jsonDir).listFiles() + assert(compressedFiles.exists(_.getName.endsWith(".json"))) + + val jsonCopy = spark.read +.format("json") +.load(jsonDir) + + assert(jsonCopy.count === jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) +} + } + + test("SPARK-18352: Expect one JSON document per file") { +// the json parser terminates as soon as it sees a matching END_OBJECT or END_ARRAY token. +// this might not be the optimal behavior but this test verifies that only the first value +// is parsed and the rest are discarded. + +// alternatively the parser could continue parsing following objects, which may further reduce +// allocations by skipping the line reader entirely + +withTempPath { dir => + val path = dir.getCanonicalPath + spark +.createDataFrame(Seq(Tuple1("{}{invalid}"))) +.coalesce(1) +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + // no corrupt record column should be created + assert(jsonDF.schema === StructType(Seq())) + // only the first object should be read + assert(jsonDF.count() === 1) +} + } + + test("SPARK-18352: Handle corrupt documents") { +withTempPath { dir => + val path = dir.getCanonicalPath + val corruptRecordCount = additionalCorruptRecords.count().toInt + assert(corruptRecordCount === 5) + + additionalCorruptRecords +.toDF("value") +.repartition(corruptRecordCount * 4) --- End diff -- do you wanna the 5 rows be distributed to 5 files? how about `repartitionBy("value")`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100651103 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.option("compression", "GzIp") +.text(path) + + new File(path).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".gz"))) + } + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.option("compression", "gZiP") +.save(jsonDir) + + new File(jsonDir).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".json.gz"))) + } + + val jsonCopy = spark.read +.format("json") +.load(jsonDir) + + assert(jsonCopy.count === jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) +} + } + + test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.save(jsonDir) + + val compressedFiles = new File(jsonDir).listFiles() + assert(compressedFiles.exists(_.getName.endsWith(".json"))) + + val jsonCopy = spark.read +.format("json") +.load(jsonDir) + + assert(jsonCopy.count === jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) +} + } + + test("SPARK-18352: Expect one JSON document per file") { +// the json parser terminates as soon as it sees a matching END_OBJECT or END_ARRAY token. +// this might not be the optimal behavior but this test verifies that only the first value +// is parsed and the rest are discarded. + +// alternatively the parser could continue parsing following objects, which may further reduce +// allocations by skipping the line reader entirely + +withTempPath { dir => + val path = dir.getCanonicalPath + spark +.createDataFrame(Seq(Tuple1("{}{invalid}"))) +.coalesce(1) +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + // no corrupt record column should be created + assert(jsonDF.schema === StructType(Seq())) + // only the first object should be read + assert(jsonDF.count() === 1) +} + } + + test("SPARK-18352: Handle corrupt documents") { +withTempPath { dir => + val path = dir.getCanonicalPath + val corruptRecordCount = additionalCorruptRecords.count().toInt + assert(corruptRecordCount === 5) + + additionalCorruptRecords +.toDF("value") +.repartition(corruptRecordCount * 4) +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) --- End diff -- shall we test all the parse modes? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100651055 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.option("compression", "GzIp") +.text(path) + + new File(path).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".gz"))) + } + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.option("compression", "gZiP") +.save(jsonDir) + + new File(jsonDir).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".json.gz"))) + } + + val jsonCopy = spark.read +.format("json") +.load(jsonDir) + + assert(jsonCopy.count === jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) +} + } + + test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.save(jsonDir) + + val compressedFiles = new File(jsonDir).listFiles() + assert(compressedFiles.exists(_.getName.endsWith(".json"))) + + val jsonCopy = spark.read +.format("json") +.load(jsonDir) + + assert(jsonCopy.count === jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) +} + } + + test("SPARK-18352: Expect one JSON document per file") { +// the json parser terminates as soon as it sees a matching END_OBJECT or END_ARRAY token. +// this might not be the optimal behavior but this test verifies that only the first value +// is parsed and the rest are discarded. + +// alternatively the parser could continue parsing following objects, which may further reduce +// allocations by skipping the line reader entirely + +withTempPath { dir => + val path = dir.getCanonicalPath + spark +.createDataFrame(Seq(Tuple1("{}{invalid}"))) +.coalesce(1) +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) --- End diff -- shall we test all the parse modes? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100650756 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.option("compression", "GzIp") +.text(path) + + new File(path).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".gz"))) + } + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.option("compression", "gZiP") +.save(jsonDir) + + new File(jsonDir).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".json.gz"))) + } + + val jsonCopy = spark.read --- End diff -- I'd like to write the whole-file json, then read it and check the answer, instead of writing and reading it again, which is too complicated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100650577 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.option("compression", "GzIp") +.text(path) + + new File(path).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".gz"))) + } + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.option("compression", "gZiP") --- End diff -- we don't support `wholeFile` at write side, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100650500 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType --- End diff -- I checked with `primitiveFieldAndType`, it's not a json array but is wrapped with `{ }`. This is different with the [python test](https://github.com/apache/spark/pull/16386/files#diff-e8e190e27ba3aee32a59b787696b34c6R1) . So if the whole file is not a json array, this file will only produce a single row, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100650450 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -394,36 +447,32 @@ class JacksonParser( } /** - * Parse the string JSON input to the set of [[InternalRow]]s. + * Parse the JSON input to the set of [[InternalRow]]s. + * + * @param recordLiteral an optional function that will be used to generate + * the corrupt record text instead of record.toString */ - def parse(input: String): Seq[InternalRow] = { -if (input.trim.isEmpty) { - Nil -} else { - try { -Utils.tryWithResource(factory.createParser(input)) { parser => - parser.nextToken() - rootConverter.apply(parser) match { -case null => failedRecord(input) -case row: InternalRow => row :: Nil -case array: ArrayData => - // Here, as we support reading top level JSON arrays and take every element - // in such an array as a row, this case is possible. - if (array.numElements() == 0) { -Nil - } else { -array.toArray[InternalRow](schema) - } -case _ => - failedRecord(input) + def parse[T]( + record: T, + createParser: (JsonFactory, T) => JsonParser, + recordLiteral: PartialFunction[T, UTF8String] = PartialFunction.empty): Seq[InternalRow] = { --- End diff -- Passing a function in instead of a closure saves an allocation that will be held for the duration of parsing, and is likely to be promoted to a later GC generation. If we went the closure route the function signature should be this: ```scala def parse( createParser: JsonFactory => JsonParser, recordLiteral: => UTF8String): Seq[InternalRow] ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16889: [SPARK-17668][SQL] Use Expressions for conversions to/fr...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16889 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16889: [SPARK-17668][SQL] Use Expressions for conversions to/fr...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16889 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72720/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16889: [SPARK-17668][SQL] Use Expressions for conversions to/fr...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16889 **[Test build #72720 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72720/testReport)** for PR 16889 at commit [`ac09ad5`](https://github.com/apache/spark/commit/ac09ad519437fe8efb071f354cc4387a4a95c206). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100650024 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.option("compression", "GzIp") +.text(path) + + new File(path).listFiles() match { +case compressedFiles => --- End diff -- why add this pattern match? is it same with `assert(new File(path).listFiles().exists(_.getName.endsWith(".gz")))`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100649836 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -48,69 +47,110 @@ class JacksonParser( // A `ValueConverter` is responsible for converting a value from `JsonParser` // to a value in a field for `InternalRow`. - private type ValueConverter = (JsonParser) => Any + private type ValueConverter = JsonParser => AnyRef // `ValueConverter`s for the root schema for all fields in the schema - private val rootConverter: ValueConverter = makeRootConverter(schema) + private val rootConverter = makeRootConverter(schema) private val factory = new JsonFactory() options.setJacksonOptions(factory) private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length)) + private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord) + + @transient + private[this] var isWarningPrinted: Boolean = false + @transient - private[this] var isWarningPrintedForMalformedRecord: Boolean = false + private def printWarningForMalformedRecord(record: () => UTF8String): Unit = { +def sampleRecord: String = { + if (options.wholeFile) { +"" + } else { +s"Sample record: ${record()}\n" + } +} + +def footer: String = { + s"""Code example to print all malformed records (scala): + |=== + |// The corrupted record exists in column ${options.columnNameOfCorruptRecord}. + |val parsedJson = spark.read.json("/path/to/json/file/test.json") + | + """.stripMargin +} + +if (options.permissive) { + logWarning( +s"""Found at least one malformed record. The JSON reader will replace + |all malformed records with placeholder null in current $PERMISSIVE_MODE parser mode. + |To find out which corrupted records have been replaced with null, please use the + |default inferred schema instead of providing a custom schema. + | + |${sampleRecord ++ footer} + | + """.stripMargin) +} else if (options.dropMalformed) { + logWarning( +s"""Found at least one malformed record. The JSON reader will drop + |all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which + |corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE + |mode and use the default inferred schema. + | + |${sampleRecord ++ footer} + | + """.stripMargin) +} + } + + @transient + private def printWarningIfWholeFile(): Unit = { +if (options.wholeFile && corruptFieldIndex.isDefined) { + logWarning( +s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord may result + |in very large allocations or OutOfMemoryExceptions being raised. + | + """.stripMargin) +} + } /** * This function deals with the cases it fails to parse. This function will be called * when exceptions are caught during converting. This functions also deals with `mode` option. */ - private def failedRecord(record: String): Seq[InternalRow] = { -// create a row even if no corrupt record column is present -if (options.failFast) { - throw new SparkSQLJsonProcessingException(s"Malformed line in FAILFAST mode: $record") -} -if (options.dropMalformed) { - if (!isWarningPrintedForMalformedRecord) { -logWarning( - s"""Found at least one malformed records (sample: $record). The JSON reader will drop - |all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which - |corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE - |mode and use the default inferred schema. - | - |Code example to print all malformed records (scala): - |=== - |// The corrupted record exists in column ${columnNameOfCorruptRecord} - |val parsedJson = spark.read.json("/path/to/json/file/test.json") - | - """.stripMargin) -isWarningPrintedForMalformedRecord = true - } - Nil -} else if (schema.getFieldIndex(columnNameOfCorruptRecord).isEmpty) { - if (!isWarningPrintedForMa
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100649847 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala --- @@ -79,7 +80,7 @@ private[sql] object JsonInferSchema { private[this] val structFieldComparator = new Comparator[StructField] { override def compare(o1: StructField, o2: StructField): Int = { - o1.name.compare(o2.name) + o1.name.compareTo(o2.name) --- End diff -- why this change? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100649748 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala --- @@ -97,46 +91,13 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) -val parsedOptions: JSONOptions = new JSONOptions(options) -val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord - .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) +val parsedOptions = new JSONOptions(options, + sparkSession.sessionState.conf.columnNameOfCorruptRecord) +val readFile = JsonDataSource(parsedOptions).readFile _ --- End diff -- this closure has the reference of the outer pointer, so we will still broadcast the `JsonDataSource`. how about ``` val columnNameOfCorruptRecord = sparkSession.sessionState.conf.columnNameOfCorruptRecord) (file: PartitionedFile) => { val parsedOptions =... val parser = new JacksonParser(requiredSchema, parsedOptions) JsonDataSource(parsedOptions).readFile... } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100649635 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala --- @@ -31,10 +31,17 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs * Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]]. */ private[sql] class JSONOptions( -@transient private val parameters: CaseInsensitiveMap) +@transient private val parameters: CaseInsensitiveMap, +defaultColumnNameOfCorruptRecord: String) extends Logging with Serializable { - def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters)) + def this( + parameters: Map[String, String], + defaultColumnNameOfCorruptRecord: String = "") = { --- End diff -- Yes, it's really not a good solution, but it doesn't make sense to have a corrupt column name in all use cases. Picking another sentinel could inadvertently conflict with a real column. It should be `Option[String] = None` but this winds up being a large change that deserves a separate pull request. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11887: [SPARK-13041][Mesos]add driver sandbox uri to the dispat...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/11887 @HyukjinKwon probably. Let me ask Michael. @mgummelt what do you think? what options do we have? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16161: [SPARK-18717][SQL] Make code generation for Scala Map wo...
Github user liancheng commented on the issue: https://github.com/apache/spark/pull/16161 Thanks. Backported to branch-2.1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100649228 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -394,36 +447,32 @@ class JacksonParser( } /** - * Parse the string JSON input to the set of [[InternalRow]]s. + * Parse the JSON input to the set of [[InternalRow]]s. + * + * @param recordLiteral an optional function that will be used to generate + * the corrupt record text instead of record.toString */ - def parse(input: String): Seq[InternalRow] = { -if (input.trim.isEmpty) { - Nil -} else { - try { -Utils.tryWithResource(factory.createParser(input)) { parser => - parser.nextToken() - rootConverter.apply(parser) match { -case null => failedRecord(input) -case row: InternalRow => row :: Nil -case array: ArrayData => - // Here, as we support reading top level JSON arrays and take every element - // in such an array as a row, this case is possible. - if (array.numElements() == 0) { -Nil - } else { -array.toArray[InternalRow](schema) - } -case _ => - failedRecord(input) + def parse[T]( + record: T, + createParser: (JsonFactory, T) => JsonParser, + recordLiteral: PartialFunction[T, UTF8String] = PartialFunction.empty): Seq[InternalRow] = { --- End diff -- seems the only caller is https://github.com/apache/spark/pull/16386/files#diff-5ac20b8d75a20117deaa9ba4af814090R211 , while doesn't take parameter, so `PartialFunction` is not a good choice. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100649101 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -394,36 +447,32 @@ class JacksonParser( } /** - * Parse the string JSON input to the set of [[InternalRow]]s. + * Parse the JSON input to the set of [[InternalRow]]s. + * + * @param recordLiteral an optional function that will be used to generate + * the corrupt record text instead of record.toString */ - def parse(input: String): Seq[InternalRow] = { -if (input.trim.isEmpty) { - Nil -} else { - try { -Utils.tryWithResource(factory.createParser(input)) { parser => - parser.nextToken() - rootConverter.apply(parser) match { -case null => failedRecord(input) -case row: InternalRow => row :: Nil -case array: ArrayData => - // Here, as we support reading top level JSON arrays and take every element - // in such an array as a row, this case is possible. - if (array.numElements() == 0) { -Nil - } else { -array.toArray[InternalRow](schema) - } -case _ => - failedRecord(input) + def parse[T]( + record: T, + createParser: (JsonFactory, T) => JsonParser, + recordLiteral: PartialFunction[T, UTF8String] = PartialFunction.empty): Seq[InternalRow] = { --- End diff -- how about `wholeTextRecord: => Option[UTF8String] = None`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100648863 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -394,36 +447,32 @@ class JacksonParser( } /** - * Parse the string JSON input to the set of [[InternalRow]]s. + * Parse the JSON input to the set of [[InternalRow]]s. + * + * @param recordLiteral an optional function that will be used to generate + * the corrupt record text instead of record.toString */ - def parse(input: String): Seq[InternalRow] = { -if (input.trim.isEmpty) { - Nil -} else { - try { -Utils.tryWithResource(factory.createParser(input)) { parser => - parser.nextToken() - rootConverter.apply(parser) match { -case null => failedRecord(input) -case row: InternalRow => row :: Nil -case array: ArrayData => - // Here, as we support reading top level JSON arrays and take every element - // in such an array as a row, this case is possible. - if (array.numElements() == 0) { -Nil - } else { -array.toArray[InternalRow](schema) - } -case _ => - failedRecord(input) + def parse[T]( + record: T, + createParser: (JsonFactory, T) => JsonParser, + recordLiteral: PartialFunction[T, UTF8String] = PartialFunction.empty): Seq[InternalRow] = { +try { + Utils.tryWithResource(createParser(factory, record)) { parser => +// a null first token is equivalent to testing for input.trim.isEmpty +// but it works on any token stream and not just strings +parser.nextToken() match { + case null => Nil + case _ => rootConverter.apply(parser) match { +case null => throw new SparkSQLJsonProcessingException("Root converter returned null") +case rows => rows } } - } catch { -case _: JsonProcessingException => - failedRecord(input) -case _: SparkSQLJsonProcessingException => - failedRecord(input) } +} catch { + case (_: JsonProcessingException) | (_: SparkSQLJsonProcessingException) => --- End diff -- nit: I think the brackets are not needed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100648524 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -394,36 +447,32 @@ class JacksonParser( } /** - * Parse the string JSON input to the set of [[InternalRow]]s. + * Parse the JSON input to the set of [[InternalRow]]s. + * + * @param recordLiteral an optional function that will be used to generate + * the corrupt record text instead of record.toString */ - def parse(input: String): Seq[InternalRow] = { -if (input.trim.isEmpty) { - Nil -} else { - try { -Utils.tryWithResource(factory.createParser(input)) { parser => - parser.nextToken() - rootConverter.apply(parser) match { -case null => failedRecord(input) -case row: InternalRow => row :: Nil -case array: ArrayData => - // Here, as we support reading top level JSON arrays and take every element - // in such an array as a row, this case is possible. - if (array.numElements() == 0) { -Nil - } else { -array.toArray[InternalRow](schema) - } -case _ => - failedRecord(input) + def parse[T]( + record: T, + createParser: (JsonFactory, T) => JsonParser, + recordLiteral: PartialFunction[T, UTF8String] = PartialFunction.empty): Seq[InternalRow] = { +try { + Utils.tryWithResource(createParser(factory, record)) { parser => +// a null first token is equivalent to testing for input.trim.isEmpty +// but it works on any token stream and not just strings +parser.nextToken() match { + case null => Nil + case _ => rootConverter.apply(parser) match { +case null => throw new SparkSQLJsonProcessingException("Root converter returned null") +case rows => rows } } - } catch { -case _: JsonProcessingException => - failedRecord(input) -case _: SparkSQLJsonProcessingException => - failedRecord(input) } +} catch { + case (_: JsonProcessingException) | (_: SparkSQLJsonProcessingException) => +failedRecord(() => recordLiteral.applyOrElse[T, UTF8String]( --- End diff -- When I do that I usually get review comments to make call by name parameters explicit... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org