[GitHub] spark issue #19339: [SPARK-22112][PYSPARK] Add an API to create a DataFrame ...

2017-09-26 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/19339
  
@goldmedal, are you online now? how about fixing the PR title to say 
something like .. "Supports RDD of strings as input in spark.read.csv in 
PySpark"?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19357: [SPARK-21322][SQL][WIP] support histogram in filter card...

2017-09-26 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19357
  
Jenkins, add to whitelist


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19357: [SPARK-21322][SQL][WIP] support histogram in filter card...

2017-09-26 Thread ron8hu
Github user ron8hu commented on the issue:

https://github.com/apache/spark/pull/19357
  
cc @wzhfy Please review code first before I request the community to review 
it.  Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19335: mapPartitions Api

2017-09-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19335


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19348: [BUILD] Close stale PRs

2017-09-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19348


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19347: Branch 2.2 sparkmlib's output of many algorithms ...

2017-09-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19347


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19244: SPARK-22021

2017-09-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19244


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18474: [SPARK-21235][TESTS] UTest should clear temp resu...

2017-09-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/18474


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18253: [SPARK-18838][CORE] Introduce multiple queues in ...

2017-09-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/18253


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18897: [SPARK-21655][YARN] Support Kill CLI for Yarn mod...

2017-09-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/18897


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19334: Branch 1.6

2017-09-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19334


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15009: [SPARK-17443][SPARK-11035] Stop Spark Application...

2017-09-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/15009


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19315: [MINOR][ML]Updated english.txt word ordering

2017-09-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19315


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18978: [SPARK-21737][YARN]Create communication channel b...

2017-09-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/18978


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19295: [SPARK-22080][SQL] Adds support for allowing user...

2017-09-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19295


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19356: Merge pull request #1 from apache/master

2017-09-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19356


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19152: [SPARK-21915][ML][PySpark] Model 1 and Model 2 Pa...

2017-09-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19152


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19238: [SPARK-22016][SQL] Add HiveDialect for JDBC conne...

2017-09-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19238


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19300: [SPARK-22082][SparkR]Spelling mistake: "choosen" ...

2017-09-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19300


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19236: SPARK-22015: Remove usage of non-used private fie...

2017-09-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19236


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13794: [SPARK-15574][ML][PySpark] Python meta-algorithms...

2017-09-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/13794


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19357: support histogram in filter cardinality estimation

2017-09-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19357
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19327: [WIP] Implement stream-stream outer joins.

2017-09-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19327
  
