[GitHub] spark issue #19339: [SPARK-22112][PYSPARK] Add an API to create a DataFrame ...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19339 @goldmedal, are you online now? how about fixing the PR title to say something like .. "Supports RDD of strings as input in spark.read.csv in PySpark"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19357: [SPARK-21322][SQL][WIP] support histogram in filter card...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19357 Jenkins, add to whitelist --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19357: [SPARK-21322][SQL][WIP] support histogram in filter card...
Github user ron8hu commented on the issue: https://github.com/apache/spark/pull/19357 cc @wzhfy Please review code first before I request the community to review it. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19335: mapPartitions Api
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19335 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19348: [BUILD] Close stale PRs
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19348 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19347: Branch 2.2 sparkmlib's output of many algorithms ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19347 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19244: SPARK-22021
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19244 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18474: [SPARK-21235][TESTS] UTest should clear temp resu...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/18474 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18253: [SPARK-18838][CORE] Introduce multiple queues in ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/18253 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18897: [SPARK-21655][YARN] Support Kill CLI for Yarn mod...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/18897 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19334: Branch 1.6
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19334 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15009: [SPARK-17443][SPARK-11035] Stop Spark Application...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/15009 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19315: [MINOR][ML]Updated english.txt word ordering
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19315 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18978: [SPARK-21737][YARN]Create communication channel b...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/18978 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19295: [SPARK-22080][SQL] Adds support for allowing user...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19295 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19356: Merge pull request #1 from apache/master
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19356 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19152: [SPARK-21915][ML][PySpark] Model 1 and Model 2 Pa...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19152 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19238: [SPARK-22016][SQL] Add HiveDialect for JDBC conne...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19238 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19300: [SPARK-22082][SparkR]Spelling mistake: "choosen" ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19300 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19236: SPARK-22015: Remove usage of non-used private fie...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19236 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13794: [SPARK-15574][ML][PySpark] Python meta-algorithms...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/13794 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19357: support histogram in filter cardinality estimation
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19357 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19327: [WIP] Implement stream-stream outer joins.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19327 **[Test build #82212 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82212/testReport)** for PR 19327 at commit [`680c51a`](https://github.com/apache/spark/commit/680c51a9cedd1370dec9d249c2eba8cdfc61fbd8). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19348: [BUILD] Close stale PRs
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19348 Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17503: [SPARK-3159][MLlib] Check for reducible DecisionTree
Github user WeichenXu123 commented on the issue: https://github.com/apache/spark/pull/17503 Can you do some benchmark to show how much improvement this change will bring ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19357: support histogram in filter cardinality estimatio...
GitHub user ron8hu opened a pull request: https://github.com/apache/spark/pull/19357 support histogram in filter cardinality estimation ## What changes were proposed in this pull request? Histogram is effective in dealing with skewed distribution. After we generate histogram information for column statistics, we need to adjust filter estimation based on histogram data structure. ## How was this patch tested? We revised all the unit test cases by including histogram data structure. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ron8hu/spark createhistogram Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19357.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19357 commit 46af54d9c86fa1e5322fdd92ed47fe3d419dd966 Author: Ron Hu Date: 2017-09-26T23:33:49Z support histogram in filter cardinality estimation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19339: [SPARK-22112][PYSPARK] Add an API to create a DataFrame ...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19339 In a quick look, both tests failures: ``` File "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/sql/readwriter.py", line 303, in parquet return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths))) ``` Sounds not related with the current PR (my rough assumption is, it's, IMHO, instability of Py4J). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19336: [SPARK-21947][SS] Check and report error when mon...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19336#discussion_r141215037 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala --- @@ -516,6 +516,14 @@ class UnsupportedOperationsSuite extends SparkFunSuite { testOutputMode(Update, shouldSupportAggregation = true, shouldSupportNonAggregation = true) testOutputMode(Complete, shouldSupportAggregation = true, shouldSupportNonAggregation = false) + // Unsupported expressions in streaming plan + assertNotSupportedInStreamingPlan( +"MonotonicallyIncreasingID", +streamRelation.select($"a", MonotonicallyIncreasingID()), --- End diff -- nit. We can remove `$"a"` here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19355: [SPARK-22130][Core] UTF8String.trim() scans " " twice
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19355 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19355: [SPARK-22130][Core] UTF8String.trim() scans " " twice
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19355 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82210/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19355: [SPARK-22130][Core] UTF8String.trim() scans " " twice
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19355 **[Test build #82210 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82210/testReport)** for PR 19355 at commit [`d39c648`](https://github.com/apache/spark/commit/d39c648fe8ed690e4aa309f4e58e8484792cfc6c). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19348: [BUILD] Close stale PRs
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19348 Added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19348: [BUILD] Close stale PRs
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/19348 Add: #15009 #18253 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19020: [SPARK-3181] [ML] Implement huber loss for LinearRegress...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/19020 > We have two candidate name: epsilon or m I see; that seems fine then, though I worry that we use "epsilon" in MLlib (tests) for "a very small positive number." Can we document it more clearly, including the comment that it matches sklearn and is "M" from the paper? > provide the estimated scaling factor (sigma from the paper) I'd say: * Either we provide it as 1 for regular linear regression (since that is technically correct) * Or we take this as indication that @sethah 's comment about separating the classes is better. Re: @sethah 's comment about separating classes, I'll comment in the JIRA since that's a bigger discussion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19270: [SPARK-21809] : Change Stage Page to use datatables to s...
Github user pgandhi999 commented on the issue: https://github.com/apache/spark/pull/19270 Ok, I will look into it. I am currently fixing ui bugs and unit tests, so will commit those changes first, then will look into the above issue. Thank you. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19270: [SPARK-21809] : Change Stage Page to use datatables to s...
Github user ajbozarth commented on the issue: https://github.com/apache/spark/pull/19270 Ok so I'm still doing more testing but I've narrowed the above problem. The above error is occurring when using either local or standalone, the error doesn't appear when using yarn. I'll continue my testing and review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19327: [WIP] Implement stream-stream outer joins.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19327 **[Test build #82211 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82211/testReport)** for PR 19327 at commit [`a8aa356`](https://github.com/apache/spark/commit/a8aa35691d7603c0ca283dc2aff295031bcc3b9e). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compression.co...
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/19218 Hi, @fjh100456 . I left a few comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r141188386 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -728,4 +728,120 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter assert(e.contains("mismatched input 'ROW'")) } } + + test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " + +"and 'spark.sql.parquet.compression.codec' taking effect on hive table writing") { +case class CompressionConf(name: String, codeC: String) + +case class TableDefine(tableName: String, isPartitioned: Boolean, format: String, + compressionConf: Option[CompressionConf]) { + def createTable(rootDir: File): Unit = { +val compression = compressionConf.map(cf => s"'${cf.name}'='${cf.codeC}'") +sql( + s""" + |CREATE TABLE $tableName(a int) + |${ if (isPartitioned) "PARTITIONED BY (p int)" else "" } + |STORED AS $format + |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName' + |${ if (compressionConf.nonEmpty) s"TBLPROPERTIES(${compression.get})" else "" } +""".stripMargin) + } + + def insertOverwriteTable(): Unit = { +sql( + s""" + |INSERT OVERWRITE TABLE $tableName + |${ if (isPartitioned) "partition (p=1)" else "" } + |SELECT * from table_source +""".stripMargin) + } + + def getDirFiles(file: File): List[File] = { +if (!file.exists()) Nil +else if (file.isFile) List(file) +else { + file.listFiles().filterNot(_.getName.startsWith(".hive-staging")) +.groupBy(_.isFile).flatMap { +case (isFile, files) if isFile => files.toList +case (_, dirs) => dirs.flatMap(getDirFiles) + }.toList +} + } + + def getTableSize: Long = { +var totalSize = 0L +withTempDir { tmpDir => + withTable(tableName) { +createTable(tmpDir) +insertOverwriteTable() +val path = s"${tmpDir.getPath.stripSuffix("/")}/$tableName" +val dir = new File(path) +val files = getDirFiles(dir).filter(_.getName.startsWith("part-")) +totalSize = files.map(_.length()).sum + } +} +totalSize + } +} + +def checkParquetCompressionCodec(isPartitioned: Boolean, tableCodec: String, + sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = { + val tableOrg = TableDefine(s"tbl_parquet$tableCodec", isPartitioned, "parquet", +Some(CompressionConf("parquet.compression", tableCodec))) + val tableOrgSize = tableOrg.getTableSize + + withSQLConf("spark.sql.parquet.compression.codec" -> sessionCodec) { +// priority check, when table-level compression conf was set, expecting +// table-level compression conf is not affected by the session conf, and table-level +// compression conf takes precedence even the two conf of codec is different +val tableOrgSessionConfSize = tableOrg.getTableSize +assert(tableOrgSize == tableOrgSessionConfSize) + +// check session conf of compression codec taking effect +val table = TableDefine(s"tbl_parquet", isPartitioned, "parquet", None) +assert(f(tableOrg.getTableSize, table.getTableSize)) + } +} + +def checkOrcCompressionCodec(isPartitioned: Boolean, tableCodec: String, + sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = { + val tableOrg = TableDefine(s"tbl_orc$tableCodec", isPartitioned, "orc", +Some(CompressionConf("orc.compress", tableCodec))) + val tableOrgSize = tableOrg.getTableSize + + withSQLConf("spark.sql.orc.compression.codec" -> sessionCodec) { +// priority check, when table-level compression conf was set, expecting +// table-level compression conf is not affected by the session conf, and table-level +// compression conf takes precedence even the two conf of codec is different +val tableOrgSessionConfSize = tableOrg.getTableSize +assert(tableOrgSize == tableOrgSessionConfSize) + +// check session conf of compression codec taking effect +val table = TableDefine(s"tbl_orc", isPartitioned, "orc", None) +assert(f(tableOrg.getTableSize, table.getTableSize)) + } +} + +withTempView("table_source") { + (0 until 10).toDF("a").crea
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r141187890 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -728,4 +728,120 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter assert(e.contains("mismatched input 'ROW'")) } } + + test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " + +"and 'spark.sql.parquet.compression.codec' taking effect on hive table writing") { --- End diff -- - `[SPARK-21786]` -> `SPARK-21786` - `spark.sql.parquet.compression.codec` -> `spark.sql.orc.compression.codec`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18098: [SPARK-16944][Mesos] Improve data locality when launchin...
Github user gpang commented on the issue: https://github.com/apache/spark/pull/18098 Hi @srowen @vanzin , would you be able to help me with this PR? Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r141187555 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -728,4 +728,120 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter assert(e.contains("mismatched input 'ROW'")) } } + + test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " + +"and 'spark.sql.parquet.compression.codec' taking effect on hive table writing") { +case class CompressionConf(name: String, codeC: String) + +case class TableDefine(tableName: String, isPartitioned: Boolean, format: String, + compressionConf: Option[CompressionConf]) { + def createTable(rootDir: File): Unit = { +val compression = compressionConf.map(cf => s"'${cf.name}'='${cf.codeC}'") +sql( + s""" + |CREATE TABLE $tableName(a int) + |${ if (isPartitioned) "PARTITIONED BY (p int)" else "" } + |STORED AS $format + |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName' + |${ if (compressionConf.nonEmpty) s"TBLPROPERTIES(${compression.get})" else "" } +""".stripMargin) + } + + def insertOverwriteTable(): Unit = { +sql( + s""" + |INSERT OVERWRITE TABLE $tableName + |${ if (isPartitioned) "partition (p=1)" else "" } + |SELECT * from table_source +""".stripMargin) + } + + def getDirFiles(file: File): List[File] = { +if (!file.exists()) Nil +else if (file.isFile) List(file) +else { + file.listFiles().filterNot(_.getName.startsWith(".hive-staging")) +.groupBy(_.isFile).flatMap { +case (isFile, files) if isFile => files.toList +case (_, dirs) => dirs.flatMap(getDirFiles) + }.toList +} + } + + def getTableSize: Long = { +var totalSize = 0L +withTempDir { tmpDir => + withTable(tableName) { +createTable(tmpDir) +insertOverwriteTable() +val path = s"${tmpDir.getPath.stripSuffix("/")}/$tableName" +val dir = new File(path) +val files = getDirFiles(dir).filter(_.getName.startsWith("part-")) +totalSize = files.map(_.length()).sum + } +} +totalSize + } +} + +def checkParquetCompressionCodec(isPartitioned: Boolean, tableCodec: String, + sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = { + val tableOrg = TableDefine(s"tbl_parquet$tableCodec", isPartitioned, "parquet", +Some(CompressionConf("parquet.compression", tableCodec))) + val tableOrgSize = tableOrg.getTableSize + + withSQLConf("spark.sql.parquet.compression.codec" -> sessionCodec) { +// priority check, when table-level compression conf was set, expecting +// table-level compression conf is not affected by the session conf, and table-level +// compression conf takes precedence even the two conf of codec is different +val tableOrgSessionConfSize = tableOrg.getTableSize +assert(tableOrgSize == tableOrgSessionConfSize) + +// check session conf of compression codec taking effect +val table = TableDefine(s"tbl_parquet", isPartitioned, "parquet", None) +assert(f(tableOrg.getTableSize, table.getTableSize)) + } +} + +def checkOrcCompressionCodec(isPartitioned: Boolean, tableCodec: String, + sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = { + val tableOrg = TableDefine(s"tbl_orc$tableCodec", isPartitioned, "orc", +Some(CompressionConf("orc.compress", tableCodec))) + val tableOrgSize = tableOrg.getTableSize + + withSQLConf("spark.sql.orc.compression.codec" -> sessionCodec) { +// priority check, when table-level compression conf was set, expecting +// table-level compression conf is not affected by the session conf, and table-level +// compression conf takes precedence even the two conf of codec is different +val tableOrgSessionConfSize = tableOrg.getTableSize +assert(tableOrgSize == tableOrgSessionConfSize) + +// check session conf of compression codec taking effect +val table = TableDefine(s"tbl_orc", isPartitioned, "orc", None) +assert(f(tableOrg.getTableSize, table.getTableSize)) + } +} + +withTempView("table_source") { + (0 until 10).toDF("a").crea
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r141186378 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -728,4 +728,120 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter assert(e.contains("mismatched input 'ROW'")) } } + + test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " + +"and 'spark.sql.parquet.compression.codec' taking effect on hive table writing") { +case class CompressionConf(name: String, codeC: String) + +case class TableDefine(tableName: String, isPartitioned: Boolean, format: String, + compressionConf: Option[CompressionConf]) { + def createTable(rootDir: File): Unit = { +val compression = compressionConf.map(cf => s"'${cf.name}'='${cf.codeC}'") +sql( + s""" + |CREATE TABLE $tableName(a int) + |${ if (isPartitioned) "PARTITIONED BY (p int)" else "" } + |STORED AS $format + |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName' + |${ if (compressionConf.nonEmpty) s"TBLPROPERTIES(${compression.get})" else "" } +""".stripMargin) + } + + def insertOverwriteTable(): Unit = { +sql( + s""" + |INSERT OVERWRITE TABLE $tableName + |${ if (isPartitioned) "partition (p=1)" else "" } + |SELECT * from table_source +""".stripMargin) + } + + def getDirFiles(file: File): List[File] = { +if (!file.exists()) Nil +else if (file.isFile) List(file) +else { + file.listFiles().filterNot(_.getName.startsWith(".hive-staging")) +.groupBy(_.isFile).flatMap { +case (isFile, files) if isFile => files.toList +case (_, dirs) => dirs.flatMap(getDirFiles) + }.toList +} + } + + def getTableSize: Long = { +var totalSize = 0L +withTempDir { tmpDir => + withTable(tableName) { +createTable(tmpDir) +insertOverwriteTable() +val path = s"${tmpDir.getPath.stripSuffix("/")}/$tableName" +val dir = new File(path) +val files = getDirFiles(dir).filter(_.getName.startsWith("part-")) +totalSize = files.map(_.length()).sum + } +} +totalSize + } +} + +def checkParquetCompressionCodec(isPartitioned: Boolean, tableCodec: String, + sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = { + val tableOrg = TableDefine(s"tbl_parquet$tableCodec", isPartitioned, "parquet", +Some(CompressionConf("parquet.compression", tableCodec))) + val tableOrgSize = tableOrg.getTableSize + + withSQLConf("spark.sql.parquet.compression.codec" -> sessionCodec) { +// priority check, when table-level compression conf was set, expecting +// table-level compression conf is not affected by the session conf, and table-level +// compression conf takes precedence even the two conf of codec is different +val tableOrgSessionConfSize = tableOrg.getTableSize +assert(tableOrgSize == tableOrgSessionConfSize) + +// check session conf of compression codec taking effect +val table = TableDefine(s"tbl_parquet", isPartitioned, "parquet", None) +assert(f(tableOrg.getTableSize, table.getTableSize)) + } +} + +def checkOrcCompressionCodec(isPartitioned: Boolean, tableCodec: String, + sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = { + val tableOrg = TableDefine(s"tbl_orc$tableCodec", isPartitioned, "orc", +Some(CompressionConf("orc.compress", tableCodec))) + val tableOrgSize = tableOrg.getTableSize + + withSQLConf("spark.sql.orc.compression.codec" -> sessionCodec) { +// priority check, when table-level compression conf was set, expecting +// table-level compression conf is not affected by the session conf, and table-level +// compression conf takes precedence even the two conf of codec is different +val tableOrgSessionConfSize = tableOrg.getTableSize +assert(tableOrgSize == tableOrgSessionConfSize) + +// check session conf of compression codec taking effect +val table = TableDefine(s"tbl_orc", isPartitioned, "orc", None) +assert(f(tableOrg.getTableSize, table.getTableSize)) --- End diff -- You may want to check the codec explicitly like [HiveDDLSuite](https://github.com/
[GitHub] spark issue #19294: [SPARK-21549][CORE] Respect OutputFormats with no output...
Github user szhem commented on the issue: https://github.com/apache/spark/pull/19294 Hello guys, are there a change for this patch to be merged to master? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19272: [Spark-21842][Mesos] Support Kerberos ticket renewal and...
Github user kalvinnchau commented on the issue: https://github.com/apache/spark/pull/19272 @ArtRand thanks! I've been testing a local version of doing that, I'll pull that change in and test it as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18887: [SPARK-20642][core] Store FsHistoryProvider listing data...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18887 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82206/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18887: [SPARK-20642][core] Store FsHistoryProvider listing data...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18887 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18887: [SPARK-20642][core] Store FsHistoryProvider listing data...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18887 **[Test build #82206 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82206/testReport)** for PR 18887 at commit [`5eff2c5`](https://github.com/apache/spark/commit/5eff2c5f0891c6ea3fcfbbc7dacbbaf56c1d1788). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19355: [SPARK-22130][Core] UTF8String.trim() scans " " twice
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19355 **[Test build #82210 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82210/testReport)** for PR 19355 at commit [`d39c648`](https://github.com/apache/spark/commit/d39c648fe8ed690e4aa309f4e58e8484792cfc6c). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19355: [SPARK-22130][Core] UTF8String.trim() scans " " twice
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/19355 Retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19355: [SPARK-22130][Core] UTF8String.trim() scans " " twice
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19355 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19355: [SPARK-22130][Core] UTF8String.trim() scans " " twice
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19355 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82209/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19355: [SPARK-22130][Core] UTF8String.trim() scans " " twice
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19355 **[Test build #82209 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82209/testReport)** for PR 19355 at commit [`d39c648`](https://github.com/apache/spark/commit/d39c648fe8ed690e4aa309f4e58e8484792cfc6c). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19272: [Spark-21842][Mesos] Support Kerberos ticket renewal and...
Github user ArtRand commented on the issue: https://github.com/apache/spark/pull/19272 Hey @kalvinnchau good catch on the first renewal time. I believe I addressed it. Have a look. Thanks again. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19355: [SPARK-22130][Core] UTF8String.trim() scans " " twice
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19355 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82205/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19355: [SPARK-22130][Core] UTF8String.trim() scans " " twice
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19355 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19355: [SPARK-22130][Core] UTF8String.trim() scans " " twice
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19355 **[Test build #82205 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82205/testReport)** for PR 19355 at commit [`243f681`](https://github.com/apache/spark/commit/243f681af922fe414db5194f1a1765328247d9ce). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19343: [SPARK-22121][SQL] Correct database location for namenod...
Github user squito commented on the issue: https://github.com/apache/spark/pull/19343 OK, closing this and the jira --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19343: [SPARK-22121][SQL] Correct database location for ...
Github user squito closed the pull request at: https://github.com/apache/spark/pull/19343 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19327: [WIP] Implement stream-stream outer joins.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19327 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19327: [WIP] Implement stream-stream outer joins.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19327 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82208/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19327: [WIP] Implement stream-stream outer joins.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19327 **[Test build #82208 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82208/testReport)** for PR 19327 at commit [`55c590c`](https://github.com/apache/spark/commit/55c590c996e3504d6353b33dad7760c98608cb55). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19343: [SPARK-22121][SQL] Correct database location for namenod...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19343 Thanks! Maybe we can close it now and revisit it when we have a better way to resolve the file system specific issues? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19356: Merge pull request #1 from apache/master
Github user srowen commented on the issue: https://github.com/apache/spark/pull/19356 Close this @yaozhang2016 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19349: [SPARK-22125][PYSPARK][SQL] Enable Arrow Stream format f...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/19349 Nice job on refactoring `PythonRunner`! I think we should just replace the arrow file format with stream format for pandas udf instead of having a new conf to enable it, as long as all the issues are worked out. Along with being a little faster, it's also easier on memory usage. I'd like to do the same for `toPandas()` also, but that can be a followup. Is it possible to do away with the SQLConf and maybe rename some of these classes to be more general, e.g. `ArrowStreamPythonUDFRunner` -> `ArrowPythonRunner`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19353: [SPARK-22103][FOLLOWUP] Rename addExtraCode to addInnerC...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19353 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19353: [SPARK-22103][FOLLOWUP] Rename addExtraCode to addInnerC...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19353 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82204/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19353: [SPARK-22103][FOLLOWUP] Rename addExtraCode to addInnerC...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19353 **[Test build #82204 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82204/testReport)** for PR 19353 at commit [`c0488fb`](https://github.com/apache/spark/commit/c0488fbbfa3abb441198aa931ada5d6001d5029f). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19356: Merge pull request #1 from apache/master
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19356 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19356: Merge pull request #1 from apache/master
GitHub user yaozhang2016 opened a pull request: https://github.com/apache/spark/pull/19356 Merge pull request #1 from apache/master update from origin ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/yaozhang2016/spark master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19356.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19356 commit 82af7606cfa6bde3c7d35529a18db372af12230c Author: yaozhang2016 Date: 2017-07-17T15:22:11Z Merge pull request #1 from apache/master update from origin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19349: [SPARK-22125][PYSPARK][SQL] Enable Arrow Stream f...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19349#discussion_r141142944 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowStreamPythonUDFRunner.scala --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io._ +import java.net._ +import java.util.concurrent.atomic.AtomicBoolean + +import scala.collection.JavaConverters._ + +import org.apache.arrow.vector.VectorSchemaRoot +import org.apache.arrow.vector.stream.{ArrowStreamReader, ArrowStreamWriter} + +import org.apache.spark._ +import org.apache.spark.api.python._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.arrow.{ArrowUtils, ArrowWriter} +import org.apache.spark.sql.execution.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnVector} +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils + +/** + * Similar to `PythonUDFRunner`, but exchange data with Python worker via Arrow stream. + */ +class ArrowStreamPythonUDFRunner( +funcs: Seq[ChainedPythonFunctions], +batchSize: Int, +bufferSize: Int, +reuseWorker: Boolean, +evalType: Int, +argOffsets: Array[Array[Int]], +schema: StructType) + extends BasePythonRunner[InternalRow, ColumnarBatch]( +funcs, bufferSize, reuseWorker, evalType, argOffsets) { + + protected override def newWriterThread( + env: SparkEnv, + worker: Socket, + inputIterator: Iterator[InternalRow], + partitionIndex: Int, + context: TaskContext): WriterThread = { +new WriterThread(env, worker, inputIterator, partitionIndex, context) { + + override def writeCommand(dataOut: DataOutputStream): Unit = { +dataOut.writeInt(funcs.length) +funcs.zip(argOffsets).foreach { case (chained, offsets) => + dataOut.writeInt(offsets.length) + offsets.foreach { offset => +dataOut.writeInt(offset) + } + dataOut.writeInt(chained.funcs.length) + chained.funcs.foreach { f => +dataOut.writeInt(f.command.length) +dataOut.write(f.command) + } +} + } + + override def writeIteratorToStream(dataOut: DataOutputStream): Unit = { +val arrowSchema = ArrowUtils.toArrowSchema(schema) +val allocator = ArrowUtils.rootAllocator.newChildAllocator( + s"stdout writer for $pythonExec", 0, Long.MaxValue) + +val root = VectorSchemaRoot.create(arrowSchema, allocator) +val arrowWriter = ArrowWriter.create(root) + +var closed = false + +context.addTaskCompletionListener { _ => + if (!closed) { +root.close() +allocator.close() + } +} + +val writer = new ArrowStreamWriter(root, null, dataOut) +writer.start() + +Utils.tryWithSafeFinally { + while (inputIterator.hasNext) { +var rowCount = 0 +while (inputIterator.hasNext && (batchSize <= 0 || rowCount < batchSize)) { + val row = inputIterator.next() + arrowWriter.write(row) + rowCount += 1 +} +arrowWriter.finish() +writer.writeBatch() +arrowWriter.reset() + } +} { + writer.end() + root.close() + allocator.close() + closed = true +} + } +} + } + + protected override def newReaderIterator( + stream: DataInputStream, + writerThread: WriterThread, + startTime: Long, + env: SparkEnv, + worker: Socket, + released: AtomicBoolean, +
[GitHub] spark issue #19355: [SPARK-22130][Core] UTF8String.trim() scans " " twice
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19355 **[Test build #82209 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82209/testReport)** for PR 19355 at commit [`d39c648`](https://github.com/apache/spark/commit/d39c648fe8ed690e4aa309f4e58e8484792cfc6c). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19327: [WIP] Implement stream-stream outer joins.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19327 **[Test build #82208 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82208/testReport)** for PR 19327 at commit [`55c590c`](https://github.com/apache/spark/commit/55c590c996e3504d6353b33dad7760c98608cb55). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19327: [WIP] Implement stream-stream outer joins.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19327 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19327: [WIP] Implement stream-stream outer joins.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19327 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82207/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19327: [WIP] Implement stream-stream outer joins.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19327 **[Test build #82207 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82207/testReport)** for PR 19327 at commit [`eb48277`](https://github.com/apache/spark/commit/eb48277e1d96d2771df620a491ec1dfe232eea7d). * This patch **fails to build**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19186: [SPARK-21972][ML] Add param handlePersistence
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19186 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82200/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19186: [SPARK-21972][ML] Add param handlePersistence
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19186 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19186: [SPARK-21972][ML] Add param handlePersistence
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19186 **[Test build #82200 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82200/testReport)** for PR 19186 at commit [`c168e7d`](https://github.com/apache/spark/commit/c168e7d0494da26acfc98080e619a2b6ce4f6a95). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19327: [WIP] Implement stream-stream outer joins.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19327 **[Test build #82207 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82207/testReport)** for PR 19327 at commit [`eb48277`](https://github.com/apache/spark/commit/eb48277e1d96d2771df620a491ec1dfe232eea7d). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19355: [SPARK-22130][Core] UTF8String.trim() scans " " t...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/19355#discussion_r141131388 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java --- @@ -501,14 +501,13 @@ public UTF8String trim() { int e = this.numBytes - 1; // skip all of the space (0x20) in the left side while (s < this.numBytes && getByte(s) == 0x20) s++; -// skip all of the space (0x20) in the right side -while (e >= 0 && getByte(e) == 0x20) e--; -if (s > e) { +if (s == this.numBytes) { // empty string return EMPTY_UTF8; -} else { - return copyUTF8String(s, e); } +// skip all of the space (0x20) in the right side +while (e >= 0 && getByte(e) == 0x20) e--; --- End diff -- Nit: while you're optimizing this, you can bring the declaration of e down here, as it won't be used unless there's a non-space char. I think this condition can start with `e > s` too. At the end, s and e point to the first and last non-space char. When the loop starts, s points to a non-space char. So you can stop when e == s; this is the case of one non-space char. Might be worth adding test cases for an empty string, and single-non-char string too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18887: [SPARK-20642][core] Store FsHistoryProvider listing data...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18887 **[Test build #82206 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82206/testReport)** for PR 18887 at commit [`5eff2c5`](https://github.com/apache/spark/commit/5eff2c5f0891c6ea3fcfbbc7dacbbaf56c1d1788). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r141127050 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -424,208 +459,105 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } -applications.get(appId) match { - case Some(appInfo) => -try { - // If no attempt is specified, or there is no attemptId for attempts, return all attempts - appInfo.attempts.filter { attempt => -attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get == attemptId.get - }.foreach { attempt => -val logPath = new Path(logDir, attempt.logPath) -zipFileToStream(logPath, attempt.logPath, zipStream) - } -} finally { - zipStream.close() +val app = try { + load(appId) +} catch { + case _: NoSuchElementException => +throw new SparkException(s"Logs for $appId not found.") +} + +try { + // If no attempt is specified, or there is no attemptId for attempts, return all attempts + attemptId +.map { id => app.attempts.filter(_.info.attemptId == Some(id)) } +.getOrElse(app.attempts) +.map(_.logPath) +.foreach { log => + zipFileToStream(new Path(logDir, log), log, zipStream) } - case None => throw new SparkException(s"Logs for $appId not found.") +} finally { + zipStream.close() } } /** - * Replay the log files in the list and merge the list of old applications with new ones + * Replay the given log file, saving the application in the listing db. */ protected def mergeApplicationListing(fileStatus: FileStatus): Unit = { -val newAttempts = try { - val eventsFilter: ReplayEventsFilter = { eventString => -eventString.startsWith(APPL_START_EVENT_PREFIX) || - eventString.startsWith(APPL_END_EVENT_PREFIX) || - eventString.startsWith(LOG_START_EVENT_PREFIX) - } - - val logPath = fileStatus.getPath() - val appCompleted = isApplicationCompleted(fileStatus) - - // Use loading time as lastUpdated since some filesystems don't update modifiedTime - // each time file is updated. However use modifiedTime for completed jobs so lastUpdated - // won't change whenever HistoryServer restarts and reloads the file. - val lastUpdated = if (appCompleted) fileStatus.getModificationTime else clock.getTimeMillis() - - val appListener = replay(fileStatus, appCompleted, new ReplayListenerBus(), eventsFilter) - - // Without an app ID, new logs will render incorrectly in the listing page, so do not list or - // try to show their UI. - if (appListener.appId.isDefined) { -val attemptInfo = new FsApplicationAttemptInfo( - logPath.getName(), - appListener.appName.getOrElse(NOT_STARTED), - appListener.appId.getOrElse(logPath.getName()), - appListener.appAttemptId, - appListener.startTime.getOrElse(-1L), - appListener.endTime.getOrElse(-1L), - lastUpdated, - appListener.sparkUser.getOrElse(NOT_STARTED), - appCompleted, - fileStatus.getLen(), - appListener.appSparkVersion.getOrElse("") -) -fileToAppInfo.put(logPath, attemptInfo) -logDebug(s"Application log ${attemptInfo.logPath} loaded successfully: $attemptInfo") -Some(attemptInfo) - } else { -logWarning(s"Failed to load application log ${fileStatus.getPath}. " + - "The application may have not started.") -None - } - -} catch { - case e: Exception => -logError( - s"Exception encountered when attempting to load application log ${fileStatus.getPath}", - e) -None -} - -if (newAttempts.isEmpty) { - return +val eventsFilter: ReplayEventsFilter = { eventString => + eventString.startsWith(APPL_START_EVENT_PREFIX) || +eventString.startsWith(APPL_END_EVENT_PREFIX) || +eventString.startsWith(LOG_START_EVENT_PREFIX) } -// Build a map containing all apps that contain new attempts. The app information in this map -// contains both the new app attempt, and those that were already loaded in the existing apps -// map. If an attempt has been updated, it replaces the old attempt in the list. -val newAppMap = new mutable.HashMap[String, FsApplicationHistor
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r141125410 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -424,208 +459,105 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } -applications.get(appId) match { - case Some(appInfo) => -try { - // If no attempt is specified, or there is no attemptId for attempts, return all attempts - appInfo.attempts.filter { attempt => -attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get == attemptId.get - }.foreach { attempt => -val logPath = new Path(logDir, attempt.logPath) -zipFileToStream(logPath, attempt.logPath, zipStream) - } -} finally { - zipStream.close() +val app = try { + load(appId) +} catch { + case _: NoSuchElementException => +throw new SparkException(s"Logs for $appId not found.") +} + +try { + // If no attempt is specified, or there is no attemptId for attempts, return all attempts + attemptId +.map { id => app.attempts.filter(_.info.attemptId == Some(id)) } +.getOrElse(app.attempts) +.map(_.logPath) +.foreach { log => + zipFileToStream(new Path(logDir, log), log, zipStream) } - case None => throw new SparkException(s"Logs for $appId not found.") +} finally { + zipStream.close() } } /** - * Replay the log files in the list and merge the list of old applications with new ones + * Replay the given log file, saving the application in the listing db. --- End diff -- No. If you look at the old code it did a "merge sort" kinda thing to create an updated listing. `KVStore` sorts things internally so there's no need for that code anymore - you just write something to it, and it's sorted magically. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r141124934 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -685,26 +618,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) * @return a summary of the component state */ override def toString: String = { -val header = s""" - | FsHistoryProvider: logdir=$logDir, - | last scan time=$lastScanTime - | Cached application count =${applications.size}} -""".stripMargin -val sb = new StringBuilder(header) -applications.foreach(entry => sb.append(entry._2).append("\n")) -sb.toString - } - - /** - * Look up an application attempt - * @param appId application ID - * @param attemptId Attempt ID, if set - * @return the matching attempt, if found - */ - def lookup(appId: String, attemptId: Option[String]): Option[FsApplicationAttemptInfo] = { -applications.get(appId).flatMap { appInfo => - appInfo.attempts.find(_.attemptId == attemptId) -} +val count = listing.count(classOf[ApplicationInfoWrapper]) +s"""|FsHistoryProvider{logdir=$logDir, --- End diff -- That would make the string look weird. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r141124127 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -722,75 +640,215 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) appId: String, attemptId: Option[String], prevFileSize: Long)(): Boolean = { -lookup(appId, attemptId) match { - case None => +try { + val attempt = getAttempt(appId, attemptId) + val logPath = fs.makeQualified(new Path(logDir, attempt.logPath)) + recordedFileSize(logPath) > prevFileSize +} catch { + case _: NoSuchElementException => logDebug(s"Application Attempt $appId/$attemptId not found") false - case Some(latest) => -prevFileSize < latest.fileSize } } -} -private[history] object FsHistoryProvider { - val DEFAULT_LOG_DIR = "file:/tmp/spark-events" + /** + * Return the last known size of the given event log, recorded the last time the file + * system scanner detected a change in the file. + */ + private def recordedFileSize(log: Path): Long = { +try { + listing.read(classOf[LogInfo], log.toString()).fileSize +} catch { + case _: NoSuchElementException => 0L +} + } + + private def load(appId: String): ApplicationInfoWrapper = { +listing.read(classOf[ApplicationInfoWrapper], appId) + } + + /** + * Write the app's information to the given store. Serialized to avoid the (notedly rare) case + * where two threads are processing separate attempts of the same application. + */ + private def addListing(app: ApplicationInfoWrapper): Unit = listing.synchronized { +val attempt = app.attempts.head + +val oldApp = try { + listing.read(classOf[ApplicationInfoWrapper], app.id) +} catch { + case _: NoSuchElementException => +app +} + +def compareAttemptInfo(a1: AttemptInfoWrapper, a2: AttemptInfoWrapper): Boolean = { --- End diff -- This is only used here, I'd rather keep the logic local. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r141123714 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -243,42 +282,38 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { try { - applications.get(appId).flatMap { appInfo => -appInfo.attempts.find(_.attemptId == attemptId).flatMap { attempt => + val appInfo = load(appId) + appInfo.attempts +.find { attempt => attempt.info.attemptId == attemptId } +.map { attempt => val replayBus = new ReplayListenerBus() val ui = { val conf = this.conf.clone() val appSecManager = new SecurityManager(conf) -SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.name, - HistoryServer.getAttemptURI(appId, attempt.attemptId), - Some(attempt.lastUpdated), attempt.startTime) +SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.info.name, + HistoryServer.getAttemptURI(appId, attempt.info.attemptId), + Some(attempt.info.lastUpdated.getTime()), attempt.info.startTime.getTime()) // Do not call ui.bind() to avoid creating a new server for each application } val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath)) val appListener = replay(fileStatus, isApplicationCompleted(fileStatus), replayBus) - - if (appListener.appId.isDefined) { -ui.appSparkVersion = appListener.appSparkVersion.getOrElse("") -ui.getSecurityManager.setAcls(HISTORY_UI_ACLS_ENABLE) -// make sure to set admin acls before view acls so they are properly picked up -val adminAcls = HISTORY_UI_ADMIN_ACLS + "," + appListener.adminAcls.getOrElse("") -ui.getSecurityManager.setAdminAcls(adminAcls) -ui.getSecurityManager.setViewAcls(attempt.sparkUser, appListener.viewAcls.getOrElse("")) -val adminAclsGroups = HISTORY_UI_ADMIN_ACLS_GROUPS + "," + - appListener.adminAclsGroups.getOrElse("") -ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups) - ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse("")) -Some(LoadedAppUI(ui, () => updateProbe(appId, attemptId, attempt.fileSize))) - } else { -None - } - + ui.appSparkVersion = appListener.appSparkVersion.getOrElse("") --- End diff -- (This code will also go away in subsequent changes.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r141123619 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -243,42 +282,38 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { try { - applications.get(appId).flatMap { appInfo => -appInfo.attempts.find(_.attemptId == attemptId).flatMap { attempt => + val appInfo = load(appId) + appInfo.attempts +.find { attempt => attempt.info.attemptId == attemptId } +.map { attempt => val replayBus = new ReplayListenerBus() val ui = { val conf = this.conf.clone() val appSecManager = new SecurityManager(conf) -SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.name, - HistoryServer.getAttemptURI(appId, attempt.attemptId), - Some(attempt.lastUpdated), attempt.startTime) +SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.info.name, + HistoryServer.getAttemptURI(appId, attempt.info.attemptId), + Some(attempt.info.lastUpdated.getTime()), attempt.info.startTime.getTime()) // Do not call ui.bind() to avoid creating a new server for each application } val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath)) val appListener = replay(fileStatus, isApplicationCompleted(fileStatus), replayBus) - - if (appListener.appId.isDefined) { -ui.appSparkVersion = appListener.appSparkVersion.getOrElse("") -ui.getSecurityManager.setAcls(HISTORY_UI_ACLS_ENABLE) -// make sure to set admin acls before view acls so they are properly picked up -val adminAcls = HISTORY_UI_ADMIN_ACLS + "," + appListener.adminAcls.getOrElse("") -ui.getSecurityManager.setAdminAcls(adminAcls) -ui.getSecurityManager.setViewAcls(attempt.sparkUser, appListener.viewAcls.getOrElse("")) -val adminAclsGroups = HISTORY_UI_ADMIN_ACLS_GROUPS + "," + - appListener.adminAclsGroups.getOrElse("") -ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups) - ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse("")) -Some(LoadedAppUI(ui, () => updateProbe(appId, attemptId, attempt.fileSize))) - } else { -None - } - + ui.appSparkVersion = appListener.appSparkVersion.getOrElse("") --- End diff -- If the listing exists it's unlikely that this would ever trigger, but sure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r141123050 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -243,42 +282,38 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { try { - applications.get(appId).flatMap { appInfo => -appInfo.attempts.find(_.attemptId == attemptId).flatMap { attempt => + val appInfo = load(appId) + appInfo.attempts +.find { attempt => attempt.info.attemptId == attemptId } +.map { attempt => val replayBus = new ReplayListenerBus() val ui = { val conf = this.conf.clone() val appSecManager = new SecurityManager(conf) -SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.name, - HistoryServer.getAttemptURI(appId, attempt.attemptId), - Some(attempt.lastUpdated), attempt.startTime) +SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.info.name, + HistoryServer.getAttemptURI(appId, attempt.info.attemptId), + Some(attempt.info.lastUpdated.getTime()), attempt.info.startTime.getTime()) --- End diff -- `AttemptInfoWrapper` stores information that is not available in the public API and is used by the provider. Later changes also add more fields to `ApplicationInfoWrapper` that are needed by the provider. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps
Github user squito commented on the issue: https://github.com/apache/spark/pull/19250 @HyukjinKwon you might be interested in this one also --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r141122539 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -243,42 +282,38 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { try { - applications.get(appId).flatMap { appInfo => -appInfo.attempts.find(_.attemptId == attemptId).flatMap { attempt => + val appInfo = load(appId) + appInfo.attempts +.find { attempt => attempt.info.attemptId == attemptId } +.map { attempt => val replayBus = new ReplayListenerBus() val ui = { val conf = this.conf.clone() val appSecManager = new SecurityManager(conf) -SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.name, - HistoryServer.getAttemptURI(appId, attempt.attemptId), - Some(attempt.lastUpdated), attempt.startTime) +SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.info.name, + HistoryServer.getAttemptURI(appId, attempt.info.attemptId), + Some(attempt.info.lastUpdated.getTime()), attempt.info.startTime.getTime()) // Do not call ui.bind() to avoid creating a new server for each application } val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath)) val appListener = replay(fileStatus, isApplicationCompleted(fileStatus), replayBus) - - if (appListener.appId.isDefined) { -ui.appSparkVersion = appListener.appSparkVersion.getOrElse("") -ui.getSecurityManager.setAcls(HISTORY_UI_ACLS_ENABLE) -// make sure to set admin acls before view acls so they are properly picked up -val adminAcls = HISTORY_UI_ADMIN_ACLS + "," + appListener.adminAcls.getOrElse("") -ui.getSecurityManager.setAdminAcls(adminAcls) -ui.getSecurityManager.setViewAcls(attempt.sparkUser, appListener.viewAcls.getOrElse("")) -val adminAclsGroups = HISTORY_UI_ADMIN_ACLS_GROUPS + "," + - appListener.adminAclsGroups.getOrElse("") -ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups) - ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse("")) -Some(LoadedAppUI(ui, () => updateProbe(appId, attemptId, attempt.fileSize))) - } else { -None - } - + ui.appSparkVersion = appListener.appSparkVersion.getOrElse("") + ui.getSecurityManager.setAcls(HISTORY_UI_ACLS_ENABLE) + // make sure to set admin acls before view acls so they are properly picked up + val adminAcls = HISTORY_UI_ADMIN_ACLS + "," + appListener.adminAcls.getOrElse("") + ui.getSecurityManager.setAdminAcls(adminAcls) + ui.getSecurityManager.setViewAcls(attempt.info.sparkUser, +appListener.viewAcls.getOrElse("")) + val adminAclsGroups = HISTORY_UI_ADMIN_ACLS_GROUPS + "," + +appListener.adminAclsGroups.getOrElse("") + ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups) + ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse("")) + LoadedAppUI(ui, () => updateProbe(appId, attemptId, attempt.fileSize)) } - } } catch { - case e: FileNotFoundException => None --- End diff -- I guess it still can until I remove the current replay code in a later change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r141121472 --- Diff: core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala --- @@ -496,7 +517,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc var provider: FsHistoryProvider = null try { -provider = new FsHistoryProvider(conf) +provider = newProvider(conf) --- End diff -- This was part of some code I only partially reverted. Will revert these. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r141121349 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/config.scala --- @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.util.concurrent.TimeUnit + +import scala.annotation.meta.getter + +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.util.kvstore.KVIndex + +private[spark] object config { + + /** Use this to annotate constructor params to be used as KVStore indices. */ + type KVIndexParam = KVIndex @getter + + val DEFAULT_LOG_DIR = "file:/tmp/spark-events" + + val EVENT_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory") +.stringConf +.createWithDefault(DEFAULT_LOG_DIR) + + val MAX_LOG_AGE_S = ConfigBuilder("spark.history.fs.cleaner.maxAge") --- End diff -- Yes. This pattern is used in a bunch of other places to indicate the unit of time of the config. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r141120995 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -722,75 +640,215 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) appId: String, attemptId: Option[String], prevFileSize: Long)(): Boolean = { -lookup(appId, attemptId) match { - case None => +try { + val attempt = getAttempt(appId, attemptId) + val logPath = fs.makeQualified(new Path(logDir, attempt.logPath)) + recordedFileSize(logPath) > prevFileSize +} catch { + case _: NoSuchElementException => logDebug(s"Application Attempt $appId/$attemptId not found") false - case Some(latest) => -prevFileSize < latest.fileSize } } -} -private[history] object FsHistoryProvider { - val DEFAULT_LOG_DIR = "file:/tmp/spark-events" + /** + * Return the last known size of the given event log, recorded the last time the file + * system scanner detected a change in the file. + */ + private def recordedFileSize(log: Path): Long = { +try { + listing.read(classOf[LogInfo], log.toString()).fileSize +} catch { + case _: NoSuchElementException => 0L +} + } + + private def load(appId: String): ApplicationInfoWrapper = { +listing.read(classOf[ApplicationInfoWrapper], appId) + } + + /** + * Write the app's information to the given store. Serialized to avoid the (notedly rare) case + * where two threads are processing separate attempts of the same application. + */ + private def addListing(app: ApplicationInfoWrapper): Unit = listing.synchronized { +val attempt = app.attempts.head + +val oldApp = try { + listing.read(classOf[ApplicationInfoWrapper], app.id) +} catch { + case _: NoSuchElementException => +app +} + +def compareAttemptInfo(a1: AttemptInfoWrapper, a2: AttemptInfoWrapper): Boolean = { + a1.info.startTime.getTime() > a2.info.startTime.getTime() +} + +val attempts = oldApp.attempts.filter(_.info.attemptId != attempt.info.attemptId) ++ + List(attempt) + +val newAppInfo = new ApplicationInfoWrapper( + app.info, + attempts.sortWith(compareAttemptInfo)) +listing.write(newAppInfo) + } - private val NOT_STARTED = "" + /** For testing. Returns internal data about a single attempt. */ + private[history] def getAttempt(appId: String, attemptId: Option[String]): AttemptInfoWrapper = { +load(appId).attempts.find(_.info.attemptId == attemptId).getOrElse( + throw new NoSuchElementException(s"Cannot find attempt $attemptId of $appId.")) + } +} + +private[history] object FsHistoryProvider { private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = "spark.history.fs.numReplayThreads" private val APPL_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationStart\"" private val APPL_END_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationEnd\"" private val LOG_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerLogStart\"" + + /** + * Current version of the data written to the listing database. When opening an existing + * db, if the version does not match this value, the FsHistoryProvider will throw away + * all data and re-generate the listing data from the event logs. + */ + private[history] val CURRENT_LISTING_VERSION = 1L } /** - * Application attempt information. - * - * @param logPath path to the log file, or, for a legacy log, its directory - * @param name application name - * @param appId application ID - * @param attemptId optional attempt ID - * @param startTime start time (from playback) - * @param endTime end time (from playback). -1 if the application is incomplete. - * @param lastUpdated the modification time of the log file when this entry was built by replaying - *the history. - * @param sparkUser user running the application - * @param completed flag to indicate whether or not the application has completed. - * @param fileSize the size of the log file the last time the file was scanned for changes + * A KVStoreSerializer that provides Scala types serialization too, and uses the same options as + * the API serializer. */ -private class FsApplicationAttemptInfo( +private class KVStoreScalaSerializer extends KVStoreSerializer { + + mapper.registerModule(DefaultScalaModule) + mapper.setSerializationInclusion(JsonInclude.Include.NON_N
[GitHub] spark pull request #19353: [SPARK-22103][FOLLOWUP] Rename addExtraCode to ad...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19353 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/17902#discussion_r141120298 --- Diff: common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java --- @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kvstore; + +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import org.iq80.leveldb.WriteBatch; + +/** + * Holds metadata about app-specific types stored in LevelDB. Serves as a cache for data collected + * via reflection, to make it cheaper to access it multiple times. + * + * + * The hierarchy of keys stored in LevelDB looks roughly like the following. This hierarchy ensures + * that iteration over indices is easy, and that updating values in the store is not overly + * expensive. Of note, indices choose using more disk space (one value per key) instead of keeping + * lists of pointers, which would be more expensive to update at runtime. + * + * + * + * Indentation defines when a sub-key lives under a parent key. In LevelDB, this means the full + * key would be the concatenation of everything up to that point in the hierarchy, with each + * component separated by a NULL byte. + * + * + * + * +TYPE_NAME + * NATURAL_INDEX + * +NATURAL_KEY + * - + * -NATURAL_INDEX + * INDEX_NAME + * +INDEX_VALUE + * +NATURAL_KEY + * -INDEX_VALUE + * .INDEX_VALUE + * CHILD_INDEX_NAME + * +CHILD_INDEX_VALUE + * NATURAL_KEY_OR_DATA + * - + * -INDEX_NAME + * + * + * + * Entity data (either the entity's natural key or a copy of the data) is stored in all keys + * that end with "+". A count of all objects that match a particular top-level index + * value is kept at the end marker ("-"). A count is also kept at the natural index's end + * marker, to make it easy to retrieve the number of all elements of a particular type. + * + * + * + * To illustrate, given a type "Foo", with a natural index and a second index called "bar", you'd + * have these keys and values in the store for two instances, one with natural key "key1" and the + * other "key2", both with value "yes" for "bar": + * + * + * + * Foo __main__ +key1 [data for instance 1] + * Foo __main__ +key2 [data for instance 2] + * Foo __main__ - [count of all Foo] + * Foo bar +yes +key1 [instance 1 key or data, depending on index type] --- End diff -- Because otherwise, when reading from the index, it's not that easy to parse the object's key from the leveldb key so that you can retrieve the object itself. You also have to store something, leveldb doesn't allow you to store null. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19353: [SPARK-22103][FOLLOWUP] Rename addExtraCode to addInnerC...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19353 Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/17902#discussion_r141119638 --- Diff: common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java --- @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kvstore; + +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import org.iq80.leveldb.WriteBatch; + +/** + * Holds metadata about app-specific types stored in LevelDB. Serves as a cache for data collected + * via reflection, to make it cheaper to access it multiple times. + * + * + * The hierarchy of keys stored in LevelDB looks roughly like the following. This hierarchy ensures + * that iteration over indices is easy, and that updating values in the store is not overly + * expensive. Of note, indices choose using more disk space (one value per key) instead of keeping + * lists of pointers, which would be more expensive to update at runtime. + * + * + * + * Indentation defines when a sub-key lives under a parent key. In LevelDB, this means the full + * key would be the concatenation of everything up to that point in the hierarchy, with each + * component separated by a NULL byte. + * + * + * + * +TYPE_NAME + * NATURAL_INDEX + * +NATURAL_KEY + * - + * -NATURAL_INDEX + * INDEX_NAME + * +INDEX_VALUE + * +NATURAL_KEY + * -INDEX_VALUE + * .INDEX_VALUE + * CHILD_INDEX_NAME + * +CHILD_INDEX_VALUE + * NATURAL_KEY_OR_DATA + * - + * -INDEX_NAME + * + * + * + * Entity data (either the entity's natural key or a copy of the data) is stored in all keys + * that end with "+". A count of all objects that match a particular top-level index + * value is kept at the end marker ("-"). A count is also kept at the natural index's end + * marker, to make it easy to retrieve the number of all elements of a particular type. + * + * + * + * To illustrate, given a type "Foo", with a natural index and a second index called "bar", you'd + * have these keys and values in the store for two instances, one with natural key "key1" and the + * other "key2", both with value "yes" for "bar": + * + * + * + * Foo __main__ +key1 [data for instance 1] + * Foo __main__ +key2 [data for instance 2] + * Foo __main__ - [count of all Foo] + * Foo bar +yes +key1 [instance 1 key or data, depending on index type] + * Foo bar +yes +key2 [instance 2 key or data, depending on index type] + * Foo bar +yes - [count of all Foo with "bar=yes" ] + * + * + * + * Note that all indexed values are prepended with "+", even if the index itself does not have an + * explicit end marker. This allows for easily skipping to the end of an index by telling LevelDB + * to seek to the "phantom" end marker of the index. Throughout the code and comments, this part + * of the full LevelDB key is generally referred to as the "index value" of the entity. + * + * + * + * Child indices are stored after their parent index. In the example above, let's assume there is + * a child index "child", whose parent is "bar". If both instances have value "no" for this field, + * the data in the store would look something like the following: + * + * + * + * ... + * Foo bar +yes - + * Foo bar .yes .child +no +key1 [instance 1 key or data, depending on index type] + * Foo bar .yes .child +no +key2 [instance 2 key or data, depending on index
[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/17902#discussion_r141119266 --- Diff: common/kvstore/src/main/java/org/apache/spark/kvstore/KVStore.java --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kvstore; + +import java.io.Closeable; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; + +/** + * Abstraction for a local key/value store for storing app data. + * + * + * There are two main features provided by the implementations of this interface: + * + * + * Serialization + * + * + * If the underlying data store requires serialization, data will be serialized to and deserialized + * using a {@link KVStoreSerializer}, which can be customized by the application. The serializer is + * based on Jackson, so it supports all the Jackson annotations for controlling the serialization of + * app-defined types. + * + * + * + * Data is also automatically compressed to save disk space. + * + * + * Automatic Key Management --- End diff -- First, you do realize this PR has been merged a long time ago, right? Automatic means you don't have to manually create keys. You're writing objects to the store, not calling something like `.put(key, object)`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19343: [SPARK-22121][SQL] Correct database location for namenod...
Github user squito commented on the issue: https://github.com/apache/spark/pull/19343 I don't see much point in putting this in the docs ... it seems too fine-grained a detail to be useful there. I just don't see the users who encounter this exception from going to look at the spot in the docs to figure out exactly what is wrong. I put an example exception in the JIRA, so at least users can find it with a search. Sounds like you feel pretty strongly we should close this as "won't fix"? I'd still prefer to have this in, but will settle for just having the workaround be easily searchable --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org