[GitHub] spark issue #16611: [SPARK-17967][SPARK-17878][SQL][PYTHON] Support for arra...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16611 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16611: [SPARK-17967][SPARK-17878][SQL][PYTHON] Support for arra...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16611 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73041/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16611: [SPARK-17967][SPARK-17878][SQL][PYTHON] Support for arra...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16611 **[Test build #73041 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73041/testReport)** for PR 16611 at commit [`60c7e25`](https://github.com/apache/spark/commit/60c7e25947d62800abcd9e96d60b859a82ff0a0d). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15415: [SPARK-14503][ML] spark.ml API for FPGrowth
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/15415#discussion_r101698347 --- Diff: mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala --- @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.ml.fpm + +import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.util.DefaultReadWriteTest +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ + +class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { + + @transient var dataset: Dataset[_] = _ + + override def beforeAll(): Unit = { +super.beforeAll() +dataset = FPGrowthSuite.getFPGrowthData(spark) + } + + test("FPGrowth fit and transform with different data types") { +Array(IntegerType, StringType, ShortType, LongType, ByteType).foreach { dt => + val intData = dataset.withColumn("features", col("features").cast(ArrayType(dt))) + val model = new FPGrowth().setMinSupport(0.8).fit(intData) + val generatedRules = model.setMinConfidence(0.8).getAssociationRules + val expectedRules = spark.createDataFrame(Seq( +(Array("2"), Array("1"), 1.0), +(Array("1"), Array("2"), 1.0) + )).toDF("antecedent", "consequent", "confidence") +.withColumn("antecedent", col("antecedent").cast(ArrayType(dt))) +.withColumn("consequent", col("consequent").cast(ArrayType(dt))) + + assert(expectedRules.sort("antecedent").rdd.collect().sameElements( +generatedRules.sort("antecedent").rdd.collect())) + val transformed = model.transform(intData) + assert(transformed.count() == 3) +} + } + + test("FPGrowth getFreqItems") { +val model = new FPGrowth().setMinSupport(0.8).fit(dataset) +val expectedFreq = spark.createDataFrame(Seq( + (Array("1"), 3L), + (Array("2"), 3L), + (Array("1", "2"), 3L) +)).toDF("items", "freq") +val freqItems = model.getFreqItemsets +assert(freqItems.sort("items").rdd.collect() + .sameElements(expectedFreq.sort("items").rdd.collect())) + } + + test("FPGrowth get Association Rules") { +val model = new FPGrowth().setMinSupport(0.8).fit(dataset) +val expectedRules = spark.createDataFrame(Seq( + (Array("2"), Array("1"), 1.0), + (Array("1"), Array("2"), 1.0) +)).toDF("antecedent", "consequent", "confidence") +val associationRules = model.getAssociationRules + +assert(associationRules.sort("antecedent").rdd.collect() + .sameElements(expectedRules.sort("antecedent").rdd.collect())) + } + + test("FPGrowth parameter check") { +val fpGrowth = new FPGrowth().setMinSupport(0.4567) +val model = fpGrowth.fit(dataset) + .setMinConfidence(0.5678) +assert(fpGrowth.getMinSupport === 0.4567) +assert(model.getMinConfidence === 0.5678) + } + + test("read/write") { +def checkModelData(model: FPGrowthModel, model2: FPGrowthModel): Unit = { + assert(model.freqItemsets.sort("items").collect() === +model2.freqItemsets.sort("items").collect()) +} +val fPGrowth = new FPGrowth() +testEstimatorAndModelReadWrite( + fPGrowth, dataset, FPGrowthSuite.allParamSettings, checkModelData) + } + +} + +object FPGrowthSuite { + + def getFPGrowthData(spark: SparkSession): DataFrame = { +spark.createDataFrame(Seq( + (0, Array("1", "2", "3", "5")), + (0, Array("1", "2", "3", "6")), + (0, Array("1", "2", "7")) +)).toDF("id", "features") + } + + /** + * Mapping from all Params to valid settings which differ from the defaults. + * This is
[GitHub] spark pull request #15415: [SPARK-14503][ML] spark.ml API for FPGrowth
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/15415#discussion_r101698262 --- Diff: mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala --- @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.fpm + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.apache.hadoop.fs.Path + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol} +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.fpm.{AssociationRules => MLlibAssociationRules, +FPGrowth => MLlibFPGrowth} --- End diff -- style: indent with 2 spaces --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15415: [SPARK-14503][ML] spark.ml API for FPGrowth
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/15415#discussion_r101698270 --- Diff: mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala --- @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.fpm + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.apache.hadoop.fs.Path + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol} +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.fpm.{AssociationRules => MLlibAssociationRules, +FPGrowth => MLlibFPGrowth} +import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset +import org.apache.spark.sql._ +import org.apache.spark.sql.types._ + +/** + * Common params for FPGrowth and FPGrowthModel + */ +private[fpm] trait FPGrowthParams extends Params with HasFeaturesCol with HasPredictionCol { + + /** + * Validates and transforms the input schema. + * @param schema input schema + * @return output schema + */ + protected def validateAndTransformSchema(schema: StructType): StructType = { --- End diff -- Since annotation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15415: [SPARK-14503][ML] spark.ml API for FPGrowth
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/15415#discussion_r101698275 --- Diff: mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala --- @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.fpm + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.apache.hadoop.fs.Path + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol} +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.fpm.{AssociationRules => MLlibAssociationRules, +FPGrowth => MLlibFPGrowth} +import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset +import org.apache.spark.sql._ +import org.apache.spark.sql.types._ + +/** + * Common params for FPGrowth and FPGrowthModel + */ +private[fpm] trait FPGrowthParams extends Params with HasFeaturesCol with HasPredictionCol { + + /** + * Validates and transforms the input schema. + * @param schema input schema + * @return output schema + */ + protected def validateAndTransformSchema(schema: StructType): StructType = { +val inputType = schema($(featuresCol)).dataType +require(inputType.isInstanceOf[ArrayType], + s"The input column must be ArrayType, but got $inputType.") +SchemaUtils.appendColumn(schema, $(predictionCol), schema($(featuresCol)).dataType) + } + + /** + * Minimal support level of the frequent pattern. [0.0, 1.0]. Any pattern that appears + * more than (minSupport * size-of-the-dataset) times will be output + * Default: 0.3 + * @group param + */ + @Since("2.2.0") + val minSupport: DoubleParam = new DoubleParam(this, "minSupport", +"the minimal support level of the frequent pattern (Default: 0.3)", +ParamValidators.inRange(0.0, 1.0)) + setDefault(minSupport -> 0.3) + + /** @group getParam */ + @Since("2.2.0") + def getMinSupport: Double = $(minSupport) + + /** + * Number of partitions used by parallel FP-growth --- End diff -- Document what it means when this is not set and that this must be >= 1. Also say this is not set by default. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15415: [SPARK-14503][ML] spark.ml API for FPGrowth
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/15415#discussion_r101698338 --- Diff: mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala --- @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.fpm + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.apache.hadoop.fs.Path + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol} +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.fpm.{AssociationRules => MLlibAssociationRules, +FPGrowth => MLlibFPGrowth} +import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset +import org.apache.spark.sql._ +import org.apache.spark.sql.types._ + +/** + * Common params for FPGrowth and FPGrowthModel + */ +private[fpm] trait FPGrowthParams extends Params with HasFeaturesCol with HasPredictionCol { + + /** + * Validates and transforms the input schema. + * @param schema input schema + * @return output schema + */ + protected def validateAndTransformSchema(schema: StructType): StructType = { +val inputType = schema($(featuresCol)).dataType +require(inputType.isInstanceOf[ArrayType], + s"The input column must be ArrayType, but got $inputType.") +SchemaUtils.appendColumn(schema, $(predictionCol), schema($(featuresCol)).dataType) + } + + /** + * Minimal support level of the frequent pattern. [0.0, 1.0]. Any pattern that appears + * more than (minSupport * size-of-the-dataset) times will be output + * Default: 0.3 + * @group param + */ + @Since("2.2.0") + val minSupport: DoubleParam = new DoubleParam(this, "minSupport", +"the minimal support level of the frequent pattern (Default: 0.3)", +ParamValidators.inRange(0.0, 1.0)) + setDefault(minSupport -> 0.3) + + /** @group getParam */ + @Since("2.2.0") + def getMinSupport: Double = $(minSupport) + + /** + * Number of partitions used by parallel FP-growth + * @group expertParam + */ + @Since("2.2.0") + val numPartitions: IntParam = new IntParam(this, "numPartitions", +"Number of partitions used by parallel FP-growth", ParamValidators.gtEq[Int](1)) + + /** @group expertGetParam */ + @Since("2.2.0") + def getNumPartitions: Int = $(numPartitions) + + /** + * minimal confidence for generating Association Rule + * Default: 0.8 + * @group param + */ + @Since("2.2.0") + val minConfidence: DoubleParam = new DoubleParam(this, "minConfidence", +"minimal confidence for generating Association Rule (Default: 0.8)", +ParamValidators.inRange(0.0, 1.0)) + setDefault(minConfidence -> 0.8) + + /** @group getParam */ + @Since("2.2.0") + def getMinConfidence: Double = $(minConfidence) + +} + +/** + * :: Experimental :: + * A parallel FP-growth algorithm to mine frequent itemsets. + * + * @see [[http://dx.doi.org/10.1145/1454008.1454027 Li et al., PFP: Parallel FP-Growth for Query + * Recommendation]] + */ +@Since("2.2.0") +@Experimental +class FPGrowth @Since("2.2.0") ( +@Since("2.2.0") override val uid: String) + extends Estimator[FPGrowthModel] with FPGrowthParams with DefaultParamsWritable { + + @Since("2.2.0") + def this() = this(Identifiable.randomUID("fpgrowth")) + + /** @group setParam */ + @Since("2.2.0") + def setMinSupport(value: Double): this.type = set(minSupport, value) + + /** @group expertSetParam */ + @Since("2.2.0") + def setNumPartitions(value: Int): this.type = set(numPartitions, value) + + /** @group setParam + * minConfidence has not effect during
[GitHub] spark pull request #15415: [SPARK-14503][ML] spark.ml API for FPGrowth
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/15415#discussion_r101698323 --- Diff: mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala --- @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.fpm + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.apache.hadoop.fs.Path + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol} +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.fpm.{AssociationRules => MLlibAssociationRules, +FPGrowth => MLlibFPGrowth} +import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset +import org.apache.spark.sql._ +import org.apache.spark.sql.types._ + +/** + * Common params for FPGrowth and FPGrowthModel + */ +private[fpm] trait FPGrowthParams extends Params with HasFeaturesCol with HasPredictionCol { + + /** + * Validates and transforms the input schema. + * @param schema input schema + * @return output schema + */ + protected def validateAndTransformSchema(schema: StructType): StructType = { +val inputType = schema($(featuresCol)).dataType +require(inputType.isInstanceOf[ArrayType], + s"The input column must be ArrayType, but got $inputType.") +SchemaUtils.appendColumn(schema, $(predictionCol), schema($(featuresCol)).dataType) + } + + /** + * Minimal support level of the frequent pattern. [0.0, 1.0]. Any pattern that appears + * more than (minSupport * size-of-the-dataset) times will be output + * Default: 0.3 + * @group param + */ + @Since("2.2.0") + val minSupport: DoubleParam = new DoubleParam(this, "minSupport", +"the minimal support level of the frequent pattern (Default: 0.3)", +ParamValidators.inRange(0.0, 1.0)) + setDefault(minSupport -> 0.3) + + /** @group getParam */ + @Since("2.2.0") + def getMinSupport: Double = $(minSupport) + + /** + * Number of partitions used by parallel FP-growth + * @group expertParam + */ + @Since("2.2.0") + val numPartitions: IntParam = new IntParam(this, "numPartitions", +"Number of partitions used by parallel FP-growth", ParamValidators.gtEq[Int](1)) + + /** @group expertGetParam */ + @Since("2.2.0") + def getNumPartitions: Int = $(numPartitions) + + /** + * minimal confidence for generating Association Rule + * Default: 0.8 + * @group param + */ + @Since("2.2.0") + val minConfidence: DoubleParam = new DoubleParam(this, "minConfidence", +"minimal confidence for generating Association Rule (Default: 0.8)", +ParamValidators.inRange(0.0, 1.0)) + setDefault(minConfidence -> 0.8) + + /** @group getParam */ + @Since("2.2.0") + def getMinConfidence: Double = $(minConfidence) + +} + +/** + * :: Experimental :: + * A parallel FP-growth algorithm to mine frequent itemsets. + * + * @see [[http://dx.doi.org/10.1145/1454008.1454027 Li et al., PFP: Parallel FP-Growth for Query + * Recommendation]] + */ +@Since("2.2.0") +@Experimental +class FPGrowth @Since("2.2.0") ( +@Since("2.2.0") override val uid: String) + extends Estimator[FPGrowthModel] with FPGrowthParams with DefaultParamsWritable { + + @Since("2.2.0") + def this() = this(Identifiable.randomUID("fpgrowth")) + + /** @group setParam */ + @Since("2.2.0") + def setMinSupport(value: Double): this.type = set(minSupport, value) + + /** @group expertSetParam */ + @Since("2.2.0") + def setNumPartitions(value: Int): this.type = set(numPartitions, value) + + /** @group setParam + * minConfidence has not effect during
[GitHub] spark pull request #15415: [SPARK-14503][ML] spark.ml API for FPGrowth
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/15415#discussion_r101698331 --- Diff: mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala --- @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.fpm + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.apache.hadoop.fs.Path + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol} +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.fpm.{AssociationRules => MLlibAssociationRules, +FPGrowth => MLlibFPGrowth} +import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset +import org.apache.spark.sql._ +import org.apache.spark.sql.types._ + +/** + * Common params for FPGrowth and FPGrowthModel + */ +private[fpm] trait FPGrowthParams extends Params with HasFeaturesCol with HasPredictionCol { + + /** + * Validates and transforms the input schema. + * @param schema input schema + * @return output schema + */ + protected def validateAndTransformSchema(schema: StructType): StructType = { +val inputType = schema($(featuresCol)).dataType +require(inputType.isInstanceOf[ArrayType], + s"The input column must be ArrayType, but got $inputType.") +SchemaUtils.appendColumn(schema, $(predictionCol), schema($(featuresCol)).dataType) + } + + /** + * Minimal support level of the frequent pattern. [0.0, 1.0]. Any pattern that appears + * more than (minSupport * size-of-the-dataset) times will be output + * Default: 0.3 + * @group param + */ + @Since("2.2.0") + val minSupport: DoubleParam = new DoubleParam(this, "minSupport", +"the minimal support level of the frequent pattern (Default: 0.3)", +ParamValidators.inRange(0.0, 1.0)) + setDefault(minSupport -> 0.3) + + /** @group getParam */ + @Since("2.2.0") + def getMinSupport: Double = $(minSupport) + + /** + * Number of partitions used by parallel FP-growth + * @group expertParam + */ + @Since("2.2.0") + val numPartitions: IntParam = new IntParam(this, "numPartitions", +"Number of partitions used by parallel FP-growth", ParamValidators.gtEq[Int](1)) + + /** @group expertGetParam */ + @Since("2.2.0") + def getNumPartitions: Int = $(numPartitions) + + /** + * minimal confidence for generating Association Rule + * Default: 0.8 + * @group param + */ + @Since("2.2.0") + val minConfidence: DoubleParam = new DoubleParam(this, "minConfidence", +"minimal confidence for generating Association Rule (Default: 0.8)", +ParamValidators.inRange(0.0, 1.0)) + setDefault(minConfidence -> 0.8) + + /** @group getParam */ + @Since("2.2.0") + def getMinConfidence: Double = $(minConfidence) + +} + +/** + * :: Experimental :: + * A parallel FP-growth algorithm to mine frequent itemsets. + * + * @see [[http://dx.doi.org/10.1145/1454008.1454027 Li et al., PFP: Parallel FP-Growth for Query + * Recommendation]] + */ +@Since("2.2.0") +@Experimental +class FPGrowth @Since("2.2.0") ( +@Since("2.2.0") override val uid: String) + extends Estimator[FPGrowthModel] with FPGrowthParams with DefaultParamsWritable { + + @Since("2.2.0") + def this() = this(Identifiable.randomUID("fpgrowth")) + + /** @group setParam */ + @Since("2.2.0") + def setMinSupport(value: Double): this.type = set(minSupport, value) + + /** @group expertSetParam */ + @Since("2.2.0") + def setNumPartitions(value: Int): this.type = set(numPartitions, value) + + /** @group setParam + * minConfidence has not effect during
[GitHub] spark pull request #15415: [SPARK-14503][ML] spark.ml API for FPGrowth
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/15415#discussion_r101698335 --- Diff: mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala --- @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.fpm + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.apache.hadoop.fs.Path + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol} +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.fpm.{AssociationRules => MLlibAssociationRules, +FPGrowth => MLlibFPGrowth} +import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset +import org.apache.spark.sql._ +import org.apache.spark.sql.types._ + +/** + * Common params for FPGrowth and FPGrowthModel + */ +private[fpm] trait FPGrowthParams extends Params with HasFeaturesCol with HasPredictionCol { + + /** + * Validates and transforms the input schema. + * @param schema input schema + * @return output schema + */ + protected def validateAndTransformSchema(schema: StructType): StructType = { +val inputType = schema($(featuresCol)).dataType +require(inputType.isInstanceOf[ArrayType], + s"The input column must be ArrayType, but got $inputType.") +SchemaUtils.appendColumn(schema, $(predictionCol), schema($(featuresCol)).dataType) + } + + /** + * Minimal support level of the frequent pattern. [0.0, 1.0]. Any pattern that appears + * more than (minSupport * size-of-the-dataset) times will be output + * Default: 0.3 + * @group param + */ + @Since("2.2.0") + val minSupport: DoubleParam = new DoubleParam(this, "minSupport", +"the minimal support level of the frequent pattern (Default: 0.3)", +ParamValidators.inRange(0.0, 1.0)) + setDefault(minSupport -> 0.3) + + /** @group getParam */ + @Since("2.2.0") + def getMinSupport: Double = $(minSupport) + + /** + * Number of partitions used by parallel FP-growth + * @group expertParam + */ + @Since("2.2.0") + val numPartitions: IntParam = new IntParam(this, "numPartitions", +"Number of partitions used by parallel FP-growth", ParamValidators.gtEq[Int](1)) + + /** @group expertGetParam */ + @Since("2.2.0") + def getNumPartitions: Int = $(numPartitions) + + /** + * minimal confidence for generating Association Rule + * Default: 0.8 + * @group param + */ + @Since("2.2.0") + val minConfidence: DoubleParam = new DoubleParam(this, "minConfidence", +"minimal confidence for generating Association Rule (Default: 0.8)", +ParamValidators.inRange(0.0, 1.0)) + setDefault(minConfidence -> 0.8) + + /** @group getParam */ + @Since("2.2.0") + def getMinConfidence: Double = $(minConfidence) + +} + +/** + * :: Experimental :: + * A parallel FP-growth algorithm to mine frequent itemsets. + * + * @see [[http://dx.doi.org/10.1145/1454008.1454027 Li et al., PFP: Parallel FP-Growth for Query + * Recommendation]] + */ +@Since("2.2.0") +@Experimental +class FPGrowth @Since("2.2.0") ( +@Since("2.2.0") override val uid: String) + extends Estimator[FPGrowthModel] with FPGrowthParams with DefaultParamsWritable { + + @Since("2.2.0") + def this() = this(Identifiable.randomUID("fpgrowth")) + + /** @group setParam */ + @Since("2.2.0") + def setMinSupport(value: Double): this.type = set(minSupport, value) + + /** @group expertSetParam */ + @Since("2.2.0") + def setNumPartitions(value: Int): this.type = set(numPartitions, value) + + /** @group setParam + * minConfidence has not effect during
[GitHub] spark pull request #15415: [SPARK-14503][ML] spark.ml API for FPGrowth
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/15415#discussion_r101698340 --- Diff: mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala --- @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.fpm + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.apache.hadoop.fs.Path + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol} +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.fpm.{AssociationRules => MLlibAssociationRules, +FPGrowth => MLlibFPGrowth} +import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset +import org.apache.spark.sql._ +import org.apache.spark.sql.types._ + +/** + * Common params for FPGrowth and FPGrowthModel + */ +private[fpm] trait FPGrowthParams extends Params with HasFeaturesCol with HasPredictionCol { + + /** + * Validates and transforms the input schema. + * @param schema input schema + * @return output schema + */ + protected def validateAndTransformSchema(schema: StructType): StructType = { +val inputType = schema($(featuresCol)).dataType +require(inputType.isInstanceOf[ArrayType], + s"The input column must be ArrayType, but got $inputType.") +SchemaUtils.appendColumn(schema, $(predictionCol), schema($(featuresCol)).dataType) + } + + /** + * Minimal support level of the frequent pattern. [0.0, 1.0]. Any pattern that appears + * more than (minSupport * size-of-the-dataset) times will be output + * Default: 0.3 + * @group param + */ + @Since("2.2.0") + val minSupport: DoubleParam = new DoubleParam(this, "minSupport", +"the minimal support level of the frequent pattern (Default: 0.3)", +ParamValidators.inRange(0.0, 1.0)) + setDefault(minSupport -> 0.3) + + /** @group getParam */ + @Since("2.2.0") + def getMinSupport: Double = $(minSupport) + + /** + * Number of partitions used by parallel FP-growth + * @group expertParam + */ + @Since("2.2.0") + val numPartitions: IntParam = new IntParam(this, "numPartitions", +"Number of partitions used by parallel FP-growth", ParamValidators.gtEq[Int](1)) + + /** @group expertGetParam */ + @Since("2.2.0") + def getNumPartitions: Int = $(numPartitions) + + /** + * minimal confidence for generating Association Rule + * Default: 0.8 + * @group param + */ + @Since("2.2.0") + val minConfidence: DoubleParam = new DoubleParam(this, "minConfidence", +"minimal confidence for generating Association Rule (Default: 0.8)", +ParamValidators.inRange(0.0, 1.0)) + setDefault(minConfidence -> 0.8) + + /** @group getParam */ + @Since("2.2.0") + def getMinConfidence: Double = $(minConfidence) + +} + +/** + * :: Experimental :: + * A parallel FP-growth algorithm to mine frequent itemsets. + * + * @see [[http://dx.doi.org/10.1145/1454008.1454027 Li et al., PFP: Parallel FP-Growth for Query + * Recommendation]] + */ +@Since("2.2.0") +@Experimental +class FPGrowth @Since("2.2.0") ( +@Since("2.2.0") override val uid: String) + extends Estimator[FPGrowthModel] with FPGrowthParams with DefaultParamsWritable { + + @Since("2.2.0") + def this() = this(Identifiable.randomUID("fpgrowth")) + + /** @group setParam */ + @Since("2.2.0") + def setMinSupport(value: Double): this.type = set(minSupport, value) + + /** @group expertSetParam */ + @Since("2.2.0") + def setNumPartitions(value: Int): this.type = set(numPartitions, value) + + /** @group setParam + * minConfidence has not effect during
[GitHub] spark pull request #15415: [SPARK-14503][ML] spark.ml API for FPGrowth
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/15415#discussion_r101698315 --- Diff: mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala --- @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.fpm + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.apache.hadoop.fs.Path + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol} +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.fpm.{AssociationRules => MLlibAssociationRules, +FPGrowth => MLlibFPGrowth} +import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset +import org.apache.spark.sql._ +import org.apache.spark.sql.types._ + +/** + * Common params for FPGrowth and FPGrowthModel + */ +private[fpm] trait FPGrowthParams extends Params with HasFeaturesCol with HasPredictionCol { + + /** + * Validates and transforms the input schema. + * @param schema input schema + * @return output schema + */ + protected def validateAndTransformSchema(schema: StructType): StructType = { +val inputType = schema($(featuresCol)).dataType +require(inputType.isInstanceOf[ArrayType], + s"The input column must be ArrayType, but got $inputType.") +SchemaUtils.appendColumn(schema, $(predictionCol), schema($(featuresCol)).dataType) + } + + /** + * Minimal support level of the frequent pattern. [0.0, 1.0]. Any pattern that appears + * more than (minSupport * size-of-the-dataset) times will be output + * Default: 0.3 + * @group param + */ + @Since("2.2.0") + val minSupport: DoubleParam = new DoubleParam(this, "minSupport", +"the minimal support level of the frequent pattern (Default: 0.3)", +ParamValidators.inRange(0.0, 1.0)) + setDefault(minSupport -> 0.3) + + /** @group getParam */ + @Since("2.2.0") + def getMinSupport: Double = $(minSupport) + + /** + * Number of partitions used by parallel FP-growth + * @group expertParam + */ + @Since("2.2.0") + val numPartitions: IntParam = new IntParam(this, "numPartitions", +"Number of partitions used by parallel FP-growth", ParamValidators.gtEq[Int](1)) + + /** @group expertGetParam */ + @Since("2.2.0") + def getNumPartitions: Int = $(numPartitions) + + /** + * minimal confidence for generating Association Rule + * Default: 0.8 + * @group param + */ + @Since("2.2.0") + val minConfidence: DoubleParam = new DoubleParam(this, "minConfidence", +"minimal confidence for generating Association Rule (Default: 0.8)", +ParamValidators.inRange(0.0, 1.0)) + setDefault(minConfidence -> 0.8) + + /** @group getParam */ + @Since("2.2.0") + def getMinConfidence: Double = $(minConfidence) + +} + +/** + * :: Experimental :: + * A parallel FP-growth algorithm to mine frequent itemsets. + * + * @see [[http://dx.doi.org/10.1145/1454008.1454027 Li et al., PFP: Parallel FP-Growth for Query + * Recommendation]] + */ +@Since("2.2.0") +@Experimental +class FPGrowth @Since("2.2.0") ( +@Since("2.2.0") override val uid: String) + extends Estimator[FPGrowthModel] with FPGrowthParams with DefaultParamsWritable { + + @Since("2.2.0") + def this() = this(Identifiable.randomUID("fpgrowth")) + + /** @group setParam */ + @Since("2.2.0") + def setMinSupport(value: Double): this.type = set(minSupport, value) + + /** @group expertSetParam */ + @Since("2.2.0") + def setNumPartitions(value: Int): this.type = set(numPartitions, value) + + /** @group setParam + * minConfidence has not effect during
[GitHub] spark pull request #15415: [SPARK-14503][ML] spark.ml API for FPGrowth
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/15415#discussion_r101698326 --- Diff: mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala --- @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.fpm + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.apache.hadoop.fs.Path + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol} +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.fpm.{AssociationRules => MLlibAssociationRules, +FPGrowth => MLlibFPGrowth} +import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset +import org.apache.spark.sql._ +import org.apache.spark.sql.types._ + +/** + * Common params for FPGrowth and FPGrowthModel + */ +private[fpm] trait FPGrowthParams extends Params with HasFeaturesCol with HasPredictionCol { + + /** + * Validates and transforms the input schema. + * @param schema input schema + * @return output schema + */ + protected def validateAndTransformSchema(schema: StructType): StructType = { +val inputType = schema($(featuresCol)).dataType +require(inputType.isInstanceOf[ArrayType], + s"The input column must be ArrayType, but got $inputType.") +SchemaUtils.appendColumn(schema, $(predictionCol), schema($(featuresCol)).dataType) + } + + /** + * Minimal support level of the frequent pattern. [0.0, 1.0]. Any pattern that appears + * more than (minSupport * size-of-the-dataset) times will be output + * Default: 0.3 + * @group param + */ + @Since("2.2.0") + val minSupport: DoubleParam = new DoubleParam(this, "minSupport", +"the minimal support level of the frequent pattern (Default: 0.3)", +ParamValidators.inRange(0.0, 1.0)) + setDefault(minSupport -> 0.3) + + /** @group getParam */ + @Since("2.2.0") + def getMinSupport: Double = $(minSupport) + + /** + * Number of partitions used by parallel FP-growth + * @group expertParam + */ + @Since("2.2.0") + val numPartitions: IntParam = new IntParam(this, "numPartitions", +"Number of partitions used by parallel FP-growth", ParamValidators.gtEq[Int](1)) + + /** @group expertGetParam */ + @Since("2.2.0") + def getNumPartitions: Int = $(numPartitions) + + /** + * minimal confidence for generating Association Rule + * Default: 0.8 + * @group param + */ + @Since("2.2.0") + val minConfidence: DoubleParam = new DoubleParam(this, "minConfidence", +"minimal confidence for generating Association Rule (Default: 0.8)", +ParamValidators.inRange(0.0, 1.0)) + setDefault(minConfidence -> 0.8) + + /** @group getParam */ + @Since("2.2.0") + def getMinConfidence: Double = $(minConfidence) + +} + +/** + * :: Experimental :: + * A parallel FP-growth algorithm to mine frequent itemsets. + * + * @see [[http://dx.doi.org/10.1145/1454008.1454027 Li et al., PFP: Parallel FP-Growth for Query + * Recommendation]] + */ +@Since("2.2.0") +@Experimental +class FPGrowth @Since("2.2.0") ( +@Since("2.2.0") override val uid: String) + extends Estimator[FPGrowthModel] with FPGrowthParams with DefaultParamsWritable { + + @Since("2.2.0") + def this() = this(Identifiable.randomUID("fpgrowth")) + + /** @group setParam */ + @Since("2.2.0") + def setMinSupport(value: Double): this.type = set(minSupport, value) + + /** @group expertSetParam */ + @Since("2.2.0") + def setNumPartitions(value: Int): this.type = set(numPartitions, value) + + /** @group setParam + * minConfidence has not effect during
[GitHub] spark pull request #15415: [SPARK-14503][ML] spark.ml API for FPGrowth
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/15415#discussion_r101698328 --- Diff: mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala --- @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.fpm + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.apache.hadoop.fs.Path + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol} +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.fpm.{AssociationRules => MLlibAssociationRules, +FPGrowth => MLlibFPGrowth} +import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset +import org.apache.spark.sql._ +import org.apache.spark.sql.types._ + +/** + * Common params for FPGrowth and FPGrowthModel + */ +private[fpm] trait FPGrowthParams extends Params with HasFeaturesCol with HasPredictionCol { + + /** + * Validates and transforms the input schema. + * @param schema input schema + * @return output schema + */ + protected def validateAndTransformSchema(schema: StructType): StructType = { +val inputType = schema($(featuresCol)).dataType +require(inputType.isInstanceOf[ArrayType], + s"The input column must be ArrayType, but got $inputType.") +SchemaUtils.appendColumn(schema, $(predictionCol), schema($(featuresCol)).dataType) + } + + /** + * Minimal support level of the frequent pattern. [0.0, 1.0]. Any pattern that appears + * more than (minSupport * size-of-the-dataset) times will be output + * Default: 0.3 + * @group param + */ + @Since("2.2.0") + val minSupport: DoubleParam = new DoubleParam(this, "minSupport", +"the minimal support level of the frequent pattern (Default: 0.3)", +ParamValidators.inRange(0.0, 1.0)) + setDefault(minSupport -> 0.3) + + /** @group getParam */ + @Since("2.2.0") + def getMinSupport: Double = $(minSupport) + + /** + * Number of partitions used by parallel FP-growth + * @group expertParam + */ + @Since("2.2.0") + val numPartitions: IntParam = new IntParam(this, "numPartitions", +"Number of partitions used by parallel FP-growth", ParamValidators.gtEq[Int](1)) + + /** @group expertGetParam */ + @Since("2.2.0") + def getNumPartitions: Int = $(numPartitions) + + /** + * minimal confidence for generating Association Rule + * Default: 0.8 + * @group param + */ + @Since("2.2.0") + val minConfidence: DoubleParam = new DoubleParam(this, "minConfidence", +"minimal confidence for generating Association Rule (Default: 0.8)", +ParamValidators.inRange(0.0, 1.0)) + setDefault(minConfidence -> 0.8) + + /** @group getParam */ + @Since("2.2.0") + def getMinConfidence: Double = $(minConfidence) + +} + +/** + * :: Experimental :: + * A parallel FP-growth algorithm to mine frequent itemsets. + * + * @see [[http://dx.doi.org/10.1145/1454008.1454027 Li et al., PFP: Parallel FP-Growth for Query + * Recommendation]] + */ +@Since("2.2.0") +@Experimental +class FPGrowth @Since("2.2.0") ( +@Since("2.2.0") override val uid: String) + extends Estimator[FPGrowthModel] with FPGrowthParams with DefaultParamsWritable { + + @Since("2.2.0") + def this() = this(Identifiable.randomUID("fpgrowth")) + + /** @group setParam */ + @Since("2.2.0") + def setMinSupport(value: Double): this.type = set(minSupport, value) + + /** @group expertSetParam */ + @Since("2.2.0") + def setNumPartitions(value: Int): this.type = set(numPartitions, value) + + /** @group setParam + * minConfidence has not effect during
[GitHub] spark issue #16956: [SPARK-19598][SQL]Remove the alias parameter in Unresolv...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16956 **[Test build #73044 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73044/testReport)** for PR 16956 at commit [`d2864b6`](https://github.com/apache/spark/commit/d2864b691154364e800030a8538f385cc4fcb5e6). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16956: [SPARK-19598][SQL]Remove the alias parameter in U...
Github user windpiger commented on a diff in the pull request: https://github.com/apache/spark/pull/16956#discussion_r101698406 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -645,17 +645,21 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { * }}} */ override def visitTable(ctx: TableContext): LogicalPlan = withOrigin(ctx) { -UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier), None) +UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier)) } /** * Create an aliased table reference. This is typically used in FROM clauses. */ override def visitTableName(ctx: TableNameContext): LogicalPlan = withOrigin(ctx) { -val table = UnresolvedRelation( - visitTableIdentifier(ctx.tableIdentifier), - Option(ctx.strictIdentifier).map(_.getText)) -table.optionalMap(ctx.sample)(withSample) --- End diff -- yes, I can not modify the original default expression order between alias and and sample. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16611: [SPARK-17967][SPARK-17878][SQL][PYTHON] Support for arra...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16611 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73039/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16611: [SPARK-17967][SPARK-17878][SQL][PYTHON] Support for arra...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16611 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16611: [SPARK-17967][SPARK-17878][SQL][PYTHON] Support for arra...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16611 **[Test build #73039 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73039/testReport)** for PR 16611 at commit [`d7b202e`](https://github.com/apache/spark/commit/d7b202e92fd10ac2c9c6d97e19e70a4c1a725fb8). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16945: [SPARK-19616][SparkR]:weightCol and aggregationDepth sho...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16945 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73043/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16945: [SPARK-19616][SparkR]:weightCol and aggregationDepth sho...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16945 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16945: [SPARK-19616][SparkR]:weightCol and aggregationDepth sho...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16945 **[Test build #73043 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73043/testReport)** for PR 16945 at commit [`0e94467`](https://github.com/apache/spark/commit/0e94467866159360c830a9c1eb04af5239ba8fcc). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12524: [SPARK-12524][Core]DagScheduler may submit a task set fo...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/12524 @JoshRosen This is interesting : thanks for the details ! On the face of it, I think @markhamstra's comment about #16620 should apply - but given the additional details, it might possible to reproduce it consistently ? I am hoping we can create a repeatable test to trigger this : which should greatly speed up the debugging. The earlier case was not reproducible when I tried, but we have more info now ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/16909#discussion_r101693864 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.util.ConcurrentModificationException + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer +import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, UnsafeSorterIterator} + +/** + * An append-only array for [[UnsafeRow]]s that spills content to disk when there a predefined + * threshold of rows is reached. + * + * Setting spill threshold faces following trade-off: + * + * - If the spill threshold is too high, the in-memory array may occupy more memory than is + * available, resulting in OOM. + * - If the spill threshold is too low, we spill frequently and incur unnecessary disk writes. + * This may lead to a performance regression compared to the normal case of using an + * [[ArrayBuffer]] or [[Array]]. + */ +private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: Int) extends Logging { --- End diff -- Good point. My original guess before writing benchmark was `ArrayBuffer` would be superior to `ExternalUnsafeSorter` so having a impl which would initially behave like `ArrayBuffer` but later switch to `ExternalUnsafeSorter` was what I went with. I won't make this call solely based on the micro benchmark results as it might not reflect what acutally happens when a query runs because there are other operations that happen while the buffer is populated and accessed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16969: [SPARK-19639][SPARKR][Example]:Add spark.svmLinear examp...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16969 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16969: [SPARK-19639][SPARKR][Example]:Add spark.svmLinear examp...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16969 **[Test build #73040 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73040/testReport)** for PR 16969 at commit [`959b0a1`](https://github.com/apache/spark/commit/959b0a11892dd54035783595109366199486d544). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16945: [SPARK-19616][SparkR]:weightCol and aggregationDepth sho...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16945 **[Test build #73043 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73043/testReport)** for PR 16945 at commit [`0e94467`](https://github.com/apache/spark/commit/0e94467866159360c830a9c1eb04af5239ba8fcc). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16611: [SPARK-17967][SPARK-17878][SQL][PYTHON] Support for arra...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/16611 Per 2f78cc7, I ran a build with Scala 2.10 as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16611: [SPARK-17967][SPARK-17878][SQL][PYTHON] Support for arra...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16611 **[Test build #73042 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73042/testReport)** for PR 16611 at commit [`2f78cc7`](https://github.com/apache/spark/commit/2f78cc7fcbeab8a899475cc6c2ce4176641e9f1b). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16945: [SPARK-19616][SparkR]:weightCol and aggregationDepth sho...
Github user wangmiao1981 commented on the issue: https://github.com/apache/spark/pull/16945 I add a test of weightCol for spark.logit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16611: [SPARK-17967][SPARK-17878][SQL][PYTHON] Support for arra...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16611 **[Test build #73041 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73041/testReport)** for PR 16611 at commit [`60c7e25`](https://github.com/apache/spark/commit/60c7e25947d62800abcd9e96d60b859a82ff0a0d). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16972: [SPARK-19556][CORE][WIP] Broadcast data is not encrypted...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16972 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73036/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16972: [SPARK-19556][CORE][WIP] Broadcast data is not encrypted...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16972 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16972: [SPARK-19556][CORE][WIP] Broadcast data is not encrypted...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16972 **[Test build #73036 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73036/testReport)** for PR 16972 at commit [`f9a91d6`](https://github.com/apache/spark/commit/f9a91d63af3191b853ef88bd48293bcc19f3ec4c). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16476: [SPARK-19084][SQL] Implement expression field
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/16476#discussion_r101690256 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala --- @@ -340,3 +341,91 @@ object CaseKeyWhen { CaseWhen(cases, elseValue) } } + +/** + * A function that returns the index of str in (str1, str2, ...) list or 0 if not found. + * It takes at least 2 parameters, and all parameters' types should be subtypes of AtomicType. + */ +@ExpressionDescription( + usage = "_FUNC_(str, str1, str2, ...) - Returns the index of str in the str1,str2,... or 0 if not found.", + extended = """ +Examples: + > SELECT _FUNC_(10, 9, 3, 10, 4); + 3 + """) +case class Field(children: Seq[Expression]) extends Expression { + + override def nullable: Boolean = false + override def foldable: Boolean = children.forall(_.foldable) + + private lazy val ordering = TypeUtils.getInterpretedOrdering(children(0).dataType) + + override def checkInputDataTypes(): TypeCheckResult = { +if (children.length <= 1) { + TypeCheckResult.TypeCheckFailure(s"FIELD requires at least 2 arguments") +} else if (!children.forall(_.dataType.isInstanceOf[AtomicType])) { + TypeCheckResult.TypeCheckFailure(s"FIELD requires all arguments to be of AtomicType") +} else + TypeCheckResult.TypeCheckSuccess + } + + override def dataType: DataType = IntegerType + + override def eval(input: InternalRow): Any = { +val target = children.head.eval(input) +val targetDataType = children.head.dataType +def findEqual(target: Any, params: Seq[Expression], index: Int): Int = { + params.toList match { +case Nil => 0 +case head::tail if targetDataType == head.dataType + && head.eval(input) != null && ordering.equiv(target, head.eval(input)) => index +case _ => findEqual(target, params.tail, index + 1) + } +} +if(target == null) + 0 +else + findEqual(target, children.tail, 1) + } + + protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val evalChildren = children.map(_.genCode(ctx)) +val target = evalChildren(0) +val targetDataType = children(0).dataType +val rest = evalChildren.drop(1) +val restDataType = children.drop(1).map(_.dataType) + +def updateEval(evalWithIndex: ((ExprCode, DataType), Int)): String = { + val ((eval, dataType), index) = evalWithIndex + s""" +${eval.code} +if (${dataType.equals(targetDataType)} + && ${ctx.genEqual(targetDataType, eval.value, target.value)}) { + ${ev.value} = ${index}; +} + """ +} + +def genIfElseStructure(code1: String, code2: String): String = { --- End diff -- I think whats already present in the code is ok. Given that there is no better option without adding more complexity, lets stick with it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16969: [SPARK-19639][SPARKR][Example]:Add spark.svmLinear examp...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16969 **[Test build #73040 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73040/testReport)** for PR 16969 at commit [`959b0a1`](https://github.com/apache/spark/commit/959b0a11892dd54035783595109366199486d544). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15770: [SPARK-15784][ML]:Add Power Iteration Clustering to spar...
Github user wangmiao1981 commented on the issue: https://github.com/apache/spark/pull/15770 @thunterdb Thanks for your review! I will address the comments soon. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16951: [SPARK-18285][SPARKR] SparkR approxQuantile supports inp...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16951 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16951: [SPARK-18285][SPARKR] SparkR approxQuantile supports inp...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16951 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73038/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16951: [SPARK-18285][SPARKR] SparkR approxQuantile supports inp...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16951 **[Test build #73038 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73038/testReport)** for PR 16951 at commit [`f9e5b5d`](https://github.com/apache/spark/commit/f9e5b5db5416528e4bd4dee14d761144647bdab6). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16611: [SPARK-17967][SPARK-17878][SQL][PYTHON] Support for arra...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16611 **[Test build #73039 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73039/testReport)** for PR 16611 at commit [`d7b202e`](https://github.com/apache/spark/commit/d7b202e92fd10ac2c9c6d97e19e70a4c1a725fb8). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16476: [SPARK-19084][SQL] Implement expression field
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16476#discussion_r101688665 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala --- @@ -340,3 +341,91 @@ object CaseKeyWhen { CaseWhen(cases, elseValue) } } + +/** + * A function that returns the index of str in (str1, str2, ...) list or 0 if not found. + * It takes at least 2 parameters, and all parameters' types should be subtypes of AtomicType. + */ +@ExpressionDescription( + usage = "_FUNC_(str, str1, str2, ...) - Returns the index of str in the str1,str2,... or 0 if not found.", + extended = """ +Examples: + > SELECT _FUNC_(10, 9, 3, 10, 4); + 3 + """) +case class Field(children: Seq[Expression]) extends Expression { + + override def nullable: Boolean = false + override def foldable: Boolean = children.forall(_.foldable) + + private lazy val ordering = TypeUtils.getInterpretedOrdering(children(0).dataType) + + override def checkInputDataTypes(): TypeCheckResult = { +if (children.length <= 1) { + TypeCheckResult.TypeCheckFailure(s"FIELD requires at least 2 arguments") +} else if (!children.forall(_.dataType.isInstanceOf[AtomicType])) { + TypeCheckResult.TypeCheckFailure(s"FIELD requires all arguments to be of AtomicType") +} else + TypeCheckResult.TypeCheckSuccess + } + + override def dataType: DataType = IntegerType + + override def eval(input: InternalRow): Any = { +val target = children.head.eval(input) +val targetDataType = children.head.dataType +def findEqual(target: Any, params: Seq[Expression], index: Int): Int = { + params.toList match { +case Nil => 0 +case head::tail if targetDataType == head.dataType + && head.eval(input) != null && ordering.equiv(target, head.eval(input)) => index +case _ => findEqual(target, params.tail, index + 1) + } +} +if(target == null) + 0 +else + findEqual(target, children.tail, 1) + } + + protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val evalChildren = children.map(_.genCode(ctx)) +val target = evalChildren(0) +val targetDataType = children(0).dataType +val rest = evalChildren.drop(1) +val restDataType = children.drop(1).map(_.dataType) + +def updateEval(evalWithIndex: ((ExprCode, DataType), Int)): String = { + val ((eval, dataType), index) = evalWithIndex + s""" +${eval.code} +if (${dataType.equals(targetDataType)} + && ${ctx.genEqual(targetDataType, eval.value, target.value)}) { + ${ev.value} = ${index}; +} + """ +} + +def genIfElseStructure(code1: String, code2: String): String = { --- End diff -- I think it looks like: ${evalChildren.zip(dataTypes).zipWithIndex.tail.filter { x => dataTypeMatchIndex.contains(x._2) }.foldRight("") { (code: String, evalWithIndex: ((ExprCode, DataType), Int)) => val ((eval, _), index) = evalWithIndex val condition = ctx.genEqual(targetDataType, eval.value, target.value) s""" ${eval.code} if ($condition) { ${ev.value} = ${index}; } else { $code } """ } You can do this with a function like you did before. It will have a empty "else" block at the end. However this doesn't affect the functionality, just dealing with how the code looks. I don't have strong option about this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16785: [SPARK-19443][SQL] The function to generate constraints ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16785 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16785: [SPARK-19443][SQL] The function to generate constraints ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16785 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73035/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16785: [SPARK-19443][SQL] The function to generate constraints ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16785 **[Test build #73035 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73035/testReport)** for PR 16785 at commit [`278c31c`](https://github.com/apache/spark/commit/278c31cf8aa27c71e0f5178bebcb426ec5fba6ce). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16785: [SPARK-19443][SQL] The function to generate constraints ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16785 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73034/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16785: [SPARK-19443][SQL] The function to generate constraints ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16785 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16785: [SPARK-19443][SQL] The function to generate constraints ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16785 **[Test build #73034 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73034/testReport)** for PR 16785 at commit [`4ba93fe`](https://github.com/apache/spark/commit/4ba93fecfcbdeebecc9526a90b5800c98b3f35a7). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16971: [SPARK-19573][SQL] Make NaN/null handling consistent in ...
Github user zhengruifeng commented on the issue: https://github.com/apache/spark/pull/16971 @gatorsmile @jkbradley --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16476: [SPARK-19084][SQL] Implement expression field
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16476#discussion_r101688198 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala --- @@ -340,3 +343,105 @@ object CaseKeyWhen { CaseWhen(cases, elseValue) } } + +/** + * A function that returns the index of expr in (expr1, expr2, ...) list or 0 if not found. + * It takes at least 2 parameters, and all parameters should be subtype of AtomicType or NullType. + * It's also acceptable to give parameters of different types. When the parameters have different + * types, comparing will be done based on type firstly. For example, ''999'' 's type is StringType, + * while 999's type is IntegerType, so that no further comparison need to be done since they have + * different types. + * If the search expression is NULL, the return value is 0 because NULL fails equality comparison + * with any value. + * To also point out, no implicit cast will be done in this expression. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(expr, expr1, expr2, ...) - Returns the index of expr in the expr1, expr2, ... or 0 if not found.", + extended = """ +Examples: + > SELECT _FUNC_(10, 9, 3, 10, 4); + 3 + > SELECT _FUNC_('a', 'b', 'c', 'd', 'a'); + 4 + > SELECT _FUNC_('999', 'a', 999, 9.99, '999'); + 4 + """) +// scalastyle:on line.size.limit +case class Field(children: Seq[Expression]) extends Expression { + + /** Even if expr is not found in (expr1, expr2, ...) list, the value will be 0, not null */ + override def nullable: Boolean = false + override def foldable: Boolean = children.forall(_.foldable) + + private lazy val ordering = TypeUtils.getInterpretedOrdering(children(0).dataType) + + private val dataTypeMatchIndex: Array[Int] = children.zipWithIndex.tail.filter( +_._1.dataType.sameType(children.head.dataType)).map(_._2).toArray + + override def checkInputDataTypes(): TypeCheckResult = { +if (children.length <= 1) { + TypeCheckResult.TypeCheckFailure(s"FIELD requires at least 2 arguments") +} else if (!children.forall( +e => e.dataType.isInstanceOf[AtomicType] || e.dataType.isInstanceOf[NullType])) { + TypeCheckResult.TypeCheckFailure( +s"FIELD requires all arguments to be of AtomicType or explicitly indicating NULL") +} else { + TypeCheckResult.TypeCheckSuccess +} + } + + override def dataType: DataType = IntegerType + override def eval(input: InternalRow): Any = { +val target = children.head.eval(input) +@tailrec def findEqual(index: Int): Int = { + if (index == dataTypeMatchIndex.length) { +0 + } else { +val value = children(dataTypeMatchIndex(index)).eval(input) +if (value != null && ordering.equiv(target, value)) { + dataTypeMatchIndex(index) +} else { + findEqual(index + 1) +} + } +} +if (target == null) 0 else findEqual(index = 0) + } + + protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val evalChildren = children.map(_.genCode(ctx)) +val target = evalChildren(0) +val targetDataType = children(0).dataType +val dataTypes = children.map(_.dataType) + +def updateEval(evalWithIndex: ((ExprCode, DataType), Int)): String = { + val ((eval, _), index) = evalWithIndex + s""" +${eval.code} +if (${ctx.genEqual(targetDataType, eval.value, target.value)}) { + ${ev.value} = ${index}; +} + """ +} + +def genIfElseStructure(code1: String, code2: String): String = { + s""" + ${code1} + else { --- End diff -- This is the floating `else`. As @tejasapatil said, looks weird. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16971: [SPARK-19573][SQL] Make NaN/null handling consistent in ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16971 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73033/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16971: [SPARK-19573][SQL] Make NaN/null handling consistent in ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16971 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16971: [SPARK-19573][SQL] Make NaN/null handling consistent in ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16971 **[Test build #73033 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73033/testReport)** for PR 16971 at commit [`d5e79a8`](https://github.com/apache/spark/commit/d5e79a809b1edd91a7e0c1d8046bb8bfec2ba4c9). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16952: [SPARK-19620][SQL]Fix incorrect exchange coordinator id ...
Github user carsonwang commented on the issue: https://github.com/apache/spark/pull/16952 cc @yhuai --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16951: [SPARK-18285][SPARKR] SparkR approxQuantile supports inp...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16951 **[Test build #73038 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73038/testReport)** for PR 16951 at commit [`f9e5b5d`](https://github.com/apache/spark/commit/f9e5b5db5416528e4bd4dee14d761144647bdab6). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16962: [SPARK-18120][SPARK-19557][SQL] Call QueryExecuti...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/16962 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15125: [SPARK-5484][GraphX] Periodically do checkpoint i...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15125#discussion_r101687644 --- Diff: project/MimaExcludes.scala --- @@ -943,7 +948,7 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.classification.RandomForestClassificationModel.numTrees"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.classification.RandomForestClassificationModel.setFeatureSubsetStrategy"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.RandomForestRegressionModel.numTrees"), - ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.RandomForestRegressionModel.setFeatureSubsetStrategy") + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.RandomForestRegressionModel.setFeatureSubsetStrategy") --- End diff -- huh, why scala style doesn't complain about this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16664: [SPARK-18120 ][SQL] Call QueryExecutionListener c...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/16664 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16962: [SPARK-18120][SPARK-19557][SQL] Call QueryExecutionListe...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/16962 thanks for the review, merging to master! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16961: [SPARK-19534][EXAMPLES] Convert Java tests to use lambda...
Github user felixcheung commented on the issue: https://github.com/apache/spark/pull/16961 whoa, nice/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16949: [SPARK-16122][CORE] Add rest api for job environment
Github user ajbozarth commented on the issue: https://github.com/apache/spark/pull/16949 Thanks @uncleGen and after seeing his code I agree with @vanzin --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15125: [SPARK-5484][GraphX] Periodically do checkpoint i...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/15125#discussion_r101687206 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/util/PeriodicGraphCheckpointer.scala --- @@ -87,10 +87,10 @@ private[mllib] class PeriodicGraphCheckpointer[VD, ED]( override protected def persist(data: Graph[VD, ED]): Unit = { if (data.vertices.getStorageLevel == StorageLevel.NONE) { - data.vertices.persist() --- End diff -- isn't persist better? this could potentially support different storage level later --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15125: [SPARK-5484][GraphX] Periodically do checkpoint i...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/15125#discussion_r101687030 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala --- @@ -362,12 +362,14 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali def pregel[A: ClassTag]( initialMsg: A, maxIterations: Int = Int.MaxValue, + checkpointInterval: Int = 25, --- End diff -- also, why is this `25`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15125: [SPARK-5484][GraphX] Periodically do checkpoint i...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/15125#discussion_r101687075 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala --- @@ -122,27 +126,37 @@ object Pregel extends Logging { require(maxIterations > 0, s"Maximum number of iterations must be greater than 0," + s" but got ${maxIterations}") -var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache() +var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)) +val graphCheckpointer = new PeriodicGraphCheckpointer[VD, ED]( + checkpointInterval, graph.vertices.sparkContext) +graphCheckpointer.update(g) + // compute the messages var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg) +val messageCheckpointer = new PeriodicRDDCheckpointer[(VertexId, A)]( + checkpointInterval, graph.vertices.sparkContext) +messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]]) var activeMessages = messages.count() + // Loop var prevG: Graph[VD, ED] = null var i = 0 while (activeMessages > 0 && i < maxIterations) { // Receive the messages and update the vertices. prevG = g - g = g.joinVertices(messages)(vprog).cache() + g = g.joinVertices(messages)(vprog) + graphCheckpointer.update(g) val oldMessages = messages // Send new messages, skipping edges where neither side received a message. We must cache // messages so it can be materialized on the next line, allowing us to uncache the previous // iteration. messages = GraphXUtils.mapReduceTriplets( -g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache() +g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))) // The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages --- End diff -- should the comment here be updated? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15125: [SPARK-5484][GraphX] Periodically do checkpoint i...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/15125#discussion_r101687037 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala --- @@ -113,7 +116,8 @@ object Pregel extends Logging { (graph: Graph[VD, ED], initialMsg: A, maxIterations: Int = Int.MaxValue, - activeDirection: EdgeDirection = EdgeDirection.Either) + activeDirection: EdgeDirection = EdgeDirection.Either, + checkpointInterval: Int = 25) --- End diff -- ditto, why `25`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15125: [SPARK-5484][GraphX] Periodically do checkpoint i...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/15125#discussion_r101686985 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala --- @@ -362,12 +362,14 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali def pregel[A: ClassTag]( initialMsg: A, maxIterations: Int = Int.MaxValue, + checkpointInterval: Int = 25, --- End diff -- isn't it a breaking change to add a param into the middle of the list? also, this should be documented above, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15125: [SPARK-5484][GraphX] Periodically do checkpoint i...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/15125#discussion_r101687170 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala --- @@ -155,6 +169,8 @@ object Pregel extends Logging { i += 1 } messages.unpersist(blocking = false) +graphCheckpointer.deleteAllCheckpoints() +messageCheckpointer.deleteAllCheckpoints() --- End diff -- shouldn't this be inside a finally clause to make sure checkpoint data is cleaned up even in error cases? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15125: [SPARK-5484][GraphX] Periodically do checkpoint i...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/15125#discussion_r101687092 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala --- @@ -155,6 +169,8 @@ object Pregel extends Logging { i += 1 } messages.unpersist(blocking = false) --- End diff -- yes --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15125: [SPARK-5484][GraphX] Periodically do checkpoint i...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/15125#discussion_r101687268 --- Diff: project/MimaExcludes.scala --- @@ -943,7 +948,7 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.classification.RandomForestClassificationModel.numTrees"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.classification.RandomForestClassificationModel.setFeatureSubsetStrategy"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.RandomForestRegressionModel.numTrees"), - ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.RandomForestRegressionModel.setFeatureSubsetStrategy") + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.RandomForestRegressionModel.setFeatureSubsetStrategy") --- End diff -- remove trailing whitespace --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/16386 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/16386 thanks, merging to master! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r101683820 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1802,4 +1806,142 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write --- End diff -- shall we call `.coalesce(1)` to make sure we only write to a singe file? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r101683991 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1802,4 +1806,142 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.option("compression", "GzIp") +.text(path) + + assert(new File(path).listFiles().exists(_.getName.endsWith(".gz"))) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.option("compression", "gZiP") +.json(jsonDir) + + assert(new File(jsonDir).listFiles().exists(_.getName.endsWith(".json.gz"))) + + val originalData = spark.read.json(primitiveFieldAndType) + checkAnswer(jsonDF, originalData) + checkAnswer(spark.read.schema(originalData.schema).json(jsonDir), originalData) +} + } + + test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write.json(jsonDir) + + val compressedFiles = new File(jsonDir).listFiles() + assert(compressedFiles.exists(_.getName.endsWith(".json"))) + + val originalData = spark.read.json(primitiveFieldAndType) + checkAnswer(jsonDF, originalData) + checkAnswer(spark.read.schema(originalData.schema).json(jsonDir), originalData) +} + } + + test("SPARK-18352: Expect one JSON document per file") { +// the json parser terminates as soon as it sees a matching END_OBJECT or END_ARRAY token. +// this might not be the optimal behavior but this test verifies that only the first value +// is parsed and the rest are discarded. + +// alternatively the parser could continue parsing following objects, which may further reduce +// allocations by skipping the line reader entirely + +withTempPath { dir => + val path = dir.getCanonicalPath + spark +.createDataFrame(Seq(Tuple1("{}{invalid}"))) --- End diff -- nit: `Seq(Tuple1("{}{invalid}")).toDF("value")` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r101684712 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala --- @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.json + +import java.io.InputStream + +import scala.reflect.ClassTag + +import com.fasterxml.jackson.core.{JsonFactory, JsonParser} +import com.google.common.io.ByteStreams +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.{LongWritable, Text} +import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, TextInputFormat} + +import org.apache.spark.TaskContext +import org.apache.spark.input.{PortableDataStream, StreamInputFormat} +import org.apache.spark.rdd.{BinaryFileRDD, RDD} +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions} +import org.apache.spark.sql.execution.datasources.{CodecStreams, HadoopFileLinesReader, PartitionedFile} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.Utils + +/** + * Common functions for parsing JSON files + * @tparam T A datatype containing the unparsed JSON, such as [[Text]] or [[String]] + */ +abstract class JsonDataSource[T] extends Serializable { + def isSplitable: Boolean + + /** + * Parse a [[PartitionedFile]] into 0 or more [[InternalRow]] instances + */ + def readFile( +conf: Configuration, +file: PartitionedFile, +parser: JacksonParser): Iterator[InternalRow] + + /** + * Create an [[RDD]] that handles the preliminary parsing of [[T]] records + */ + protected def createBaseRdd( +sparkSession: SparkSession, +inputPaths: Seq[FileStatus]): RDD[T] + + /** + * A generic wrapper to invoke the correct [[JsonFactory]] method to allocate a [[JsonParser]] + * for an instance of [[T]] + */ + def createParser(jsonFactory: JsonFactory, value: T): JsonParser + + final def infer( + sparkSession: SparkSession, + inputPaths: Seq[FileStatus], + parsedOptions: JSONOptions): Option[StructType] = { +if (inputPaths.nonEmpty) { + val jsonSchema = JsonInferSchema.infer( +createBaseRdd(sparkSession, inputPaths), +parsedOptions, +createParser) + checkConstraints(jsonSchema) + Some(jsonSchema) +} else { + None +} + } + + /** Constraints to be imposed on schema to be stored. */ + private def checkConstraints(schema: StructType): Unit = { +if (schema.fieldNames.length != schema.fieldNames.distinct.length) { + val duplicateColumns = schema.fieldNames.groupBy(identity).collect { +case (x, ys) if ys.length > 1 => "\"" + x + "\"" + }.mkString(", ") + throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " + +s"cannot save to JSON format") +} + } +} + +object JsonDataSource { + def apply(options: JSONOptions): JsonDataSource[_] = { +if (options.wholeFile) { + WholeFileJsonDataSource +} else { + TextInputJsonDataSource +} + } + + /** + * Create a new [[RDD]] via the supplied callback if there is at least one file to process, + * otherwise an [[org.apache.spark.rdd.EmptyRDD]] will be returned. + */ + def createBaseRdd[T : ClassTag]( + sparkSession: SparkSession, + inputPaths: Seq[FileStatus])( + fn: (Configuration, String) => RDD[T]): RDD[T] = { +val paths
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r101683719 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -394,36 +447,30 @@ class JacksonParser( } /** - * Parse the string JSON input to the set of [[InternalRow]]s. + * Parse the JSON input to the set of [[InternalRow]]s. + * + * @param recordLiteral an optional function that will be used to generate + * the corrupt record text instead of record.toString */ - def parse(input: String): Seq[InternalRow] = { -if (input.trim.isEmpty) { - Nil -} else { - try { -Utils.tryWithResource(factory.createParser(input)) { parser => - parser.nextToken() - rootConverter.apply(parser) match { -case null => failedRecord(input) -case row: InternalRow => row :: Nil -case array: ArrayData => - // Here, as we support reading top level JSON arrays and take every element - // in such an array as a row, this case is possible. - if (array.numElements() == 0) { -Nil - } else { -array.toArray[InternalRow](schema) - } -case _ => - failedRecord(input) + def parse[T]( + record: T, + createParser: (JsonFactory, T) => JsonParser, + recordLiteral: T => UTF8String): Seq[InternalRow] = { --- End diff -- nit: `recordToString`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r101685228 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1802,4 +1806,142 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.option("compression", "GzIp") +.text(path) + + assert(new File(path).listFiles().exists(_.getName.endsWith(".gz"))) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.option("compression", "gZiP") +.json(jsonDir) + + assert(new File(jsonDir).listFiles().exists(_.getName.endsWith(".json.gz"))) + + val originalData = spark.read.json(primitiveFieldAndType) + checkAnswer(jsonDF, originalData) + checkAnswer(spark.read.schema(originalData.schema).json(jsonDir), originalData) +} + } + + test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write.json(jsonDir) + + val compressedFiles = new File(jsonDir).listFiles() + assert(compressedFiles.exists(_.getName.endsWith(".json"))) + + val originalData = spark.read.json(primitiveFieldAndType) + checkAnswer(jsonDF, originalData) + checkAnswer(spark.read.schema(originalData.schema).json(jsonDir), originalData) +} + } + + test("SPARK-18352: Expect one JSON document per file") { +// the json parser terminates as soon as it sees a matching END_OBJECT or END_ARRAY token. +// this might not be the optimal behavior but this test verifies that only the first value +// is parsed and the rest are discarded. + +// alternatively the parser could continue parsing following objects, which may further reduce +// allocations by skipping the line reader entirely + +withTempPath { dir => + val path = dir.getCanonicalPath + spark +.createDataFrame(Seq(Tuple1("{}{invalid}"))) +.coalesce(1) +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + // no corrupt record column should be created + assert(jsonDF.schema === StructType(Seq())) + // only the first object should be read + assert(jsonDF.count() === 1) +} + } + + test("SPARK-18352: Handle multi-line corrupt documents (PERMISSIVE)") { +withTempPath { dir => + val path = dir.getCanonicalPath + val corruptRecordCount = additionalCorruptRecords.count().toInt + assert(corruptRecordCount === 5) + + additionalCorruptRecords +.toDF("value") +// this is the minimum partition count that avoids hash collisions +.repartition(corruptRecordCount * 4, F.hash($"value")) +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).option("mode", "PERMISSIVE").json(path) + assert(jsonDF.count() === corruptRecordCount) + assert(jsonDF.schema === new StructType() +.add("_corrupt_record", StringType) +.add("dummy", StringType)) + val counts = jsonDF +.join( + additionalCorruptRecords.toDF("value"), + F.regexp_replace($"_corrupt_record", "(^\\s+|\\s+$)", "") === F.trim($"value"), + "outer") +.agg( + F.count($"dummy").as("valid"), + F.count($"_corrupt_record").as("corrupt"), + F.count("*").as("count")) + checkAnswer(counts, Row(1, 4, 6)) --- End diff -- why `count(*)` is 6? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r101683953 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1802,4 +1806,142 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.option("compression", "GzIp") +.text(path) + + assert(new File(path).listFiles().exists(_.getName.endsWith(".gz"))) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.option("compression", "gZiP") +.json(jsonDir) + + assert(new File(jsonDir).listFiles().exists(_.getName.endsWith(".json.gz"))) + + val originalData = spark.read.json(primitiveFieldAndType) + checkAnswer(jsonDF, originalData) + checkAnswer(spark.read.schema(originalData.schema).json(jsonDir), originalData) +} + } + + test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write.json(jsonDir) + + val compressedFiles = new File(jsonDir).listFiles() --- End diff -- it's not compressed, let's just call it `jsonFiles` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r101683631 --- Diff: core/src/main/scala/org/apache/spark/input/PortableDataStream.scala --- @@ -193,6 +196,10 @@ class PortableDataStream( } } + @Since("1.2.0") def getPath(): String = path + + @Since("2.2.0") + def getConfiguration: Configuration = conf --- End diff -- nit: we should rename it to `getConf`, `getConfiguration` is too verbose. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r101683916 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1802,4 +1806,142 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.option("compression", "GzIp") +.text(path) + + assert(new File(path).listFiles().exists(_.getName.endsWith(".gz"))) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.option("compression", "gZiP") +.json(jsonDir) + + assert(new File(jsonDir).listFiles().exists(_.getName.endsWith(".json.gz"))) + + val originalData = spark.read.json(primitiveFieldAndType) + checkAnswer(jsonDF, originalData) + checkAnswer(spark.read.schema(originalData.schema).json(jsonDir), originalData) +} + } + + test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write --- End diff -- same here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r101684619 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala --- @@ -37,29 +32,30 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { + override val shortName: String = "json" - override def shortName(): String = "json" + override def isSplitable( + sparkSession: SparkSession, + options: Map[String, String], + path: Path): Boolean = { +val parsedOptions = new JSONOptions( + options, + sparkSession.sessionState.conf.sessionLocalTimeZone, + sparkSession.sessionState.conf.columnNameOfCorruptRecord) +val jsonDataSource = JsonDataSource(parsedOptions) +jsonDataSource.isSplitable && super.isSplitable(sparkSession, options, path) + } override def inferSchema( sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { -if (files.isEmpty) { - None -} else { - val parsedOptions: JSONOptions = -new JSONOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) - val columnNameOfCorruptRecord = -parsedOptions.columnNameOfCorruptRecord - .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) - val jsonSchema = JsonInferSchema.infer( -createBaseRdd(sparkSession, files), -columnNameOfCorruptRecord, -parsedOptions) - checkConstraints(jsonSchema) - - Some(jsonSchema) -} +val parsedOptions = new JSONOptions( + options, + sparkSession.sessionState.conf.sessionLocalTimeZone, + sparkSession.sessionState.conf.columnNameOfCorruptRecord) +JsonDataSource(parsedOptions).infer( + sparkSession, files, parsedOptions) --- End diff -- we can merge it into the previous line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r101684365 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1802,4 +1806,142 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.option("compression", "GzIp") +.text(path) + + assert(new File(path).listFiles().exists(_.getName.endsWith(".gz"))) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.option("compression", "gZiP") +.json(jsonDir) + + assert(new File(jsonDir).listFiles().exists(_.getName.endsWith(".json.gz"))) + + val originalData = spark.read.json(primitiveFieldAndType) + checkAnswer(jsonDF, originalData) + checkAnswer(spark.read.schema(originalData.schema).json(jsonDir), originalData) +} + } + + test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write.json(jsonDir) + + val compressedFiles = new File(jsonDir).listFiles() + assert(compressedFiles.exists(_.getName.endsWith(".json"))) + + val originalData = spark.read.json(primitiveFieldAndType) + checkAnswer(jsonDF, originalData) + checkAnswer(spark.read.schema(originalData.schema).json(jsonDir), originalData) +} + } + + test("SPARK-18352: Expect one JSON document per file") { +// the json parser terminates as soon as it sees a matching END_OBJECT or END_ARRAY token. +// this might not be the optimal behavior but this test verifies that only the first value +// is parsed and the rest are discarded. + +// alternatively the parser could continue parsing following objects, which may further reduce +// allocations by skipping the line reader entirely + +withTempPath { dir => + val path = dir.getCanonicalPath + spark +.createDataFrame(Seq(Tuple1("{}{invalid}"))) +.coalesce(1) +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + // no corrupt record column should be created + assert(jsonDF.schema === StructType(Seq())) + // only the first object should be read + assert(jsonDF.count() === 1) +} + } + + test("SPARK-18352: Handle multi-line corrupt documents (PERMISSIVE)") { +withTempPath { dir => + val path = dir.getCanonicalPath + val corruptRecordCount = additionalCorruptRecords.count().toInt + assert(corruptRecordCount === 5) + + additionalCorruptRecords --- End diff -- I have a more robust solution: ``` additionalCorruptRecords.collect().zipWithIndex.foreach { case (str, index) => Seq(str).toDF("value").write.text(new File(dir, index + ".json").getCanonicalPath) } ... // dir will have 5 files: 0.json, 1.json, ... 4.json ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r101684508 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.option("compression", "GzIp") +.text(path) + + new File(path).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".gz"))) + } + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.option("compression", "gZiP") +.save(jsonDir) + + new File(jsonDir).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".json.gz"))) + } + + val jsonCopy = spark.read +.format("json") +.load(jsonDir) + + assert(jsonCopy.count === jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) +} + } + + test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.save(jsonDir) + + val compressedFiles = new File(jsonDir).listFiles() + assert(compressedFiles.exists(_.getName.endsWith(".json"))) + + val jsonCopy = spark.read +.format("json") +.load(jsonDir) + + assert(jsonCopy.count === jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) +} + } + + test("SPARK-18352: Expect one JSON document per file") { +// the json parser terminates as soon as it sees a matching END_OBJECT or END_ARRAY token. +// this might not be the optimal behavior but this test verifies that only the first value +// is parsed and the rest are discarded. + +// alternatively the parser could continue parsing following objects, which may further reduce +// allocations by skipping the line reader entirely + +withTempPath { dir => + val path = dir.getCanonicalPath + spark +.createDataFrame(Seq(Tuple1("{}{invalid}"))) +.coalesce(1) +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + // no corrupt record column should be created + assert(jsonDF.schema === StructType(Seq())) + // only the first object should be read + assert(jsonDF.count() === 1) +} + } + + test("SPARK-18352: Handle corrupt documents") { +withTempPath { dir => + val path = dir.getCanonicalPath + val corruptRecordCount = additionalCorruptRecords.count().toInt + assert(corruptRecordCount === 5) + + additionalCorruptRecords +.toDF("value") +.repartition(corruptRecordCount * 4) +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + assert(jsonDF.count() === corruptRecordCount) + assert(jsonDF.schema === new StructType() +.add("_corrupt_record", StringType) +.add("dummy", StringType)) + val counts = jsonDF +.join( + additionalCorruptRecords.toDF("value"), + F.regexp_replace($"_corrupt_record", "(^\\s+|\\s+$)", "") === F.trim($"value"), --- End diff -- why we have an extra line endings in `_corrupt_record`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r101683666 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -48,69 +47,110 @@ class JacksonParser( // A `ValueConverter` is responsible for converting a value from `JsonParser` // to a value in a field for `InternalRow`. - private type ValueConverter = (JsonParser) => Any + private type ValueConverter = JsonParser => AnyRef // `ValueConverter`s for the root schema for all fields in the schema - private val rootConverter: ValueConverter = makeRootConverter(schema) + private val rootConverter = makeRootConverter(schema) private val factory = new JsonFactory() options.setJacksonOptions(factory) private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length)) + private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord) + corruptFieldIndex.foreach(idx => require(schema(idx).dataType == StringType)) + + @transient + private[this] var isWarningPrinted: Boolean = false + @transient - private[this] var isWarningPrintedForMalformedRecord: Boolean = false + private def printWarningForMalformedRecord(record: () => UTF8String): Unit = { +def sampleRecord: String = { + if (options.wholeFile) { +"" + } else { +s"Sample record: ${record()}\n" + } +} + +def footer: String = { + s"""Code example to print all malformed records (scala): + |=== + |// The corrupted record exists in column ${options.columnNameOfCorruptRecord}. + |val parsedJson = spark.read.json("/path/to/json/file/test.json") + | + """.stripMargin +} + +if (options.permissive) { + logWarning( +s"""Found at least one malformed record. The JSON reader will replace + |all malformed records with placeholder null in current $PERMISSIVE_MODE parser mode. + |To find out which corrupted records have been replaced with null, please use the + |default inferred schema instead of providing a custom schema. + | + |${sampleRecord ++ footer} + | + """.stripMargin) +} else if (options.dropMalformed) { + logWarning( +s"""Found at least one malformed record. The JSON reader will drop + |all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which + |corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE + |mode and use the default inferred schema. + | + |${sampleRecord ++ footer} + | + """.stripMargin) +} + } + + @transient + private def printWarningIfWholeFile(): Unit = { +if (options.wholeFile && corruptFieldIndex.isDefined) { + logWarning( +s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord may result + |in very large allocations or OutOfMemoryExceptions being raised. + | --- End diff -- nit: unnecessary line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r101684686 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala --- @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.json + +import java.io.InputStream + +import scala.reflect.ClassTag + +import com.fasterxml.jackson.core.{JsonFactory, JsonParser} +import com.google.common.io.ByteStreams +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.{LongWritable, Text} +import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, TextInputFormat} + +import org.apache.spark.TaskContext +import org.apache.spark.input.{PortableDataStream, StreamInputFormat} +import org.apache.spark.rdd.{BinaryFileRDD, RDD} +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions} +import org.apache.spark.sql.execution.datasources.{CodecStreams, HadoopFileLinesReader, PartitionedFile} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.Utils + +/** + * Common functions for parsing JSON files + * @tparam T A datatype containing the unparsed JSON, such as [[Text]] or [[String]] + */ +abstract class JsonDataSource[T] extends Serializable { + def isSplitable: Boolean + + /** + * Parse a [[PartitionedFile]] into 0 or more [[InternalRow]] instances + */ + def readFile( +conf: Configuration, +file: PartitionedFile, +parser: JacksonParser): Iterator[InternalRow] + + /** + * Create an [[RDD]] that handles the preliminary parsing of [[T]] records + */ + protected def createBaseRdd( +sparkSession: SparkSession, +inputPaths: Seq[FileStatus]): RDD[T] + + /** + * A generic wrapper to invoke the correct [[JsonFactory]] method to allocate a [[JsonParser]] + * for an instance of [[T]] + */ + def createParser(jsonFactory: JsonFactory, value: T): JsonParser + + final def infer( + sparkSession: SparkSession, + inputPaths: Seq[FileStatus], + parsedOptions: JSONOptions): Option[StructType] = { +if (inputPaths.nonEmpty) { + val jsonSchema = JsonInferSchema.infer( +createBaseRdd(sparkSession, inputPaths), +parsedOptions, +createParser) + checkConstraints(jsonSchema) + Some(jsonSchema) +} else { + None +} + } + + /** Constraints to be imposed on schema to be stored. */ + private def checkConstraints(schema: StructType): Unit = { +if (schema.fieldNames.length != schema.fieldNames.distinct.length) { + val duplicateColumns = schema.fieldNames.groupBy(identity).collect { +case (x, ys) if ys.length > 1 => "\"" + x + "\"" + }.mkString(", ") + throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " + +s"cannot save to JSON format") +} + } +} + +object JsonDataSource { + def apply(options: JSONOptions): JsonDataSource[_] = { +if (options.wholeFile) { + WholeFileJsonDataSource +} else { + TextInputJsonDataSource +} + } + + /** + * Create a new [[RDD]] via the supplied callback if there is at least one file to process, + * otherwise an [[org.apache.spark.rdd.EmptyRDD]] will be returned. + */ + def createBaseRdd[T : ClassTag]( + sparkSession: SparkSession, + inputPaths: Seq[FileStatus])( + fn: (Configuration, String) => RDD[T]): RDD[T] = { +val paths
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r101685374 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1802,4 +1806,142 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.option("compression", "GzIp") +.text(path) + + assert(new File(path).listFiles().exists(_.getName.endsWith(".gz"))) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.option("compression", "gZiP") +.json(jsonDir) + + assert(new File(jsonDir).listFiles().exists(_.getName.endsWith(".json.gz"))) + + val originalData = spark.read.json(primitiveFieldAndType) + checkAnswer(jsonDF, originalData) + checkAnswer(spark.read.schema(originalData.schema).json(jsonDir), originalData) +} + } + + test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write.json(jsonDir) + + val compressedFiles = new File(jsonDir).listFiles() + assert(compressedFiles.exists(_.getName.endsWith(".json"))) + + val originalData = spark.read.json(primitiveFieldAndType) + checkAnswer(jsonDF, originalData) + checkAnswer(spark.read.schema(originalData.schema).json(jsonDir), originalData) +} + } + + test("SPARK-18352: Expect one JSON document per file") { +// the json parser terminates as soon as it sees a matching END_OBJECT or END_ARRAY token. +// this might not be the optimal behavior but this test verifies that only the first value +// is parsed and the rest are discarded. + +// alternatively the parser could continue parsing following objects, which may further reduce +// allocations by skipping the line reader entirely + +withTempPath { dir => + val path = dir.getCanonicalPath + spark +.createDataFrame(Seq(Tuple1("{}{invalid}"))) +.coalesce(1) +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + // no corrupt record column should be created + assert(jsonDF.schema === StructType(Seq())) + // only the first object should be read + assert(jsonDF.count() === 1) +} + } + + test("SPARK-18352: Handle multi-line corrupt documents (PERMISSIVE)") { +withTempPath { dir => + val path = dir.getCanonicalPath + val corruptRecordCount = additionalCorruptRecords.count().toInt --- End diff -- The name is misleading, we do have a good record in this dataset, isn't it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13467: [SPARK-15642][SQL] Bug report: metadata gets lost when s...
Github user zommerfelds commented on the issue: https://github.com/apache/spark/pull/13467 The bug is still present. To be honest I don't think I have time to work on this right now, so we can close this PR if you want. I tried to find the code that was related to this functionality but didn't get there yet, so not sure who to contact. Someone might be able to tell really fast if this is a bug or not, and what has to be done to fix it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16973: [SPARKR][EXAMPLES] update examples to stop spark session
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16973 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73037/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16973: [SPARKR][EXAMPLES] update examples to stop spark session
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16973 **[Test build #73037 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73037/testReport)** for PR 16973 at commit [`4d0506b`](https://github.com/apache/spark/commit/4d0506bd20010bd1f1f61eaaa0cf664f7b02a499). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16951: [SPARK-18285][SPARKR] SparkR approxQuantile supports inp...
Github user felixcheung commented on the issue: https://github.com/apache/spark/pull/16951 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16951: [SPARK-18285][SPARKR] SparkR approxQuantile suppo...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/16951#discussion_r101685201 --- Diff: R/pkg/R/stats.R --- @@ -171,12 +175,17 @@ setMethod("freqItems", signature(x = "SparkDataFrame", cols = "character"), #' } #' @note approxQuantile since 2.0.0 setMethod("approxQuantile", - signature(x = "SparkDataFrame", col = "character", + signature(x = "SparkDataFrame", cols = "character", probabilities = "numeric", relativeError = "numeric"), - function(x, col, probabilities, relativeError) { + function(x, cols, probabilities, relativeError) { statFunctions <- callJMethod(x@sdf, "stat") -callJMethod(statFunctions, "approxQuantile", col, -as.list(probabilities), relativeError) +quantiles <- callJMethod(statFunctions, "approxQuantile", as.list(cols), + as.list(probabilities), relativeError) +if (length(quantiles) == 1) { --- End diff -- nit: would it be better to check `length(cols) == 1` instead of `quantiles`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16973: [SPARKR][EXAMPLES] update examples to stop spark session
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16973 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16951: [SPARK-18285][SPARKR] SparkR approxQuantile suppo...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/16951#discussion_r101685081 --- Diff: R/pkg/R/stats.R --- @@ -149,15 +149,19 @@ setMethod("freqItems", signature(x = "SparkDataFrame", cols = "character"), #' This method implements a variation of the Greenwald-Khanna algorithm (with some speed #' optimizations). The algorithm was first present in [[http://dx.doi.org/10.1145/375663.375670 #' Space-efficient Online Computation of Quantile Summaries]] by Greenwald and Khanna. +#' Note that rows containing any NA values will be removed before calculation. #' #' @param x A SparkDataFrame. -#' @param col The name of the numerical column. +#' @param cols A single column name, or a list of names for multiple columns. --- End diff -- typically, this would have been a breaking API parameter name change that we should avoid, but, because of special R parameter partial matching rule of `col` matching `cols`, this is in fact compatible --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16330: [SPARK-18817][SPARKR][SQL] change derby log output and m...
Github user felixcheung commented on the issue: https://github.com/apache/spark/pull/16330 Hi @gatorsmile @cloud-fan @yhuai could you give us some feedback on this? This is blocking release of Spark to R/CRAN - we don't need a very detailed review - just your thought of whether this is heading the right direction. Thanks And #16290 please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16386 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73032/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16966: [SPARK-18409][ML]LSH approxNearestNeighbors shoul...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/16966#discussion_r101684524 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala --- @@ -147,6 +148,15 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] modelSubsetWithDistCol.sort(distCol).limit(numNearestNeighbors) } + private[feature] def approxNearestNeighbors( + dataset: Dataset[_], + key: Vector, + numNearestNeighbors: Int, + singleProbe: Boolean, + distCol: String): Dataset[_] = { +approxNearestNeighbors(dataset, key, numNearestNeighbors, singleProbe, distCol, 0.05) --- End diff -- what is `0.05`? should that be documented why that value is chosen? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16386 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16386 **[Test build #73032 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73032/testReport)** for PR 16386 at commit [`b801ab0`](https://github.com/apache/spark/commit/b801ab096c3bf426c7d2044291785359ace4922f). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16973: [SPARKR][EXAMPLES] update examples to stop spark session
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16973 **[Test build #73037 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73037/testReport)** for PR 16973 at commit [`4d0506b`](https://github.com/apache/spark/commit/4d0506bd20010bd1f1f61eaaa0cf664f7b02a499). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16973: [SPARKR][EXAMPLES] update examples to stop spark ...
GitHub user felixcheung opened a pull request: https://github.com/apache/spark/pull/16973 [SPARKR][EXAMPLES] update examples to stop spark session ## What changes were proposed in this pull request? stop session at end of example ## How was this patch tested? manual You can merge this pull request into a Git repository by running: $ git pull https://github.com/felixcheung/spark rexamples Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16973.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16973 commit 4d0506bd20010bd1f1f61eaaa0cf664f7b02a499 Author: Felix CheungDate: 2017-02-17T04:29:53Z update --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org