**[Test build #82212 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82212/testReport)**
 for PR 19327 at commit 
[`680c51a`](https://github.com/apache/spark/commit/680c51a9cedd1370dec9d249c2eba8cdfc61fbd8).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19348: [BUILD] Close stale PRs

2017-09-26 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/19348
  
Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17503: [SPARK-3159][MLlib] Check for reducible DecisionTree

2017-09-26 Thread WeichenXu123
Github user WeichenXu123 commented on the issue:

https://github.com/apache/spark/pull/17503
  
Can you do some benchmark to show how much improvement this change will 
bring ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19357: support histogram in filter cardinality estimatio...

2017-09-26 Thread ron8hu
GitHub user ron8hu opened a pull request:

https://github.com/apache/spark/pull/19357

support histogram in filter cardinality estimation

## What changes were proposed in this pull request?

Histogram is effective in dealing with skewed distribution. After we 
generate histogram information for column statistics, we need to adjust filter 
estimation based on histogram data structure.

## How was this patch tested?

We revised all the unit test cases by including histogram data structure.

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ron8hu/spark createhistogram

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19357.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19357


commit 46af54d9c86fa1e5322fdd92ed47fe3d419dd966
Author: Ron Hu 
Date:   2017-09-26T23:33:49Z

support histogram in filter cardinality estimation




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19339: [SPARK-22112][PYSPARK] Add an API to create a DataFrame ...

2017-09-26 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/19339
  
In a quick look, both tests failures:

```
  File 
"/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/sql/readwriter.py",
 line 303, in parquet
return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
```

Sounds not related with the current PR (my rough assumption is, it's, IMHO, 
instability of Py4J).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19336: [SPARK-21947][SS] Check and report error when mon...

2017-09-26 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19336#discussion_r141215037
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
 ---
@@ -516,6 +516,14 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
{
   testOutputMode(Update, shouldSupportAggregation = true, 
shouldSupportNonAggregation = true)
   testOutputMode(Complete, shouldSupportAggregation = true, 
shouldSupportNonAggregation = false)
 
+  // Unsupported expressions in streaming plan
+  assertNotSupportedInStreamingPlan(
+"MonotonicallyIncreasingID",
+streamRelation.select($"a", MonotonicallyIncreasingID()),
--- End diff --

nit. We can remove `$"a"` here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19355: [SPARK-22130][Core] UTF8String.trim() scans " " twice

2017-09-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19355
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19355: [SPARK-22130][Core] UTF8String.trim() scans " " twice

2017-09-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19355
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82210/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19355: [SPARK-22130][Core] UTF8String.trim() scans " " twice

2017-09-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19355
  
**[Test build #82210 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82210/testReport)**
 for PR 19355 at commit 
[`d39c648`](https://github.com/apache/spark/commit/d39c648fe8ed690e4aa309f4e58e8484792cfc6c).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19348: [BUILD] Close stale PRs

2017-09-26 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/19348
  
Added.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19348: [BUILD] Close stale PRs

2017-09-26 Thread vanzin
Github user vanzin commented on the issue:

https://github.com/apache/spark/pull/19348
  
Add:
#15009
#18253



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19020: [SPARK-3181] [ML] Implement huber loss for LinearRegress...

2017-09-26 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/19020
  
> We have two candidate name: epsilon or m

I see; that seems fine then, though I worry that we use "epsilon" in MLlib 
(tests) for "a very small positive number."  Can we document it more clearly, 
including the comment that it matches sklearn and is "M" from the paper?

> provide the estimated scaling factor (sigma from the paper)

I'd say:
* Either we provide it as 1 for regular linear regression (since that is 
technically correct)
* Or we take this as indication that @sethah 's comment about separating 
the classes is better.

Re: @sethah 's comment about separating classes, I'll comment in the JIRA 
since that's a bigger discussion.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19270: [SPARK-21809] : Change Stage Page to use datatables to s...

2017-09-26 Thread pgandhi999
Github user pgandhi999 commented on the issue:

https://github.com/apache/spark/pull/19270
  
Ok, I will look into it. I am currently fixing ui bugs and unit tests, so 
will commit those changes first, then will look into the above issue. Thank you.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19270: [SPARK-21809] : Change Stage Page to use datatables to s...

2017-09-26 Thread ajbozarth
Github user ajbozarth commented on the issue:

https://github.com/apache/spark/pull/19270
  
Ok so I'm still doing more testing but I've narrowed the above problem. The 
above error is occurring when using either local or standalone, the error 
doesn't appear when using yarn. I'll continue my testing and review.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19327: [WIP] Implement stream-stream outer joins.

2017-09-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19327
  
**[Test build #82211 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82211/testReport)**
 for PR 19327 at commit 
[`a8aa356`](https://github.com/apache/spark/commit/a8aa35691d7603c0ca283dc2aff295031bcc3b9e).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compression.co...

2017-09-26 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/19218
  
Hi, @fjh100456 . I left a few comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-09-26 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r141188386
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -728,4 +728,120 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
   assert(e.contains("mismatched input 'ROW'"))
 }
   }
+
+  test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " +
+"and 'spark.sql.parquet.compression.codec' taking effect on hive table 
writing") {
+case class CompressionConf(name: String, codeC: String)
+
+case class TableDefine(tableName: String, isPartitioned: Boolean, 
format: String,
+  compressionConf: Option[CompressionConf]) {
+  def createTable(rootDir: File): Unit = {
+val compression = compressionConf.map(cf => 
s"'${cf.name}'='${cf.codeC}'")
+sql(
+  s"""
+  |CREATE TABLE $tableName(a int)
+  |${ if (isPartitioned) "PARTITIONED BY (p int)" else "" }
+  |STORED AS $format
+  |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+  |${ if (compressionConf.nonEmpty) 
s"TBLPROPERTIES(${compression.get})" else "" }
+""".stripMargin)
+  }
+
+  def insertOverwriteTable(): Unit = {
+sql(
+  s"""
+  |INSERT OVERWRITE TABLE $tableName
+  |${ if (isPartitioned) "partition (p=1)" else "" }
+  |SELECT * from table_source
+""".stripMargin)
+  }
+
+  def getDirFiles(file: File): List[File] = {
+if (!file.exists()) Nil
+else if (file.isFile) List(file)
+else {
+  file.listFiles().filterNot(_.getName.startsWith(".hive-staging"))
+.groupBy(_.isFile).flatMap {
+case (isFile, files) if isFile => files.toList
+case (_, dirs) => dirs.flatMap(getDirFiles)
+  }.toList
+}
+  }
+
+  def getTableSize: Long = {
+var totalSize = 0L
+withTempDir { tmpDir =>
+  withTable(tableName) {
+createTable(tmpDir)
+insertOverwriteTable()
+val path = s"${tmpDir.getPath.stripSuffix("/")}/$tableName"
+val dir = new File(path)
+val files = 
getDirFiles(dir).filter(_.getName.startsWith("part-"))
+totalSize = files.map(_.length()).sum
+  }
+}
+totalSize
+  }
+}
+
+def checkParquetCompressionCodec(isPartitioned: Boolean, tableCodec: 
String,
+  sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = {
+  val tableOrg = TableDefine(s"tbl_parquet$tableCodec", isPartitioned, 
"parquet",
+Some(CompressionConf("parquet.compression", tableCodec)))
+  val tableOrgSize = tableOrg.getTableSize
+
+  withSQLConf("spark.sql.parquet.compression.codec" -> sessionCodec) {
+// priority check, when table-level compression conf was set, 
expecting
+// table-level compression conf is not affected by the session 
conf, and table-level
+// compression conf takes precedence even the two conf of codec is 
different
+val tableOrgSessionConfSize = tableOrg.getTableSize
+assert(tableOrgSize == tableOrgSessionConfSize)
+
+// check session conf of compression codec taking effect
+val table = TableDefine(s"tbl_parquet", isPartitioned, "parquet", 
None)
+assert(f(tableOrg.getTableSize, table.getTableSize))
+  }
+}
+
+def checkOrcCompressionCodec(isPartitioned: Boolean, tableCodec: 
String,
+  sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = {
+  val tableOrg = TableDefine(s"tbl_orc$tableCodec", isPartitioned, 
"orc",
+Some(CompressionConf("orc.compress", tableCodec)))
+  val tableOrgSize = tableOrg.getTableSize
+
+  withSQLConf("spark.sql.orc.compression.codec" -> sessionCodec) {
+// priority check, when table-level compression conf was set, 
expecting
+// table-level compression conf is not affected by the session 
conf, and table-level
+// compression conf takes precedence even the two conf of codec is 
different
+val tableOrgSessionConfSize = tableOrg.getTableSize
+assert(tableOrgSize == tableOrgSessionConfSize)
+
+// check session conf of compression codec taking effect
+val table = TableDefine(s"tbl_orc", isPartitioned, "orc", None)
+assert(f(tableOrg.getTableSize, table.getTableSize))
+  }
+}
+
+withTempView("table_source") {
+  (0 until 10).toDF("a").crea

[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-09-26 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r141187890
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -728,4 +728,120 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
   assert(e.contains("mismatched input 'ROW'"))
 }
   }
+
+  test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " +
+"and 'spark.sql.parquet.compression.codec' taking effect on hive table 
writing") {
--- End diff --

- `[SPARK-21786]` -> `SPARK-21786`
- `spark.sql.parquet.compression.codec` -> 
`spark.sql.orc.compression.codec`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18098: [SPARK-16944][Mesos] Improve data locality when launchin...

2017-09-26 Thread gpang
Github user gpang commented on the issue:

https://github.com/apache/spark/pull/18098
  
Hi @srowen @vanzin , would you be able to help me with this PR? Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-09-26 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r141187555
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -728,4 +728,120 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
   assert(e.contains("mismatched input 'ROW'"))
 }
   }
+
+  test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " +
+"and 'spark.sql.parquet.compression.codec' taking effect on hive table 
writing") {
+case class CompressionConf(name: String, codeC: String)
+
+case class TableDefine(tableName: String, isPartitioned: Boolean, 
format: String,
+  compressionConf: Option[CompressionConf]) {
+  def createTable(rootDir: File): Unit = {
+val compression = compressionConf.map(cf => 
s"'${cf.name}'='${cf.codeC}'")
+sql(
+  s"""
+  |CREATE TABLE $tableName(a int)
+  |${ if (isPartitioned) "PARTITIONED BY (p int)" else "" }
+  |STORED AS $format
+  |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+  |${ if (compressionConf.nonEmpty) 
s"TBLPROPERTIES(${compression.get})" else "" }
+""".stripMargin)
+  }
+
+  def insertOverwriteTable(): Unit = {
+sql(
+  s"""
+  |INSERT OVERWRITE TABLE $tableName
+  |${ if (isPartitioned) "partition (p=1)" else "" }
+  |SELECT * from table_source
+""".stripMargin)
+  }
+
+  def getDirFiles(file: File): List[File] = {
+if (!file.exists()) Nil
+else if (file.isFile) List(file)
+else {
+  file.listFiles().filterNot(_.getName.startsWith(".hive-staging"))
+.groupBy(_.isFile).flatMap {
+case (isFile, files) if isFile => files.toList
+case (_, dirs) => dirs.flatMap(getDirFiles)
+  }.toList
+}
+  }
+
+  def getTableSize: Long = {
+var totalSize = 0L
+withTempDir { tmpDir =>
+  withTable(tableName) {
+createTable(tmpDir)
+insertOverwriteTable()
+val path = s"${tmpDir.getPath.stripSuffix("/")}/$tableName"
+val dir = new File(path)
+val files = 
getDirFiles(dir).filter(_.getName.startsWith("part-"))
+totalSize = files.map(_.length()).sum
+  }
+}
+totalSize
+  }
+}
+
+def checkParquetCompressionCodec(isPartitioned: Boolean, tableCodec: 
String,
+  sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = {
+  val tableOrg = TableDefine(s"tbl_parquet$tableCodec", isPartitioned, 
"parquet",
+Some(CompressionConf("parquet.compression", tableCodec)))
+  val tableOrgSize = tableOrg.getTableSize
+
+  withSQLConf("spark.sql.parquet.compression.codec" -> sessionCodec) {
+// priority check, when table-level compression conf was set, 
expecting
+// table-level compression conf is not affected by the session 
conf, and table-level
+// compression conf takes precedence even the two conf of codec is 
different
+val tableOrgSessionConfSize = tableOrg.getTableSize
+assert(tableOrgSize == tableOrgSessionConfSize)
+
+// check session conf of compression codec taking effect
+val table = TableDefine(s"tbl_parquet", isPartitioned, "parquet", 
None)
+assert(f(tableOrg.getTableSize, table.getTableSize))
+  }
+}
+
+def checkOrcCompressionCodec(isPartitioned: Boolean, tableCodec: 
String,
+  sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = {
+  val tableOrg = TableDefine(s"tbl_orc$tableCodec", isPartitioned, 
"orc",
+Some(CompressionConf("orc.compress", tableCodec)))
+  val tableOrgSize = tableOrg.getTableSize
+
+  withSQLConf("spark.sql.orc.compression.codec" -> sessionCodec) {
+// priority check, when table-level compression conf was set, 
expecting
+// table-level compression conf is not affected by the session 
conf, and table-level
+// compression conf takes precedence even the two conf of codec is 
different
+val tableOrgSessionConfSize = tableOrg.getTableSize
+assert(tableOrgSize == tableOrgSessionConfSize)
+
+// check session conf of compression codec taking effect
+val table = TableDefine(s"tbl_orc", isPartitioned, "orc", None)
+assert(f(tableOrg.getTableSize, table.getTableSize))
+  }
+}
+
+withTempView("table_source") {
+  (0 until 10).toDF("a").crea

[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-09-26 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r141186378
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -728,4 +728,120 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
   assert(e.contains("mismatched input 'ROW'"))
 }
   }
+
+  test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " +
+"and 'spark.sql.parquet.compression.codec' taking effect on hive table 
writing") {
+case class CompressionConf(name: String, codeC: String)
+
+case class TableDefine(tableName: String, isPartitioned: Boolean, 
format: String,
+  compressionConf: Option[CompressionConf]) {
+  def createTable(rootDir: File): Unit = {
+val compression = compressionConf.map(cf => 
s"'${cf.name}'='${cf.codeC}'")
+sql(
+  s"""
+  |CREATE TABLE $tableName(a int)
+  |${ if (isPartitioned) "PARTITIONED BY (p int)" else "" }
+  |STORED AS $format
+  |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+  |${ if (compressionConf.nonEmpty) 
s"TBLPROPERTIES(${compression.get})" else "" }
+""".stripMargin)
+  }
+
+  def insertOverwriteTable(): Unit = {
+sql(
+  s"""
+  |INSERT OVERWRITE TABLE $tableName
+  |${ if (isPartitioned) "partition (p=1)" else "" }
+  |SELECT * from table_source
+""".stripMargin)
+  }
+
+  def getDirFiles(file: File): List[File] = {
+if (!file.exists()) Nil
+else if (file.isFile) List(file)
+else {
+  file.listFiles().filterNot(_.getName.startsWith(".hive-staging"))
+.groupBy(_.isFile).flatMap {
+case (isFile, files) if isFile => files.toList
+case (_, dirs) => dirs.flatMap(getDirFiles)
+  }.toList
+}
+  }
+
+  def getTableSize: Long = {
+var totalSize = 0L
+withTempDir { tmpDir =>
+  withTable(tableName) {
+createTable(tmpDir)
+insertOverwriteTable()
+val path = s"${tmpDir.getPath.stripSuffix("/")}/$tableName"
+val dir = new File(path)
+val files = 
getDirFiles(dir).filter(_.getName.startsWith("part-"))
+totalSize = files.map(_.length()).sum
+  }
+}
+totalSize
+  }
+}
+
+def checkParquetCompressionCodec(isPartitioned: Boolean, tableCodec: 
String,
+  sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = {
+  val tableOrg = TableDefine(s"tbl_parquet$tableCodec", isPartitioned, 
"parquet",
+Some(CompressionConf("parquet.compression", tableCodec)))
+  val tableOrgSize = tableOrg.getTableSize
+
+  withSQLConf("spark.sql.parquet.compression.codec" -> sessionCodec) {
+// priority check, when table-level compression conf was set, 
expecting
+// table-level compression conf is not affected by the session 
conf, and table-level
+// compression conf takes precedence even the two conf of codec is 
different
+val tableOrgSessionConfSize = tableOrg.getTableSize
+assert(tableOrgSize == tableOrgSessionConfSize)
+
+// check session conf of compression codec taking effect
+val table = TableDefine(s"tbl_parquet", isPartitioned, "parquet", 
None)
+assert(f(tableOrg.getTableSize, table.getTableSize))
+  }
+}
+
+def checkOrcCompressionCodec(isPartitioned: Boolean, tableCodec: 
String,
+  sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = {
+  val tableOrg = TableDefine(s"tbl_orc$tableCodec", isPartitioned, 
"orc",
+Some(CompressionConf("orc.compress", tableCodec)))
+  val tableOrgSize = tableOrg.getTableSize
+
+  withSQLConf("spark.sql.orc.compression.codec" -> sessionCodec) {
+// priority check, when table-level compression conf was set, 
expecting
+// table-level compression conf is not affected by the session 
conf, and table-level
+// compression conf takes precedence even the two conf of codec is 
different
+val tableOrgSessionConfSize = tableOrg.getTableSize
+assert(tableOrgSize == tableOrgSessionConfSize)
+
+// check session conf of compression codec taking effect
+val table = TableDefine(s"tbl_orc", isPartitioned, "orc", None)
+assert(f(tableOrg.getTableSize, table.getTableSize))
--- End diff --

You may want to check the codec explicitly like 
[HiveDDLSuite](https://github.com/

[GitHub] spark issue #19294: [SPARK-21549][CORE] Respect OutputFormats with no output...

2017-09-26 Thread szhem
Github user szhem commented on the issue:

https://github.com/apache/spark/pull/19294
  
Hello guys, are there a change for this patch to be merged to master?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19272: [Spark-21842][Mesos] Support Kerberos ticket renewal and...

2017-09-26 Thread kalvinnchau
Github user kalvinnchau commented on the issue:

https://github.com/apache/spark/pull/19272
  
@ArtRand thanks! I've been testing a local version of doing that, I'll pull 
that change in and test it as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18887: [SPARK-20642][core] Store FsHistoryProvider listing data...

2017-09-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18887
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82206/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18887: [SPARK-20642][core] Store FsHistoryProvider listing data...

2017-09-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18887
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18887: [SPARK-20642][core] Store FsHistoryProvider listing data...

2017-09-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18887
  
**[Test build #82206 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82206/testReport)**
 for PR 18887 at commit 
[`5eff2c5`](https://github.com/apache/spark/commit/5eff2c5f0891c6ea3fcfbbc7dacbbaf56c1d1788).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19355: [SPARK-22130][Core] UTF8String.trim() scans " " twice

2017-09-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19355
  
**[Test build #82210 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82210/testReport)**
 for PR 19355 at commit 
[`d39c648`](https://github.com/apache/spark/commit/d39c648fe8ed690e4aa309f4e58e8484792cfc6c).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19355: [SPARK-22130][Core] UTF8String.trim() scans " " twice

2017-09-26 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/19355
  
Retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19355: [SPARK-22130][Core] UTF8String.trim() scans " " twice

2017-09-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19355
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19355: [SPARK-22130][Core] UTF8String.trim() scans " " twice

2017-09-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19355
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82209/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19355: [SPARK-22130][Core] UTF8String.trim() scans " " twice

2017-09-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19355
  
**[Test build #82209 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82209/testReport)**
 for PR 19355 at commit 
[`d39c648`](https://github.com/apache/spark/commit/d39c648fe8ed690e4aa309f4e58e8484792cfc6c).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19272: [Spark-21842][Mesos] Support Kerberos ticket renewal and...

2017-09-26 Thread ArtRand
Github user ArtRand commented on the issue:

https://github.com/apache/spark/pull/19272
  
Hey @kalvinnchau good catch on the first renewal time. I believe I 
addressed it. Have a look. Thanks again. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19355: [SPARK-22130][Core] UTF8String.trim() scans " " twice

2017-09-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19355
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82205/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19355: [SPARK-22130][Core] UTF8String.trim() scans " " twice

2017-09-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19355
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19355: [SPARK-22130][Core] UTF8String.trim() scans " " twice

2017-09-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19355
  
**[Test build #82205 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82205/testReport)**
 for PR 19355 at commit 
[`243f681`](https://github.com/apache/spark/commit/243f681af922fe414db5194f1a1765328247d9ce).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19343: [SPARK-22121][SQL] Correct database location for namenod...

2017-09-26 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/19343
  
OK, closing this and the jira


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19343: [SPARK-22121][SQL] Correct database location for ...

2017-09-26 Thread squito
Github user squito closed the pull request at:

https://github.com/apache/spark/pull/19343


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19327: [WIP] Implement stream-stream outer joins.

2017-09-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19327
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19327: [WIP] Implement stream-stream outer joins.

2017-09-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19327
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82208/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19327: [WIP] Implement stream-stream outer joins.

2017-09-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19327
  
**[Test build #82208 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82208/testReport)**
 for PR 19327 at commit 
[`55c590c`](https://github.com/apache/spark/commit/55c590c996e3504d6353b33dad7760c98608cb55).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19343: [SPARK-22121][SQL] Correct database location for namenod...

2017-09-26 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19343
  
Thanks! Maybe we can close it now and revisit it when we have a better way 
to resolve the file system specific issues? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19356: Merge pull request #1 from apache/master

2017-09-26 Thread srowen
Github user srowen commented on the issue:

https://github.com/apache/spark/pull/19356
  
Close this @yaozhang2016 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19349: [SPARK-22125][PYSPARK][SQL] Enable Arrow Stream format f...

2017-09-26 Thread BryanCutler
Github user BryanCutler commented on the issue:

https://github.com/apache/spark/pull/19349
  
Nice job on refactoring `PythonRunner`!  I think we should just replace the 
arrow file format with stream format for pandas udf instead of having a new 
conf to enable it, as long as all the issues are worked out. Along with being a 
little faster, it's also easier on memory usage.  I'd like to do the same for 
`toPandas()` also, but that can be a followup.  Is it possible to do away with 
the SQLConf and maybe rename some of these classes to be more general, e.g. 
`ArrowStreamPythonUDFRunner` -> `ArrowPythonRunner`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19353: [SPARK-22103][FOLLOWUP] Rename addExtraCode to addInnerC...

2017-09-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19353
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19353: [SPARK-22103][FOLLOWUP] Rename addExtraCode to addInnerC...

2017-09-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19353
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82204/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19353: [SPARK-22103][FOLLOWUP] Rename addExtraCode to addInnerC...

2017-09-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19353
  
**[Test build #82204 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82204/testReport)**
 for PR 19353 at commit 
[`c0488fb`](https://github.com/apache/spark/commit/c0488fbbfa3abb441198aa931ada5d6001d5029f).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19356: Merge pull request #1 from apache/master

2017-09-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19356
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19356: Merge pull request #1 from apache/master

2017-09-26 Thread yaozhang2016
GitHub user yaozhang2016 opened a pull request:

https://github.com/apache/spark/pull/19356

Merge pull request #1 from apache/master

update from origin

## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yaozhang2016/spark master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19356.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19356


commit 82af7606cfa6bde3c7d35529a18db372af12230c
Author: yaozhang2016 
Date:   2017-07-17T15:22:11Z

Merge pull request #1 from apache/master

update from origin




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19349: [SPARK-22125][PYSPARK][SQL] Enable Arrow Stream f...

2017-09-26 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/19349#discussion_r141142944
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowStreamPythonUDFRunner.scala
 ---
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io._
+import java.net._
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import org.apache.arrow.vector.VectorSchemaRoot
+import org.apache.arrow.vector.stream.{ArrowStreamReader, 
ArrowStreamWriter}
+
+import org.apache.spark._
+import org.apache.spark.api.python._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.arrow.{ArrowUtils, ArrowWriter}
+import org.apache.spark.sql.execution.vectorized.{ArrowColumnVector, 
ColumnarBatch, ColumnVector}
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+
+/**
+ * Similar to `PythonUDFRunner`, but exchange data with Python worker via 
Arrow stream.
+ */
+class ArrowStreamPythonUDFRunner(
+funcs: Seq[ChainedPythonFunctions],
+batchSize: Int,
+bufferSize: Int,
+reuseWorker: Boolean,
+evalType: Int,
+argOffsets: Array[Array[Int]],
+schema: StructType)
+  extends BasePythonRunner[InternalRow, ColumnarBatch](
+funcs, bufferSize, reuseWorker, evalType, argOffsets) {
+
+  protected override def newWriterThread(
+  env: SparkEnv,
+  worker: Socket,
+  inputIterator: Iterator[InternalRow],
+  partitionIndex: Int,
+  context: TaskContext): WriterThread = {
+new WriterThread(env, worker, inputIterator, partitionIndex, context) {
+
+  override def writeCommand(dataOut: DataOutputStream): Unit = {
+dataOut.writeInt(funcs.length)
+funcs.zip(argOffsets).foreach { case (chained, offsets) =>
+  dataOut.writeInt(offsets.length)
+  offsets.foreach { offset =>
+dataOut.writeInt(offset)
+  }
+  dataOut.writeInt(chained.funcs.length)
+  chained.funcs.foreach { f =>
+dataOut.writeInt(f.command.length)
+dataOut.write(f.command)
+  }
+}
+  }
+
+  override def writeIteratorToStream(dataOut: DataOutputStream): Unit 
= {
+val arrowSchema = ArrowUtils.toArrowSchema(schema)
+val allocator = ArrowUtils.rootAllocator.newChildAllocator(
+  s"stdout writer for $pythonExec", 0, Long.MaxValue)
+
+val root = VectorSchemaRoot.create(arrowSchema, allocator)
+val arrowWriter = ArrowWriter.create(root)
+
+var closed = false
+
+context.addTaskCompletionListener { _ =>
+  if (!closed) {
+root.close()
+allocator.close()
+  }
+}
+
+val writer = new ArrowStreamWriter(root, null, dataOut)
+writer.start()
+
+Utils.tryWithSafeFinally {
+  while (inputIterator.hasNext) {
+var rowCount = 0
+while (inputIterator.hasNext && (batchSize <= 0 || rowCount < 
batchSize)) {
+  val row = inputIterator.next()
+  arrowWriter.write(row)
+  rowCount += 1
+}
+arrowWriter.finish()
+writer.writeBatch()
+arrowWriter.reset()
+  }
+} {
+  writer.end()
+  root.close()
+  allocator.close()
+  closed = true
+}
+  }
+}
+  }
+
+  protected override def newReaderIterator(
+  stream: DataInputStream,
+  writerThread: WriterThread,
+  startTime: Long,
+  env: SparkEnv,
+  worker: Socket,
+  released: AtomicBoolean,
+

[GitHub] spark issue #19355: [SPARK-22130][Core] UTF8String.trim() scans " " twice

2017-09-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19355
  
**[Test build #82209 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82209/testReport)**
 for PR 19355 at commit 
[`d39c648`](https://github.com/apache/spark/commit/d39c648fe8ed690e4aa309f4e58e8484792cfc6c).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19327: [WIP] Implement stream-stream outer joins.

2017-09-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19327
  
**[Test build #82208 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82208/testReport)**
 for PR 19327 at commit 
[`55c590c`](https://github.com/apache/spark/commit/55c590c996e3504d6353b33dad7760c98608cb55).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19327: [WIP] Implement stream-stream outer joins.

2017-09-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19327
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19327: [WIP] Implement stream-stream outer joins.

2017-09-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19327
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82207/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19327: [WIP] Implement stream-stream outer joins.

2017-09-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19327
  
**[Test build #82207 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82207/testReport)**
 for PR 19327 at commit 
[`eb48277`](https://github.com/apache/spark/commit/eb48277e1d96d2771df620a491ec1dfe232eea7d).
 * This patch **fails to build**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19186: [SPARK-21972][ML] Add param handlePersistence

2017-09-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19186
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82200/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19186: [SPARK-21972][ML] Add param handlePersistence

2017-09-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19186
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19186: [SPARK-21972][ML] Add param handlePersistence

2017-09-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19186
  
**[Test build #82200 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82200/testReport)**
 for PR 19186 at commit 
[`c168e7d`](https://github.com/apache/spark/commit/c168e7d0494da26acfc98080e619a2b6ce4f6a95).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19327: [WIP] Implement stream-stream outer joins.

2017-09-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19327
  
**[Test build #82207 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82207/testReport)**
 for PR 19327 at commit 
[`eb48277`](https://github.com/apache/spark/commit/eb48277e1d96d2771df620a491ec1dfe232eea7d).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19355: [SPARK-22130][Core] UTF8String.trim() scans " " t...

2017-09-26 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/19355#discussion_r141131388
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java ---
@@ -501,14 +501,13 @@ public UTF8String trim() {
 int e = this.numBytes - 1;
 // skip all of the space (0x20) in the left side
 while (s < this.numBytes && getByte(s) == 0x20) s++;
-// skip all of the space (0x20) in the right side
-while (e >= 0 && getByte(e) == 0x20) e--;
-if (s > e) {
+if (s == this.numBytes) {
   // empty string
   return EMPTY_UTF8;
-} else {
-  return copyUTF8String(s, e);
 }
+// skip all of the space (0x20) in the right side
+while (e >= 0 && getByte(e) == 0x20) e--;
--- End diff --

Nit: while you're optimizing this, you can bring the declaration of e down 
here, as it won't be used unless there's a non-space char.

I think this condition can start with `e > s` too. At the end, s and e 
point to the first and last non-space char. When the loop starts, s points to a 
non-space char. So you can stop when e == s; this is the case of one non-space 
char.

Might be worth adding test cases for an empty string, and single-non-char 
string too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18887: [SPARK-20642][core] Store FsHistoryProvider listing data...

2017-09-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18887
  
**[Test build #82206 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82206/testReport)**
 for PR 18887 at commit 
[`5eff2c5`](https://github.com/apache/spark/commit/5eff2c5f0891c6ea3fcfbbc7dacbbaf56c1d1788).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r141127050
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -424,208 +459,105 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   }
 }
 
-applications.get(appId) match {
-  case Some(appInfo) =>
-try {
-  // If no attempt is specified, or there is no attemptId for 
attempts, return all attempts
-  appInfo.attempts.filter { attempt =>
-attempt.attemptId.isEmpty || attemptId.isEmpty || 
attempt.attemptId.get == attemptId.get
-  }.foreach { attempt =>
-val logPath = new Path(logDir, attempt.logPath)
-zipFileToStream(logPath, attempt.logPath, zipStream)
-  }
-} finally {
-  zipStream.close()
+val app = try {
+  load(appId)
+} catch {
+  case _: NoSuchElementException =>
+throw new SparkException(s"Logs for $appId not found.")
+}
+
+try {
+  // If no attempt is specified, or there is no attemptId for 
attempts, return all attempts
+  attemptId
+.map { id => app.attempts.filter(_.info.attemptId == Some(id)) }
+.getOrElse(app.attempts)
+.map(_.logPath)
+.foreach { log =>
+  zipFileToStream(new Path(logDir, log), log, zipStream)
 }
-  case None => throw new SparkException(s"Logs for $appId not found.")
+} finally {
+  zipStream.close()
 }
   }
 
   /**
-   * Replay the log files in the list and merge the list of old 
applications with new ones
+   * Replay the given log file, saving the application in the listing db.
*/
   protected def mergeApplicationListing(fileStatus: FileStatus): Unit = {
-val newAttempts = try {
-  val eventsFilter: ReplayEventsFilter = { eventString =>
-eventString.startsWith(APPL_START_EVENT_PREFIX) ||
-  eventString.startsWith(APPL_END_EVENT_PREFIX) ||
-  eventString.startsWith(LOG_START_EVENT_PREFIX)
-  }
-
-  val logPath = fileStatus.getPath()
-  val appCompleted = isApplicationCompleted(fileStatus)
-
-  // Use loading time as lastUpdated since some filesystems don't 
update modifiedTime
-  // each time file is updated. However use modifiedTime for completed 
jobs so lastUpdated
-  // won't change whenever HistoryServer restarts and reloads the file.
-  val lastUpdated = if (appCompleted) fileStatus.getModificationTime 
else clock.getTimeMillis()
-
-  val appListener = replay(fileStatus, appCompleted, new 
ReplayListenerBus(), eventsFilter)
-
-  // Without an app ID, new logs will render incorrectly in the 
listing page, so do not list or
-  // try to show their UI.
-  if (appListener.appId.isDefined) {
-val attemptInfo = new FsApplicationAttemptInfo(
-  logPath.getName(),
-  appListener.appName.getOrElse(NOT_STARTED),
-  appListener.appId.getOrElse(logPath.getName()),
-  appListener.appAttemptId,
-  appListener.startTime.getOrElse(-1L),
-  appListener.endTime.getOrElse(-1L),
-  lastUpdated,
-  appListener.sparkUser.getOrElse(NOT_STARTED),
-  appCompleted,
-  fileStatus.getLen(),
-  appListener.appSparkVersion.getOrElse("")
-)
-fileToAppInfo.put(logPath, attemptInfo)
-logDebug(s"Application log ${attemptInfo.logPath} loaded 
successfully: $attemptInfo")
-Some(attemptInfo)
-  } else {
-logWarning(s"Failed to load application log ${fileStatus.getPath}. 
" +
-  "The application may have not started.")
-None
-  }
-
-} catch {
-  case e: Exception =>
-logError(
-  s"Exception encountered when attempting to load application log 
${fileStatus.getPath}",
-  e)
-None
-}
-
-if (newAttempts.isEmpty) {
-  return
+val eventsFilter: ReplayEventsFilter = { eventString =>
+  eventString.startsWith(APPL_START_EVENT_PREFIX) ||
+eventString.startsWith(APPL_END_EVENT_PREFIX) ||
+eventString.startsWith(LOG_START_EVENT_PREFIX)
 }
 
-// Build a map containing all apps that contain new attempts. The app 
information in this map
-// contains both the new app attempt, and those that were already 
loaded in the existing apps
-// map. If an attempt has been updated, it replaces the old attempt in 
the list.
-val newAppMap = new mutable.HashMap[String, FsApplicationHistor

[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r141125410
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -424,208 +459,105 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   }
 }
 
-applications.get(appId) match {
-  case Some(appInfo) =>
-try {
-  // If no attempt is specified, or there is no attemptId for 
attempts, return all attempts
-  appInfo.attempts.filter { attempt =>
-attempt.attemptId.isEmpty || attemptId.isEmpty || 
attempt.attemptId.get == attemptId.get
-  }.foreach { attempt =>
-val logPath = new Path(logDir, attempt.logPath)
-zipFileToStream(logPath, attempt.logPath, zipStream)
-  }
-} finally {
-  zipStream.close()
+val app = try {
+  load(appId)
+} catch {
+  case _: NoSuchElementException =>
+throw new SparkException(s"Logs for $appId not found.")
+}
+
+try {
+  // If no attempt is specified, or there is no attemptId for 
attempts, return all attempts
+  attemptId
+.map { id => app.attempts.filter(_.info.attemptId == Some(id)) }
+.getOrElse(app.attempts)
+.map(_.logPath)
+.foreach { log =>
+  zipFileToStream(new Path(logDir, log), log, zipStream)
 }
-  case None => throw new SparkException(s"Logs for $appId not found.")
+} finally {
+  zipStream.close()
 }
   }
 
   /**
-   * Replay the log files in the list and merge the list of old 
applications with new ones
+   * Replay the given log file, saving the application in the listing db.
--- End diff --

No. If you look at the old code it did a "merge sort" kinda thing to create 
an updated listing. `KVStore` sorts things internally so there's no need for 
that code anymore - you just write something to it, and it's sorted magically.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r141124934
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -685,26 +618,11 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
* @return a summary of the component state
*/
   override def toString: String = {
-val header = s"""
-  | FsHistoryProvider: logdir=$logDir,
-  | last scan time=$lastScanTime
-  | Cached application count =${applications.size}}
-""".stripMargin
-val sb = new StringBuilder(header)
-applications.foreach(entry => sb.append(entry._2).append("\n"))
-sb.toString
-  }
-
-  /**
-   * Look up an application attempt
-   * @param appId application ID
-   * @param attemptId Attempt ID, if set
-   * @return the matching attempt, if found
-   */
-  def lookup(appId: String, attemptId: Option[String]): 
Option[FsApplicationAttemptInfo] = {
-applications.get(appId).flatMap { appInfo =>
-  appInfo.attempts.find(_.attemptId == attemptId)
-}
+val count = listing.count(classOf[ApplicationInfoWrapper])
+s"""|FsHistoryProvider{logdir=$logDir,
--- End diff --

That would make the string look weird.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r141124127
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -722,75 +640,215 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   appId: String,
   attemptId: Option[String],
   prevFileSize: Long)(): Boolean = {
-lookup(appId, attemptId) match {
-  case None =>
+try {
+  val attempt = getAttempt(appId, attemptId)
+  val logPath = fs.makeQualified(new Path(logDir, attempt.logPath))
+  recordedFileSize(logPath) > prevFileSize
+} catch {
+  case _: NoSuchElementException =>
 logDebug(s"Application Attempt $appId/$attemptId not found")
 false
-  case Some(latest) =>
-prevFileSize < latest.fileSize
 }
   }
-}
 
-private[history] object FsHistoryProvider {
-  val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
+  /**
+   * Return the last known size of the given event log, recorded the last 
time the file
+   * system scanner detected a change in the file.
+   */
+  private def recordedFileSize(log: Path): Long = {
+try {
+  listing.read(classOf[LogInfo], log.toString()).fileSize
+} catch {
+  case _: NoSuchElementException => 0L
+}
+  }
+
+  private def load(appId: String): ApplicationInfoWrapper = {
+listing.read(classOf[ApplicationInfoWrapper], appId)
+  }
+
+  /**
+   * Write the app's information to the given store. Serialized to avoid 
the (notedly rare) case
+   * where two threads are processing separate attempts of the same 
application.
+   */
+  private def addListing(app: ApplicationInfoWrapper): Unit = 
listing.synchronized {
+val attempt = app.attempts.head
+
+val oldApp = try {
+  listing.read(classOf[ApplicationInfoWrapper], app.id)
+} catch {
+  case _: NoSuchElementException =>
+app
+}
+
+def compareAttemptInfo(a1: AttemptInfoWrapper, a2: 
AttemptInfoWrapper): Boolean = {
--- End diff --

This is only used here, I'd rather keep the logic local.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r141123714
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -243,42 +282,38 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 
   override def getAppUI(appId: String, attemptId: Option[String]): 
Option[LoadedAppUI] = {
 try {
-  applications.get(appId).flatMap { appInfo =>
-appInfo.attempts.find(_.attemptId == attemptId).flatMap { attempt 
=>
+  val appInfo = load(appId)
+  appInfo.attempts
+.find { attempt => attempt.info.attemptId == attemptId }
+.map { attempt =>
   val replayBus = new ReplayListenerBus()
   val ui = {
 val conf = this.conf.clone()
 val appSecManager = new SecurityManager(conf)
-SparkUI.createHistoryUI(conf, replayBus, appSecManager, 
appInfo.name,
-  HistoryServer.getAttemptURI(appId, attempt.attemptId),
-  Some(attempt.lastUpdated), attempt.startTime)
+SparkUI.createHistoryUI(conf, replayBus, appSecManager, 
appInfo.info.name,
+  HistoryServer.getAttemptURI(appId, attempt.info.attemptId),
+  Some(attempt.info.lastUpdated.getTime()), 
attempt.info.startTime.getTime())
 // Do not call ui.bind() to avoid creating a new server for 
each application
   }
 
   val fileStatus = fs.getFileStatus(new Path(logDir, 
attempt.logPath))
 
   val appListener = replay(fileStatus, 
isApplicationCompleted(fileStatus), replayBus)
-
-  if (appListener.appId.isDefined) {
-ui.appSparkVersion = appListener.appSparkVersion.getOrElse("")
-ui.getSecurityManager.setAcls(HISTORY_UI_ACLS_ENABLE)
-// make sure to set admin acls before view acls so they are 
properly picked up
-val adminAcls = HISTORY_UI_ADMIN_ACLS + "," + 
appListener.adminAcls.getOrElse("")
-ui.getSecurityManager.setAdminAcls(adminAcls)
-ui.getSecurityManager.setViewAcls(attempt.sparkUser, 
appListener.viewAcls.getOrElse(""))
-val adminAclsGroups = HISTORY_UI_ADMIN_ACLS_GROUPS + "," +
-  appListener.adminAclsGroups.getOrElse("")
-ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups)
-
ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse(""))
-Some(LoadedAppUI(ui, () => updateProbe(appId, attemptId, 
attempt.fileSize)))
-  } else {
-None
-  }
-
+  ui.appSparkVersion = appListener.appSparkVersion.getOrElse("")
--- End diff --

(This code will also go away in subsequent changes.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r141123619
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -243,42 +282,38 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 
   override def getAppUI(appId: String, attemptId: Option[String]): 
Option[LoadedAppUI] = {
 try {
-  applications.get(appId).flatMap { appInfo =>
-appInfo.attempts.find(_.attemptId == attemptId).flatMap { attempt 
=>
+  val appInfo = load(appId)
+  appInfo.attempts
+.find { attempt => attempt.info.attemptId == attemptId }
+.map { attempt =>
   val replayBus = new ReplayListenerBus()
   val ui = {
 val conf = this.conf.clone()
 val appSecManager = new SecurityManager(conf)
-SparkUI.createHistoryUI(conf, replayBus, appSecManager, 
appInfo.name,
-  HistoryServer.getAttemptURI(appId, attempt.attemptId),
-  Some(attempt.lastUpdated), attempt.startTime)
+SparkUI.createHistoryUI(conf, replayBus, appSecManager, 
appInfo.info.name,
+  HistoryServer.getAttemptURI(appId, attempt.info.attemptId),
+  Some(attempt.info.lastUpdated.getTime()), 
attempt.info.startTime.getTime())
 // Do not call ui.bind() to avoid creating a new server for 
each application
   }
 
   val fileStatus = fs.getFileStatus(new Path(logDir, 
attempt.logPath))
 
   val appListener = replay(fileStatus, 
isApplicationCompleted(fileStatus), replayBus)
-
-  if (appListener.appId.isDefined) {
-ui.appSparkVersion = appListener.appSparkVersion.getOrElse("")
-ui.getSecurityManager.setAcls(HISTORY_UI_ACLS_ENABLE)
-// make sure to set admin acls before view acls so they are 
properly picked up
-val adminAcls = HISTORY_UI_ADMIN_ACLS + "," + 
appListener.adminAcls.getOrElse("")
-ui.getSecurityManager.setAdminAcls(adminAcls)
-ui.getSecurityManager.setViewAcls(attempt.sparkUser, 
appListener.viewAcls.getOrElse(""))
-val adminAclsGroups = HISTORY_UI_ADMIN_ACLS_GROUPS + "," +
-  appListener.adminAclsGroups.getOrElse("")
-ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups)
-
ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse(""))
-Some(LoadedAppUI(ui, () => updateProbe(appId, attemptId, 
attempt.fileSize)))
-  } else {
-None
-  }
-
+  ui.appSparkVersion = appListener.appSparkVersion.getOrElse("")
--- End diff --

If the listing exists it's unlikely that this would ever trigger, but sure.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r141123050
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -243,42 +282,38 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 
   override def getAppUI(appId: String, attemptId: Option[String]): 
Option[LoadedAppUI] = {
 try {
-  applications.get(appId).flatMap { appInfo =>
-appInfo.attempts.find(_.attemptId == attemptId).flatMap { attempt 
=>
+  val appInfo = load(appId)
+  appInfo.attempts
+.find { attempt => attempt.info.attemptId == attemptId }
+.map { attempt =>
   val replayBus = new ReplayListenerBus()
   val ui = {
 val conf = this.conf.clone()
 val appSecManager = new SecurityManager(conf)
-SparkUI.createHistoryUI(conf, replayBus, appSecManager, 
appInfo.name,
-  HistoryServer.getAttemptURI(appId, attempt.attemptId),
-  Some(attempt.lastUpdated), attempt.startTime)
+SparkUI.createHistoryUI(conf, replayBus, appSecManager, 
appInfo.info.name,
+  HistoryServer.getAttemptURI(appId, attempt.info.attemptId),
+  Some(attempt.info.lastUpdated.getTime()), 
attempt.info.startTime.getTime())
--- End diff --

`AttemptInfoWrapper` stores information that is not available in the public 
API and is used by the provider. Later changes also add more fields to 
`ApplicationInfoWrapper` that are needed by the provider.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps

2017-09-26 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/19250
  
@HyukjinKwon you might be interested in this one also


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r141122539
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -243,42 +282,38 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 
   override def getAppUI(appId: String, attemptId: Option[String]): 
Option[LoadedAppUI] = {
 try {
-  applications.get(appId).flatMap { appInfo =>
-appInfo.attempts.find(_.attemptId == attemptId).flatMap { attempt 
=>
+  val appInfo = load(appId)
+  appInfo.attempts
+.find { attempt => attempt.info.attemptId == attemptId }
+.map { attempt =>
   val replayBus = new ReplayListenerBus()
   val ui = {
 val conf = this.conf.clone()
 val appSecManager = new SecurityManager(conf)
-SparkUI.createHistoryUI(conf, replayBus, appSecManager, 
appInfo.name,
-  HistoryServer.getAttemptURI(appId, attempt.attemptId),
-  Some(attempt.lastUpdated), attempt.startTime)
+SparkUI.createHistoryUI(conf, replayBus, appSecManager, 
appInfo.info.name,
+  HistoryServer.getAttemptURI(appId, attempt.info.attemptId),
+  Some(attempt.info.lastUpdated.getTime()), 
attempt.info.startTime.getTime())
 // Do not call ui.bind() to avoid creating a new server for 
each application
   }
 
   val fileStatus = fs.getFileStatus(new Path(logDir, 
attempt.logPath))
 
   val appListener = replay(fileStatus, 
isApplicationCompleted(fileStatus), replayBus)
-
-  if (appListener.appId.isDefined) {
-ui.appSparkVersion = appListener.appSparkVersion.getOrElse("")
-ui.getSecurityManager.setAcls(HISTORY_UI_ACLS_ENABLE)
-// make sure to set admin acls before view acls so they are 
properly picked up
-val adminAcls = HISTORY_UI_ADMIN_ACLS + "," + 
appListener.adminAcls.getOrElse("")
-ui.getSecurityManager.setAdminAcls(adminAcls)
-ui.getSecurityManager.setViewAcls(attempt.sparkUser, 
appListener.viewAcls.getOrElse(""))
-val adminAclsGroups = HISTORY_UI_ADMIN_ACLS_GROUPS + "," +
-  appListener.adminAclsGroups.getOrElse("")
-ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups)
-
ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse(""))
-Some(LoadedAppUI(ui, () => updateProbe(appId, attemptId, 
attempt.fileSize)))
-  } else {
-None
-  }
-
+  ui.appSparkVersion = appListener.appSparkVersion.getOrElse("")
+  ui.getSecurityManager.setAcls(HISTORY_UI_ACLS_ENABLE)
+  // make sure to set admin acls before view acls so they are 
properly picked up
+  val adminAcls = HISTORY_UI_ADMIN_ACLS + "," + 
appListener.adminAcls.getOrElse("")
+  ui.getSecurityManager.setAdminAcls(adminAcls)
+  ui.getSecurityManager.setViewAcls(attempt.info.sparkUser,
+appListener.viewAcls.getOrElse(""))
+  val adminAclsGroups = HISTORY_UI_ADMIN_ACLS_GROUPS + "," +
+appListener.adminAclsGroups.getOrElse("")
+  ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups)
+  
ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse(""))
+  LoadedAppUI(ui, () => updateProbe(appId, attemptId, 
attempt.fileSize))
 }
-  }
 } catch {
-  case e: FileNotFoundException => None
--- End diff --

I guess it still can until I remove the current replay code in a later 
change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r141121472
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 ---
@@ -496,7 +517,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
 
   var provider: FsHistoryProvider = null
   try {
-provider = new FsHistoryProvider(conf)
+provider = newProvider(conf)
--- End diff --

This was part of some code I only partially reverted. Will revert these.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r141121349
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/history/config.scala 
---
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.concurrent.TimeUnit
+
+import scala.annotation.meta.getter
+
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.util.kvstore.KVIndex
+
+private[spark] object config {
+
+  /** Use this to annotate constructor params to be used as KVStore 
indices. */
+  type KVIndexParam = KVIndex @getter
+
+  val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
+
+  val EVENT_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory")
+.stringConf
+.createWithDefault(DEFAULT_LOG_DIR)
+
+  val MAX_LOG_AGE_S = ConfigBuilder("spark.history.fs.cleaner.maxAge")
--- End diff --

Yes. This pattern is used in a bunch of other places to indicate the unit 
of time of the config.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r141120995
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -722,75 +640,215 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   appId: String,
   attemptId: Option[String],
   prevFileSize: Long)(): Boolean = {
-lookup(appId, attemptId) match {
-  case None =>
+try {
+  val attempt = getAttempt(appId, attemptId)
+  val logPath = fs.makeQualified(new Path(logDir, attempt.logPath))
+  recordedFileSize(logPath) > prevFileSize
+} catch {
+  case _: NoSuchElementException =>
 logDebug(s"Application Attempt $appId/$attemptId not found")
 false
-  case Some(latest) =>
-prevFileSize < latest.fileSize
 }
   }
-}
 
-private[history] object FsHistoryProvider {
-  val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
+  /**
+   * Return the last known size of the given event log, recorded the last 
time the file
+   * system scanner detected a change in the file.
+   */
+  private def recordedFileSize(log: Path): Long = {
+try {
+  listing.read(classOf[LogInfo], log.toString()).fileSize
+} catch {
+  case _: NoSuchElementException => 0L
+}
+  }
+
+  private def load(appId: String): ApplicationInfoWrapper = {
+listing.read(classOf[ApplicationInfoWrapper], appId)
+  }
+
+  /**
+   * Write the app's information to the given store. Serialized to avoid 
the (notedly rare) case
+   * where two threads are processing separate attempts of the same 
application.
+   */
+  private def addListing(app: ApplicationInfoWrapper): Unit = 
listing.synchronized {
+val attempt = app.attempts.head
+
+val oldApp = try {
+  listing.read(classOf[ApplicationInfoWrapper], app.id)
+} catch {
+  case _: NoSuchElementException =>
+app
+}
+
+def compareAttemptInfo(a1: AttemptInfoWrapper, a2: 
AttemptInfoWrapper): Boolean = {
+  a1.info.startTime.getTime() > a2.info.startTime.getTime()
+}
+
+val attempts = oldApp.attempts.filter(_.info.attemptId != 
attempt.info.attemptId) ++
+  List(attempt)
+
+val newAppInfo = new ApplicationInfoWrapper(
+  app.info,
+  attempts.sortWith(compareAttemptInfo))
+listing.write(newAppInfo)
+  }
 
-  private val NOT_STARTED = ""
+  /** For testing. Returns internal data about a single attempt. */
+  private[history] def getAttempt(appId: String, attemptId: 
Option[String]): AttemptInfoWrapper = {
+load(appId).attempts.find(_.info.attemptId == attemptId).getOrElse(
+  throw new NoSuchElementException(s"Cannot find attempt $attemptId of 
$appId."))
+  }
 
+}
+
+private[history] object FsHistoryProvider {
   private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = 
"spark.history.fs.numReplayThreads"
 
   private val APPL_START_EVENT_PREFIX = 
"{\"Event\":\"SparkListenerApplicationStart\""
 
   private val APPL_END_EVENT_PREFIX = 
"{\"Event\":\"SparkListenerApplicationEnd\""
 
   private val LOG_START_EVENT_PREFIX = 
"{\"Event\":\"SparkListenerLogStart\""
+
+  /**
+   * Current version of the data written to the listing database. When 
opening an existing
+   * db, if the version does not match this value, the FsHistoryProvider 
will throw away
+   * all data and re-generate the listing data from the event logs.
+   */
+  private[history] val CURRENT_LISTING_VERSION = 1L
 }
 
 /**
- * Application attempt information.
- *
- * @param logPath path to the log file, or, for a legacy log, its directory
- * @param name application name
- * @param appId application ID
- * @param attemptId optional attempt ID
- * @param startTime start time (from playback)
- * @param endTime end time (from playback). -1 if the application is 
incomplete.
- * @param lastUpdated the modification time of the log file when this 
entry was built by replaying
- *the history.
- * @param sparkUser user running the application
- * @param completed flag to indicate whether or not the application has 
completed.
- * @param fileSize the size of the log file the last time the file was 
scanned for changes
+ * A KVStoreSerializer that provides Scala types serialization too, and 
uses the same options as
+ * the API serializer.
  */
-private class FsApplicationAttemptInfo(
+private class KVStoreScalaSerializer extends KVStoreSerializer {
+
+  mapper.registerModule(DefaultScalaModule)
+  mapper.setSerializationInclusion(JsonInclude.Include.NON_N

[GitHub] spark pull request #19353: [SPARK-22103][FOLLOWUP] Rename addExtraCode to ad...

2017-09-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19353


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-09-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r141120298
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java ---
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.iq80.leveldb.WriteBatch;
+
+/**
+ * Holds metadata about app-specific types stored in LevelDB. Serves as a 
cache for data collected
+ * via reflection, to make it cheaper to access it multiple times.
+ *
+ * 
+ * The hierarchy of keys stored in LevelDB looks roughly like the 
following. This hierarchy ensures
+ * that iteration over indices is easy, and that updating values in the 
store is not overly
+ * expensive. Of note, indices choose using more disk space (one value per 
key) instead of keeping
+ * lists of pointers, which would be more expensive to update at runtime.
+ * 
+ *
+ * 
+ * Indentation defines when a sub-key lives under a parent key. In 
LevelDB, this means the full
+ * key would be the concatenation of everything up to that point in the 
hierarchy, with each
+ * component separated by a NULL byte.
+ * 
+ *
+ * 
+ * +TYPE_NAME
+ *   NATURAL_INDEX
+ * +NATURAL_KEY
+ * -
+ *   -NATURAL_INDEX
+ *   INDEX_NAME
+ * +INDEX_VALUE
+ *   +NATURAL_KEY
+ * -INDEX_VALUE
+ * .INDEX_VALUE
+ *   CHILD_INDEX_NAME
+ * +CHILD_INDEX_VALUE
+ *   NATURAL_KEY_OR_DATA
+ * -
+ *   -INDEX_NAME
+ * 
+ *
+ * 
+ * Entity data (either the entity's natural key or a copy of the data) is 
stored in all keys
+ * that end with "+". A count of all objects that match a 
particular top-level index
+ * value is kept at the end marker ("-"). A count is also kept 
at the natural index's end
+ * marker, to make it easy to retrieve the number of all elements of a 
particular type.
+ * 
+ *
+ * 
+ * To illustrate, given a type "Foo", with a natural index and a second 
index called "bar", you'd
+ * have these keys and values in the store for two instances, one with 
natural key "key1" and the
+ * other "key2", both with value "yes" for "bar":
+ * 
+ *
+ * 
+ * Foo __main__ +key1   [data for instance 1]
+ * Foo __main__ +key2   [data for instance 2]
+ * Foo __main__ -   [count of all Foo]
+ * Foo bar +yes +key1   [instance 1 key or data, depending on index type]
--- End diff --

Because otherwise, when reading from the index, it's not that easy to parse 
the object's key from the leveldb key so that you can retrieve the object 
itself.

You also have to store something, leveldb doesn't allow you to store null.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19353: [SPARK-22103][FOLLOWUP] Rename addExtraCode to addInnerC...

2017-09-26 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19353
  
Thanks! Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-09-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r141119638
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java ---
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.iq80.leveldb.WriteBatch;
+
+/**
+ * Holds metadata about app-specific types stored in LevelDB. Serves as a 
cache for data collected
+ * via reflection, to make it cheaper to access it multiple times.
+ *
+ * 
+ * The hierarchy of keys stored in LevelDB looks roughly like the 
following. This hierarchy ensures
+ * that iteration over indices is easy, and that updating values in the 
store is not overly
+ * expensive. Of note, indices choose using more disk space (one value per 
key) instead of keeping
+ * lists of pointers, which would be more expensive to update at runtime.
+ * 
+ *
+ * 
+ * Indentation defines when a sub-key lives under a parent key. In 
LevelDB, this means the full
+ * key would be the concatenation of everything up to that point in the 
hierarchy, with each
+ * component separated by a NULL byte.
+ * 
+ *
+ * 
+ * +TYPE_NAME
+ *   NATURAL_INDEX
+ * +NATURAL_KEY
+ * -
+ *   -NATURAL_INDEX
+ *   INDEX_NAME
+ * +INDEX_VALUE
+ *   +NATURAL_KEY
+ * -INDEX_VALUE
+ * .INDEX_VALUE
+ *   CHILD_INDEX_NAME
+ * +CHILD_INDEX_VALUE
+ *   NATURAL_KEY_OR_DATA
+ * -
+ *   -INDEX_NAME
+ * 
+ *
+ * 
+ * Entity data (either the entity's natural key or a copy of the data) is 
stored in all keys
+ * that end with "+". A count of all objects that match a 
particular top-level index
+ * value is kept at the end marker ("-"). A count is also kept 
at the natural index's end
+ * marker, to make it easy to retrieve the number of all elements of a 
particular type.
+ * 
+ *
+ * 
+ * To illustrate, given a type "Foo", with a natural index and a second 
index called "bar", you'd
+ * have these keys and values in the store for two instances, one with 
natural key "key1" and the
+ * other "key2", both with value "yes" for "bar":
+ * 
+ *
+ * 
+ * Foo __main__ +key1   [data for instance 1]
+ * Foo __main__ +key2   [data for instance 2]
+ * Foo __main__ -   [count of all Foo]
+ * Foo bar +yes +key1   [instance 1 key or data, depending on index type]
+ * Foo bar +yes +key2   [instance 2 key or data, depending on index type]
+ * Foo bar +yes -   [count of all Foo with "bar=yes" ]
+ * 
+ *
+ * 
+ * Note that all indexed values are prepended with "+", even if the index 
itself does not have an
+ * explicit end marker. This allows for easily skipping to the end of an 
index by telling LevelDB
+ * to seek to the "phantom" end marker of the index. Throughout the code 
and comments, this part
+ * of the full LevelDB key is generally referred to as the "index value" 
of the entity.
+ * 
+ *
+ * 
+ * Child indices are stored after their parent index. In the example 
above, let's assume there is
+ * a child index "child", whose parent is "bar". If both instances have 
value "no" for this field,
+ * the data in the store would look something like the following:
+ * 
+ *
+ * 
+ * ...
+ * Foo bar +yes -
+ * Foo bar .yes .child +no +key1   [instance 1 key or data, depending on 
index type]
+ * Foo bar .yes .child +no +key2   [instance 2 key or data, depending on 
index

[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-09-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r141119266
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/KVStore.java ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Abstraction for a local key/value store for storing app data.
+ *
+ * 
+ * There are two main features provided by the implementations of this 
interface:
+ * 
+ *
+ * Serialization
+ *
+ * 
+ * If the underlying data store requires serialization, data will be 
serialized to and deserialized
+ * using a {@link KVStoreSerializer}, which can be customized by the 
application. The serializer is
+ * based on Jackson, so it supports all the Jackson annotations for 
controlling the serialization of
+ * app-defined types.
+ * 
+ *
+ * 
+ * Data is also automatically compressed to save disk space.
+ * 
+ *
+ * Automatic Key Management
--- End diff --

First, you do realize this PR has been merged a long time ago, right?

Automatic means you don't have to manually create keys. You're writing 
objects to the store, not calling something like `.put(key, object)`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19343: [SPARK-22121][SQL] Correct database location for namenod...

2017-09-26 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/19343
  
I don't see much point in putting this in the docs ... it seems too 
fine-grained a detail to be useful there.  I just don't see the users who 
encounter this exception from going to look at the spot in the docs to figure 
out exactly what is wrong.  I put an example exception in the JIRA, so at least 
users can find it with a search.  

Sounds like you feel pretty strongly we should close this as "won't fix"?  
I'd still prefer to have this in, but will settle for just having the 
workaround be easily searchable


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   >