[GitHub] spark pull request: [Spark 2060][SQL] Querying JSON Datasets with ...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/999#discussion_r13518342 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/json/JsonTable.scala --- @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.json + +import org.apache.spark.annotation.Experimental +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.SchemaRDD +import org.apache.spark.sql.Logging +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, GetField} + +import com.fasterxml.jackson.databind.ObjectMapper + +import scala.collection.JavaConversions._ +import scala.math.BigDecimal +import org.apache.spark.sql.catalyst.expressions.GetField +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.execution.SparkLogicalPlan +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.expressions.GetField +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.execution.SparkLogicalPlan +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.types.StructField +import org.apache.spark.sql.catalyst.types.StructType +import org.apache.spark.sql.catalyst.types.ArrayType +import org.apache.spark.sql.catalyst.expressions.GetField +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.execution.SparkLogicalPlan +import org.apache.spark.sql.catalyst.expressions.Alias + +sealed trait SchemaResolutionMode + +case object EAGER_SCHEMA_RESOLUTION extends SchemaResolutionMode +case class EAGER_SCHEMA_RESOLUTION_WITH_SAMPLING(val fraction: Double) extends SchemaResolutionMode +case object LAZY_SCHEMA_RESOLUTION extends SchemaResolutionMode + +/** + * :: Experimental :: + * Converts a JSON file to a SparkSQL logical query plan. This implementation is only designed to + * work on JSON files that have mostly uniform schema. The conversion suffers from the following + * limitation: + * - The data is optionally sampled to determine all of the possible fields. Any fields that do + *not appear in this sample will not be included in the final output. + */ +@Experimental +object JsonTable extends Serializable with Logging { + def inferSchema( + json: RDD[String], sampleSchema: Option[Double] = None): LogicalPlan = { --- End diff -- Actually - it might make sense to just get rid of the Option. If sampleRatio == 1.0, then it doesn't need to do sample. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [Spark 2060][SQL] Querying JSON Datasets with ...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/999#discussion_r13518348 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/json/JsonTable.scala --- @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.json + +import org.apache.spark.annotation.Experimental +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.SchemaRDD +import org.apache.spark.sql.Logging +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, GetField} + +import com.fasterxml.jackson.databind.ObjectMapper + +import scala.collection.JavaConversions._ +import scala.math.BigDecimal +import org.apache.spark.sql.catalyst.expressions.GetField +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.execution.SparkLogicalPlan +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.expressions.GetField +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.execution.SparkLogicalPlan +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.types.StructField +import org.apache.spark.sql.catalyst.types.StructType +import org.apache.spark.sql.catalyst.types.ArrayType +import org.apache.spark.sql.catalyst.expressions.GetField +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.execution.SparkLogicalPlan +import org.apache.spark.sql.catalyst.expressions.Alias + +sealed trait SchemaResolutionMode + +case object EAGER_SCHEMA_RESOLUTION extends SchemaResolutionMode +case class EAGER_SCHEMA_RESOLUTION_WITH_SAMPLING(val fraction: Double) extends SchemaResolutionMode +case object LAZY_SCHEMA_RESOLUTION extends SchemaResolutionMode + +/** + * :: Experimental :: + * Converts a JSON file to a SparkSQL logical query plan. This implementation is only designed to + * work on JSON files that have mostly uniform schema. The conversion suffers from the following + * limitation: + * - The data is optionally sampled to determine all of the possible fields. Any fields that do + *not appear in this sample will not be included in the final output. + */ +@Experimental +object JsonTable extends Serializable with Logging { + def inferSchema( + json: RDD[String], sampleSchema: Option[Double] = None): LogicalPlan = { +val schemaData = sampleSchema.map(json.sample(false, _, 1)).getOrElse(json) +val allKeys = parseJson(schemaData).map(getAllKeysWithValueTypes).reduce(_ ++ _) + +// Resolve type conflicts +val resolved = allKeys.groupBy { + case (key, dataType) => key +}.map { + // Now, keys and types are organized in the format of + // key -> Set(type1, type2, ...). + case (key, typeSet) => { +val fieldName = key.substring(1, key.length - 1).split("`.`").toSeq +val dataType = typeSet.map { + case (_, dataType) => dataType +}.reduce((type1: DataType, type2: DataType) => getCompatibleType(type1, type2)) + +// Finally, we replace all NullType to StringType. We do not need to take care +// StructType because all fields with a StructType are represented by a placeholder +// StructType(Nil). +dataType match { + case NullType => (fieldName, StringType) + case ArrayType(NullType) => (fieldName, ArrayType(StringType)) + case other => (fieldName, other) +} + } +} + +def m
[GitHub] spark pull request: [SPARK-1704][SQL] Fully support EXPLAIN comman...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1003#issuecomment-45403070 This might not be your problem, but when I tried the following, I got ... ``` scala> c.hql("explain select key, count(value) from src group by key").collect() 14/06/06 23:58:05 INFO parse.ParseDriver: Parsing command: explain select key, count(value) from src group by key 14/06/06 23:58:05 INFO parse.ParseDriver: Parse Completed 14/06/06 23:58:05 INFO analysis.Analyzer: Max iterations (2) reached for batch MultiInstanceRelations 14/06/06 23:58:05 INFO analysis.Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences 14/06/06 23:58:05 INFO analysis.Analyzer: Max iterations (2) reached for batch MultiInstanceRelations 14/06/06 23:58:05 INFO analysis.Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences 14/06/06 23:58:05 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=src 14/06/06 23:58:05 INFO HiveMetaStore.audit: ugi=rxin ip=unknown-ip-addr cmd=get_table : db=default tbl=src 14/06/06 23:58:05 INFO storage.MemoryStore: ensureFreeSpace(147699) called with curMem=737503, maxMem=1145674137 14/06/06 23:58:05 INFO storage.MemoryStore: Block broadcast_5 stored as values to memory (estimated size 144.2 KB, free 1091.8 MB) 14/06/06 23:58:05 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange 14/06/06 23:58:05 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Prepare Expressions org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree: ExplainCommandPhysical Aggregate false, [key#12], [key#12,SUM(PartialCount#14L) AS c_1#10L] Exchange (HashPartitioning [key#12:0], 150) Aggregate true, [key#12], [key#12,COUNT(value#13) AS PartialCount#14L] HiveTableScan [key#12,value#13], (MetastoreRelation default, src, None), None at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47) at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:265) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenUp(TreeNode.scala:249) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:215) at org.apache.spark.sql.execution.AddExchange$.apply(Exchange.scala:93) at org.apache.spark.sql.execution.AddExchange$.apply(Exchange.scala:89) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:62) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:60) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:60) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:52) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:52) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:275) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:275) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:260) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:248) at org.apache.spark.sql.hive.HiveContext.hiveql(HiveContext.scala:85) at org.apache.spark.sql.hive.HiveContext.hql(HiveContext.scala:90) at $i9579e5b89ab1eb428704b684e2e341c$.(:70) at $i9579e5b89ab1eb428704b684e2e341c$.() at .(:7) at .() at $print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734) at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983) at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568) at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760) at scala.tools.nsc.interpreter.ILoop.interpre
[GitHub] spark pull request: [SPARK-1704][SQL] Fully support EXPLAIN comman...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1003#issuecomment-45417101 Can you add a test case? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Workaround in Spark for ConcurrentModification...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1000#issuecomment-45417279 Thanks. Can we synchronized only the JobConf creation, i.e. ```scala val newJobConf = new JobConf(broadcastedConf.value.value) ``` and add an inline comment linking to the spark/hadoop jira issue --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [Spark 2060][SQL] Querying JSON Datasets with ...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/999#issuecomment-45417535 Yea definitely very exciting feature. Also, should we consider getting rid of EAGER_SCHEMA_RESOLUTION and the case classes, and just have a schemaResolutionRatio. If it is 0.0, then it is purely lazy; if it is greater than 0.0, then ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SQL] Simple framework for debugging query exe...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1005#issuecomment-45417569 Jenkins, test this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1994][SQL] Weird data corruption bug wh...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1004#issuecomment-45421226 How big does the closure size increase by? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1508][SQL] Add SQLConf to SQLContext.
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/956#issuecomment-45421436 I think it could go both ways for the setters. However, we should not add all the accessors (e.g. numPartitions, codeGen) to SQLContext, since that is just polluting the namespace and will make the scaladoc/javadoc very hard to read. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1994][SQL] Weird data corruption bug wh...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1004#issuecomment-45421520 Merged in master & branch-1.0. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1994][SQL] Weird data corruption bug wh...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1004#issuecomment-45421512 I'm going to merge this. YOu can test this easily by looking at the log. Spark tells you the size of the task closure and how long it takes to serialize each of them in the info log. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1994][SQL] Weird data corruption bug wh...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1004#issuecomment-45421535 One reason we had to add @transient lazy val is due to the lack of an init method on each partition for operators. I think there are benefits of adding that - it makes clear and explicit about object initialization, and then you can probably avoid this problem. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: HOTFIX: Support empty body in merge script
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1007#issuecomment-45422011 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [Spark 2060][SQL] Querying JSON Datasets with ...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/999#discussion_r13523010 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/json/JsonTable.scala --- @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.json + +import org.apache.spark.annotation.Experimental +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.SchemaRDD +import org.apache.spark.sql.Logging +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, GetField} + +import com.fasterxml.jackson.databind.ObjectMapper + +import scala.collection.JavaConversions._ +import scala.math.BigDecimal +import org.apache.spark.sql.catalyst.expressions.GetField +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.execution.SparkLogicalPlan +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.expressions.GetField +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.execution.SparkLogicalPlan +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.types.StructField +import org.apache.spark.sql.catalyst.types.StructType +import org.apache.spark.sql.catalyst.types.ArrayType +import org.apache.spark.sql.catalyst.expressions.GetField +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.execution.SparkLogicalPlan +import org.apache.spark.sql.catalyst.expressions.Alias + +sealed trait SchemaResolutionMode + +case object EAGER_SCHEMA_RESOLUTION extends SchemaResolutionMode +case class EAGER_SCHEMA_RESOLUTION_WITH_SAMPLING(val fraction: Double) extends SchemaResolutionMode +case object LAZY_SCHEMA_RESOLUTION extends SchemaResolutionMode + +/** + * :: Experimental :: + * Converts a JSON file to a SparkSQL logical query plan. This implementation is only designed to + * work on JSON files that have mostly uniform schema. The conversion suffers from the following + * limitation: + * - The data is optionally sampled to determine all of the possible fields. Any fields that do + *not appear in this sample will not be included in the final output. + */ +@Experimental +object JsonTable extends Serializable with Logging { + def inferSchema( + json: RDD[String], sampleSchema: Option[Double] = None): LogicalPlan = { +val schemaData = sampleSchema.map(json.sample(false, _, 1)).getOrElse(json) --- End diff -- Yup this shouldn't block on that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1628: Add missing hashCode methods in Pa...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/549#issuecomment-45430500 Jenkins, test this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1628: Add missing hashCode methods in Pa...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/549#issuecomment-45430517 This looks good to me pending test passes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Update RoutingTable.scala
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/647#issuecomment-45430659 @ArcherShao do you mind closing this? This has been fixed in https://github.com/apache/spark/commit/b1feb60209174433262de2a26d39616ba00edcc8 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-938 - Openstack Swift object storage sup...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1010#discussion_r13526195 --- Diff: docs/openstack-integration.md --- @@ -0,0 +1,83 @@ +--- +layout: global +title: Accessing Openstack Swift storage from Spark +--- + +# Accessing Openstack Swift storage from Spark + +Spark's file interface allows it to process data in Openstack Swift using the same URI formats that are supported for Hadoop. You can specify a path in Swift as input through a URI of the form `swift:///path`. You will also need to set your Swift security credentials, through `SparkContext.hadoopConfiguration`. + +#Configuring Hadoop to use Openstack Swift +Openstack Swift driver was merged in Hadoop verion 2.3.0 ([Swift driver](https://issues.apache.org/jira/browse/HADOOP-8545)) Users that wish to use previous Hadoop versions will need to configure Swift driver manually. +Hadoop 2.3.0 and above. +An Openstack Swift driver was merged into Haddop 2.3.0 . Current Hadoop driver requieres Swift to use Keystone authentication. There are additional efforts to support temp auth for Hadoop [Hadoop-10420](https://issues.apache.org/jira/browse/HADOOP-10420). +To configure Hadoop to work with Swift one need to modify core-sites.xml of Hadoop and setup Swift FS. + + + + fs.swift.impl + org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem + + + + +Configuring Spark - stand alone cluster +You need to configure the compute-classpath.sh and add Hadoop classpath for --- End diff -- Is the swift jar not included in hadoop-client? Is there a way to specify this through Maven dependencies rather than manually including the path? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Update run-example
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1011#issuecomment-45448882 @maji2014 do you mind editing the title of this pull request to say: [SPARK-2057] run-example can only be ran under spark_home Then we can merge this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1628: Add missing hashCode methods in Pa...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/549#issuecomment-45448899 Thanks. I'm merging this in master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1628 follow up: Improve RangePartitioner...
GitHub user rxin opened a pull request: https://github.com/apache/spark/pull/1012 SPARK-1628 follow up: Improve RangePartitioner's documentation. Adding a paragraph clarifying a weird behavior in RangePartitioner. See also #549. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rxin/spark partitioner-doc Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/1012.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1012 commit 6f0109e5e918d88365056111f9c2fb1f9ec5df84 Author: Reynold Xin Date: 2014-06-08T21:22:42Z SPARK-1628 follow up: Improve RangePartitioner's documentation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1628 follow up: Improve RangePartitioner...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1012#issuecomment-45455000 Ok merging this in master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2044] Pluggable interface for shuffles
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1009#discussion_r13529575 --- Diff: core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala --- @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.hash + +import org.apache.spark._ +import org.apache.spark.shuffle._ + +/** + * A ShuffleManager using the hash-based implementation available up to and including Spark 1.0. --- End diff -- May want to change the wording. "available up to and including" seems to suggest that we are going to remove this post 1.0. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2052] [SQL] Add optimization for CaseCo...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/990#discussion_r13529691 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -288,3 +293,18 @@ object SimplifyCasts extends Rule[LogicalPlan] { case Cast(e, dataType) if e.dataType == dataType => e } } + +/** + * Removes the inner [[catalyst.expressions.CaseConversionExpression]] that are unnecessary because + * the inner conversion is overwritten by the outer one. + */ +object SimplifyCaseConversionExpressions extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case q: LogicalPlan => q transformExpressionsUp { --- End diff -- Just curious - do you have a use case for this rule? I wonder how often it happens in practice. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1308] Add getNumPartitions to pyspark R...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/995#issuecomment-45462116 Jenkins, test this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Add unit test to spark_ec2 script
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/134#issuecomment-45462687 @pwendell what do we do with respect to binary files included with this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Add a function that can build an EdgePartition...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/740#issuecomment-45462702 @ArcherShao if you are sending / have sent a new pull request, do you mind closing this one? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2052] [SQL] Add optimization for CaseCo...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/990#discussion_r13530208 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -288,3 +293,18 @@ object SimplifyCasts extends Rule[LogicalPlan] { case Cast(e, dataType) if e.dataType == dataType => e } } + +/** + * Removes the inner [[catalyst.expressions.CaseConversionExpression]] that are unnecessary because + * the inner conversion is overwritten by the outer one. + */ +object SimplifyCaseConversionExpressions extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case q: LogicalPlan => q transformExpressionsUp { --- End diff -- Do you actually have queries that trigger this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2052] [SQL] Add optimization for CaseCo...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/990#issuecomment-45463867 This looks good to me. Can you add some unit tests for collapsing case statements? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Grammar: read -> reads
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1016#issuecomment-45463990 Thanks. Merged this in master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1308] Add getNumPartitions to pyspark R...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/995#issuecomment-45466522 Thanks. I've merged this in master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2077 Log serializer that actually ends u...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1017#issuecomment-45466827 Isn't it obvious when you turn the web ui on and look at the environments tab? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1944 Document --verbose in spark-shell -...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1020#issuecomment-45517595 LGTM. Merging this in master & branch-1.0. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SQL] Simple framework for debugging query exe...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1005#issuecomment-45546214 Thanks. I've merged this in master & branch-1.0. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2082] stratified sampling in PairRDDFun...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1025#discussion_r13566500 --- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala --- @@ -27,8 +27,12 @@ import scala.collection.Map import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag +import scala.util.control.Breaks._ --- End diff -- remove this --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2082] stratified sampling in PairRDDFun...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1025#discussion_r13566493 --- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala --- @@ -46,7 +50,8 @@ import org.apache.spark.Partitioner.defaultPartitioner import org.apache.spark.SparkContext._ import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.serializer.Serializer -import org.apache.spark.util.SerializableHyperLogLog +import org.apache.spark.util.{Utils, SerializableHyperLogLog} --- End diff -- SerializableHyperLogLog no longer exists --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Moved hiveOperators.scala to the right package...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1029#issuecomment-45579331 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2065] give launched instances names
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1043#discussion_r13630905 --- Diff: ec2/spark_ec2.py --- @@ -418,6 +418,12 @@ def launch_cluster(conn, opts, cluster_name): master_nodes = master_res.instances print "Launched master in %s, regid = %s" % (zone, master_res.id) +# Give the instances descriptive names +for master in master_nodes: +master.add_tag(key='Name', value='spark-{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id)) --- End diff -- Do you mind wrapping this before 100 char wide? We don't yet enforce it, but we will soon enforce line length in Python as well. Also for line 425. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2065] give launched instances names
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1043#issuecomment-45695922 Do you mind running pep8 on this? Let's make sure that doesn't report any style violations. We will get pep8 integrated with Jenkins in the future. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2065] give launched instances names
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1043#issuecomment-45701517 Thanks. I am merging this in master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2107: FilterPushdownSuite doesn't need J...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1046#issuecomment-45703476 Jenkins, test this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [WIP][SPARK-2054][SQL] Code Generation for Exp...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r13634545 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala --- @@ -72,7 +72,9 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { } iteration += 1 if (iteration > batch.strategy.maxIterations) { - logger.info(s"Max iterations ($iteration) reached for batch ${batch.name}") + if (iteration != 2) { --- End diff -- maybe add that to the comment :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2091][MLLIB] use numpy.dot instead of n...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1035#issuecomment-45703960 Jenkins, test this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-938 - Openstack Swift object storage sup...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1010#discussion_r13635058 --- Diff: docs/openstack-integration.md --- @@ -0,0 +1,110 @@ +yout: global +title: Accessing Openstack Swift storage from Spark +--- + +# Accessing Openstack Swift storage from Spark + +Spark's file interface allows it to process data in Openstack Swift using the same URI +formats that are supported for Hadoop. You can specify a path in Swift as input through a +URI of the form `swift:///path`. You will also need to set your +Swift security credentials, through `SparkContext.hadoopConfiguration`. + +#Configuring Hadoop to use Openstack Swift +Openstack Swift driver was merged in Hadoop verion 2.3.0 ([Swift driver](https://issues.apache.org/jira/browse/HADOOP-8545)). Users that wish to use previous Hadoop versions will need to configure Swift driver manually. Current Swift driver +requieres Swift to use Keystone authentication method. There are recent efforts to support +also temp auth [Hadoop-10420](https://issues.apache.org/jira/browse/HADOOP-10420). +To configure Hadoop to work with Swift one need to modify core-sites.xml of Hadoop and +setup Swift FS. + + --- End diff -- Is this needed? Can we just put this in core-site.xml under conf? (Basically removing the configuring Hadoop section) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-938 - Openstack Swift object storage sup...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1010#discussion_r13635099 --- Diff: core/pom.xml --- @@ -35,7 +35,11 @@ org.apache.hadoop hadoop-client - + --- End diff -- While it might make sense for this to eventually get into Spark, we need to look more carefully at the dependency that this brings. Since Spark runs different from Hadoop (Spark is really just a user level library), users can always include openstack support in their project dependencies (with the documentation you provide). For the time being, let's first update the documentation so it is obvious & clear & easy for users to add openstack support, and then we can discuss more about whether/when we should push the openstack dependency in Spark. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-938 - Openstack Swift object storage sup...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1010#discussion_r13635108 --- Diff: docs/openstack-integration.md --- @@ -0,0 +1,110 @@ +yout: global +title: Accessing Openstack Swift storage from Spark +--- + +# Accessing Openstack Swift storage from Spark + +Spark's file interface allows it to process data in Openstack Swift using the same URI +formats that are supported for Hadoop. You can specify a path in Swift as input through a +URI of the form `swift:///path`. You will also need to set your +Swift security credentials, through `SparkContext.hadoopConfiguration`. + +#Configuring Hadoop to use Openstack Swift +Openstack Swift driver was merged in Hadoop verion 2.3.0 ([Swift driver](https://issues.apache.org/jira/browse/HADOOP-8545)). Users that wish to use previous Hadoop versions will need to configure Swift driver manually. Current Swift driver +requieres Swift to use Keystone authentication method. There are recent efforts to support +also temp auth [Hadoop-10420](https://issues.apache.org/jira/browse/HADOOP-10420). +To configure Hadoop to work with Swift one need to modify core-sites.xml of Hadoop and +setup Swift FS. + + + + fs.swift.impl + org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem + + + +#Configuring Swift +Proxy server of Swift should include `list_endpoints` middleware. More information +available [here] (https://github.com/openstack/swift/blob/master/swift/common/middleware/list_endpoints.py) + +#Configuring Spark +To use Swift driver, Spark need to be compiled with `hadoop-openstack-2.3.0.jar` +distributted with Hadoop 2.3.0. For the Maven builds, Spark's main pom.xml should include + + 2.3.0 + + + + org.apache.hadoop + hadoop-openstack + ${swift.version} + + +in addition, pom.xml of the `core` and `yarn` projects should include + + + org.apache.hadoop + hadoop-openstack + + + +Additional parameters has to be provided to the Swift driver. Swift driver will use those +parameters to perform authentication in Keystone prior accessing Swift. List of mandatory +parameters is : `fs.swift.service..auth.url`, --- End diff -- Might make sense to make a table or a bullet list for this, instead of just comma separate lists. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-938 - Openstack Swift object storage sup...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1010#discussion_r13635121 --- Diff: docs/openstack-integration.md --- @@ -0,0 +1,110 @@ +yout: global +title: Accessing Openstack Swift storage from Spark +--- + +# Accessing Openstack Swift storage from Spark + +Spark's file interface allows it to process data in Openstack Swift using the same URI +formats that are supported for Hadoop. You can specify a path in Swift as input through a +URI of the form `swift:///path`. You will also need to set your +Swift security credentials, through `SparkContext.hadoopConfiguration`. + +#Configuring Hadoop to use Openstack Swift +Openstack Swift driver was merged in Hadoop verion 2.3.0 ([Swift driver](https://issues.apache.org/jira/browse/HADOOP-8545)). Users that wish to use previous Hadoop versions will need to configure Swift driver manually. Current Swift driver +requieres Swift to use Keystone authentication method. There are recent efforts to support +also temp auth [Hadoop-10420](https://issues.apache.org/jira/browse/HADOOP-10420). +To configure Hadoop to work with Swift one need to modify core-sites.xml of Hadoop and +setup Swift FS. + + + + fs.swift.impl + org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem + + + +#Configuring Swift +Proxy server of Swift should include `list_endpoints` middleware. More information +available [here] (https://github.com/openstack/swift/blob/master/swift/common/middleware/list_endpoints.py) + +#Configuring Spark +To use Swift driver, Spark need to be compiled with `hadoop-openstack-2.3.0.jar` +distributted with Hadoop 2.3.0. For the Maven builds, Spark's main pom.xml should include + + 2.3.0 + + + + org.apache.hadoop + hadoop-openstack + ${swift.version} + + +in addition, pom.xml of the `core` and `yarn` projects should include + + + org.apache.hadoop + hadoop-openstack + + + +Additional parameters has to be provided to the Swift driver. Swift driver will use those +parameters to perform authentication in Keystone prior accessing Swift. List of mandatory +parameters is : `fs.swift.service..auth.url`, +`fs.swift.service..auth.endpoint.prefix`, `fs.swift.service..tenant`, +`fs.swift.service..username`, +`fs.swift.service..password`, `fs.swift.service..http.port`, +`fs.swift.service..http.port`, `fs.swift.service..public`, where +`PROVIDER` is any name. `fs.swift.service..auth.url` should point to the Keystone +authentication URL. + +Create core-sites.xml with the mandatory parameters and place it under /spark/conf +directory. For example: + + + + fs.swift.service..auth.url + http://127.0.0.1:5000/v2.0/tokens + + + fs.swift.service..auth.endpoint.prefix + endpoints + + fs.swift.service..http.port + 8080 + + + fs.swift.service..region + RegionOne + + + fs.swift.service..public + true + + +We left with `fs.swift.service..tenant`, `fs.swift.service..username`, +`fs.swift.service..password`. The best way to provide those parameters to --- End diff -- This is still doable at runtime by setting the job conf, isn't it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2042] Prevent unnecessary shuffle trigg...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1048#discussion_r13636450 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.dsl.expressions._ + +class CombiningLimitsSuite extends OptimizerTest { + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = + Batch("Combine Limit", FixedPoint(2), +CombineLimits) :: + Batch("Constant Folding", FixedPoint(3), --- End diff -- do we need the constant folding stuff here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2042] Prevent unnecessary shuffle trigg...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1048#discussion_r13636504 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala --- @@ -374,6 +374,9 @@ class SchemaRDD( override def collect(): Array[Row] = queryExecution.executedPlan.executeCollect() + override def take(num: Int): Array[Row] = +new SchemaRDD(sqlContext, Limit(Literal(num), logicalPlan)).collect() --- End diff -- maybe you can delegate to limit? btw it is strange to me that limit takes an Expression. I can understand why, but I can't see why on earth somebody would want to build a spark sql expression to give a limit, if he could just give it an integer (or some scala expression). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2042] Prevent unnecessary shuffle trigg...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1048#discussion_r13636646 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.dsl.expressions._ + +class CombiningLimitsSuite extends OptimizerTest { + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = + Batch("Combine Limit", FixedPoint(2), +CombineLimits) :: + Batch("Constant Folding", FixedPoint(3), --- End diff -- ic makes sense --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2091][MLLIB] use numpy.dot instead of n...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1035#issuecomment-45709770 merging this in master --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2044] Pluggable interface for shuffles
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1009#issuecomment-45827229 Ok merging this in master. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2099. Report progress while task is runn...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1056#issuecomment-45827315 I haven't looked into your implementation yet, but the block manager also sends heartbeats back to the driver. Perhaps we can consolidate heartbeats. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: 'killFuture' is never used
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1052#issuecomment-45827794 I've merged this in master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2014] Make PySpark store RDDs in MEMORY...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1051#issuecomment-45827838 Now I think about it, perhaps compression should be a setting for individual RDDs? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2044] Pluggable interface for shuffles
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1009#discussion_r13688732 --- Diff: core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala --- @@ -42,10 +42,11 @@ class ShuffledRDD[K, V, P <: Product2[K, V] : ClassTag]( part: Partitioner) extends RDD[P](prev.context, Nil) { - private var serializer: Serializer = null + private var serializer: Option[Serializer] = None + /** Set a serializer for this RDD's shuffle, or null to use the default (spark.serializer) */ def setSerializer(serializer: Serializer): ShuffledRDD[K, V, P] = { -this.serializer = serializer +this.serializer = Option(serializer) --- End diff -- Not if serializer is null --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Cleanup on Connection and ConnectionManager
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1060#issuecomment-45833767 LGTM. Merging this in master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Made splits deprecated in JavaRDDLike
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1062#discussion_r13689356 --- Diff: core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala --- @@ -43,8 +43,11 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def rdd: RDD[T] - /** Set of partitions in this RDD. */ + @deprecated(message: String = "Use partitions instead.", since: String = "0.9.2") --- End diff -- I'm not sure how well this shows up in JavaDoc. Can you test it out? We might need to add the java @Deprecated, and then explain it has been deprecated since 1.0.1. (it is not 0.9.2). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2044] Pluggable interface for shuffles
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1009#discussion_r13690317 --- Diff: core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala --- @@ -42,10 +42,11 @@ class ShuffledRDD[K, V, P <: Product2[K, V] : ClassTag]( part: Partitioner) extends RDD[P](prev.context, Nil) { - private var serializer: Serializer = null + private var serializer: Option[Serializer] = None + /** Set a serializer for this RDD's shuffle, or null to use the default (spark.serializer) */ def setSerializer(serializer: Serializer): ShuffledRDD[K, V, P] = { -this.serializer = serializer +this.serializer = Option(serializer) --- End diff -- ``` scala> var serializer: Option[java.lang.Integer] = null serializer: Option[Integer] = null scala> serializer = Some(1) serializer: Option[Integer] = Some(1) scala> serializer.get res0: Integer = 1 scala> serializer = Some(null) serializer: Option[Integer] = Some(null) scala> serializer.get res1: Integer = null scala> serializer.isDefined res2: Boolean = true scala> serializer = Option(null) serializer: Option[Integer] = None scala> serializer.isDefined res3: Boolean = false ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: fixed typo in docstring for min()
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1065#issuecomment-45838746 Jenkins, test this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: document laziness of parallelize
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1070#issuecomment-45957720 Jenkins, add to whitelist. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: document laziness of parallelize
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1070#issuecomment-45965270 Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: document laziness of parallelize
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1070#issuecomment-45965267 Merged in master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Workaround in Spark for ConcurrentModification...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1000#issuecomment-45983980 LGTM. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Workaround in Spark for ConcurrentModification...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1000#issuecomment-45983988 Jenkins, test this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Workaround in Spark for ConcurrentModification...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1000#issuecomment-46040589 Merging this in master & branch-1.0. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Small correction in Streaming Programming Guid...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1079#issuecomment-46067411 Thanks. Merging this in master & branch-1.0. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SQL] Update SparkSQL in branch-1.0 to match m...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1078#issuecomment-46067459 There were some compilation errors. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SQL] Update SparkSQL and ScalaTest in branch-...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1078#issuecomment-46069659 This looks good. I merged it. Note that the only thing that is remotely scary is we are bumping the version of scalatest. However, that only affects development and doesn't change the binary build at all. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2141] Adding getPersistentRddIds and un...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1082#issuecomment-46079903 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [Spark-2137][SQL] Timestamp UDFs broken
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1081#issuecomment-46079937 Ok I'm merging this in master & branch-1.0. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1201] Do not fully materialize partitio...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1083#discussion_r13779767 --- Diff: core/src/main/scala/org/apache/spark/CacheManager.scala --- @@ -128,4 +76,89 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { } } } + + /** + * Acquire a loading lock for the partition identified by the given block ID. + * + * If the lock is free, just acquire it and return None. Otherwise, another thread is already + * loading the partition, so we wait for it to finish and return the values loaded by the thread. + */ + private def acquireLockForPartition(id: RDDBlockId): Option[Iterator[Any]] = { +loading.synchronized { + if (!loading.contains(id)) { +// If the partition is free, acquire its lock and begin computing its value +loading.add(id) +None + } else { +// Otherwise, wait for another thread to finish and return its result +logInfo(s"Another thread is loading $id, waiting for it to finish...") +while (loading.contains(id)) { + try { +loading.wait() + } catch { +case e: Exception => + logWarning(s"Exception while waiting for another thread to load $id", e) + } +} +logInfo(s"Finished waiting for $id") +/* See whether someone else has successfully loaded it. The main way this would fail + * is for the RDD-level cache eviction policy if someone else has loaded the same RDD + * partition but we didn't want to make space for it. However, that case is unlikely + * because it's unlikely that two threads would work on the same RDD partition. One + * downside of the current code is that threads wait serially if this does happen. */ +val values = blockManager.get(id) +if (!values.isDefined) { + logInfo(s"Whoever was loading $id failed; we'll try it ourselves") + loading.add(id) +} +values + } +} + } + + /** + * Cache the values of a partition, keeping track of any updates in the storage statuses + * of other blocks along the way. + */ + private def cacheValues[T]( + key: BlockId, + value: Iterator[T], + storageLevel: StorageLevel, + updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)]): Iterator[T] = { + +if (!storageLevel.useMemory) { + /* This RDD is not to be cached in memory, so we can just pass the computed values + * as an iterator directly to the BlockManager, rather than first fully unrolling + * it in memory. The latter option potentially uses much more memory and risks OOM + * exceptions that can be avoided. */ + assume(storageLevel.useDisk || storageLevel.useOffHeap, s"Empty storage level for $key!") --- End diff -- Might make sense to remove this assume; in case we add a new storage level in the future, this won't hold any more and because this code is so far away from the storage level code, we will likely forget to update this location. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1201] Do not fully materialize partitio...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1083#discussion_r13779775 --- Diff: core/src/main/scala/org/apache/spark/CacheManager.scala --- @@ -128,4 +76,89 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { } } } + + /** + * Acquire a loading lock for the partition identified by the given block ID. + * + * If the lock is free, just acquire it and return None. Otherwise, another thread is already + * loading the partition, so we wait for it to finish and return the values loaded by the thread. + */ + private def acquireLockForPartition(id: RDDBlockId): Option[Iterator[Any]] = { +loading.synchronized { + if (!loading.contains(id)) { +// If the partition is free, acquire its lock and begin computing its value +loading.add(id) +None + } else { +// Otherwise, wait for another thread to finish and return its result +logInfo(s"Another thread is loading $id, waiting for it to finish...") +while (loading.contains(id)) { + try { +loading.wait() + } catch { +case e: Exception => + logWarning(s"Exception while waiting for another thread to load $id", e) + } +} +logInfo(s"Finished waiting for $id") +/* See whether someone else has successfully loaded it. The main way this would fail + * is for the RDD-level cache eviction policy if someone else has loaded the same RDD + * partition but we didn't want to make space for it. However, that case is unlikely + * because it's unlikely that two threads would work on the same RDD partition. One --- End diff -- This paragraph doesn't make a lot of sense to me. In general it is just unlikely for two threads to work on the same rdd partition. However, if we ever pass the first if (where it returns None), it already means we are in the "unlikely" case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1201] Do not fully materialize partitio...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1083#discussion_r13779776 --- Diff: core/src/main/scala/org/apache/spark/CacheManager.scala --- @@ -128,4 +76,89 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { } } } + + /** + * Acquire a loading lock for the partition identified by the given block ID. + * + * If the lock is free, just acquire it and return None. Otherwise, another thread is already + * loading the partition, so we wait for it to finish and return the values loaded by the thread. + */ + private def acquireLockForPartition(id: RDDBlockId): Option[Iterator[Any]] = { +loading.synchronized { + if (!loading.contains(id)) { +// If the partition is free, acquire its lock and begin computing its value +loading.add(id) +None + } else { +// Otherwise, wait for another thread to finish and return its result +logInfo(s"Another thread is loading $id, waiting for it to finish...") +while (loading.contains(id)) { + try { +loading.wait() + } catch { +case e: Exception => + logWarning(s"Exception while waiting for another thread to load $id", e) + } +} +logInfo(s"Finished waiting for $id") +/* See whether someone else has successfully loaded it. The main way this would fail + * is for the RDD-level cache eviction policy if someone else has loaded the same RDD + * partition but we didn't want to make space for it. However, that case is unlikely + * because it's unlikely that two threads would work on the same RDD partition. One + * downside of the current code is that threads wait serially if this does happen. */ +val values = blockManager.get(id) +if (!values.isDefined) { + logInfo(s"Whoever was loading $id failed; we'll try it ourselves") + loading.add(id) +} +values + } +} + } + + /** + * Cache the values of a partition, keeping track of any updates in the storage statuses + * of other blocks along the way. + */ + private def cacheValues[T]( --- End diff -- instead of cacheValues, how about storeInBlockmanager? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1201] Do not fully materialize partitio...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1083#discussion_r13779778 --- Diff: core/src/main/scala/org/apache/spark/CacheManager.scala --- @@ -46,79 +46,27 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]]) case None => -// Mark the split as loading (unless someone else marks it first) -loading.synchronized { - if (loading.contains(key)) { -logInfo(s"Another thread is loading $key, waiting for it to finish...") -while (loading.contains(key)) { - try { -loading.wait() - } catch { -case e: Exception => - logWarning(s"Got an exception while waiting for another thread to load $key", e) - } -} -logInfo(s"Finished waiting for $key") -/* See whether someone else has successfully loaded it. The main way this would fail - * is for the RDD-level cache eviction policy if someone else has loaded the same RDD - * partition but we didn't want to make space for it. However, that case is unlikely - * because it's unlikely that two threads would work on the same RDD partition. One - * downside of the current code is that threads wait serially if this does happen. */ -blockManager.get(key) match { - case Some(values) => -return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]]) - case None => -logInfo(s"Whoever was loading $key failed; we'll try it ourselves") -loading.add(key) -} - } else { -loading.add(key) - } +// Acquire a lock for loading this partition +// If another thread already holds the lock, wait for it to finish return its results +acquireLockForPartition(key).foreach { values => --- End diff -- Maybe better to avoid the foreach closure here because return within a closure is implemented as a try-catch, and it is very error prone in the future when we wrap this whole block with a try-catch that catches general Exception ... It has happened multiple times in Spark already, and those problems are really hard to find/debug. Best to just avoid it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1970] Update unit test in XORShiftRando...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1073#issuecomment-46080456 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1201] Do not fully materialize partitio...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1083#issuecomment-46080474 BTW can you construct a unit test for this in CacheManagerSuite? Would be good also to add a unit test to test the lock (which existed earlier but had no test for it). Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-937] adding EXITED executor state and n...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/306#issuecomment-46098404 @aarondav can you take a look again at this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Update BasicOperationsSuite.scala
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1084#discussion_r13782625 --- Diff: streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala --- @@ -22,7 +22,7 @@ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.rdd.{BlockRDD, RDD} import org.apache.spark.SparkContext._ -import util.ManualClock +import org.apache.spark.streaming.util.ManualClock --- End diff -- do u mind sorting the imports according to the coding style? https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide#SparkCodeStyleGuide-Imports --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2079] Support batching when serializing...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1023#issuecomment-46098515 Thanks. I'm merging this in master & branch-1.0. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2079] Support batching when serializing...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1023#issuecomment-46098532 Oops sorry I meant to merge another PR. I'm going to take a look at this and revert it if there are any problems. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2079] Support batching when serializing...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1023#discussion_r13782641 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala --- @@ -343,16 +343,11 @@ class SchemaRDD( val pickle = new Pickler iter.map { row => val map: JMap[String, Any] = new java.util.HashMap -// TODO: We place the map in an ArrayList so that the object is pickled to a List[Dict]. -// Ideally we should be able to pickle an object directly into a Python collection so we -// don't have to create an ArrayList every time. -val arr: java.util.ArrayList[Any] = new java.util.ArrayList row.zip(fieldNames).foreach { case (obj, name) => map.put(name, obj) } -arr.add(map) -pickle.dumps(arr) - } +map + }.grouped(10).map(batched => pickle.dumps(batched.toArray)) --- End diff -- Do you know if grouped is stream based or does it materialize the whole thing in memory? If it is the latter, we should rewrite it to to be stream based. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2079] Support batching when serializing...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1023#issuecomment-46098611 Also tagging @ahirreddy to see if this makes sense. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2013] Documentation for saveAsPickleFil...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/983#issuecomment-46098635 Merging this in master & branch-1.0. (This is what I meant to merge, not #1023 ...) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2013] Documentation for saveAsPickleFil...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/983#issuecomment-46098841 This didn't merge cleanly in branch-1.0. I had to apply a manual merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2010] Support for nested data in PySpar...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1041#discussion_r13782687 --- Diff: python/pyspark/sql.py --- @@ -82,6 +82,19 @@ def inferSchema(self, rdd): >>> srdd.collect() == [{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"}, ...{"field1" : 3, "field2": "row3"}] True + +Nested collections are supported, which include array, dict, list, set, and tuple. --- End diff -- can we move this line before the previous unit test block? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2010] Support for nested data in PySpar...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1041#discussion_r13782689 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -298,19 +298,27 @@ class SQLContext(@transient val sparkContext: SparkContext) /** * Peek at the first row of the RDD and infer its schema. - * TODO: We only support primitive types, add support for nested types. */ private[sql] def inferSchema(rdd: RDD[Map[String, _]]): SchemaRDD = { +import scala.collection.JavaConversions._ +def typeFor(obj: Any): DataType = obj match { --- End diff -- we gotta have a few of similar implementations already for this. I think @yhuai was consolidating them. @yhuai can you comment on whether this would still be necessary after your consolidation? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [WIP][SPARK-1477]: Add the lifecycle interface...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/991#discussion_r13782743 --- Diff: core/src/main/java/org/apache/spark/Service.java --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark; + +import java.io.Closeable; +import java.io.IOException; + +// copy from hadoop +public interface Service extends Closeable { --- End diff -- shouldn't we declare this in scala? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2010] Support for nested data in PySpar...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1041#discussion_r13782766 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -298,19 +298,27 @@ class SQLContext(@transient val sparkContext: SparkContext) /** * Peek at the first row of the RDD and infer its schema. - * TODO: We only support primitive types, add support for nested types. */ private[sql] def inferSchema(rdd: RDD[Map[String, _]]): SchemaRDD = { +import scala.collection.JavaConversions._ +def typeFor(obj: Any): DataType = obj match { --- End diff -- @kanzhang do you mind adding a todo here to move this once @yhuai's type system is done? This PR looks good other than adding a todo here and moving the comment above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2010] Support for nested data in PySpar...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1041#issuecomment-46213668 Thanks. I've merged this in master & branch-1.0. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SparkSQL] allow UDF on struct
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/796#issuecomment-46250174 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Minor fix: made "EXPLAIN" output to play well ...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1097#issuecomment-46251700 Thanks. I'm merging this one. The test that failed was a flume test that is sometimes flaky. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Follow up of PR #1071 for Java API
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1085#issuecomment-46252146 FYI This didn't get merged into branch-1.0. I did a manual cherry pick. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1063 Add .sortBy(f) method on RDD
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/369#issuecomment-46343296 I will test this today. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1063 Add .sortBy(f) method on RDD
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/369#issuecomment-46345169 This looks good to me. I will merge it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1293 [SQL] [WIP] Parquet support for nes...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/360#discussion_r13876519 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala --- @@ -0,0 +1,667 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.parquet + +import scala.collection.mutable.{Buffer, ArrayBuffer, HashMap} + +import parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, Converter} +import parquet.schema.MessageType + +import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.catalyst.expressions.{GenericRow, Row, Attribute} +import org.apache.spark.sql.parquet.CatalystConverter.FieldType + +/** + * Collection of converters of Parquet types (group and primitive types) that + * model arrays and maps. The conversions are partly based on the AvroParquet + * converters that are part of Parquet in order to be able to process these + * types. + * + * There are several types of converters: + * + * [[org.apache.spark.sql.parquet.CatalystPrimitiveConverter]] for primitive + * (numeric, boolean and String) types + * [[org.apache.spark.sql.parquet.CatalystNativeArrayConverter]] for arrays + * of native JVM element types; note: currently null values are not supported! + * [[org.apache.spark.sql.parquet.CatalystArrayConverter]] for arrays of + * arbitrary element types (including nested element types); note: currently + * null values are not supported! + * [[org.apache.spark.sql.parquet.CatalystStructConverter]] for structs + * [[org.apache.spark.sql.parquet.CatalystMapConverter]] for maps; note: + * currently null values are not supported! + * [[org.apache.spark.sql.parquet.CatalystPrimitiveRowConverter]] for rows + * of only primitive element types + * [[org.apache.spark.sql.parquet.CatalystGroupConverter]] for other nested + * records, including the top-level row record + * + */ + +private[sql] object CatalystConverter { + // The type internally used for fields + type FieldType = StructField + + // This is mostly Parquet convention (see, e.g., `ConversionPatterns`). + // Note that "array" for the array elements is chosen by ParquetAvro. + // Using a different value will result in Parquet silently dropping columns. + val ARRAY_ELEMENTS_SCHEMA_NAME = "array" + val MAP_KEY_SCHEMA_NAME = "key" + val MAP_VALUE_SCHEMA_NAME = "value" + val MAP_SCHEMA_NAME = "map" + + // TODO: consider using Array[T] for arrays to avoid boxing of primitive types + type ArrayScalaType[T] = Seq[T] + type StructScalaType[T] = Seq[T] + type MapScalaType[K, V] = Map[K, V] + + protected[parquet] def createConverter( + field: FieldType, + fieldIndex: Int, + parent: CatalystConverter): Converter = { +val fieldType: DataType = field.dataType +fieldType match { + // For native JVM types we use a converter with native arrays + case ArrayType(elementType: NativeType) => { +new CatalystNativeArrayConverter(elementType, fieldIndex, parent) + } + // This is for other types of arrays, including those with nested fields + case ArrayType(elementType: DataType) => { +new CatalystArrayConverter(elementType, fieldIndex, parent) + } + case StructType(fields: Seq[StructField]) => { +new CatalystStructConverter(fields, fieldIndex, parent) + } + case MapType(keyType: DataType, valueType: DataType) => { +new CatalystMapConverter( + Seq( +new FieldType(MAP_KEY_SCHEMA_NAME, keyType, false), +new FieldType(MAP_VALUE_SCHEMA_NAME, valueType, true)), +fieldIndex, +parent) + } + // Strings, Shorts and Bytes do not have
[GitHub] spark pull request: SPARK-1063 Add .sortBy(f) method on RDD
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/369#issuecomment-46348842 There was a conflict that I had to merge manually. Take a look at master to make sure everything is ok. I did compile and ran a couple things. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: spark-submit: add exec at the end of the scrip...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/858#issuecomment-46353884 Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [Spark 2060][SQL] Querying JSON Datasets with ...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/999#issuecomment-46363656 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [Spark 2060][SQL] Querying JSON Datasets with ...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/999#discussion_r13891473 --- Diff: docs/sql-programming-guide.md --- @@ -91,14 +91,33 @@ of its decedents. To create a basic SQLContext, all you need is a SparkContext. {% highlight python %} from pyspark.sql import SQLContext -sqlCtx = SQLContext(sc) +sqlContext = SQLContext(sc) {% endhighlight %} -## Running SQL on RDDs +# Data Sources + + + +Spark SQL supports operating on a variety of data sources though the SchemaRDD interface. --- End diff -- best to put ... around SchemaRDD --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---