[GitHub] spark pull request: [SPARK-6538][SQL] Add missing nullable Metasto...

2015-03-25 Thread budde
GitHub user budde opened a pull request:

https://github.com/apache/spark/pull/5188

[SPARK-6538][SQL] Add missing nullable Metastore fields when merging a 
Parquet schema

When Spark SQL infers a schema for a DataFrame, it will take the union of 
all field types present in the structured source data (e.g. an RDD of JSON 
data). When the source data for a row doesn't define a particular field on the 
DataFrame's schema, a null value will simply be assumed for this field. This 
workflow makes it very easy to construct tables and query over a set of 
structured data with a nonuniform schema. However, this behavior is not 
consistent in some cases when dealing with Parquet files and an external table 
managed by an external Hive metastore.

In our particular usecase, we use Spark Streaming to parse and transform 
our input data and then apply a window function to save an arbitrary-sized 
batch of data as a Parquet file, which itself will be added as a partition to 
an external Hive table via an *ALTER TABLE... ADD PARTITION...* statement. 
Since our input data is nonuniform, it is expected that not every partition 
batch will contain every field present in the table's schema obtained from the 
Hive metastore. As such, we expect that the schema of some of our Parquet files 
may not contain the same set fields present in the full metastore schema.

In such cases, it seems natural that Spark SQL would simply assume null 
values for any missing fields in the partition's Parquet file, assuming these 
fields are specified as nullable by the metastore schema. This is not the case 
in the current implementation of ParquetRelation2. The 
**mergeMetastoreParquetSchema()** method used to reconcile differences between 
a Parquet file's schema and a schema retrieved from the Hive metastore will 
raise an exception if the Parquet file doesn't match the same set of fields 
specified by the metastore.

This pull requests alters the behavior of **mergeMetastoreParquetSchema()** 
by having it first add any nullable fields from the metastore schema to the 
Parquet file schema if they aren't already present there. 

Besides the usual code quality and correctness feedback, I'd appreciate any 
comments specifically on:
* should this be the assumed behavoir of the 
**mergeMetastoreParquetSchema()** method or should I refactor this pull request 
to make this behavior dependent on a configuration option?
* am I correct in submitting a pull request to change newParquet.scala in 
branch-1.3 or should I have submitted it to the master branch?

Thanks for taking a look!

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/budde/spark merge-nullable-fields

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/5188.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5188


commit 13ae7bf5fa2fdde16b1c14713a16bdf2c59b28c0
Author: Adam Budde bu...@amazon.com
Date:   2015-03-25T19:59:34Z

Add missing nullable Metastore fields when merging a Parquet schema




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6538][SQL] Add missing nullable Metasto...

2015-03-26 Thread budde
GitHub user budde opened a pull request:

https://github.com/apache/spark/pull/5214

[SPARK-6538][SQL] Add missing nullable Metastore fields when merging a 
Parquet schema

Opening to replace #5188.

When Spark SQL infers a schema for a DataFrame, it will take the union of 
all field types present in the structured source data (e.g. an RDD of JSON 
data). When the source data for a row doesn't define a particular field on the 
DataFrame's schema, a null value will simply be assumed for this field. This 
workflow makes it very easy to construct tables and query over a set of 
structured data with a nonuniform schema. However, this behavior is not 
consistent in some cases when dealing with Parquet files and an external table 
managed by an external Hive metastore.

In our particular usecase, we use Spark Streaming to parse and transform 
our input data and then apply a window function to save an arbitrary-sized 
batch of data as a Parquet file, which itself will be added as a partition to 
an external Hive table via an *ALTER TABLE... ADD PARTITION...* statement. 
Since our input data is nonuniform, it is expected that not every partition 
batch will contain every field present in the table's schema obtained from the 
Hive metastore. As such, we expect that the schema of some of our Parquet files 
may not contain the same set fields present in the full metastore schema.

In such cases, it seems natural that Spark SQL would simply assume null 
values for any missing fields in the partition's Parquet file, assuming these 
fields are specified as nullable by the metastore schema. This is not the case 
in the current implementation of ParquetRelation2. The 
**mergeMetastoreParquetSchema()** method used to reconcile differences between 
a Parquet file's schema and a schema retrieved from the Hive metastore will 
raise an exception if the Parquet file doesn't match the same set of fields 
specified by the metastore.

This pull requests alters the behavior of **mergeMetastoreParquetSchema()** 
by having it first add any nullable fields from the metastore schema to the 
Parquet file schema if they aren't already present there.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/budde/spark nullable-fields

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/5214.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5214


commit 9041bfa3de565e3bafeaf87fc33a44925a0a0320
Author: Adam Budde bu...@amazon.com
Date:   2015-03-25T19:59:34Z

Add missing nullable Metastore fields when merging a Parquet schema




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6538][SQL] Add missing nullable Metasto...

2015-03-26 Thread budde
Github user budde closed the pull request at:

https://github.com/apache/spark/pull/5188


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6538][SQL] Add missing nullable Metasto...

2015-03-26 Thread budde
Github user budde commented on the pull request:

https://github.com/apache/spark/pull/5188#issuecomment-86625383
  
Thanks for the input, @marmbrus and @liancheng. I'll resolve the conflicts 
and open a new PR against master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6538][SQL] Add missing nullable Metasto...

2015-03-27 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/5214#discussion_r27315420
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---
@@ -775,6 +777,32 @@ private[sql] object ParquetRelation2 extends Logging {
 })
   }
 
+  /**
+   * Returns the original schema from the Parquet file with any missing 
nullable fields from the
+   * Hive Metastore schema merged in.
+   *
+   * When constructing a DataFrame from a collection of structured data, 
the resulting object has
+   * a schema corresponding to the union of the fields present in each 
element of the collection.
+   * Spark SQL simply assigns a null value to any field that isn't present 
for a particular row.
+   * In some cases, it is possible that a given table partition stored as 
a Parquet file doesn't
+   * contain a particular nullable field in its schema despite that field 
being present in the
+   * table schema obtained from the Hive Metastore. This method returns a 
schema representing the
+   * Parquet file schema along with any additional nullable fields from 
the Metastore schema
+   * merged in.
+   */
+  private[parquet] def mergeMissingNullableFields(
+  metastoreSchema: StructType,
+  parquetSchema: StructType): StructType = {
+val fieldMap = metastoreSchema.map(f = f.name.toLowerCase - f).toMap
+val missingFields = metastoreSchema
+  .map(_.name.toLowerCase)
+  .diff(parquetSchema.map(_.name.toLowerCase))
+  .map(fieldMap(_))
+  .filter(_.nullable)
+StructType(parquetSchema ++ missingFields)
--- End diff --

How should we deal with potential ambiguities that may be introduced due to 
#5141?  For instance, say we are merging the following schemas:

Metastores schema | Parquet schema
---|---
Foo | Foo
Bar | Bar
*Baz* | *Bop*
Bat | Bat

The following options come to mind:
* Attempt to merge the orderings and accept any possibility when there are 
ambiguities (e.g. both `Foo Bar Baz Bop Bat` and `Foo Bar Bop Baz Bat` are 
acceptable).
* The fields defined in the metastore schema always begin in order, 
followed by any additional fields defined in the Parquet schema (e.g. `Foo Bar 
Baz Bat Bop` is the only accepted ordering).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6538][SQL] Add missing nullable Metasto...

2015-03-27 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/5214#discussion_r27311560
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---
@@ -775,6 +777,32 @@ private[sql] object ParquetRelation2 extends Logging {
 })
   }
 
+  /**
+   * Returns the original schema from the Parquet file with any missing 
nullable fields from the
+   * Hive Metastore schema merged in.
+   *
+   * When constructing a DataFrame from a collection of structured data, 
the resulting object has
+   * a schema corresponding to the union of the fields present in each 
element of the collection.
+   * Spark SQL simply assigns a null value to any field that isn't present 
for a particular row.
+   * In some cases, it is possible that a given table partition stored as 
a Parquet file doesn't
+   * contain a particular nullable field in its schema despite that field 
being present in the
+   * table schema obtained from the Hive Metastore. This method returns a 
schema representing the
+   * Parquet file schema along with any additional nullable fields from 
the Metastore schema
+   * merged in.
+   */
+  private[parquet] def mergeMissingNullableFields(
+  metastoreSchema: StructType,
+  parquetSchema: StructType): StructType = {
+val fieldMap = metastoreSchema.map(f = f.name.toLowerCase - f).toMap
+val missingFields = metastoreSchema
+  .map(_.name.toLowerCase)
+  .diff(parquetSchema.map(_.name.toLowerCase))
+  .map(fieldMap(_))
+  .filter(_.nullable)
+StructType(parquetSchema ++ missingFields)
--- End diff --

What is the expected order of fields in a schema? Is is lexicographic?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6538][SQL] Add missing nullable Metasto...

2015-03-27 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/5214#discussion_r27332712
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---
@@ -775,6 +777,32 @@ private[sql] object ParquetRelation2 extends Logging {
 })
   }
 
+  /**
+   * Returns the original schema from the Parquet file with any missing 
nullable fields from the
+   * Hive Metastore schema merged in.
+   *
+   * When constructing a DataFrame from a collection of structured data, 
the resulting object has
+   * a schema corresponding to the union of the fields present in each 
element of the collection.
+   * Spark SQL simply assigns a null value to any field that isn't present 
for a particular row.
+   * In some cases, it is possible that a given table partition stored as 
a Parquet file doesn't
+   * contain a particular nullable field in its schema despite that field 
being present in the
+   * table schema obtained from the Hive Metastore. This method returns a 
schema representing the
+   * Parquet file schema along with any additional nullable fields from 
the Metastore schema
+   * merged in.
+   */
+  private[parquet] def mergeMissingNullableFields(
+  metastoreSchema: StructType,
+  parquetSchema: StructType): StructType = {
+val fieldMap = metastoreSchema.map(f = f.name.toLowerCase - f).toMap
+val missingFields = metastoreSchema
+  .map(_.name.toLowerCase)
+  .diff(parquetSchema.map(_.name.toLowerCase))
+  .map(fieldMap(_))
+  .filter(_.nullable)
+StructType(parquetSchema ++ missingFields)
--- End diff --

I see. Based on the change made in #5141, it looks like the schema returned 
by **mergeMissingNullableFields()** will still contain any additional fields 
defined in `parquetSchema` (lines 766-767). How would you feel about simply 
removing the additional `parquetSchema` fields in the 
**mergeMissingNullableFields()** method?

Execution would look something like this:

* remove additional `parquetSchema` fields
* call **mergeMissingNullableFields()** on schema w/additional fields 
removed
* proceed with executing **mergeMetastoreParquetSchema()** with additions 
made #5141 removed (should be unnecessary if we prune additional fields first)




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6538][SQL] Add missing nullable Metasto...

2015-03-27 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/5214#discussion_r27332969
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---
@@ -775,6 +777,32 @@ private[sql] object ParquetRelation2 extends Logging {
 })
   }
 
+  /**
+   * Returns the original schema from the Parquet file with any missing 
nullable fields from the
+   * Hive Metastore schema merged in.
+   *
+   * When constructing a DataFrame from a collection of structured data, 
the resulting object has
+   * a schema corresponding to the union of the fields present in each 
element of the collection.
+   * Spark SQL simply assigns a null value to any field that isn't present 
for a particular row.
+   * In some cases, it is possible that a given table partition stored as 
a Parquet file doesn't
+   * contain a particular nullable field in its schema despite that field 
being present in the
+   * table schema obtained from the Hive Metastore. This method returns a 
schema representing the
+   * Parquet file schema along with any additional nullable fields from 
the Metastore schema
+   * merged in.
+   */
+  private[parquet] def mergeMissingNullableFields(
+  metastoreSchema: StructType,
+  parquetSchema: StructType): StructType = {
+val fieldMap = metastoreSchema.map(f = f.name.toLowerCase - f).toMap
+val missingFields = metastoreSchema
+  .map(_.name.toLowerCase)
+  .diff(parquetSchema.map(_.name.toLowerCase))
+  .map(fieldMap(_))
+  .filter(_.nullable)
+StructType(parquetSchema ++ missingFields)
--- End diff --

Actually, now that I consider it, I'm not convinced that having the 
**mergeNullableFields()** method return the fields in non-metastore order is a 
problem here. LInes 766-767 of **mergeMetastoreParquetSchema()** should handle 
putting them in the proper order.

Removing the additional fields is still an option to consider, however.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6538][SQL] Add missing nullable Metasto...

2015-03-26 Thread budde
Github user budde commented on the pull request:

https://github.com/apache/spark/pull/5214#issuecomment-86699105
  
Taking a look at why these tests failed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6538][SQL] Add missing nullable Metasto...

2015-03-26 Thread budde
Github user budde commented on the pull request:

https://github.com/apache/spark/pull/5214#issuecomment-86721204
  
I must've accidentally run the tests on an old build artifact before 
opening this PR. It turns out that tests included #5141 expect failure in 
scenarios now permitted by this PR, while the tests originally included in this 
PR also expect failure in scenarios now permitted by #5141. I've cleared this 
up and the tests should pass now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13122] Fix race condition in MemoryStor...

2016-02-01 Thread budde
Github user budde commented on the pull request:

https://github.com/apache/spark/pull/11012#issuecomment-178314141
  
Pinging @andrewor14 , the original implementor of unrollSafely(), for any 
potential feedback.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13122] Fix race condition in MemoryStor...

2016-02-01 Thread budde
GitHub user budde opened a pull request:

https://github.com/apache/spark/pull/11012

[SPARK-13122] Fix race condition in MemoryStore.unrollSafely()

https://issues.apache.org/jira/browse/SPARK-13122

A race condition can occur in MemoryStore's unrollSafely() method if two 
threads that
return the same value for currentTaskAttemptId() execute this method 
concurrently. This
change makes the operation of reading the initial amount of unroll memory 
used, performing
the unroll, and updating the associated memory maps atomic in order to 
avoid this race
condition.

Initial proposed fix wraps all of unrollSafely() in a 
memoryManager.synchronized { } block. A cleaner approach might be introduce a 
mechanism that synchronizes based on task attempt ID. An alternative option 
might be to track unroll/pending unroll memory based on block ID rather than 
task attempt ID.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/budde/spark master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/11012.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #11012


commit 6e0156c6e3c2fd137005ce3d55cecdf7070795da
Author: Adam Budde <bu...@amazon.com>
Date:   2016-02-02T01:04:45Z

[SPARK-13122] Fix race condition in MemoryStore.unrollSafely()

https://issues.apache.org/jira/browse/SPARK-13122

A race condition can occur in MemoryStore's unrollSafely() method if two 
threads that
return the same value for currentTaskAttemptId() execute this method 
concurrently. This
change makes the operation of reading the initial amount of unroll memory 
used, performing
the unroll, and updating the associated memory maps atomic in order to 
avoid this race
condition.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13122] Fix race condition in MemoryStor...

2016-02-02 Thread budde
Github user budde commented on the pull request:

https://github.com/apache/spark/pull/11012#issuecomment-178853877
  
From Jenkins output:

>Fetching upstream changes from https://github.com/apache/spark.git
 > git --version # timeout=10
 > git fetch --tags --progress https://github.com/apache/spark.git 
+refs/pull/11012/*:refs/remotes/origin/pr/11012/* # timeout=15
ERROR: Timeout after 15 minutes
ERROR: Error fetching remote repo 'origin'


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13122] Fix race condition in MemoryStor...

2016-02-02 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/11012#discussion_r51645315
  
--- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
@@ -304,10 +309,9 @@ private[spark] class MemoryStore(blockManager: 
BlockManager, memoryManager: Memo
   // release the unroll memory yet. Instead, we transfer it to 
pending unroll memory
   // so `tryToPut` can further transfer it to normal storage 
memory later.
   // TODO: we can probably express this without pending unroll 
memory (SPARK-10907)
-  val amountToTransferToPending = currentUnrollMemoryForThisTask - 
previousMemoryReserved
--- End diff --

@nongli – the problem is that the original implementation assumes that 
previousMemoryReserved is an invariant representing the number of unroll bytes 
allocated for the process besides the pending bytes allocated during the 
unroll, but no synchronization exists to enforce this invariant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13122] Fix race condition in MemoryStor...

2016-02-02 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/11012#discussion_r51643796
  
--- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
@@ -304,10 +309,9 @@ private[spark] class MemoryStore(blockManager: 
BlockManager, memoryManager: Memo
   // release the unroll memory yet. Instead, we transfer it to 
pending unroll memory
   // so `tryToPut` can further transfer it to normal storage 
memory later.
   // TODO: we can probably express this without pending unroll 
memory (SPARK-10907)
-  val amountToTransferToPending = currentUnrollMemoryForThisTask - 
previousMemoryReserved
--- End diff --

Per my earlier comment, I updated the PR to use to use a var named 
previousMemoryReserved to manually track the number of unroll bytes allocated 
during a given invocation of unrollSafely rather than relying on 
unrollMemoryMap(taskAttemptId) not being modified outside of the given thread 
between the assignment to previousMemoryReserved and the memory maps being 
updated in the finally { } block. This should remove the need to make the whole 
method synchronized.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13122] Fix race condition in MemoryStor...

2016-02-02 Thread budde
Github user budde commented on the pull request:

https://github.com/apache/spark/pull/11012#issuecomment-178940544
  
Looks like a bunch of Spark SQL/Hive tests are failing due to this error:

>Caused by: sbt.ForkMain$ForkError: org.apache.spark.SparkException: Job 
aborted due to stage failure: Task 19 in stage 4892.0 failed 1 times, most 
recent failure: Lost task 19.0 in stage 4892.0 (TID 69335, localhost): 
java.lang.RuntimeException: Stream '/jars/TestUDTF.jar' was not found.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13122] Fix race condition in MemoryStor...

2016-02-02 Thread budde
Github user budde commented on the pull request:

https://github.com/apache/spark/pull/11012#issuecomment-178929153
  
Latest change is looking good on my end. No unroll memory is being leaked.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13122] Fix race condition in MemoryStor...

2016-02-02 Thread budde
Github user budde commented on the pull request:

https://github.com/apache/spark/pull/11012#issuecomment-178766741
  
Updated PR with new implementation that uses a counter variable instead of 
requiring the whole method to be atomic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...

2017-02-01 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16744
  
Pinging @brkyvz as well, who also appears to have reviewed kinesis-asl 
changes in the past


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...

2017-02-02 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16744#discussion_r99217534
  
--- Diff: pom.xml ---
@@ -146,6 +146,8 @@
 hadoop2
 0.7.1
 1.6.2
+
+1.10.61
--- End diff --

I believe there was previously a direct dependency on the AWS SDK but it is 
currently getting pulled in as a transitive dependency of the Kinesis Client 
Library. The KCL dependencies don't include the aws-java-sdk-sts Maven artifact 
so we must add it as an explicit dependency in the pom.xml for kinesis-asl.

The AWS SDK is licensed under [Apache 
2.0](https://github.com/aws/aws-sdk-java/blob/master/LICENSE.txt)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16797: [SPARK-19455][SQL] Add option for case-insensitive Parqu...

2017-02-03 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16797
  
Relevant part of [Jenkins 
output](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72326/console)
 for SparkR tests:

```
Error: processing vignette 'sparkr-vignettes.Rmd' failed with diagnostics:
error in evaluating the argument 'object' in selecting a method for 
function 'summary': Error: object 'kmeansModel' not found
```

Doesn't appear to be related to this change. I'll investigate and see if I 
can reproduce it locally.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16797: [SPARK-19455][SQL] Add option for case-insensitive Parqu...

2017-02-03 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16797
  
Pinging @ericl, @cloud-fan and @davies, committers who have all reviewed or 
submitted changes related to this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16797: [SPARK-19455][SQL] Add option for case-insensitive Parqu...

2017-02-03 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16797
  
Looks like SparkR unit tests have been failing for all or most PRs after 
[this 
commit.](https://github.com/apache/spark/commit/48aafeda7db879491ed36fff89d59ca7ec3136fa)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16797: [SPARK-19455][SQL] Add option for case-insensitiv...

2017-02-03 Thread budde
GitHub user budde opened a pull request:

https://github.com/apache/spark/pull/16797

[SPARK-19455][SQL] Add option for case-insensitive Parquet field resolution

## What changes were proposed in this pull request?

**Summary**
- Add spark.sql.parquet.caseInsensitiveResolution config option
- Add caseInsensitive option to ParquetReadSupport.clipParquetType
- Add ParquetIOSuite test
- Disable Parquet filter push-down when using case-insensitive field 
resolution


**Details**

[*Copied from 
SPARK-19455*](https://issues.apache.org/jira/browse/SPARK-19455)

[SPARK-16980](https://issues.apache.org/jira/browse/SPARK-16980) removed 
the schema inferrence from the HiveMetastoreCatalog class when converting a 
MetastoreRelation to a LoigcalRelation (HadoopFsRelation, in this case) in 
favor of simply using the schema returend by the metastore. This results in an 
optimization as the underlying file status no longer need to be resolved until 
after the partition pruning step, reducing the number of files to be touched 
significantly in some cases. The downside is that the data schema used may no 
longer match the underlying file schema for case-sensitive formats such as 
Parquet.

This change initially included a [patch to 
ParquetReadSupport](https://github.com/apache/spark/blob/6ce1b675ee9fc9a6034439c3ca00441f9f172f84/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala#L270-L284)
 that attempted to remedy this conflict by using a case-insentive fallback 
mapping when resolving field names during the schema clipping step. 
[SPARK-1833](https://issues.apache.org/jira/browse/SPARK-18333) later removed 
this patch after 
[SPARK-17183](https://issues.apache.org/jira/browse/SPARK-17183) added support 
for embedding a case-sensitive schema as a Hive Metastore table property. AFAIK 
the assumption here was that the data schema obtained from the Metastore table 
property will be case sensitive and should match the Parquet schema exactly.

The problem arises when dealing with Parquet-backed tables for which this 
schema has not been embedded as a table attributes and for which the underlying 
files contain case-sensitive field names. This will happen for any Hive table 
that was not created by Spark or created by a version prior to 2.1.0. We've 
seen Spark SQL return no results for any query containing a case-sensitive 
field name for such tables.

The change we're proposing is to introduce a configuration parameter that 
will re-enable case-insensitive field name resolution in ParquetReadSupport. 
This option will also disable filter push-down for Parquet, as the filter 
predicate constructed by Spark SQL contains the case-insensitive field names 
which Parquet will return 0 records for when filtering against a case-sensitive 
column name. I was hoping to find a way to construct the filter on-the-fly in 
ParquetReadSupport but Parquet doesn't propegate the Configuration object 
passed to this class to the underlying InternalParquetRecordReader class.

## How was this patch tested?

This test re-introduces a unit test to ParquetSchemaSuite.scala to test 
that case-insensitive schema clipping behaves as expected. It also introduces a 
ParquetIOSuite unit test that constructs a case-insensitive catalog table and 
ensures case-sensitive Parquet data can still be queried against.

Please review http://spark.apache.org/contributing.html before opening a 
pull request.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/budde/spark SPARK-19455

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16797.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16797


commit 5426271946419a9defb59bb84575501bc8296578
Author: Budde <bu...@amazon.com>
Date:   2017-02-02T07:34:15Z

[SPARK-19455][SQL] Add option for case-insensitive Parquet field resolution

- Add spark.sql.parquet.caseInsensitiveResolution config option
- Add caseInsensitive option to ParquetReadSupport.clipParquetType
- Add ParquetIOSuite test
- Disable Parquet filter push-down when using case-insensitive field 
resolution




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16797: [SPARK-19455][SQL] Add option for case-insensitiv...

2017-02-03 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16797#discussion_r99456138
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 ---
@@ -268,13 +292,23 @@ private[parquet] object ParquetReadSupport {
* @return A list of clipped [[GroupType]] fields, which can be empty.
*/
   private def clipParquetGroupFields(
-  parquetRecord: GroupType, structType: StructType): Seq[Type] = {
-val parquetFieldMap = parquetRecord.getFields.asScala.map(f => 
f.getName -> f).toMap
+  parquetRecord: GroupType,
+  structType: StructType,
+  caseInsensitive: Boolean): Seq[Type] = {
+val parquetFieldMap = {
+  val pairs = parquetRecord.getFields.asScala.map(f => f.getName -> f)
+  implicit val ordering = if (caseInsensitive) {
+Ordering.by[String, String](_.toLowerCase)
+  } else {
+Ordering.String
+  }
+  TreeMap(pairs: _*)
--- End diff --

The implicit ordering val determines if the TreeMap behaves in a 
case-sensitive manner or not. I can rework this into using standard Maps if you 
feel thats appropriate or clearer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...

2017-01-31 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16744
  
Pinging @tdas on this-- looks like you're the committer who has contributed 
the most to kinesis-asl.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...

2017-02-06 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16744#discussion_r99718950
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 ---
@@ -35,10 +36,65 @@ import 
org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListen
 import org.apache.spark.util.Utils
 
 private[kinesis]
-case class SerializableAWSCredentials(accessKeyId: String, secretKey: 
String)
-  extends AWSCredentials {
-  override def getAWSAccessKeyId: String = accessKeyId
-  override def getAWSSecretKey: String = secretKey
+case class SerializableKCLAuthProvider(
--- End diff --

Was hoping for some feedback here. I think making this an interface with 
split basic/STS implementations should work well. I'll give it a shot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...

2017-02-06 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16744
  
PR has been amended to reflect feedback. Thanks for taking a look, @brkyvz.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16797: [SPARK-19455][SQL] Add option for case-insensitive Parqu...

2017-02-06 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16797
  
> Should we roll these behaviors into one flag? e.g. 
```spark.sql.hive.mixedCaseSchemaSupport```

That sounds reasonable to me. The only thing I wonder about is if there's 
any use case where we want to infer the schema but not attempt to write it back 
as a property, say if the external metastore doesn't permit table property 
updates from the user. We can always just log the failure, but this could be 
noisy for users expecting this behavior by default. This could be solved by 
adding an INFER_WITHOUT_SAVING mode.

I'll leave the PR open for now so we can hear and discuss @mallman's input 
but if we're all on board with this approach I'll eventually close out this out 
in favor of a new PR adding configurable schema inference behavior.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...

2017-02-06 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16744#discussion_r99718545
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisExampleUtils.scala
 ---
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+private[streaming] object KinesisExampleUtils {
+  def getRegionNameByEndpoint(endpoint: String): String = 
endpoint.split("\\.")(1)
--- End diff --

Sounds good. Went for a quick fix but this is much nicer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16797: [SPARK-19455][SQL] Add option for case-insensitive Parqu...

2017-02-06 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16797
  
> how about we add a new SQL command to refresh the table schema in 
metastore by inferring schema with data files? This is a compatibility issue 
and we should have provided a way for users to migrate, before the 2.1 release. 
I think this approach is much simpler than adding a flag.

While I think introducing a command for inferring and storing the table's 
case-sensitive schema as a property would be a welcome addition, I think 
requiring this property to be there in order for Spark SQL to function properly 
with case-sensitive data files could really restrict the settings Spark SQL can 
be used in.

If a user wanted to use Spark SQL to query over an existing warehouse 
containing hundreds or even thousands of tables, under the suggested approach a 
Spark job would have to be run to infer the schema of each and every table. 
file formats such as Parquet store their schemas as metadata there still could 
potentially be millions of files to inspect for the warehouse. A less amenable 
format like JSON might require scanning all the data in the warehouse.

This also doesn't cover the use case @viirya pointed our where the user may 
not have write access to the metastore they are querying against. In this case, 
the user would have to rely on the warehouse administrator to create the Spark 
schema property for every table they wish to query.

> For tables created by hive, as hive is a case-insensitive system, will 
the parquet files have mixed-case schema?

I think the Hive Metastore has become a bit of an open standard for 
maintaining a data warehouse catalog since so many tools integrate with it. I 
wouldn't assume that the underlying data pointed to by an external metastore 
was created or managed by Hive itself. For example, we maintain a Hive 
Metastore that catalogs case-sensitive files written by our Spark-based ETL 
pipeline, which parses case classes from string data and writes them as Parquet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16797: [SPARK-19455][SQL] Add option for case-insensitive Parqu...

2017-02-06 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16797
  
> BTW, what behavior do we expect if a parquet file has two columns whose 
lower-cased names are identical?

I can take a look at how Spark handled this prior to 2.1, although I'm not 
sure if the behavior we'll see there was the result of a conscious decision or 
"undefined" behavior.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16797: [SPARK-19455][SQL] Add option for case-insensitive Parqu...

2017-02-04 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16797
  
Bringing back schema inference is certainly a much cleaner option, although 
I imagine doing this in the old manner would negate the performance 
improvements brought by #14690 for any non-Spark 2.1 dataset.

Ideally, I think we would infer the schema only from the pruned partition 
list for tables we can't read a case-sensitive schema for. Unless I'm mistaken, 
this would have to happen during optimization of the logical plan, after the 
PruneFileSourcePartitions rule has been applied. My thought is that we could 
write a rule that passes the pruned file list to the file format's 
inferSchema() method to replace the HadoopFsRelation's dataSchema with the 
result. I'm not very familiar with Catalyst though, so I'm not sure if changing 
the relation's schema during optimization will cause problems.

There is [an open PR to add support for case-insensitive schemas to 
Parquet](https://github.com/apache/parquet-mr/pull/210) which would be helpful 
here since it would provide a way to avoid schema inference when your Parquet 
files have case-sensitive fields but you don't care about case sensitivity when 
querying. Unfortunately the PR seems to be more or less abandoned though.

Pinging @mallman, the author of #14690, to see if he has any input on this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16797: [SPARK-19455][SQL] Add option for case-insensitive Parqu...

2017-02-07 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16797
  
> Can we write such schema (conflicting columns after lower-casing) into 
metastore?

I think the scenario here would be that the metastore contains a single 
lower-case column name that could resolve to multiple case-sesnitive column 
names in the underlying Parquet file. This could've happened via the user 
manually executing a ```CREATE TABE...``` query with an explicit schema. Since 
the metastore itself isn't really defining expected behavior in this case I 
think we can just consider this undefined behavior and return the first field 
that matches alphabetically.

I don't think this is very likely to be a legitimate usecase, but it's good 
to point out edge cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...

2017-02-07 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16744#discussion_r99908125
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
 ---
@@ -449,22 +935,48 @@ private class KinesisUtilsPythonHelper {
   checkpointInterval: Duration,
   storageLevel: StorageLevel,
   awsAccessKeyId: String,
-  awsSecretKey: String
-  ): JavaReceiverInputDStream[Array[Byte]] = {
+  awsSecretKey: String,
+  stsAssumeRoleArn: String,
+  stsSessionName: String,
+  stsExternalId: String): JavaReceiverInputDStream[Array[Byte]] = {
+// scalastyle:on
+if (!(stsAssumeRoleArn != null && stsSessionName != null && 
stsExternalId != null)
+&& !(stsAssumeRoleArn == null && stsSessionName == null && 
stsExternalId == null)) {
+  throw new IllegalArgumentException("stsAssumeRoleArn, 
stsSessionName, and stsExtenalId " +
+"must all be defined or all be null")
+}
+
+if (stsAssumeRoleArn != null && stsSessionName != null && 
stsExternalId != null) {
+  validateAwsCreds(awsAccessKeyId, awsSecretKey)
+  if (awsAccessKeyId == null && awsSecretKey == null) {
+KinesisUtils.createStream(jssc, kinesisAppName, streamName, 
endpointUrl, regionName,
--- End diff --

This is actually in the ```KinesisUtilsPythonHelper``` object so we'll 
either need to qualify ```KinesisUtils``` or do an ```import KinesisUtils._```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...

2017-02-07 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16744#discussion_r99908239
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 ---
@@ -34,11 +35,56 @@ import org.apache.spark.streaming.Duration
 import org.apache.spark.streaming.receiver.{BlockGenerator, 
BlockGeneratorListener, Receiver}
 import org.apache.spark.util.Utils
 
-private[kinesis]
-case class SerializableAWSCredentials(accessKeyId: String, secretKey: 
String)
-  extends AWSCredentials {
-  override def getAWSAccessKeyId: String = accessKeyId
-  override def getAWSSecretKey: String = secretKey
+/**
+ * Serializable interface providing a method executors can call to obtain 
an
+ * AWSCredentialsProvider instance for authenticating to AWS services.
+ */
+private[kinesis] trait SerializableCredentialsProvider extends 
Serializable {
+  /**
+   * Return an AWSCredentialProvider instance that can be used by the 
Kinesis Client
+   * Library to authenticate to AWS services (Kinesis, CloudWatch and 
DynamoDB).
+   */
+  def provider: AWSCredentialsProvider
+}
+
+/** Returns DefaultAWSCredentialsProviderChain for authentication. */
+private[kinesis] case object DefaultCredentialsProvider
+  extends SerializableCredentialsProvider {
+
+  def provider: AWSCredentialsProvider = new 
DefaultAWSCredentialsProviderChain
+}
+
+/** Returns AWSStaticCredentialsProvider constructed using basic AWS 
keypair. */
+private[kinesis] case class BasicCredentialsProvider(
+awsAccessKeyId: String,
+awsSecretKey: String) extends SerializableCredentialsProvider {
+
+  def provider: AWSCredentialsProvider
+= new AWSStaticCredentialsProvider(new 
BasicAWSCredentials(awsAccessKeyId, awsSecretKey))
--- End diff --

Will do


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...

2017-02-07 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16744
  
Amending PR per review feedback. Issue around using optional stsExternalId 
argument in ```KinesisUtils.createStream()``` remains open.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...

2017-02-07 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16744#discussion_r99907831
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
 ---
@@ -449,22 +935,48 @@ private class KinesisUtilsPythonHelper {
   checkpointInterval: Duration,
   storageLevel: StorageLevel,
   awsAccessKeyId: String,
-  awsSecretKey: String
-  ): JavaReceiverInputDStream[Array[Byte]] = {
+  awsSecretKey: String,
+  stsAssumeRoleArn: String,
+  stsSessionName: String,
+  stsExternalId: String): JavaReceiverInputDStream[Array[Byte]] = {
+// scalastyle:on
+if (!(stsAssumeRoleArn != null && stsSessionName != null && 
stsExternalId != null)
+&& !(stsAssumeRoleArn == null && stsSessionName == null && 
stsExternalId == null)) {
+  throw new IllegalArgumentException("stsAssumeRoleArn, 
stsSessionName, and stsExtenalId " +
--- End diff --

See previous comment in KinesisUtils. Obviously it's easier to deal with 
optional arguments for the Python API but I'd favor keeping this consistent 
with the definitions of ```createStream()``` for Java/Scala


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...

2017-02-07 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16744#discussion_r99906733
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
 ---
@@ -123,9 +123,143 @@ object KinesisUtils {
 // scalastyle:on
 val cleanedHandler = ssc.sc.clean(messageHandler)
 ssc.withNamedScope("kinesis stream") {
+  val kinesisCredsProvider = BasicCredentialsProvider(
+awsAccessKeyId = awsAccessKeyId,
+awsSecretKey = awsSecretKey)
   new KinesisInputDStream[T](ssc, streamName, endpointUrl, 
validateRegion(regionName),
 initialPositionInStream, kinesisAppName, checkpointInterval, 
storageLevel,
-cleanedHandler, Some(SerializableAWSCredentials(awsAccessKeyId, 
awsSecretKey)))
+cleanedHandler, kinesisCredsProvider)
+}
+  }
+
+  /**
+   * Create an input stream that pulls messages from a Kinesis stream.
+   * This uses the Kinesis Client Library (KCL) to pull messages from 
Kinesis.
+   *
+   * @param ssc StreamingContext object
+   * @param kinesisAppName  Kinesis application name used by the Kinesis 
Client Library
+   *(KCL) to update DynamoDB
+   * @param streamName   Kinesis stream name
+   * @param endpointUrl  Url of Kinesis service (e.g., 
https://kinesis.us-east-1.amazonaws.com)
+   * @param regionName   Name of region used by the Kinesis Client Library 
(KCL) to update
+   * DynamoDB (lease coordination and checkpointing) 
and CloudWatch (metrics)
+   * @param initialPositionInStream  In the absence of Kinesis checkpoint 
info, this is the
+   * worker's initial starting position in 
the stream.
+   * The values are either the beginning 
of the stream
+   * per Kinesis' limit of 24 hours
+   * 
(InitialPositionInStream.TRIM_HORIZON) or
+   * the tip of the stream 
(InitialPositionInStream.LATEST).
+   * @param checkpointInterval  Checkpoint interval for Kinesis 
checkpointing.
+   *See the Kinesis Spark Streaming 
documentation for more
+   *details on the different types of 
checkpoints.
+   * @param storageLevel Storage level to use for storing the received 
objects.
+   * StorageLevel.MEMORY_AND_DISK_2 is recommended.
+   * @param messageHandler A custom message handler that can generate a 
generic output from a
+   *   Kinesis `Record`, which contains both message 
data, and metadata.
+   * @param stsAssumeRoleArn ARN of IAM role to assume when using STS 
sessions to read from
+   * Kinesis stream.
+   * @param stsSessionName Name to uniquely identify STS sessions if 
multiple princples assume
+   *   the same role.
+   * @param stsExternalId External ID that can be used to validate against 
the assumed IAM role's
+   *  trust policy.
+   *
+   * @note The AWS credentials will be discovered using the 
DefaultAWSCredentialsProviderChain
+   * on the workers. See AWS documentation to understand how 
DefaultAWSCredentialsProviderChain
+   * gets the AWS credentials.
+   */
+  // scalastyle:off
+  def createStream[T: ClassTag](
+  ssc: StreamingContext,
+  kinesisAppName: String,
+  streamName: String,
+  endpointUrl: String,
+  regionName: String,
+  initialPositionInStream: InitialPositionInStream,
+  checkpointInterval: Duration,
+  storageLevel: StorageLevel,
+  messageHandler: Record => T,
+  stsAssumeRoleArn: String,
+  stsSessionName: String,
+  stsExternalId: String): ReceiverInputDStream[T] = {
--- End diff --

The external ID is optional but I'm making it required in 
```KinesisUtils``` since otherwise we'll need to double the number of overrides 
of ```createStream()``` (e.g. for STS params with/without stsExternalId rather 
than just STS prams with stsExternalId). I think the API was constructed in 
this fashion in order to have consistent method declarations between Scala and 
Java. I think the better long term solution here is to deprecate 
```createStream()``` in favor of a builder class for constructing Kinesis 
DStreams.

If the external ID isn't specified in the trust policy of the IAM role 
being assumed it will simply be ignored.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, p

[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...

2017-02-07 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16744#discussion_r99909144
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
 ---
@@ -123,9 +123,143 @@ object KinesisUtils {
 // scalastyle:on
 val cleanedHandler = ssc.sc.clean(messageHandler)
 ssc.withNamedScope("kinesis stream") {
+  val kinesisCredsProvider = BasicCredentialsProvider(
+awsAccessKeyId = awsAccessKeyId,
+awsSecretKey = awsSecretKey)
   new KinesisInputDStream[T](ssc, streamName, endpointUrl, 
validateRegion(regionName),
 initialPositionInStream, kinesisAppName, checkpointInterval, 
storageLevel,
-cleanedHandler, Some(SerializableAWSCredentials(awsAccessKeyId, 
awsSecretKey)))
+cleanedHandler, kinesisCredsProvider)
+}
+  }
+
+  /**
+   * Create an input stream that pulls messages from a Kinesis stream.
+   * This uses the Kinesis Client Library (KCL) to pull messages from 
Kinesis.
+   *
+   * @param ssc StreamingContext object
+   * @param kinesisAppName  Kinesis application name used by the Kinesis 
Client Library
+   *(KCL) to update DynamoDB
+   * @param streamName   Kinesis stream name
+   * @param endpointUrl  Url of Kinesis service (e.g., 
https://kinesis.us-east-1.amazonaws.com)
+   * @param regionName   Name of region used by the Kinesis Client Library 
(KCL) to update
+   * DynamoDB (lease coordination and checkpointing) 
and CloudWatch (metrics)
+   * @param initialPositionInStream  In the absence of Kinesis checkpoint 
info, this is the
+   * worker's initial starting position in 
the stream.
+   * The values are either the beginning 
of the stream
+   * per Kinesis' limit of 24 hours
+   * 
(InitialPositionInStream.TRIM_HORIZON) or
+   * the tip of the stream 
(InitialPositionInStream.LATEST).
+   * @param checkpointInterval  Checkpoint interval for Kinesis 
checkpointing.
+   *See the Kinesis Spark Streaming 
documentation for more
+   *details on the different types of 
checkpoints.
+   * @param storageLevel Storage level to use for storing the received 
objects.
+   * StorageLevel.MEMORY_AND_DISK_2 is recommended.
+   * @param messageHandler A custom message handler that can generate a 
generic output from a
+   *   Kinesis `Record`, which contains both message 
data, and metadata.
+   * @param stsAssumeRoleArn ARN of IAM role to assume when using STS 
sessions to read from
+   * Kinesis stream.
+   * @param stsSessionName Name to uniquely identify STS sessions if 
multiple princples assume
+   *   the same role.
+   * @param stsExternalId External ID that can be used to validate against 
the assumed IAM role's
+   *  trust policy.
+   *
+   * @note The AWS credentials will be discovered using the 
DefaultAWSCredentialsProviderChain
+   * on the workers. See AWS documentation to understand how 
DefaultAWSCredentialsProviderChain
+   * gets the AWS credentials.
+   */
+  // scalastyle:off
+  def createStream[T: ClassTag](
+  ssc: StreamingContext,
+  kinesisAppName: String,
+  streamName: String,
+  endpointUrl: String,
+  regionName: String,
+  initialPositionInStream: InitialPositionInStream,
+  checkpointInterval: Duration,
+  storageLevel: StorageLevel,
+  messageHandler: Record => T,
+  stsAssumeRoleArn: String,
+  stsSessionName: String,
+  stsExternalId: String): ReceiverInputDStream[T] = {
--- End diff --

To expand on this a little bit, using a builder pattern will also make it 
feasible to specificy a different SerializableCredentialsProvider for each of 
the three AWS services the KCL uses (Kinesis, DynamoDB and CloudWatch). This 
really won't be feasible under the current approach of extending the single 
```KinesisUtils.createStream()``` API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...

2017-02-08 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16744
  
Amending the PR again to fix new dependency conflict in spark/pom.xml. 
Thanks again for taking the time to review this, @brkyvz and @srowen. Please 
let me know if you feel any additional changes are needed before this is ready 
to merge. Since this doesn't break any existing APIs I think it would make some 
people happy if we could get this in the 2.1.1 release.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...

2017-02-08 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16744
  
Looks like Jenkins is failing to build any recent PR due to the following 
error:

```[error] Could not find hadoop2.3 in the list. Valid options  are 
['hadoop2.6', 'hadoop2.7']```

I would guess this is related to [this 
commit](https://github.com/apache/spark/commit/e8d3fca4502d5f5b8f38525b5fdabe80ccf9a8ec).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16797: [SPARK-19455][SQL] Add option for case-insensitive Parqu...

2017-02-08 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16797
  
> For better user experience, we should automatically infer the schema and 
write it back to metastore, if there is no case-sensitive table schema in 
metastore. This has the cost of detection the need of schema inference, and 
complicating table read code path.

Totally agree. I think the default behavior should be to infer and backfill 
a case-sensitive schema into the table properties if one isn't already there. 
An option should also be provided to disable all inference and just fall back 
to the case-insensitive metastore schema if none is found (i.e. the current 
behavior in 2.1.0).

> If this is only a compatibility issue, I think it's fair to ask the 
cluster maintainers to run some commands after upgrade Spark cluster. Even 
there are a lot of tables, it's easy to write a script to automate it.

I don't think this is fair. For one, as I've mentioned, in some cases Spark 
may not be the tool being used to maintain the metastore. This will now require 
the warehouse admins to set up a Spark cluster and run these migration commands 
on every table with case-sensitive underlying data if they'd like them to be 
accessible from Spark. As a second point, while writing an automation script 
may be trivial the execution costs aren't, especially if the data is stored in 
a format like JSON where each and every record in the table must be read in 
order to infer the schema.

> If there is no Spark specific table properties, we assume this table is 
created by hive(not by external systems like Presto), so the schema of parquet 
files should be all lowercased.

This isn't an assumption made by Spark prior to 2.1.0, whether this was an 
explicit decision or not. All I'm asking for is a way to configure Spark to 
continue supporting a use case it has for years.

Also, in our case, the table was created by Spark, not Presto. Presto is 
just an example of another execution engine we've put in front of our warehouse 
that hasn't had a problem with the underlying Parquet data being 
case-sensitive. We just used an older version of Spark to create the tables. I 
would think long and hard about whether requiring warehouse admins to run 
potentially-costly migrations between Spark versions to update table metadata 
is a preferable option to offering a way for being backwards-compatible with 
the old behavior.

Again, I think introducing a mechanism to migrate the table properties is a 
good idea. I just don't think it should be the only option.

> Another proposal is to make parquet reader case-insensitive, so that we 
can solve this problem without schema inference. But the problem is, Spark can 
be configured to be case-sensitive, so that it's possible to write such a 
schema (conflicting columns after lower-casing) into metastore. I think this 
proposal is the best if we can totally make Spark case-insensitive.

I don't think this would be a bad option if this could be enabled at the 
Parquet level, but it seems as their work towards enabling case-insensitive 
file access has stalled. As @ericl pointed out above, moving this to the 
ParquetReadSupport level may make the situation better for Parquet but the 
behavior won't be consistent across file formats like ORC or JSON.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...

2017-02-06 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16744
  
Amending this PR to upgrade the KCL/AWS SDK dependencies to more-current 
versions (1.7.3 and 1.11.76, respectively). The 
```RegionUtils.getRegionByEndpoint()``` API was removed from the SDK, so I've 
had to replace it with a simple string split method for the examples and test 
suites that were utilizing it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16797: [SPARK-19455][SQL] Add option for case-insensitive Parqu...

2017-02-06 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16797
  
I'll double check, but I don't think 
```spark.sql.hive.manageFilesourcePartitions=false``` would solve this issue 
since we're still deriving the file relation's dataSchema parameter from the 
schema of MetastoreRelation. The call to ```fileFormat.inferSchema()``` has 
been removed entirely.

If Spark SQL is set on using a table property to store the case-sesnitive 
schema then I think having a way to backfill this property for existing < 2.1 
tables as well as tables not created or managed by Spark will be a necessity. 
If the cleanest way to deal with this case sensitivity problem is to bring back 
schema inference then I think a good option would be to introduce a 
configuration param to indicate whether or not an inferred schema should be 
written back to the table as a property.

We could also introduce another config param that allows a user to bypass 
schema inference even if a case-sensitive schema can't be read from the table 
properties. This could be helpful for users who would like to query external 
Hive tables that aren't managed by Spark and that they know aren't backed by 
files containing case-sensitive field names.

This would basically allow us to support the following use cases:

1) The MetastoreRelation is able to read a case-sensitive schema from the 
table properties. No inference is necessary.
2) The MetastoreRelation can't read a case-sensitive schema from the table 
properties. A case-sensitive schema is inferred and, if configured, written 
back as a table property.
3) The MetastoreRelation can't read a case-sensitive schema from the table 
properties. The user knows the underlying data files don't contain 
case-sensitive field names and has explicitly set a config param to skip the 
inference step.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16797: [SPARK-19455][SQL] Add option for case-insensitive Parqu...

2017-02-07 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16797
  
> is it a completely compatibility issue? Seems like the only problem is, 
when we write out mixed-case-schema parquet files directly, and create an 
external table pointing to these files with Spark prior to 2.1, then read this 
table with Spark 2.1+.

Fundamentally, I wouldn't make the assumption that Spark is being used to 
create and maintain the tables in the Hive Metastore that Spark is querying 
against. We're currently using Spark to add and update metastore tables in our 
usecase, but I don't think Spark should make any assumptions about how the 
table was created with or what properties may be set.

In regard to the underlying issue, we've been using Spark in production for 
over two years and have several petabytes of case-sensitive Parquet data we've 
both written and queried using Spark. As of Spark 2.1, we are no longer able to 
use Spark to query any of this data as any query containing a case-sensitive 
field name will return 0 results. I would argue this is a compatibility 
regression.

> For tables in hive, as long as long hive can read it, Spark should be 
able to read it too.

In our case, other Hive-compatible query engines like Presto don't have a 
problem with case-sensitive  Parquet files. I haven't tried Hive itself in a 
long time but as far as I remember we didn't have a problem there either.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...

2017-02-07 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16744#discussion_r99905577
  
--- Diff: 
external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
 ---
@@ -62,9 +62,20 @@ class KinesisReceiverSuite extends TestSuiteBase with 
Matchers with BeforeAndAft
 checkpointerMock = mock[IRecordProcessorCheckpointer]
   }
 
-  test("check serializability of SerializableAWSCredentials") {
-Utils.deserialize[SerializableAWSCredentials](
-  Utils.serialize(new SerializableAWSCredentials("x", "y")))
+  test("check serializability of credential provider classes") {
+Utils.deserialize[BasicCredentialsProvider](
+  Utils.serialize(BasicCredentialsProvider(
+awsAccessKeyId = "x",
+awsSecretKey = "y")))
+
+Utils.deserialize[STSCredentialsProvider](
+  Utils.serialize(STSCredentialsProvider(
+stsRoleArn = "fakeArn",
+stsSessionName = "fakeSessionName",
+stsExternalId = Some("fakeExternalId"),
+longLivedCredsProvider = BasicCredentialsProvider(
--- End diff --

I'll add another test ```longLivedCredentialsProvider.``` I ran into errors 
making similar ser/de test for ```DefaultCredentialsProvider``` since it's a 
case class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...

2017-02-07 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16744#discussion_r99905664
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
 ---
@@ -123,9 +123,143 @@ object KinesisUtils {
 // scalastyle:on
 val cleanedHandler = ssc.sc.clean(messageHandler)
 ssc.withNamedScope("kinesis stream") {
+  val kinesisCredsProvider = BasicCredentialsProvider(
+awsAccessKeyId = awsAccessKeyId,
+awsSecretKey = awsSecretKey)
   new KinesisInputDStream[T](ssc, streamName, endpointUrl, 
validateRegion(regionName),
 initialPositionInStream, kinesisAppName, checkpointInterval, 
storageLevel,
-cleanedHandler, Some(SerializableAWSCredentials(awsAccessKeyId, 
awsSecretKey)))
+cleanedHandler, kinesisCredsProvider)
+}
+  }
+
+  /**
+   * Create an input stream that pulls messages from a Kinesis stream.
+   * This uses the Kinesis Client Library (KCL) to pull messages from 
Kinesis.
+   *
+   * @param ssc StreamingContext object
+   * @param kinesisAppName  Kinesis application name used by the Kinesis 
Client Library
+   *(KCL) to update DynamoDB
+   * @param streamName   Kinesis stream name
+   * @param endpointUrl  Url of Kinesis service (e.g., 
https://kinesis.us-east-1.amazonaws.com)
+   * @param regionName   Name of region used by the Kinesis Client Library 
(KCL) to update
+   * DynamoDB (lease coordination and checkpointing) 
and CloudWatch (metrics)
+   * @param initialPositionInStream  In the absence of Kinesis checkpoint 
info, this is the
+   * worker's initial starting position in 
the stream.
+   * The values are either the beginning 
of the stream
+   * per Kinesis' limit of 24 hours
+   * 
(InitialPositionInStream.TRIM_HORIZON) or
+   * the tip of the stream 
(InitialPositionInStream.LATEST).
+   * @param checkpointInterval  Checkpoint interval for Kinesis 
checkpointing.
+   *See the Kinesis Spark Streaming 
documentation for more
+   *details on the different types of 
checkpoints.
+   * @param storageLevel Storage level to use for storing the received 
objects.
+   * StorageLevel.MEMORY_AND_DISK_2 is recommended.
+   * @param messageHandler A custom message handler that can generate a 
generic output from a
+   *   Kinesis `Record`, which contains both message 
data, and metadata.
+   * @param stsAssumeRoleArn ARN of IAM role to assume when using STS 
sessions to read from
+   * Kinesis stream.
+   * @param stsSessionName Name to uniquely identify STS sessions if 
multiple princples assume
+   *   the same role.
+   * @param stsExternalId External ID that can be used to validate against 
the assumed IAM role's
+   *  trust policy.
+   *
+   * @note The AWS credentials will be discovered using the 
DefaultAWSCredentialsProviderChain
+   * on the workers. See AWS documentation to understand how 
DefaultAWSCredentialsProviderChain
+   * gets the AWS credentials.
+   */
+  // scalastyle:off
+  def createStream[T: ClassTag](
+  ssc: StreamingContext,
+  kinesisAppName: String,
+  streamName: String,
+  endpointUrl: String,
+  regionName: String,
+  initialPositionInStream: InitialPositionInStream,
+  checkpointInterval: Duration,
+  storageLevel: StorageLevel,
+  messageHandler: Record => T,
+  stsAssumeRoleArn: String,
+  stsSessionName: String,
+  stsExternalId: String): ReceiverInputDStream[T] = {
+// scalastyle:on
+val cleanedHandler = ssc.sc.clean(messageHandler)
+// Setting scope to override receiver stream's scope of "receiver 
stream"
+ssc.withNamedScope("kinesis stream") {
+  val kinesisCredsProvider = STSCredentialsProvider(
+stsRoleArn = stsAssumeRoleArn,
+stsSessionName = stsSessionName,
+stsExternalId = Some(stsExternalId))
--- End diff --

Will do


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: rev

[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...

2017-02-07 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16744#discussion_r99905600
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 ---
@@ -23,7 +23,8 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.util.control.NonFatal
 
-import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, 
DefaultAWSCredentialsProviderChain}
+import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, 
AWSStaticCredentialsProvider,
--- End diff --

Will do


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...

2017-02-07 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16744#discussion_r99905835
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 ---
@@ -34,11 +35,56 @@ import org.apache.spark.streaming.Duration
 import org.apache.spark.streaming.receiver.{BlockGenerator, 
BlockGeneratorListener, Receiver}
 import org.apache.spark.util.Utils
 
-private[kinesis]
-case class SerializableAWSCredentials(accessKeyId: String, secretKey: 
String)
-  extends AWSCredentials {
-  override def getAWSAccessKeyId: String = accessKeyId
-  override def getAWSSecretKey: String = secretKey
+/**
+ * Serializable interface providing a method executors can call to obtain 
an
+ * AWSCredentialsProvider instance for authenticating to AWS services.
+ */
+private[kinesis] trait SerializableCredentialsProvider extends 
Serializable {
--- End diff --

Will do


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16797: [SPARK-19455][SQL] Add option for case-insensitiv...

2017-02-03 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16797#discussion_r99455967
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -249,10 +249,18 @@ object SQLConf {
 
   val PARQUET_VECTORIZED_READER_ENABLED =
 SQLConfigBuilder("spark.sql.parquet.enableVectorizedReader")
-  .doc("Enables vectorized parquet decoding.")
+  .doc("Ingnores case sensitivity differences in field names while 
resolving Parquet columns " +
--- End diff --

My bad. Don't know how that got in. Fixing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16797: [SPARK-19455][SQL] Add option for case-insensitiv...

2017-02-03 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16797#discussion_r99458106
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 ---
@@ -268,13 +292,23 @@ private[parquet] object ParquetReadSupport {
* @return A list of clipped [[GroupType]] fields, which can be empty.
*/
   private def clipParquetGroupFields(
-  parquetRecord: GroupType, structType: StructType): Seq[Type] = {
-val parquetFieldMap = parquetRecord.getFields.asScala.map(f => 
f.getName -> f).toMap
+  parquetRecord: GroupType,
+  structType: StructType,
+  caseInsensitive: Boolean): Seq[Type] = {
+val parquetFieldMap = {
+  val pairs = parquetRecord.getFields.asScala.map(f => f.getName -> f)
+  implicit val ordering = if (caseInsensitive) {
+Ordering.by[String, String](_.toLowerCase)
+  } else {
+Ordering.String
+  }
+  TreeMap(pairs: _*)
--- End diff --

In the current implementation, we exploit the sorted map keyspace to define 
an ordering where, if configured, lookups are case-insensitive due to sorting 
by the downcased key.

We only use the values in the map at one point though, so I'll just roll it 
back to using a standard Map and add an if/else for downcasing the key when 
creating/accessing the map.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...

2017-02-01 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16744
  
Pinging @zsxwing and @srowen, additional committers who have previously 
reviewed kinesis-asl changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...

2017-02-01 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16744
  
There shouldn't be any change to behavior or compatibility when using the 
existing implementations of ```KinesisUtils.createStream()```. Only drawback I 
can think of is this is making the ```createStream()``` API more complex by 
introducing an additional set of optional config values, which in turn 
necessitates an additional set of overridden interface implementations. I think 
the longer-term solution here is to introduce a builder-style API for 
generating Kinesis streams and eventually put the existing 
```KinesisUtils.createStream()``` on the deprecation path, but I've chosen to 
bite the bullet and just extend ```createStream()``` further in the interest of 
making this a minimal change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...

2017-01-30 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16744
  
The JIRA I opended for this issue contains further details and background. 
Linking to it here for good measure:

* https://issues.apache.org/jira/browse/SPARK-19405


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...

2017-01-30 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16744
  
Missed the code in python/streaming that this touches. Will update PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...

2017-01-30 Thread budde
GitHub user budde opened a pull request:

https://github.com/apache/spark/pull/16744

[SPARK-19405][STREAMING] Support for cross-account Kinesis reads via STS


- Add dependency on aws-java-sdk-sts
- Replace SerializableAWSCredentials with new SerializableKCLAuthProvider 
class
- Make KinesisReceiver take SerializableKCLAuthProvider as argument and
  pass credential provider to KCL
- Add new implementations of KinesisUtils.createStream() that take STS
  arguments
- Make JavaKinesisStreamSuite test the entire KinesisUtils Java API

## What changes were proposed in this pull request?

* Replace SerializableAWSCredentials with new SerializableKCLAuthProvider 
class that takes 5 optional config params for configuring AWS auth and returns 
the appropriate credential provider object
* Add new public createStream() APIs for specifying these parameters in 
KinesisUtils

## How was this patch tested?

* Manually tested using explicit keypair and instance profile to read data 
from Kinesis stream in separate account (difficult to write a test 
orchestrating creation and assumption of IAM roles across separate accounts)
* Expanded JavaKinesisStreamSuite to test the entire Java API in 
KinesisUtils

## License acknowledgement
This contribution is my original work and that I license the work to the 
project under the project’s open source license.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/budde/spark master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16744.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16744


commit 4786cdec136b717b1a933e67b61fe79ce3e3ce1e
Author: Budde <bu...@amazon.com>
Date:   2017-01-17T18:21:06Z

[SPARK-19405][STREAMING] Add support to KinesisUtils for cross-account 
Kinesis reads via STS

- Add dependency on aws-java-sdk-sts
- Replace SerializableAWSCredentials with new SerializableKCLAuthProvider 
class
- Make KinesisReceiver take SerializableKCLAuthProvider as argument and
  pass credential provider to KCL
- Add new implementations of KinesisUtils.createStream() that take STS
  arguments
- Make JavaKinesisStreamSuite test the entire KinesisUtils Java API




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...

2017-01-30 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16744
  
Also, on another note, the ```SerializableKCLAuthProvider``` class that 
**SparkQA** is identifying as a new public class is actually package private 
and replaced another package private class (```SerializableAWSCredentials```).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...

2017-02-15 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16944#discussion_r101460842
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -161,23 +161,49 @@ private[hive] class 
HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
   bucketSpec,
   Some(partitionSchema))
 
+val catalogTable = metastoreRelation.catalogTable
 val logicalRelation = cached.getOrElse {
   val sizeInBytes =
 
metastoreRelation.stats(sparkSession.sessionState.conf).sizeInBytes.toLong
   val fileIndex = {
-val index = new CatalogFileIndex(
-  sparkSession, metastoreRelation.catalogTable, sizeInBytes)
+val index = new CatalogFileIndex(sparkSession, catalogTable, 
sizeInBytes)
 if (lazyPruningEnabled) {
   index
 } else {
   index.filterPartitions(Nil)  // materialize all the 
partitions in memory
 }
   }
   val partitionSchemaColumnNames = 
partitionSchema.map(_.name.toLowerCase).toSet
-  val dataSchema =
-StructType(metastoreSchema
+  val filteredMetastoreSchema = StructType(metastoreSchema
   .filterNot(field => 
partitionSchemaColumnNames.contains(field.name.toLowerCase)))
--- End diff --

I'll fix both of these


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...

2017-02-15 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16944#discussion_r101461535
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -161,23 +161,49 @@ private[hive] class 
HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
   bucketSpec,
   Some(partitionSchema))
 
+val catalogTable = metastoreRelation.catalogTable
 val logicalRelation = cached.getOrElse {
   val sizeInBytes =
 
metastoreRelation.stats(sparkSession.sessionState.conf).sizeInBytes.toLong
   val fileIndex = {
-val index = new CatalogFileIndex(
-  sparkSession, metastoreRelation.catalogTable, sizeInBytes)
+val index = new CatalogFileIndex(sparkSession, catalogTable, 
sizeInBytes)
 if (lazyPruningEnabled) {
   index
 } else {
   index.filterPartitions(Nil)  // materialize all the 
partitions in memory
 }
   }
   val partitionSchemaColumnNames = 
partitionSchema.map(_.name.toLowerCase).toSet
-  val dataSchema =
-StructType(metastoreSchema
+  val filteredMetastoreSchema = StructType(metastoreSchema
   .filterNot(field => 
partitionSchemaColumnNames.contains(field.name.toLowerCase)))
 
+  val inferenceMode = 
sparkSession.sessionState.conf.schemaInferenceMode
+  val dataSchema = if (inferenceMode != "NEVER_INFER" &&
+  !catalogTable.schemaFromTableProps) {
+val fileStatuses = fileIndex.listFiles(Nil).flatMap(_.files)
+val inferred = defaultSource.inferSchema(sparkSession, 
options, fileStatuses)
--- End diff --

I'll add an info log here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...

2017-02-15 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16944#discussion_r101461155
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+import java.util.concurrent.{Executors, TimeUnit}
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.execution.datasources.FileStatusCache
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.hive.client.HiveClient
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types._
+
+class HiveSchemaInferenceSuite
+  extends QueryTest with TestHiveSingleton with SQLTestUtils with 
BeforeAndAfterEach {
+
+  import HiveSchemaInferenceSuite._
+
+  // Create a CatalogTable instance modeling an external Hive table in a 
metastore that isn't
+  // controlled by Spark (i.e. has no Spark-specific table properties set).
--- End diff --

I wrote the method to take arbitrary properties but for the purposes of 
this test only an empty map is supplied. I'll make the comment more applicable 
to the method though and describe the usage of it elsewhere.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16944: [SPARK-19611][SQL] Introduce configurable table schema i...

2017-02-15 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16944
  
Looks like I missed a Catalyst test. Updating the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...

2017-02-15 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16944#discussion_r101461357
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+import java.util.concurrent.{Executors, TimeUnit}
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.execution.datasources.FileStatusCache
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.hive.client.HiveClient
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types._
+
+class HiveSchemaInferenceSuite
+  extends QueryTest with TestHiveSingleton with SQLTestUtils with 
BeforeAndAfterEach {
+
+  import HiveSchemaInferenceSuite._
+
+  // Create a CatalogTable instance modeling an external Hive table in a 
metastore that isn't
+  // controlled by Spark (i.e. has no Spark-specific table properties set).
+  private def hiveExternalCatalogTable(
+  tableName: String,
+  location: String,
+  schema: StructType,
+  partitionColumns: Seq[String],
+  properties: Map[String, String] = Map.empty): CatalogTable = {
+CatalogTable(
+  identifier = TableIdentifier(table = tableName, database = 
Option("default")),
+  tableType = CatalogTableType.EXTERNAL,
+  storage = CatalogStorageFormat(
+locationUri = Option(location),
+inputFormat = 
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
+outputFormat = 
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
+serde = 
Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"),
+compressed = false,
+properties = Map("serialization.format" -> "1")),
+  schema = schema,
+  provider = Option("hive"),
+  partitionColumnNames = partitionColumns,
+  properties = properties)
+  }
+
+  // Creates CatalogTablePartition instances for adding partitions of data 
to our test table.
+  private def hiveCatalogPartition(location: String, index: Int): 
CatalogTablePartition
+= CatalogTablePartition(
+  spec = Map("partcol1" -> index.toString, "partcol2" -> 
index.toString),
+  storage = CatalogStorageFormat(
+locationUri = 
Option(s"${location}/partCol1=$index/partCol2=$index/"),
+inputFormat = 
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
+outputFormat = 
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
+serde = 
Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"),
+compressed = false,
+properties = Map("serialization.format" -> "1")))
+
+  // Creates a case-sensitive external Hive table for testing schema 
inference options. Table
+  // will not have Spark-specific table properties set.
+  private def setupCaseSensitiveTable(
+  tableName: String,
+  dir: File): Unit = {
+spark.range(NUM_RECORDS)
+  .selectExpr("id as fieldOne", "id as partCol1", "id as partCol2")
+  .write
+  .partitionBy("partCol1", "partCol2")
+  .mode("overwrite")
+  .parquet(dir.getAbsolutePath)
+
+val lowercaseSchema = StructType(Seq(
+  StructField("fieldone&

[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...

2017-02-15 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16944#discussion_r101460565
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -296,6 +296,17 @@ object SQLConf {
   .longConf
   .createWithDefault(250 * 1024 * 1024)
 
+  val HIVE_SCHEMA_INFERENCE_MODE = 
buildConf("spark.sql.hive.schemaInferenceMode")
+.doc("Configures the action to take when a case-sensitive schema 
cannot be read from a Hive " +
+  "table's properties. Valid options include INFER_AND_SAVE (infer the 
case-sensitive " +
+  "schema from the underlying data files and write it back to the 
table properties), " +
+  "INFER_ONLY (infer the schema but don't attempt to write it to the 
table properties) and " +
+  "NEVER_INFER (fallback to using the case-insensitive metastore 
schema instead of inferring).")
+.stringConf
+.transform(_.toUpperCase())
+.checkValues(Set("INFER_AND_SAVE", "INFER_ONLY", "NEVER_INFER"))
+.createWithDefault("INFER_AND_SAVE")
--- End diff --

This was proposed in #16797 but I'd like to open this for discussion.
- ```INFER_ONLY``` would mimic the pre-2.1.0 behavior.
- ```INFER_AND_SAVE``` would attempt to prevent future inferences but may 
fail if the Hive client doesn't have write permissions on the metastore. 
- ```NEVER_INFER``` is the current behavior in 2.1.0 which breaks support 
with the tables affected by 
[SPARK-19611](https://issues.apache.org/jira/browse/SPARK-19611). Users may 
wish to enable this mode for tables without the table properties schema that 
they know are case-insensitive.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...

2017-02-21 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16744#discussion_r102356422
  
--- Diff: python/pyspark/streaming/kinesis.py ---
@@ -67,6 +68,12 @@ def createStream(ssc, kinesisAppName, streamName, 
endpointUrl, regionName,
 :param awsSecretKey:  AWS SecretKey (default is None. If None, 
will use
   DefaultAWSCredentialsProviderChain)
 :param decoder:  A function used to decode value (default is 
utf8_decoder)
+:param stsAssumeRoleArn: ARN of IAM role to assume when using STS 
sessions to read from
+ the Kinesis stream (default is None).
+:param stsSessionName: Name to uniquely identify STS sessions used 
to read from Kinesis
+   stream, if STS is being used (default is 
None).
+:param stsExternalId: Extenral ID that can be used to validate 
against the assumed IAM
--- End diff --

Ugh. Fixed. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...

2017-02-21 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16744
  
Updated the PR. Thanks for the work you've done on this! Hopefully I can 
have a PR for the builder interface up later this week.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...

2017-02-21 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16744#discussion_r102366429
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/SerializableCredentialsProvider.scala
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import scala.collection.JavaConverters._
+
+import com.amazonaws.auth._
+
+import org.apache.spark.internal.Logging
+
+/**
+ * Serializable interface providing a method executors can call to obtain 
an
+ * AWSCredentialsProvider instance for authenticating to AWS services.
+ */
+private[kinesis] sealed trait SerializableCredentialsProvider extends 
Serializable {
+  /**
+   * Return an AWSCredentialProvider instance that can be used by the 
Kinesis Client
+   * Library to authenticate to AWS services (Kinesis, CloudWatch and 
DynamoDB).
+   */
+  def provider: AWSCredentialsProvider
+}
+
+/** Returns DefaultAWSCredentialsProviderChain for authentication. */
+private[kinesis] final case object DefaultCredentialsProvider
+  extends SerializableCredentialsProvider {
+
+  def provider: AWSCredentialsProvider = new 
DefaultAWSCredentialsProviderChain
+}
+
+/**
+ * Returns AWSStaticCredentialsProvider constructed using basic AWS 
keypair. Falls back to using
+ * DefaultAWSCredentialsProviderChain if unable to construct a 
AWSCredentialsProviderChain
+ * instance with the provided arguments (e.g. if they are null).
+ */
+private[kinesis] final case class BasicCredentialsProvider(
+awsAccessKeyId: String,
+awsSecretKey: String) extends SerializableCredentialsProvider with 
Logging {
+
+  def provider: AWSCredentialsProvider = try {
+new AWSStaticCredentialsProvider(new 
BasicAWSCredentials(awsAccessKeyId, awsSecretKey))
+  } catch {
+case e: IllegalArgumentException =>
+  logWarning("Unable to construct AWSStaticCredentialsProvider with 
provided keypair; " +
+s"falling back to DefaultAWSCredentialsProviderChain: $e")
--- End diff --

I went ahead and updated it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16944: [SPARK-19611][SQL] Introduce configurable table schema i...

2017-02-21 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16944
  
Pinging participants from #16797 once more to get any feedback on the new 
proposal: @gatorsmile, @viirya, @ericl, @mallman and @cloud-fan


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...

2017-02-21 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16744
  
So, if these values are ```null``` we'll still be passing them to construct 
a ```BasicCredentialsProvider``` to pass as 
```STSCredentialsProvider.longLivedCredentialsProvider```. I could add a check 
to use ```DefaultCredentialsProvider``` if these params are ```null```. It 
wouldn't be very good Scala style but perhaps this isn't much of a concern if 
we aren't expecting this to really be used much.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...

2017-02-21 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16744
  
@brkyvz I actually think that Scaladoc may be outdated– I double checked 
the current master branch and it looks like ```KinesisUtils.createStream()``` 
will still provide Some(SerializableAWSCredentials(null, null)) when null 
values are passed. The [helper method returning the 
AWSCredentialsProvider](https://github.com/apache/spark/blob/master/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala#L312-L324)
 passed to the KCL doesn't inspect the values to make sure they are non-null, 
so we'd be relying on the AWS SDK implicitly falling back to 
```DefaultAWSCredentialsProviderChain()``` when given null credentials, which I 
don't believe it does.

Regardless, the check you've suggested would restore this behavior. I'll go 
that route.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...

2017-02-21 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16744#discussion_r102351716
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/SerializableCredentialsProvider.scala
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import scala.collection.JavaConverters._
+
+import com.amazonaws.auth._
+
+import org.apache.spark.internal.Logging
+
+/**
+ * Serializable interface providing a method executors can call to obtain 
an
+ * AWSCredentialsProvider instance for authenticating to AWS services.
+ */
+private[kinesis] sealed trait SerializableCredentialsProvider extends 
Serializable {
+  /**
+   * Return an AWSCredentialProvider instance that can be used by the 
Kinesis Client
+   * Library to authenticate to AWS services (Kinesis, CloudWatch and 
DynamoDB).
+   */
+  def provider: AWSCredentialsProvider
+}
+
+/** Returns DefaultAWSCredentialsProviderChain for authentication. */
+private[kinesis] final case object DefaultCredentialsProvider
+  extends SerializableCredentialsProvider {
+
+  def provider: AWSCredentialsProvider = new 
DefaultAWSCredentialsProviderChain
+}
+
+/**
+ * Returns AWSStaticCredentialsProvider constructed using basic AWS 
keypair. Falls back to using
+ * DefaultAWSCredentialsProviderChain if unable to construct a 
AWSCredentialsProviderChain
+ * instance with the provided arguments (e.g. if they are null).
+ */
+private[kinesis] final case class BasicCredentialsProvider(
+awsAccessKeyId: String,
+awsSecretKey: String) extends SerializableCredentialsProvider with 
Logging {
+
+  def provider: AWSCredentialsProvider = try {
+new AWSStaticCredentialsProvider(new 
BasicAWSCredentials(awsAccessKeyId, awsSecretKey))
+  } catch {
+case _: IllegalArgumentException =>
--- End diff --

I can add the exception to the log message if you think it's appropriate


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...

2017-02-18 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16944#discussion_r101908105
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+import java.util.concurrent.{Executors, TimeUnit}
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.execution.datasources.FileStatusCache
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.hive.client.HiveClient
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types._
+
+class HiveSchemaInferenceSuite
+  extends QueryTest with TestHiveSingleton with SQLTestUtils with 
BeforeAndAfterEach {
+
+  import HiveSchemaInferenceSuite._
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+FileStatusCache.resetForTesting()
+  }
+
+  override def afterEach(): Unit = {
+super.afterEach()
+FileStatusCache.resetForTesting()
+  }
+
+  private val externalCatalog = 
spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
+  private val lowercaseSchema = StructType(Seq(
+StructField("fieldone", LongType),
+StructField("partcol1", IntegerType),
+StructField("partcol2", IntegerType)))
+  private val caseSensitiveSchema = StructType(Seq(
+StructField("fieldOne", LongType),
+// Partition columns remain case-insensitive
+StructField("partcol1", IntegerType),
+StructField("partcol2", IntegerType)))
+
+  // Create a CatalogTable instance modeling an external Hive Metastore 
table backed by
+  // Parquet data files.
+  private def hiveExternalCatalogTable(
+  tableName: String,
+  location: String,
+  schema: StructType,
+  partitionColumns: Seq[String],
+  properties: Map[String, String] = Map.empty): CatalogTable = {
+CatalogTable(
+  identifier = TableIdentifier(table = tableName, database = 
Option(DATABASE)),
+  tableType = CatalogTableType.EXTERNAL,
+  storage = CatalogStorageFormat(
+locationUri = Option(location),
+inputFormat = 
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
+outputFormat = 
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
+serde = 
Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"),
+compressed = false,
+properties = Map("serialization.format" -> "1")),
+  schema = schema,
+  provider = Option("hive"),
+  partitionColumnNames = partitionColumns,
+  properties = properties)
+  }
+
+  // Creates CatalogTablePartition instances for adding partitions of data 
to our test table.
+  private def hiveCatalogPartition(location: String, index: Int): 
CatalogTablePartition
+= CatalogTablePartition(
+  spec = Map("partcol1" -> index.toString, "partcol2" -> 
index.toString),
+  storage = CatalogStorageFormat(
+locationUri = 
Option(s"${location}/partCol1=$index/partCol2=$index/"),
+inputFormat = 
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
+outputFormat = 
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
+

[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...

2017-02-18 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16944#discussion_r101908155
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+import java.util.concurrent.{Executors, TimeUnit}
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.execution.datasources.FileStatusCache
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.hive.client.HiveClient
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types._
+
+class HiveSchemaInferenceSuite
+  extends QueryTest with TestHiveSingleton with SQLTestUtils with 
BeforeAndAfterEach {
+
+  import HiveSchemaInferenceSuite._
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+FileStatusCache.resetForTesting()
+  }
+
+  override def afterEach(): Unit = {
+super.afterEach()
+FileStatusCache.resetForTesting()
+  }
+
+  private val externalCatalog = 
spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
+  private val lowercaseSchema = StructType(Seq(
+StructField("fieldone", LongType),
+StructField("partcol1", IntegerType),
+StructField("partcol2", IntegerType)))
+  private val caseSensitiveSchema = StructType(Seq(
+StructField("fieldOne", LongType),
+// Partition columns remain case-insensitive
+StructField("partcol1", IntegerType),
+StructField("partcol2", IntegerType)))
+
+  // Create a CatalogTable instance modeling an external Hive Metastore 
table backed by
+  // Parquet data files.
+  private def hiveExternalCatalogTable(
+  tableName: String,
+  location: String,
+  schema: StructType,
+  partitionColumns: Seq[String],
+  properties: Map[String, String] = Map.empty): CatalogTable = {
+CatalogTable(
+  identifier = TableIdentifier(table = tableName, database = 
Option(DATABASE)),
+  tableType = CatalogTableType.EXTERNAL,
+  storage = CatalogStorageFormat(
+locationUri = Option(location),
+inputFormat = 
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
+outputFormat = 
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
+serde = 
Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"),
+compressed = false,
+properties = Map("serialization.format" -> "1")),
+  schema = schema,
+  provider = Option("hive"),
+  partitionColumnNames = partitionColumns,
+  properties = properties)
+  }
+
+  // Creates CatalogTablePartition instances for adding partitions of data 
to our test table.
+  private def hiveCatalogPartition(location: String, index: Int): 
CatalogTablePartition
+= CatalogTablePartition(
+  spec = Map("partcol1" -> index.toString, "partcol2" -> 
index.toString),
+  storage = CatalogStorageFormat(
+locationUri = 
Option(s"${location}/partCol1=$index/partCol2=$index/"),
+inputFormat = 
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
+outputFormat = 
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
+

[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...

2017-02-20 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16744
  
@brkyvz Just for clarification, can this PR be merged as-is with a separate 
Jira/PR for adding a builder interface or is the builder interface a 
prerequisite for merging this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16944: [SPARK-19611][SQL] Introduce configurable table schema i...

2017-02-19 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16944
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...

2017-02-17 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16744
  
@brkyvz Fair enough. Let me know if there's anything I can do to help get 
this merged. I can also take a look at adding a builder class for Kinesis 
streams as a separate PR before the code freeze.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...

2017-02-16 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16744
  
@brkyvz, @zsxwing – Any update here? Worried that this PR is starting to 
languish.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...

2017-02-22 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16944#discussion_r102548681
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---
@@ -690,10 +696,10 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
   "different from the schema when this table was created by Spark 
SQL" +
   s"(${schemaFromTableProps.simpleString}). We have to fall back 
to the table schema " +
   "from Hive metastore which is not case preserving.")
-hiveTable
+hiveTable.copy(schemaPreservesCase = false)
--- End diff --

In this case we are discarding the schema obtained from the table 
properties and explicitly falling back to using the case-insenstive schema 
obtained from the metastore. ```schemaPreservesCase``` needs to be set to 
```false``` here for the same reason it does at [line 
702](https://github.com/apache/spark/pull/16944/files/15c25e0fc23ec48ecd0fad7ca7c60d3a82d32a73#diff-159191585e10542f013cb3a714f26075R702).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...

2017-02-23 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16944#discussion_r102859179
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -161,22 +164,45 @@ private[hive] class 
HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
   bucketSpec,
   Some(partitionSchema))
 
+val catalogTable = metastoreRelation.catalogTable
 val logicalRelation = cached.getOrElse {
   val sizeInBytes =
 
metastoreRelation.stats(sparkSession.sessionState.conf).sizeInBytes.toLong
   val fileIndex = {
-val index = new CatalogFileIndex(
-  sparkSession, metastoreRelation.catalogTable, sizeInBytes)
+val index = new CatalogFileIndex(sparkSession, catalogTable, 
sizeInBytes)
 if (lazyPruningEnabled) {
   index
 } else {
   index.filterPartitions(Nil)  // materialize all the 
partitions in memory
 }
   }
   val partitionSchemaColumnNames = 
partitionSchema.map(_.name.toLowerCase).toSet
-  val dataSchema =
-StructType(metastoreSchema
-  .filterNot(field => 
partitionSchemaColumnNames.contains(field.name.toLowerCase)))
+  val filteredMetastoreSchema = StructType(metastoreSchema
+.filterNot(field => 
partitionSchemaColumnNames.contains(field.name.toLowerCase)))
+
+  // Infer a case-sensitive schema when the metastore doesn't 
return one, if configured.
+  val inferredSchema = inferSchema(
+catalogTable,
+metastoreSchema,
+options,
+defaultSource,
+fileType,
+fileIndex)
+
+  // If configured, save the inferred case-sensitive schema to the 
table properties and
+  // fetch the updated CatalogTable record for use in the 
LogicalRelation.
+  val updatedCatalogTable = updateCatalogTable(catalogTable, 
inferredSchema)
+
+  val dataSchema = inferenceMode match {
+case (INFER_AND_SAVE | INFER_ONLY) if 
(!catalogTable.schemaPreservesCase) =>
+  inferredSchema.getOrElse {
+logWarning(s"Unable to infer schema for table 
$tableIdentifier from file format " +
+  s"$defaultSource (inference mode: $inferenceMode); using 
metastore schema.")
+filteredMetastoreSchema
+  }
+case _ =>
+  filteredMetastoreSchema
+  }
--- End diff --

I was thinking a tuple could be a possibility but as you've pointed out 
this can get a bit cumbersome as well. I'll put some more thought into it and 
see what I can do here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...

2017-02-23 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16944#discussion_r102850496
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -296,6 +296,25 @@ object SQLConf {
   .longConf
   .createWithDefault(250 * 1024 * 1024)
 
+  object HiveCaseSensitiveInferenceMode extends Enumeration {
+type HiveCaseSensitiveInferenceMode = Value
+val INFER_AND_SAVE, INFER_ONLY, NEVER_INFER = Value
+  }
+
+  // Although Spark SQL itself is not case-sensitive, some of the 
underlying data storage formats
+  // it supports such as Parquet are. Spark must use the correct schema 
when querying against data
+  // files containing case-sensitive names or field values will not be 
resolved properly.
--- End diff --

Gotcha. Will fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...

2017-02-23 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16944#discussion_r102850475
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -161,22 +164,45 @@ private[hive] class 
HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
   bucketSpec,
   Some(partitionSchema))
 
+val catalogTable = metastoreRelation.catalogTable
 val logicalRelation = cached.getOrElse {
   val sizeInBytes =
 
metastoreRelation.stats(sparkSession.sessionState.conf).sizeInBytes.toLong
   val fileIndex = {
-val index = new CatalogFileIndex(
-  sparkSession, metastoreRelation.catalogTable, sizeInBytes)
+val index = new CatalogFileIndex(sparkSession, catalogTable, 
sizeInBytes)
 if (lazyPruningEnabled) {
   index
 } else {
   index.filterPartitions(Nil)  // materialize all the 
partitions in memory
 }
   }
   val partitionSchemaColumnNames = 
partitionSchema.map(_.name.toLowerCase).toSet
-  val dataSchema =
-StructType(metastoreSchema
-  .filterNot(field => 
partitionSchemaColumnNames.contains(field.name.toLowerCase)))
+  val filteredMetastoreSchema = StructType(metastoreSchema
+.filterNot(field => 
partitionSchemaColumnNames.contains(field.name.toLowerCase)))
+
+  // Infer a case-sensitive schema when the metastore doesn't 
return one, if configured.
+  val inferredSchema = inferSchema(
+catalogTable,
+metastoreSchema,
+options,
+defaultSource,
+fileType,
+fileIndex)
+
+  // If configured, save the inferred case-sensitive schema to the 
table properties and
+  // fetch the updated CatalogTable record for use in the 
LogicalRelation.
+  val updatedCatalogTable = updateCatalogTable(catalogTable, 
inferredSchema)
+
+  val dataSchema = inferenceMode match {
+case (INFER_AND_SAVE | INFER_ONLY) if 
(!catalogTable.schemaPreservesCase) =>
+  inferredSchema.getOrElse {
+logWarning(s"Unable to infer schema for table 
$tableIdentifier from file format " +
+  s"$defaultSource (inference mode: $inferenceMode); using 
metastore schema.")
+filteredMetastoreSchema
+  }
+case _ =>
+  filteredMetastoreSchema
+  }
--- End diff --

I'll see if I can clean this up some more. It gets a bit messy since we'll 
want to replace ```catalogTable``` in the ```LogicalRelation``` returned by 
this method with the updated catalog table returned by 
```updateTableCatalog()``` if the schema is successfully written to the 
metastore. Essentially, we need to return an ```Option[StructType]``` 
representing if the schema was inferred or not and an 
```Option[CatalogTable]``` representing if the catalog table was updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16944: [SPARK-19611][SQL] Introduce configurable table schema i...

2017-02-23 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16944
  
Updated per feedback from @ericl:

- Added comment with additional context to 
```HIVE_CASE_SENSITIVE_INFERENCE``` in SQLConf.scala
- Removed default value test for ```HIVE_CASE_SENSITIVE_INFERENCE``` from 
SQLConfSuite.scala (was primarily there for dev purposes)
- Refactor changes to ```convertToLogicalRelation()``` as helper functions 
to simplify this function a little
- Added the configured inference mode to several log lines

I also reworked a lot of the comparisons involving the 
```HiveCaseInsensitiveInferenceMode``` enumerations to be ```match { }``` 
blocks as I think this is more consistent with typical Scala conventions. I can 
change these back to ```if { }``` blocks though if consensus is that it's 
clearer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16944: [SPARK-19611][SQL] Introduce configurable table schema i...

2017-02-24 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16944
  
@ericl: fixed the param doc string and tried to clean up 
```createLogicalRelation()``` as you suggested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16944: [SPARK-19611][SQL] Introduce configurable table schema i...

2017-02-24 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16944
  
Thanks, @ericl. Is there anybody else you'd suggest pinging to take a look 
at this and ultimately get it merged?

Re-pinging @viirya to review latest updates addressing his previous 
feedback.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...

2017-02-24 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16944#discussion_r103027907
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+import java.util.concurrent.{Executors, TimeUnit}
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.execution.datasources.FileStatusCache
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.hive.client.HiveClient
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types._
+
+class HiveSchemaInferenceSuite
+  extends QueryTest with TestHiveSingleton with SQLTestUtils with 
BeforeAndAfterEach {
+
+  import HiveSchemaInferenceSuite._
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+FileStatusCache.resetForTesting()
+  }
+
+  override def afterEach(): Unit = {
+super.afterEach()
+FileStatusCache.resetForTesting()
+  }
+
+  private val externalCatalog = 
spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
+  private val lowercaseSchema = StructType(Seq(
+StructField("fieldone", LongType),
+StructField("partcol1", IntegerType),
+StructField("partcol2", IntegerType)))
+  private val caseSensitiveSchema = StructType(Seq(
+StructField("fieldOne", LongType),
+// Partition columns remain case-insensitive
+StructField("partcol1", IntegerType),
+StructField("partcol2", IntegerType)))
+
+  // Create a CatalogTable instance modeling an external Hive Metastore 
table backed by
+  // Parquet data files.
+  private def hiveExternalCatalogTable(
+  tableName: String,
+  location: String,
+  schema: StructType,
+  partitionColumns: Seq[String],
+  properties: Map[String, String] = Map.empty): CatalogTable = {
+CatalogTable(
+  identifier = TableIdentifier(table = tableName, database = 
Option(DATABASE)),
+  tableType = CatalogTableType.EXTERNAL,
+  storage = CatalogStorageFormat(
+locationUri = Option(location),
+inputFormat = 
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
+outputFormat = 
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
+serde = 
Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"),
+compressed = false,
+properties = Map("serialization.format" -> "1")),
+  schema = schema,
+  provider = Option("hive"),
+  partitionColumnNames = partitionColumns,
+  properties = properties)
+  }
+
+  // Creates CatalogTablePartition instances for adding partitions of data 
to our test table.
+  private def hiveCatalogPartition(location: String, index: Int): 
CatalogTablePartition
+= CatalogTablePartition(
+  spec = Map("partcol1" -> index.toString, "partcol2" -> 
index.toString),
+  storage = CatalogStorageFormat(
+locationUri = 
Option(s"${location}/partCol1=$index/partCol2=$index/"),
+inputFormat = 
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
+outputFormat = 
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
+

[GitHub] spark issue #16944: [SPARK-19611][SQL] Introduce configurable table schema i...

2017-02-24 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16944
  
The ```assert()``` statements added to ```setupCaseSensitiveTable()``` in 
**HiveSchemaInferenceSuite** per earlier feedback got squashed somewhere in the 
course of updating this PR. I've added them back in the latest update.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...

2017-02-24 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16944#discussion_r103050080
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -161,22 +164,51 @@ private[hive] class 
HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
   bucketSpec,
   Some(partitionSchema))
 
+val catalogTable = metastoreRelation.catalogTable
 val logicalRelation = cached.getOrElse {
   val sizeInBytes =
 
metastoreRelation.stats(sparkSession.sessionState.conf).sizeInBytes.toLong
   val fileIndex = {
-val index = new CatalogFileIndex(
-  sparkSession, metastoreRelation.catalogTable, sizeInBytes)
+val index = new CatalogFileIndex(sparkSession, catalogTable, 
sizeInBytes)
 if (lazyPruningEnabled) {
   index
 } else {
   index.filterPartitions(Nil)  // materialize all the 
partitions in memory
 }
   }
   val partitionSchemaColumnNames = 
partitionSchema.map(_.name.toLowerCase).toSet
-  val dataSchema =
-StructType(metastoreSchema
-  .filterNot(field => 
partitionSchemaColumnNames.contains(field.name.toLowerCase)))
+  val filteredMetastoreSchema = StructType(metastoreSchema
--- End diff --

We use the ```dataSchema``` name below when we resolve the final 
```StructType``` passed as the schema for the ```HadoopFsRelation``` (which 
falls back to ```filteredMetastoreSchema``` if it shouldn't/can't infer a 
schema from the file format).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...

2017-02-24 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16944#discussion_r103051801
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -226,6 +258,41 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
 result.copy(expectedOutputAttributes = Some(metastoreRelation.output))
   }
 
+  private def inferSchema(
+  metastoreSchema: StructType,
+  options: Map[String, String],
+  fileFormat: FileFormat,
+  fileType: String,
+  fileIndex: FileIndex): Option[StructType] = {
+val inferred = fileFormat.inferSchema(
+  sparkSession,
+  options,
+  fileIndex.listFiles(Nil).flatMap(_.files))
+if (fileType.equals("parquet")) {
+  
inferred.map(ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, _))
+} else {
+  inferred
+}
+  }
+
+  private def updateCatalogTable(
+  catalogTable: CatalogTable,
+  inferredSchema: Option[StructType]): Option[CatalogTable] = try {
+inferredSchema.flatMap { schema =>
--- End diff --

We should be able to just make this a ```map``` though


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...

2017-02-24 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16944#discussion_r103049838
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -296,6 +296,25 @@ object SQLConf {
   .longConf
   .createWithDefault(250 * 1024 * 1024)
 
+  object HiveCaseSensitiveInferenceMode extends Enumeration {
+type HiveCaseSensitiveInferenceMode = Value
--- End diff --

Not in the current version of the code. I'll remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...

2017-02-24 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16944#discussion_r103050192
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -226,6 +258,41 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
 result.copy(expectedOutputAttributes = Some(metastoreRelation.output))
   }
 
+  private def inferSchema(
+  metastoreSchema: StructType,
+  options: Map[String, String],
+  fileFormat: FileFormat,
+  fileType: String,
+  fileIndex: FileIndex): Option[StructType] = {
+val inferred = fileFormat.inferSchema(
+  sparkSession,
+  options,
+  fileIndex.listFiles(Nil).flatMap(_.files))
+if (fileType.equals("parquet")) {
+  
inferred.map(ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, _))
+} else {
+  inferred
+}
+  }
+
+  private def updateCatalogTable(
+  catalogTable: CatalogTable,
+  inferredSchema: Option[StructType]): Option[CatalogTable] = try {
+inferredSchema.flatMap { schema =>
--- End diff --

We want the return value of this operation. It'll be the updated 
```CatalogTable``` record if the update is successful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...

2017-02-24 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16944#discussion_r103049359
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala ---
@@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.execution.WholeStageCodegenExec
+import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode
--- End diff --

Forgot to remove this when I removed the test in this file. Will fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...

2017-02-24 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16944#discussion_r103049381
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -296,6 +296,25 @@ object SQLConf {
   .longConf
   .createWithDefault(250 * 1024 * 1024)
 
+  object HiveCaseSensitiveInferenceMode extends Enumeration {
+type HiveCaseSensitiveInferenceMode = Value
+val INFER_AND_SAVE, INFER_ONLY, NEVER_INFER = Value
+  }
+
+  val HIVE_CASE_SENSITIVE_INFERENCE = 
buildConf("spark.sql.hive.caseSensitiveInferenceMode")
+.doc("Sets the action to take when a case-sensitive schema cannot be 
read from a Hive " +
+  "table's properties. Although Spark SQL itself is not 
case-sensitive, Hive compatible file " +
+  "formats such as Parquet are. Spark SQL must use a case-preserving 
schema when querying " +
+  "any table backed by files containing case-sensitive field names or 
queries may not return " +
+  "accurate results. Valid options include INFER_AND_SAVE (infer the 
case-sensitive " +
--- End diff --

Will fix


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...

2017-02-24 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16944#discussion_r103051158
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -226,6 +258,41 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
 result.copy(expectedOutputAttributes = Some(metastoreRelation.output))
   }
 
+  private def inferSchema(
+  metastoreSchema: StructType,
+  options: Map[String, String],
+  fileFormat: FileFormat,
+  fileType: String,
+  fileIndex: FileIndex): Option[StructType] = {
+val inferred = fileFormat.inferSchema(
+  sparkSession,
+  options,
+  fileIndex.listFiles(Nil).flatMap(_.files))
+if (fileType.equals("parquet")) {
+  
inferred.map(ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, _))
+} else {
+  inferred
+}
+  }
+
+  private def updateCatalogTable(
+  catalogTable: CatalogTable,
+  inferredSchema: Option[StructType]): Option[CatalogTable] = try {
+inferredSchema.flatMap { schema =>
+  logInfo(s"Saving case-sensitive schema for table 
${catalogTable.identifier.table}")
+  val updatedTable = catalogTable.copy(schema = schema)
+  val catalog = sparkSession.sharedState.externalCatalog
+  catalog.alterTable(updatedTable)
+  Option(catalog.getTable(
--- End diff --

I think that should be fine. I had some concerns around the way 
```HiveExternalCatalog``` mutates the raw ```CatalogTable``` returned by the 
metastore that I think pushed me towards fetching the table again but I really 
don't think that should matter since the original ```catalogTable``` was 
retrieved from ```HiveExternalCatalog``` as well.

I'll just used ```updatedTable``` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...

2017-02-24 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16944#discussion_r103050652
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---
@@ -510,8 +510,13 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
 requireTableExists(db, tableDefinition.identifier.table)
 verifyTableProperties(tableDefinition)
 
+// Add table metadata such as table schema, partition columns, etc. if 
they aren't already
+// present.
+val withMetaProps = tableDefinition.copy(
--- End diff --

Sounds reasonable to me. This would be a new public method only for 
```HiveExternalCatalog```, correct (i.e. it shouldn't be added to the 
```ExternalCatalog``` interface)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...

2017-02-22 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16944#discussion_r102554012
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -161,22 +164,70 @@ private[hive] class 
HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
   bucketSpec,
   Some(partitionSchema))
 
+val catalogTable = metastoreRelation.catalogTable
 val logicalRelation = cached.getOrElse {
   val sizeInBytes =
 
metastoreRelation.stats(sparkSession.sessionState.conf).sizeInBytes.toLong
   val fileIndex = {
-val index = new CatalogFileIndex(
-  sparkSession, metastoreRelation.catalogTable, sizeInBytes)
+val index = new CatalogFileIndex(sparkSession, catalogTable, 
sizeInBytes)
 if (lazyPruningEnabled) {
   index
 } else {
   index.filterPartitions(Nil)  // materialize all the 
partitions in memory
 }
   }
   val partitionSchemaColumnNames = 
partitionSchema.map(_.name.toLowerCase).toSet
-  val dataSchema =
-StructType(metastoreSchema
-  .filterNot(field => 
partitionSchemaColumnNames.contains(field.name.toLowerCase)))
+  val filteredMetastoreSchema = StructType(metastoreSchema
+.filterNot(field => 
partitionSchemaColumnNames.contains(field.name.toLowerCase)))
+
+  // Infer a case-sensitive schema when the metastore doesn't 
return one, if configured.
+  val inferenceMode = 
sparkSession.sessionState.conf.caseSensitiveInferenceMode
+  val inferredSchema = if (!catalogTable.schemaPreservesCase &&
+  inferenceMode != HiveCaseSensitiveInferenceMode.NEVER_INFER) 
{
+logInfo(s"Inferring case-sensitive schema for table 
$tableIdentifier")
+val fileStatuses = fileIndex.listFiles(Nil).flatMap(_.files)
+val inferred = defaultSource.inferSchema(sparkSession, 
options, fileStatuses)
+if (fileType.equals("parquet")) {
+  
inferred.map(ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, _))
+} else {
+  inferred
+}
+  } else {
+None
+  }
+
+  // If configured, save the inferred case-sensitive schema to the 
table properties and
+  // fetch the updated CatalogTable record for use in the 
LogicalRelation.
+  val updatedCatalogTable = if (!catalogTable.schemaPreservesCase 
&&
+  inferenceMode == 
HiveCaseSensitiveInferenceMode.INFER_AND_SAVE) {
+inferredSchema.flatMap { schema =>
+  logInfo(s"Saving case-sensitive schema for table 
$tableIdentifier to table " +
+"properties")
+  val updatedTable = catalogTable.copy(schema = schema)
+  try {
+val catalog = sparkSession.sharedState.externalCatalog
+catalog.alterTable(updatedTable)
+
Option(catalog.getTable(updatedTable.identifier.database.get,
+  updatedTable.identifier.table))
+  } catch {
+case NonFatal(ex) =>
+  logError(s"Error saving case-sensitive schema for table 
$tableIdentifier: $ex")
--- End diff --

I copied this from the old schema inference mode. I'll update it to be a 
warning and include the inference mode.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...

2017-02-22 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16944#discussion_r102554021
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -161,22 +164,70 @@ private[hive] class 
HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
   bucketSpec,
   Some(partitionSchema))
 
+val catalogTable = metastoreRelation.catalogTable
 val logicalRelation = cached.getOrElse {
   val sizeInBytes =
 
metastoreRelation.stats(sparkSession.sessionState.conf).sizeInBytes.toLong
   val fileIndex = {
-val index = new CatalogFileIndex(
-  sparkSession, metastoreRelation.catalogTable, sizeInBytes)
+val index = new CatalogFileIndex(sparkSession, catalogTable, 
sizeInBytes)
 if (lazyPruningEnabled) {
   index
 } else {
   index.filterPartitions(Nil)  // materialize all the 
partitions in memory
 }
   }
   val partitionSchemaColumnNames = 
partitionSchema.map(_.name.toLowerCase).toSet
-  val dataSchema =
-StructType(metastoreSchema
-  .filterNot(field => 
partitionSchemaColumnNames.contains(field.name.toLowerCase)))
+  val filteredMetastoreSchema = StructType(metastoreSchema
+.filterNot(field => 
partitionSchemaColumnNames.contains(field.name.toLowerCase)))
+
+  // Infer a case-sensitive schema when the metastore doesn't 
return one, if configured.
+  val inferenceMode = 
sparkSession.sessionState.conf.caseSensitiveInferenceMode
+  val inferredSchema = if (!catalogTable.schemaPreservesCase &&
+  inferenceMode != HiveCaseSensitiveInferenceMode.NEVER_INFER) 
{
+logInfo(s"Inferring case-sensitive schema for table 
$tableIdentifier")
--- End diff --

Will do


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...

2017-02-22 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16944#discussion_r102547965
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -296,6 +296,21 @@ object SQLConf {
   .longConf
   .createWithDefault(250 * 1024 * 1024)
 
+  object HiveCaseSensitiveInferenceMode extends Enumeration {
--- End diff --

I'm trying to avoid using string literals. If we want to change the 
possible values for this param we would need to find each and every place the 
literal value is used and update it. I think this is too flaky and runs the 
risk of introducing bugs that will only be apparent at runtime. Expressing this 
as an enumeration gives us some level of type safety and at the very least will 
cause a compiler error if the possible values are changed and comparisons 
elsewhere in the code aren't updated.

I'm willing to remove the enumeration if it isn't consistent with Spark 
code practices but at the very least the possible values should be expressed as 
constants rather than literals.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...

2017-02-22 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16944#discussion_r102554155
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -296,6 +296,21 @@ object SQLConf {
   .longConf
   .createWithDefault(250 * 1024 * 1024)
 
+  object HiveCaseSensitiveInferenceMode extends Enumeration {
+type HiveCaseSensitiveInferenceMode = Value
+val INFER_AND_SAVE, INFER_ONLY, NEVER_INFER = Value
+  }
+  val HIVE_CASE_SENSITIVE_INFERENCE = 
buildConf("spark.sql.hive.caseSensitiveInferenceMode")
+.doc("Configures the action to take when a case-sensitive schema 
cannot be read from a Hive " +
+  "table's properties. Valid options include INFER_AND_SAVE (infer the 
case-sensitive " +
--- End diff --

Would you prefer this as a code comment or embedded in the doc string 
itself?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...

2017-02-22 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16944#discussion_r102553568
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 ---
@@ -181,7 +186,8 @@ case class CatalogTable(
 viewText: Option[String] = None,
 comment: Option[String] = None,
 unsupportedFeatures: Seq[String] = Seq.empty,
-tracksPartitionsInCatalog: Boolean = false) {
+tracksPartitionsInCatalog: Boolean = false,
+schemaPreservesCase: Boolean = true) {
--- End diff --

I considered taking this approach but I think adding this as a parameter is 
more explicit and less flaky. I share your concern that adding more and more 
parameters to CatalogTable could make this less-usable, especially since params 
like ```schemaPreservesCase``` really only matter when dealing with Hive tables.

However, I don't think dumping more and more parameters into 
```properties``` is a great solution either. As you've pointed out, we would 
need to filter out the properties only used internally by Spark before writing 
them to the catalog. HiveExternalCatalog already filters out Spark SQL-specific 
properties from the CatalogTable returned by HiveClient. Adding additional 
internal properties would put us in a place where properties contains:

- Actual properties key/value pairs returned from the Hive metastore table.
- Spark SQL-specific properties that are stored in the Hive metastore table 
but filtered out by HiveExternalCatalog when used by Spark internally. These 
properties must be restored before writing back.
- Spark SQL internal-only properties that are added after reading the table 
from the metastore and must be removed before writing it.

Which isn't even to mention that we'll have to be serializing/deserializing 
this value to and from a (String, String) pair just to pass information between 
```HiveExternalCatalog``` and ```HiveMetastoreCatalog```.

I think that if CatalogTable ends up with too many datasource-specific 
internal parameters then maybe it makes more sense to introduce a new Map 
element, e.g. ```internalProperties```, so these don't get mixed in with the 
table properties.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...

2017-02-22 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/16944#discussion_r102554381
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -161,22 +164,70 @@ private[hive] class 
HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
   bucketSpec,
   Some(partitionSchema))
 
+val catalogTable = metastoreRelation.catalogTable
 val logicalRelation = cached.getOrElse {
   val sizeInBytes =
 
metastoreRelation.stats(sparkSession.sessionState.conf).sizeInBytes.toLong
   val fileIndex = {
-val index = new CatalogFileIndex(
-  sparkSession, metastoreRelation.catalogTable, sizeInBytes)
+val index = new CatalogFileIndex(sparkSession, catalogTable, 
sizeInBytes)
 if (lazyPruningEnabled) {
   index
 } else {
   index.filterPartitions(Nil)  // materialize all the 
partitions in memory
 }
   }
   val partitionSchemaColumnNames = 
partitionSchema.map(_.name.toLowerCase).toSet
-  val dataSchema =
-StructType(metastoreSchema
-  .filterNot(field => 
partitionSchemaColumnNames.contains(field.name.toLowerCase)))
+  val filteredMetastoreSchema = StructType(metastoreSchema
+.filterNot(field => 
partitionSchemaColumnNames.contains(field.name.toLowerCase)))
+
+  // Infer a case-sensitive schema when the metastore doesn't 
return one, if configured.
+  val inferenceMode = 
sparkSession.sessionState.conf.caseSensitiveInferenceMode
+  val inferredSchema = if (!catalogTable.schemaPreservesCase &&
+  inferenceMode != HiveCaseSensitiveInferenceMode.NEVER_INFER) 
{
+logInfo(s"Inferring case-sensitive schema for table 
$tableIdentifier")
+val fileStatuses = fileIndex.listFiles(Nil).flatMap(_.files)
+val inferred = defaultSource.inferSchema(sparkSession, 
options, fileStatuses)
+if (fileType.equals("parquet")) {
+  
inferred.map(ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, _))
+} else {
+  inferred
+}
+  } else {
+None
+  }
+
+  // If configured, save the inferred case-sensitive schema to the 
table properties and
+  // fetch the updated CatalogTable record for use in the 
LogicalRelation.
+  val updatedCatalogTable = if (!catalogTable.schemaPreservesCase 
&&
+  inferenceMode == 
HiveCaseSensitiveInferenceMode.INFER_AND_SAVE) {
+inferredSchema.flatMap { schema =>
+  logInfo(s"Saving case-sensitive schema for table 
$tableIdentifier to table " +
+"properties")
+  val updatedTable = catalogTable.copy(schema = schema)
+  try {
+val catalog = sparkSession.sharedState.externalCatalog
+catalog.alterTable(updatedTable)
+
Option(catalog.getTable(updatedTable.identifier.database.get,
+  updatedTable.identifier.table))
+  } catch {
+case NonFatal(ex) =>
+  logError(s"Error saving case-sensitive schema for table 
$tableIdentifier: $ex")
+  None
+  }
+}
+  } else {
+None
+  }
+
+  val dataSchema = if (!catalogTable.schemaPreservesCase) {
+inferredSchema.getOrElse {
+  logWarning(s"Unable to infer schema for table 
$tableIdentifier from file format " +
+s"$defaultSource; using metastore schema.")
+  filteredMetastoreSchema
+}
+  } else {
+filteredMetastoreSchema
+  }
--- End diff --

I started by trying to keep this as consistent as possible with the 
previous schema inference code but I agree that this function is a bit 
unwieldy. I'll refactor some of this code into helper functions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   >