[jira] [Commented] (SPARK-24915) Calling SparkSession.createDataFrame with schema can throw exception

2020-01-13 Thread Joachim Hereth (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17014870#comment-17014870
 ] 

Joachim Hereth commented on SPARK-24915:


[~bryanc]  Thanks! The PR was against master and can probably be closed. If 
this will now be a bugfix for 2.4 - should I rebase the PR or open a new one? I 
couldn't find information how this is handled.

> Calling SparkSession.createDataFrame with schema can throw exception
> 
>
> Key: SPARK-24915
> URL: https://issues.apache.org/jira/browse/SPARK-24915
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.1
> Environment: Python 3.6.3
> PySpark 2.3.1 (installed via pip)
> OSX 10.12.6
>Reporter: Stephen Spencer
>Priority: Major
>
> There seems to be a bug in PySpark when using the PySparkSQL session to 
> create a dataframe with a pre-defined schema.
> Code to reproduce the error:
> {code:java}
> from pyspark import SparkConf, SparkContext
> from pyspark.sql import SparkSession
> from pyspark.sql.types import StructType, StructField, StringType, Row
> conf = SparkConf().setMaster("local").setAppName("repro") 
> context = SparkContext(conf=conf) 
> session = SparkSession(context)
> # Construct schema (the order of fields is important)
> schema = StructType([
> StructField('field2', StructType([StructField('sub_field', StringType(), 
> False)]), False),
> StructField('field1', StringType(), False),
> ])
> # Create data to populate data frame
> data = [
> Row(field1="Hello", field2=Row(sub_field='world'))
> ]
> # Attempt to create the data frame supplying the schema
> # this will throw a ValueError
> df = session.createDataFrame(data, schema=schema)
> df.show(){code}
> Running this throws a ValueError
> {noformat}
> Traceback (most recent call last):
> File "schema_bug.py", line 18, in 
> df = session.createDataFrame(data, schema=schema)
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/session.py",
>  line 691, in createDataFrame
> rdd, schema = self._createFromLocal(map(prepare, data), schema)
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/session.py",
>  line 423, in _createFromLocal
> data = [schema.toInternal(row) for row in data]
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/session.py",
>  line 423, in 
> data = [schema.toInternal(row) for row in data]
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
>  line 601, in toInternal
> for f, v, c in zip(self.fields, obj, self._needConversion))
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
>  line 601, in 
> for f, v, c in zip(self.fields, obj, self._needConversion))
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
>  line 439, in toInternal
> return self.dataType.toInternal(obj)
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
>  line 619, in toInternal
> raise ValueError("Unexpected tuple %r with StructType" % obj)
> ValueError: Unexpected tuple 'Hello' with StructType{noformat}
> The problem seems to be here:
> https://github.com/apache/spark/blob/3d5c61e5fd24f07302e39b5d61294da79aa0c2f9/python/pyspark/sql/types.py#L603
> specifically the bit
> {code:java}
> zip(self.fields, obj, self._needConversion)
> {code}
> This zip statement seems to assume that obj and self.fields are ordered in 
> the same way, so that the elements of obj will correspond to the right fields 
> in the schema. However this is not true, a Row orders its elements 
> alphabetically but the fields in the schema are in whatever order they are 
> specified. In this example field2 is being initialised with the field1 
> element 'Hello'. If you re-order the fields in the schema to go (field1, 
> field2), the given example works without error.
> The schema in the repro is specifically designed to elicit the problem, the 
> fields are out of alphabetical order and one field is a StructType, making 
> chema._needSerializeAnyField==True . However we encountered this in real use.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24915) Calling SparkSession.createDataFrame with schema can throw exception

2020-01-11 Thread Joachim Hereth (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17013591#comment-17013591
 ] 

Joachim Hereth commented on SPARK-24915:


[~bryanc]Is there any chance that SPARK-29748 will be backported to 2.4?

If not, why not apply the bugfix from 
[https://github.com/apache/spark/pull/26118] to 2.4 instead of keeping this bug 
for 2.4 users?

> Calling SparkSession.createDataFrame with schema can throw exception
> 
>
> Key: SPARK-24915
> URL: https://issues.apache.org/jira/browse/SPARK-24915
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.1
> Environment: Python 3.6.3
> PySpark 2.3.1 (installed via pip)
> OSX 10.12.6
>Reporter: Stephen Spencer
>Priority: Major
>
> There seems to be a bug in PySpark when using the PySparkSQL session to 
> create a dataframe with a pre-defined schema.
> Code to reproduce the error:
> {code:java}
> from pyspark import SparkConf, SparkContext
> from pyspark.sql import SparkSession
> from pyspark.sql.types import StructType, StructField, StringType, Row
> conf = SparkConf().setMaster("local").setAppName("repro") 
> context = SparkContext(conf=conf) 
> session = SparkSession(context)
> # Construct schema (the order of fields is important)
> schema = StructType([
> StructField('field2', StructType([StructField('sub_field', StringType(), 
> False)]), False),
> StructField('field1', StringType(), False),
> ])
> # Create data to populate data frame
> data = [
> Row(field1="Hello", field2=Row(sub_field='world'))
> ]
> # Attempt to create the data frame supplying the schema
> # this will throw a ValueError
> df = session.createDataFrame(data, schema=schema)
> df.show(){code}
> Running this throws a ValueError
> {noformat}
> Traceback (most recent call last):
> File "schema_bug.py", line 18, in 
> df = session.createDataFrame(data, schema=schema)
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/session.py",
>  line 691, in createDataFrame
> rdd, schema = self._createFromLocal(map(prepare, data), schema)
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/session.py",
>  line 423, in _createFromLocal
> data = [schema.toInternal(row) for row in data]
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/session.py",
>  line 423, in 
> data = [schema.toInternal(row) for row in data]
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
>  line 601, in toInternal
> for f, v, c in zip(self.fields, obj, self._needConversion))
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
>  line 601, in 
> for f, v, c in zip(self.fields, obj, self._needConversion))
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
>  line 439, in toInternal
> return self.dataType.toInternal(obj)
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
>  line 619, in toInternal
> raise ValueError("Unexpected tuple %r with StructType" % obj)
> ValueError: Unexpected tuple 'Hello' with StructType{noformat}
> The problem seems to be here:
> https://github.com/apache/spark/blob/3d5c61e5fd24f07302e39b5d61294da79aa0c2f9/python/pyspark/sql/types.py#L603
> specifically the bit
> {code:java}
> zip(self.fields, obj, self._needConversion)
> {code}
> This zip statement seems to assume that obj and self.fields are ordered in 
> the same way, so that the elements of obj will correspond to the right fields 
> in the schema. However this is not true, a Row orders its elements 
> alphabetically but the fields in the schema are in whatever order they are 
> specified. In this example field2 is being initialised with the field1 
> element 'Hello'. If you re-order the fields in the schema to go (field1, 
> field2), the given example works without error.
> The schema in the repro is specifically designed to elicit the problem, the 
> fields are out of alphabetical order and one field is a StructType, making 
> chema._needSerializeAnyField==True . However we encountered this in real use.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-29748) Remove sorting of fields in PySpark SQL Row creation

2019-11-14 Thread Joachim Hereth (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16974821#comment-16974821
 ] 

Joachim Hereth commented on SPARK-29748:


[~bryanc] With simply removing sorting we change the semantics, e.g. `Row(a=1, 
b=2) != Row(b=2, a=1)` (opposed to what we currently have.

Also, there might be problems if data was written with Spark pre-change and 
read after the change.

Adding workarounds (if possible) will make the code very complex.

I think [~zero323] was thinking about changes for the upcoming 3.0?

> Remove sorting of fields in PySpark SQL Row creation
> 
>
> Key: SPARK-29748
> URL: https://issues.apache.org/jira/browse/SPARK-29748
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, SQL
>Affects Versions: 3.0.0
>Reporter: Bryan Cutler
>Priority: Major
>
> Currently, when a PySpark Row is created with keyword arguments, the fields 
> are sorted alphabetically. This has created a lot of confusion with users 
> because it is not obvious (although it is stated in the pydocs) that they 
> will be sorted alphabetically, and then an error can occur later when 
> applying a schema and the field order does not match.
> The original reason for sorting fields is because kwargs in python < 3.6 are 
> not guaranteed to be in the same order that they were entered. Sorting 
> alphabetically would ensure a consistent order.  Matters are further 
> complicated with the flag {{__from_dict__}} that allows the {{Row}} fields to 
> to be referenced by name when made by kwargs, but this flag is not serialized 
> with the Row and leads to inconsistent behavior.
> This JIRA proposes that any sorting of the Fields is removed. Users with 
> Python 3.6+ creating Rows with kwargs can continue to do so since Python will 
> ensure the order is the same as entered. Users with Python < 3.6 will have to 
> create Rows with an OrderedDict or by using the Row class as a factory 
> (explained in the pydoc).  If kwargs are used, an error will be raised or 
> based on a conf setting it can fall back to a LegacyRow that will sort the 
> fields as before. This LegacyRow will be immediately deprecated and removed 
> once support for Python < 3.6 is dropped.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24915) Calling SparkSession.createDataFrame with schema can throw exception

2019-10-15 Thread Joachim Hereth (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16951417#comment-16951417
 ] 

Joachim Hereth edited comment on SPARK-24915 at 10/15/19 8:58 PM:
--

this is fixed by [https://github.com/apache/spark/pull/26118]

As in RDD days Row is considered a tuple while all Rows coming from dataframes 
have column names and behave more like dicts.

Maybe it's time to deprecate RDD-style Rows?


was (Author: jhereth):
this is fixed by 
[https://github.com/apache/spark/pull/26118|https://github.com/apache/spark/pull/26118.]

As in RDD days Row is considered a tuple while all Rows coming from dataframes 
have column names and behave more like dicts.

Maybe it's time to deprecate RDD-style Rows?

> Calling SparkSession.createDataFrame with schema can throw exception
> 
>
> Key: SPARK-24915
> URL: https://issues.apache.org/jira/browse/SPARK-24915
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.1
> Environment: Python 3.6.3
> PySpark 2.3.1 (installed via pip)
> OSX 10.12.6
>Reporter: Stephen Spencer
>Priority: Major
>
> There seems to be a bug in PySpark when using the PySparkSQL session to 
> create a dataframe with a pre-defined schema.
> Code to reproduce the error:
> {code:java}
> from pyspark import SparkConf, SparkContext
> from pyspark.sql import SparkSession
> from pyspark.sql.types import StructType, StructField, StringType, Row
> conf = SparkConf().setMaster("local").setAppName("repro") 
> context = SparkContext(conf=conf) 
> session = SparkSession(context)
> # Construct schema (the order of fields is important)
> schema = StructType([
> StructField('field2', StructType([StructField('sub_field', StringType(), 
> False)]), False),
> StructField('field1', StringType(), False),
> ])
> # Create data to populate data frame
> data = [
> Row(field1="Hello", field2=Row(sub_field='world'))
> ]
> # Attempt to create the data frame supplying the schema
> # this will throw a ValueError
> df = session.createDataFrame(data, schema=schema)
> df.show(){code}
> Running this throws a ValueError
> {noformat}
> Traceback (most recent call last):
> File "schema_bug.py", line 18, in 
> df = session.createDataFrame(data, schema=schema)
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/session.py",
>  line 691, in createDataFrame
> rdd, schema = self._createFromLocal(map(prepare, data), schema)
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/session.py",
>  line 423, in _createFromLocal
> data = [schema.toInternal(row) for row in data]
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/session.py",
>  line 423, in 
> data = [schema.toInternal(row) for row in data]
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
>  line 601, in toInternal
> for f, v, c in zip(self.fields, obj, self._needConversion))
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
>  line 601, in 
> for f, v, c in zip(self.fields, obj, self._needConversion))
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
>  line 439, in toInternal
> return self.dataType.toInternal(obj)
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
>  line 619, in toInternal
> raise ValueError("Unexpected tuple %r with StructType" % obj)
> ValueError: Unexpected tuple 'Hello' with StructType{noformat}
> The problem seems to be here:
> https://github.com/apache/spark/blob/3d5c61e5fd24f07302e39b5d61294da79aa0c2f9/python/pyspark/sql/types.py#L603
> specifically the bit
> {code:java}
> zip(self.fields, obj, self._needConversion)
> {code}
> This zip statement seems to assume that obj and self.fields are ordered in 
> the same way, so that the elements of obj will correspond to the right fields 
> in the schema. However this is not true, a Row orders its elements 
> alphabetically but the fields in the schema are in whatever order they are 
> specified. In this example field2 is being initialised with the field1 
> element 'Hello'. If you re-order the fields in the schema to go (field1, 
> field2), the given example works without error.
> The schema in the repro is specifically designed to elicit the problem, the 
> fields are out of alphabetical order and one field is a StructType, making 
> chema._needSerializeAnyField==True . However we encountered this in real use.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (SPARK-24915) Calling SparkSession.createDataFrame with schema can throw exception

2019-10-15 Thread Joachim Hereth (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16951417#comment-16951417
 ] 

Joachim Hereth edited comment on SPARK-24915 at 10/15/19 8:58 PM:
--

this is fixed by 
[https://github.com/apache/spark/pull/26118|https://github.com/apache/spark/pull/26118.]

As in RDD days Row is considered a tuple while all Rows coming from dataframes 
have column names and behave more like dicts.

Maybe it's time to deprecate RDD-style Rows?


was (Author: jhereth):
this is fixed by [https://github.com/apache/spark/pull/26118.]

As in RDD days Row is considered a tuple while all Rows coming from dataframes 
have column names and behave more like dicts.

Maybe it's time to deprecate RDD-style Rows?

> Calling SparkSession.createDataFrame with schema can throw exception
> 
>
> Key: SPARK-24915
> URL: https://issues.apache.org/jira/browse/SPARK-24915
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.1
> Environment: Python 3.6.3
> PySpark 2.3.1 (installed via pip)
> OSX 10.12.6
>Reporter: Stephen Spencer
>Priority: Major
>
> There seems to be a bug in PySpark when using the PySparkSQL session to 
> create a dataframe with a pre-defined schema.
> Code to reproduce the error:
> {code:java}
> from pyspark import SparkConf, SparkContext
> from pyspark.sql import SparkSession
> from pyspark.sql.types import StructType, StructField, StringType, Row
> conf = SparkConf().setMaster("local").setAppName("repro") 
> context = SparkContext(conf=conf) 
> session = SparkSession(context)
> # Construct schema (the order of fields is important)
> schema = StructType([
> StructField('field2', StructType([StructField('sub_field', StringType(), 
> False)]), False),
> StructField('field1', StringType(), False),
> ])
> # Create data to populate data frame
> data = [
> Row(field1="Hello", field2=Row(sub_field='world'))
> ]
> # Attempt to create the data frame supplying the schema
> # this will throw a ValueError
> df = session.createDataFrame(data, schema=schema)
> df.show(){code}
> Running this throws a ValueError
> {noformat}
> Traceback (most recent call last):
> File "schema_bug.py", line 18, in 
> df = session.createDataFrame(data, schema=schema)
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/session.py",
>  line 691, in createDataFrame
> rdd, schema = self._createFromLocal(map(prepare, data), schema)
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/session.py",
>  line 423, in _createFromLocal
> data = [schema.toInternal(row) for row in data]
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/session.py",
>  line 423, in 
> data = [schema.toInternal(row) for row in data]
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
>  line 601, in toInternal
> for f, v, c in zip(self.fields, obj, self._needConversion))
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
>  line 601, in 
> for f, v, c in zip(self.fields, obj, self._needConversion))
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
>  line 439, in toInternal
> return self.dataType.toInternal(obj)
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
>  line 619, in toInternal
> raise ValueError("Unexpected tuple %r with StructType" % obj)
> ValueError: Unexpected tuple 'Hello' with StructType{noformat}
> The problem seems to be here:
> https://github.com/apache/spark/blob/3d5c61e5fd24f07302e39b5d61294da79aa0c2f9/python/pyspark/sql/types.py#L603
> specifically the bit
> {code:java}
> zip(self.fields, obj, self._needConversion)
> {code}
> This zip statement seems to assume that obj and self.fields are ordered in 
> the same way, so that the elements of obj will correspond to the right fields 
> in the schema. However this is not true, a Row orders its elements 
> alphabetically but the fields in the schema are in whatever order they are 
> specified. In this example field2 is being initialised with the field1 
> element 'Hello'. If you re-order the fields in the schema to go (field1, 
> field2), the given example works without error.
> The schema in the repro is specifically designed to elicit the problem, the 
> fields are out of alphabetical order and one field is a StructType, making 
> chema._needSerializeAnyField==True . However we encountered this in real use.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (SPARK-24915) Calling SparkSession.createDataFrame with schema can throw exception

2019-10-15 Thread Joachim Hereth (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16951417#comment-16951417
 ] 

Joachim Hereth edited comment on SPARK-24915 at 10/15/19 8:25 PM:
--

this is fixed by [https://github.com/apache/spark/pull/26118.]

As in RDD days Row is considered a tuple while all Rows coming from dataframes 
have column names and behave more like dicts.

Maybe it's time to deprecate RDD-style Rows?


was (Author: jhereth):
this is fixed by [https://github.com/apache/spark/pull/26118.]

It's strange that Row is considered a tuple (it also causes the tests to look a 
bit strange).

However, changing the hierarchy seemed a bit too adventurous.

> Calling SparkSession.createDataFrame with schema can throw exception
> 
>
> Key: SPARK-24915
> URL: https://issues.apache.org/jira/browse/SPARK-24915
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.1
> Environment: Python 3.6.3
> PySpark 2.3.1 (installed via pip)
> OSX 10.12.6
>Reporter: Stephen Spencer
>Priority: Major
>
> There seems to be a bug in PySpark when using the PySparkSQL session to 
> create a dataframe with a pre-defined schema.
> Code to reproduce the error:
> {code:java}
> from pyspark import SparkConf, SparkContext
> from pyspark.sql import SparkSession
> from pyspark.sql.types import StructType, StructField, StringType, Row
> conf = SparkConf().setMaster("local").setAppName("repro") 
> context = SparkContext(conf=conf) 
> session = SparkSession(context)
> # Construct schema (the order of fields is important)
> schema = StructType([
> StructField('field2', StructType([StructField('sub_field', StringType(), 
> False)]), False),
> StructField('field1', StringType(), False),
> ])
> # Create data to populate data frame
> data = [
> Row(field1="Hello", field2=Row(sub_field='world'))
> ]
> # Attempt to create the data frame supplying the schema
> # this will throw a ValueError
> df = session.createDataFrame(data, schema=schema)
> df.show(){code}
> Running this throws a ValueError
> {noformat}
> Traceback (most recent call last):
> File "schema_bug.py", line 18, in 
> df = session.createDataFrame(data, schema=schema)
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/session.py",
>  line 691, in createDataFrame
> rdd, schema = self._createFromLocal(map(prepare, data), schema)
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/session.py",
>  line 423, in _createFromLocal
> data = [schema.toInternal(row) for row in data]
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/session.py",
>  line 423, in 
> data = [schema.toInternal(row) for row in data]
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
>  line 601, in toInternal
> for f, v, c in zip(self.fields, obj, self._needConversion))
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
>  line 601, in 
> for f, v, c in zip(self.fields, obj, self._needConversion))
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
>  line 439, in toInternal
> return self.dataType.toInternal(obj)
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
>  line 619, in toInternal
> raise ValueError("Unexpected tuple %r with StructType" % obj)
> ValueError: Unexpected tuple 'Hello' with StructType{noformat}
> The problem seems to be here:
> https://github.com/apache/spark/blob/3d5c61e5fd24f07302e39b5d61294da79aa0c2f9/python/pyspark/sql/types.py#L603
> specifically the bit
> {code:java}
> zip(self.fields, obj, self._needConversion)
> {code}
> This zip statement seems to assume that obj and self.fields are ordered in 
> the same way, so that the elements of obj will correspond to the right fields 
> in the schema. However this is not true, a Row orders its elements 
> alphabetically but the fields in the schema are in whatever order they are 
> specified. In this example field2 is being initialised with the field1 
> element 'Hello'. If you re-order the fields in the schema to go (field1, 
> field2), the given example works without error.
> The schema in the repro is specifically designed to elicit the problem, the 
> fields are out of alphabetical order and one field is a StructType, making 
> chema._needSerializeAnyField==True . However we encountered this in real use.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, 

[jira] [Commented] (SPARK-24915) Calling SparkSession.createDataFrame with schema can throw exception

2019-10-14 Thread Joachim Hereth (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16951417#comment-16951417
 ] 

Joachim Hereth commented on SPARK-24915:


this is fixed by [https://github.com/apache/spark/pull/26118.]

It's strange that Row is considered a tuple (it also causes the tests to look a 
bit strange).

However, changing the hierarchy seemed a bit too adventurous.

> Calling SparkSession.createDataFrame with schema can throw exception
> 
>
> Key: SPARK-24915
> URL: https://issues.apache.org/jira/browse/SPARK-24915
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.1
> Environment: Python 3.6.3
> PySpark 2.3.1 (installed via pip)
> OSX 10.12.6
>Reporter: Stephen Spencer
>Priority: Major
>
> There seems to be a bug in PySpark when using the PySparkSQL session to 
> create a dataframe with a pre-defined schema.
> Code to reproduce the error:
> {code:java}
> from pyspark import SparkConf, SparkContext
> from pyspark.sql import SparkSession
> from pyspark.sql.types import StructType, StructField, StringType, Row
> conf = SparkConf().setMaster("local").setAppName("repro") 
> context = SparkContext(conf=conf) 
> session = SparkSession(context)
> # Construct schema (the order of fields is important)
> schema = StructType([
> StructField('field2', StructType([StructField('sub_field', StringType(), 
> False)]), False),
> StructField('field1', StringType(), False),
> ])
> # Create data to populate data frame
> data = [
> Row(field1="Hello", field2=Row(sub_field='world'))
> ]
> # Attempt to create the data frame supplying the schema
> # this will throw a ValueError
> df = session.createDataFrame(data, schema=schema)
> df.show(){code}
> Running this throws a ValueError
> {noformat}
> Traceback (most recent call last):
> File "schema_bug.py", line 18, in 
> df = session.createDataFrame(data, schema=schema)
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/session.py",
>  line 691, in createDataFrame
> rdd, schema = self._createFromLocal(map(prepare, data), schema)
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/session.py",
>  line 423, in _createFromLocal
> data = [schema.toInternal(row) for row in data]
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/session.py",
>  line 423, in 
> data = [schema.toInternal(row) for row in data]
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
>  line 601, in toInternal
> for f, v, c in zip(self.fields, obj, self._needConversion))
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
>  line 601, in 
> for f, v, c in zip(self.fields, obj, self._needConversion))
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
>  line 439, in toInternal
> return self.dataType.toInternal(obj)
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
>  line 619, in toInternal
> raise ValueError("Unexpected tuple %r with StructType" % obj)
> ValueError: Unexpected tuple 'Hello' with StructType{noformat}
> The problem seems to be here:
> https://github.com/apache/spark/blob/3d5c61e5fd24f07302e39b5d61294da79aa0c2f9/python/pyspark/sql/types.py#L603
> specifically the bit
> {code:java}
> zip(self.fields, obj, self._needConversion)
> {code}
> This zip statement seems to assume that obj and self.fields are ordered in 
> the same way, so that the elements of obj will correspond to the right fields 
> in the schema. However this is not true, a Row orders its elements 
> alphabetically but the fields in the schema are in whatever order they are 
> specified. In this example field2 is being initialised with the field1 
> element 'Hello'. If you re-order the fields in the schema to go (field1, 
> field2), the given example works without error.
> The schema in the repro is specifically designed to elicit the problem, the 
> fields are out of alphabetical order and one field is a StructType, making 
> chema._needSerializeAnyField==True . However we encountered this in real use.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24067) Backport SPARK-17147 to 2.3 (Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction))

2018-05-02 Thread Joachim Hereth (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16460870#comment-16460870
 ] 

Joachim Hereth commented on SPARK-24067:


It would be great if this fix could go into 2.3.1. Any progress on that?

> Backport SPARK-17147 to 2.3 (Spark Streaming Kafka 0.10 Consumer Can't Handle 
> Non-consecutive Offsets (i.e. Log Compaction))
> 
>
> Key: SPARK-24067
> URL: https://issues.apache.org/jira/browse/SPARK-24067
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.3.0
>Reporter: Joachim Hereth
>Assignee: Cody Koeninger
>Priority: Major
>
> SPARK-17147 fixes a problem with non-consecutive Kafka Offsets. The  [PR 
> w|https://github.com/apache/spark/pull/20572]as merged to {{master}}. This 
> should be backported to 2.3.
>  
> Original Description from SPARK-17147 :
>  
> When Kafka does log compaction offsets often end up with gaps, meaning the 
> next requested offset will be frequently not be offset+1. The logic in 
> KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset 
> will always be just an increment of 1 above the previous offset.
> I have worked around this problem by changing CachedKafkaConsumer to use the 
> returned record's offset, from:
>  {{nextOffset = offset + 1}}
>  to:
>  {{nextOffset = record.offset + 1}}
> and changed KafkaRDD from:
>  {{requestOffset += 1}}
>  to:
>  {{requestOffset = r.offset() + 1}}
> (I also had to change some assert logic in CachedKafkaConsumer).
> There's a strong possibility that I have misconstrued how to use the 
> streaming kafka consumer, and I'm happy to close this out if that's the case. 
> If, however, it is supposed to support non-consecutive offsets (e.g. due to 
> log compaction) I am also happy to contribute a PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24067) Backport SPARK-17147 to 2.3 (Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction))

2018-04-24 Thread Joachim Hereth (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joachim Hereth updated SPARK-24067:
---
Description: 
SPARK-17147 fixes a problem with non-consecutive Kafka Offsets. The  [PR 
w|https://github.com/apache/spark/pull/20572]as merged to {{master}}. This 
should be backported to 2.3.

 

Original Description from SPARK-17147 :

 

When Kafka does log compaction offsets often end up with gaps, meaning the next 
requested offset will be frequently not be offset+1. The logic in KafkaRDD & 
CachedKafkaConsumer has a baked in assumption that the next offset will always 
be just an increment of 1 above the previous offset.

I have worked around this problem by changing CachedKafkaConsumer to use the 
returned record's offset, from:
 {{nextOffset = offset + 1}}
 to:
 {{nextOffset = record.offset + 1}}

and changed KafkaRDD from:
 {{requestOffset += 1}}
 to:
 {{requestOffset = r.offset() + 1}}

(I also had to change some assert logic in CachedKafkaConsumer).

There's a strong possibility that I have misconstrued how to use the streaming 
kafka consumer, and I'm happy to close this out if that's the case. If, 
however, it is supposed to support non-consecutive offsets (e.g. due to log 
compaction) I am also happy to contribute a PR.

  was:
Spark 2.3 Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets 
(i.e. Log Compaction)

 

When Kafka does log compaction offsets often end up with gaps, meaning the next 
requested offset will be frequently not be offset+1. The logic in KafkaRDD & 
CachedKafkaConsumer has a baked in assumption that the next offset will always 
be just an increment of 1 above the previous offset.

I have worked around this problem by changing CachedKafkaConsumer to use the 
returned record's offset, from:
 {{nextOffset = offset + 1}}
 to:
 {{nextOffset = record.offset + 1}}

and changed KafkaRDD from:
 {{requestOffset += 1}}
 to:
 {{requestOffset = r.offset() + 1}}

(I also had to change some assert logic in CachedKafkaConsumer).

There's a strong possibility that I have misconstrued how to use the streaming 
kafka consumer, and I'm happy to close this out if that's the case. If, 
however, it is supposed to support non-consecutive offsets (e.g. due to log 
compaction) I am also happy to contribute a PR.


> Backport SPARK-17147 to 2.3 (Spark Streaming Kafka 0.10 Consumer Can't Handle 
> Non-consecutive Offsets (i.e. Log Compaction))
> 
>
> Key: SPARK-24067
> URL: https://issues.apache.org/jira/browse/SPARK-24067
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.3.0
>Reporter: Joachim Hereth
>Assignee: Cody Koeninger
>Priority: Major
>
> SPARK-17147 fixes a problem with non-consecutive Kafka Offsets. The  [PR 
> w|https://github.com/apache/spark/pull/20572]as merged to {{master}}. This 
> should be backported to 2.3.
>  
> Original Description from SPARK-17147 :
>  
> When Kafka does log compaction offsets often end up with gaps, meaning the 
> next requested offset will be frequently not be offset+1. The logic in 
> KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset 
> will always be just an increment of 1 above the previous offset.
> I have worked around this problem by changing CachedKafkaConsumer to use the 
> returned record's offset, from:
>  {{nextOffset = offset + 1}}
>  to:
>  {{nextOffset = record.offset + 1}}
> and changed KafkaRDD from:
>  {{requestOffset += 1}}
>  to:
>  {{requestOffset = r.offset() + 1}}
> (I also had to change some assert logic in CachedKafkaConsumer).
> There's a strong possibility that I have misconstrued how to use the 
> streaming kafka consumer, and I'm happy to close this out if that's the case. 
> If, however, it is supposed to support non-consecutive offsets (e.g. due to 
> log compaction) I am also happy to contribute a PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24067) Backport SPARK-17147 to 2.3 (Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction))

2018-04-24 Thread Joachim Hereth (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joachim Hereth updated SPARK-24067:
---
Affects Version/s: (was: 2.0.0)
   2.3.0

> Backport SPARK-17147 to 2.3 (Spark Streaming Kafka 0.10 Consumer Can't Handle 
> Non-consecutive Offsets (i.e. Log Compaction))
> 
>
> Key: SPARK-24067
> URL: https://issues.apache.org/jira/browse/SPARK-24067
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.3.0
>Reporter: Joachim Hereth
>Assignee: Cody Koeninger
>Priority: Major
>
> Spark 2.3 Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets 
> (i.e. Log Compaction)
>  
> When Kafka does log compaction offsets often end up with gaps, meaning the 
> next requested offset will be frequently not be offset+1. The logic in 
> KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset 
> will always be just an increment of 1 above the previous offset.
> I have worked around this problem by changing CachedKafkaConsumer to use the 
> returned record's offset, from:
>  {{nextOffset = offset + 1}}
>  to:
>  {{nextOffset = record.offset + 1}}
> and changed KafkaRDD from:
>  {{requestOffset += 1}}
>  to:
>  {{requestOffset = r.offset() + 1}}
> (I also had to change some assert logic in CachedKafkaConsumer).
> There's a strong possibility that I have misconstrued how to use the 
> streaming kafka consumer, and I'm happy to close this out if that's the case. 
> If, however, it is supposed to support non-consecutive offsets (e.g. due to 
> log compaction) I am also happy to contribute a PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24067) Backport SPARK-17147 to 2.3 (Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction))

2018-04-24 Thread Joachim Hereth (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joachim Hereth updated SPARK-24067:
---
Fix Version/s: (was: 2.4.0)

> Backport SPARK-17147 to 2.3 (Spark Streaming Kafka 0.10 Consumer Can't Handle 
> Non-consecutive Offsets (i.e. Log Compaction))
> 
>
> Key: SPARK-24067
> URL: https://issues.apache.org/jira/browse/SPARK-24067
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.3.0
>Reporter: Joachim Hereth
>Assignee: Cody Koeninger
>Priority: Major
>
> Spark 2.3 Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets 
> (i.e. Log Compaction)
>  
> When Kafka does log compaction offsets often end up with gaps, meaning the 
> next requested offset will be frequently not be offset+1. The logic in 
> KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset 
> will always be just an increment of 1 above the previous offset.
> I have worked around this problem by changing CachedKafkaConsumer to use the 
> returned record's offset, from:
>  {{nextOffset = offset + 1}}
>  to:
>  {{nextOffset = record.offset + 1}}
> and changed KafkaRDD from:
>  {{requestOffset += 1}}
>  to:
>  {{requestOffset = r.offset() + 1}}
> (I also had to change some assert logic in CachedKafkaConsumer).
> There's a strong possibility that I have misconstrued how to use the 
> streaming kafka consumer, and I'm happy to close this out if that's the case. 
> If, however, it is supposed to support non-consecutive offsets (e.g. due to 
> log compaction) I am also happy to contribute a PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24067) Backport SPARK-17147 to 2.3 (Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction))

2018-04-24 Thread Joachim Hereth (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joachim Hereth updated SPARK-24067:
---
Summary: Backport SPARK-17147 to 2.3 (Spark Streaming Kafka 0.10 Consumer 
Can't Handle Non-consecutive Offsets (i.e. Log Compaction))  (was: Backport 
SPARK-17147 to 2.3)

> Backport SPARK-17147 to 2.3 (Spark Streaming Kafka 0.10 Consumer Can't Handle 
> Non-consecutive Offsets (i.e. Log Compaction))
> 
>
> Key: SPARK-24067
> URL: https://issues.apache.org/jira/browse/SPARK-24067
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0
>Reporter: Joachim Hereth
>Assignee: Cody Koeninger
>Priority: Major
> Fix For: 2.4.0
>
>
> Spark 2.3 Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets 
> (i.e. Log Compaction)
>  
> When Kafka does log compaction offsets often end up with gaps, meaning the 
> next requested offset will be frequently not be offset+1. The logic in 
> KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset 
> will always be just an increment of 1 above the previous offset.
> I have worked around this problem by changing CachedKafkaConsumer to use the 
> returned record's offset, from:
>  {{nextOffset = offset + 1}}
>  to:
>  {{nextOffset = record.offset + 1}}
> and changed KafkaRDD from:
>  {{requestOffset += 1}}
>  to:
>  {{requestOffset = r.offset() + 1}}
> (I also had to change some assert logic in CachedKafkaConsumer).
> There's a strong possibility that I have misconstrued how to use the 
> streaming kafka consumer, and I'm happy to close this out if that's the case. 
> If, however, it is supposed to support non-consecutive offsets (e.g. due to 
> log compaction) I am also happy to contribute a PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24067) Spark 2.3 Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)

2018-04-24 Thread Joachim Hereth (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joachim Hereth updated SPARK-24067:
---
Description: 
Spark 2.3 Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets 
(i.e. Log Compaction)

 

When Kafka does log compaction offsets often end up with gaps, meaning the next 
requested offset will be frequently not be offset+1. The logic in KafkaRDD & 
CachedKafkaConsumer has a baked in assumption that the next offset will always 
be just an increment of 1 above the previous offset.

I have worked around this problem by changing CachedKafkaConsumer to use the 
returned record's offset, from:
 {{nextOffset = offset + 1}}
 to:
 {{nextOffset = record.offset + 1}}

and changed KafkaRDD from:
 {{requestOffset += 1}}
 to:
 {{requestOffset = r.offset() + 1}}

(I also had to change some assert logic in CachedKafkaConsumer).

There's a strong possibility that I have misconstrued how to use the streaming 
kafka consumer, and I'm happy to close this out if that's the case. If, 
however, it is supposed to support non-consecutive offsets (e.g. due to log 
compaction) I am also happy to contribute a PR.

  was:
When Kafka does log compaction offsets often end up with gaps, meaning the next 
requested offset will be frequently not be offset+1. The logic in KafkaRDD & 
CachedKafkaConsumer has a baked in assumption that the next offset will always 
be just an increment of 1 above the previous offset. 

I have worked around this problem by changing CachedKafkaConsumer to use the 
returned record's offset, from:
{{nextOffset = offset + 1}}
to:
{{nextOffset = record.offset + 1}}

and changed KafkaRDD from:
{{requestOffset += 1}}
to:
{{requestOffset = r.offset() + 1}}

(I also had to change some assert logic in CachedKafkaConsumer).

There's a strong possibility that I have misconstrued how to use the streaming 
kafka consumer, and I'm happy to close this out if that's the case. If, 
however, it is supposed to support non-consecutive offsets (e.g. due to log 
compaction) I am also happy to contribute a PR.


> Spark 2.3 Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets 
> (i.e. Log Compaction)
> --
>
> Key: SPARK-24067
> URL: https://issues.apache.org/jira/browse/SPARK-24067
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0
>Reporter: Joachim Hereth
>Assignee: Cody Koeninger
>Priority: Major
> Fix For: 2.4.0
>
>
> Spark 2.3 Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets 
> (i.e. Log Compaction)
>  
> When Kafka does log compaction offsets often end up with gaps, meaning the 
> next requested offset will be frequently not be offset+1. The logic in 
> KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset 
> will always be just an increment of 1 above the previous offset.
> I have worked around this problem by changing CachedKafkaConsumer to use the 
> returned record's offset, from:
>  {{nextOffset = offset + 1}}
>  to:
>  {{nextOffset = record.offset + 1}}
> and changed KafkaRDD from:
>  {{requestOffset += 1}}
>  to:
>  {{requestOffset = r.offset() + 1}}
> (I also had to change some assert logic in CachedKafkaConsumer).
> There's a strong possibility that I have misconstrued how to use the 
> streaming kafka consumer, and I'm happy to close this out if that's the case. 
> If, however, it is supposed to support non-consecutive offsets (e.g. due to 
> log compaction) I am also happy to contribute a PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24067) Backport SPARK-17147 to 2.3

2018-04-24 Thread Joachim Hereth (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joachim Hereth updated SPARK-24067:
---
Summary: Backport SPARK-17147 to 2.3  (was: Spark 2.3 Streaming Kafka 0.10 
Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction))

> Backport SPARK-17147 to 2.3
> ---
>
> Key: SPARK-24067
> URL: https://issues.apache.org/jira/browse/SPARK-24067
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0
>Reporter: Joachim Hereth
>Assignee: Cody Koeninger
>Priority: Major
> Fix For: 2.4.0
>
>
> Spark 2.3 Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets 
> (i.e. Log Compaction)
>  
> When Kafka does log compaction offsets often end up with gaps, meaning the 
> next requested offset will be frequently not be offset+1. The logic in 
> KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset 
> will always be just an increment of 1 above the previous offset.
> I have worked around this problem by changing CachedKafkaConsumer to use the 
> returned record's offset, from:
>  {{nextOffset = offset + 1}}
>  to:
>  {{nextOffset = record.offset + 1}}
> and changed KafkaRDD from:
>  {{requestOffset += 1}}
>  to:
>  {{requestOffset = r.offset() + 1}}
> (I also had to change some assert logic in CachedKafkaConsumer).
> There's a strong possibility that I have misconstrued how to use the 
> streaming kafka consumer, and I'm happy to close this out if that's the case. 
> If, however, it is supposed to support non-consecutive offsets (e.g. due to 
> log compaction) I am also happy to contribute a PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24067) Spark 2.3 Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)

2018-04-24 Thread Joachim Hereth (JIRA)
Joachim Hereth created SPARK-24067:
--

 Summary: Spark 2.3 Streaming Kafka 0.10 Consumer Can't Handle 
Non-consecutive Offsets (i.e. Log Compaction)
 Key: SPARK-24067
 URL: https://issues.apache.org/jira/browse/SPARK-24067
 Project: Spark
  Issue Type: Bug
  Components: DStreams
Affects Versions: 2.0.0
Reporter: Joachim Hereth
Assignee: Cody Koeninger
 Fix For: 2.4.0


When Kafka does log compaction offsets often end up with gaps, meaning the next 
requested offset will be frequently not be offset+1. The logic in KafkaRDD & 
CachedKafkaConsumer has a baked in assumption that the next offset will always 
be just an increment of 1 above the previous offset. 

I have worked around this problem by changing CachedKafkaConsumer to use the 
returned record's offset, from:
{{nextOffset = offset + 1}}
to:
{{nextOffset = record.offset + 1}}

and changed KafkaRDD from:
{{requestOffset += 1}}
to:
{{requestOffset = r.offset() + 1}}

(I also had to change some assert logic in CachedKafkaConsumer).

There's a strong possibility that I have misconstrued how to use the streaming 
kafka consumer, and I'm happy to close this out if that's the case. If, 
however, it is supposed to support non-consecutive offsets (e.g. due to log 
compaction) I am also happy to contribute a PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20808) External Table unnecessarily not created in Hive-compatible way

2017-05-22 Thread Joachim Hereth (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019667#comment-16019667
 ] 

Joachim Hereth commented on SPARK-20808:


Thanks! As you suggested, the bug was already fixed.

Testing against 2.2.0-rc2 the table is created correctly with 
`createExternalTable` and the deprecation warning is issued. With `createTable` 
the table is created correctly without warning.

In both cases the table can be read via hive without problems.

Testing against master (cfca01, 2.3.0-SNAPSHOT) the behaviour is the same.

The bug can be closed as solved, Fix version 2.2.0.

> External Table unnecessarily not created in Hive-compatible way
> ---
>
> Key: SPARK-20808
> URL: https://issues.apache.org/jira/browse/SPARK-20808
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0, 2.1.1
>Reporter: Joachim Hereth
>Priority: Minor
>
> In Spark 2.1.0 and 2.1.1 {{spark.catalog.createExternalTable}} creates tables 
> unnecessarily in a hive-incompatible way.
> For instance executing in a spark shell
> {code}
> val database = "default"
> val table = "table_name"
> val path = "/user/daki/"  + database + "/" + table
> var data = Array(("Alice", 23), ("Laura", 33), ("Peter", 54))
> val df = sc.parallelize(data).toDF("name","age") 
> df.write.mode(org.apache.spark.sql.SaveMode.Overwrite).parquet(path)
> spark.sql("DROP TABLE IF EXISTS " + database + "." + table)
> spark.catalog.createExternalTable(database + "."+ table, path)
> {code}
> issues the warning
> {code}
> Search Subject for Kerberos V5 INIT cred (<>, 
> sun.security.jgss.krb5.Krb5InitCredential)
> 17/05/19 11:01:17 WARN hive.HiveExternalCatalog: Could not persist 
> `default`.`table_name` in a Hive compatible way. Persisting it into Hive 
> metastore in Spark SQL specific format.
> org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:User 
> daki does not have privileges for CREATETABLE)
>   at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:720)
> ...
> {code}
> The Exception (user does not have privileges for CREATETABLE) is misleading 
> (I do have the CREATE TABLE privilege).
> Querying the table with Hive does not return any result. With Spark one can 
> access the data.
> The following code creates the table correctly (workaround):
> {code}
> def sqlStatement(df : org.apache.spark.sql.DataFrame, database : String, 
> table: String, path: String) : String = {
>   val rows = (for(col <- df.schema) 
> yield "`" + col.name + "` " + 
> col.dataType.simpleString).mkString(",\n")
>   val sqlStmnt = ("CREATE EXTERNAL TABLE `%s`.`%s` (%s) " +
> "STORED AS PARQUET " +
> "Location 'hdfs://nameservice1%s'").format(database, table, rows, path)
>   return sqlStmnt
> }
> spark.sql("DROP TABLE IF EXISTS " + database + "." + table)
> spark.sql(sqlStatement(df, database, table, path))
> {code}
> The code is executed via YARN against a Cloudera CDH 5.7.5 cluster with 
> Sentry enabled (in case this matters regarding the privilege warning). Spark 
> was built against the CDH libraries.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20808) External Table unnecessarily not created in Hive-compatible way

2017-05-19 Thread Joachim Hereth (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joachim Hereth updated SPARK-20808:
---
Summary: External Table unnecessarily not created in Hive-compatible way  
(was: External Table unnecessarily not create in Hive-compatible way)

> External Table unnecessarily not created in Hive-compatible way
> ---
>
> Key: SPARK-20808
> URL: https://issues.apache.org/jira/browse/SPARK-20808
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0, 2.1.1
>Reporter: Joachim Hereth
>Priority: Minor
>
> In Spark 2.1.0 and 2.1.1 {{spark.catalog.createExternalTable}} creates tables 
> unnecessarily in a hive-incompatible way.
> For instance executing in a spark shell
> {code}
> val database = "default"
> val table = "table_name"
> val path = "/user/daki/"  + database + "/" + table
> var data = Array(("Alice", 23), ("Laura", 33), ("Peter", 54))
> val df = sc.parallelize(data).toDF("name","age") 
> df.write.mode(org.apache.spark.sql.SaveMode.Overwrite).parquet(path)
> spark.sql("DROP TABLE IF EXISTS " + database + "." + table)
> spark.catalog.createExternalTable(database + "."+ table, path)
> {code}
> issues the warning
> {code}
> Search Subject for Kerberos V5 INIT cred (<>, 
> sun.security.jgss.krb5.Krb5InitCredential)
> 17/05/19 11:01:17 WARN hive.HiveExternalCatalog: Could not persist 
> `default`.`table_name` in a Hive compatible way. Persisting it into Hive 
> metastore in Spark SQL specific format.
> org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:User 
> daki does not have privileges for CREATETABLE)
>   at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:720)
> ...
> {code}
> The Exception (user does not have privileges for CREATETABLE) is misleading 
> (I do have the CREATE TABLE privilege).
> Querying the table with Hive does not return any result. With Spark one can 
> access the data.
> The following code creates the table correctly (workaround):
> {code}
> def sqlStatement(df : org.apache.spark.sql.DataFrame, database : String, 
> table: String, path: String) : String = {
>   val rows = (for(col <- df.schema) 
> yield "`" + col.name + "` " + 
> col.dataType.simpleString).mkString(",\n")
>   val sqlStmnt = ("CREATE EXTERNAL TABLE `%s`.`%s` (%s) " +
> "STORED AS PARQUET " +
> "Location 'hdfs://nameservice1%s'").format(database, table, rows, path)
>   return sqlStmnt
> }
> spark.sql("DROP TABLE IF EXISTS " + database + "." + table)
> spark.sql(sqlStatement(df, database, table, path))
> {code}
> The code is executed via YARN against a Cloudera CDH 5.7.5 cluster with 
> Sentry enabled (in case this matters regarding the privilege warning). Spark 
> was built against the CDH libraries.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20808) External Table unnecessarily not create in Hive-compatible way

2017-05-19 Thread Joachim Hereth (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16017108#comment-16017108
 ] 

Joachim Hereth commented on SPARK-20808:


The warning is caused by an Exeption raised by a call to [saveTableIntoHive() | 
https://github.com/apache/spark/blob/ac1ab6b9db188ac54c745558d57dd0a031d0b162/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala#L369].

I was not able to debug what caused the misleading Exception about privileges.


> External Table unnecessarily not create in Hive-compatible way
> --
>
> Key: SPARK-20808
> URL: https://issues.apache.org/jira/browse/SPARK-20808
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0, 2.1.1
>Reporter: Joachim Hereth
>Priority: Minor
>
> In Spark 2.1.0 and 2.1.1 {{spark.catalog.createExternalTable}} creates tables 
> unnecessarily in a hive-incompatible way.
> For instance executing in a spark shell
> {code}
> val database = "default"
> val table = "table_name"
> val path = "/user/daki/"  + database + "/" + table
> var data = Array(("Alice", 23), ("Laura", 33), ("Peter", 54))
> val df = sc.parallelize(data).toDF("name","age") 
> df.write.mode(org.apache.spark.sql.SaveMode.Overwrite).parquet(path)
> spark.sql("DROP TABLE IF EXISTS " + database + "." + table)
> spark.catalog.createExternalTable(database + "."+ table, path)
> {code}
> issues the warning
> {code}
> Search Subject for Kerberos V5 INIT cred (<>, 
> sun.security.jgss.krb5.Krb5InitCredential)
> 17/05/19 11:01:17 WARN hive.HiveExternalCatalog: Could not persist 
> `default`.`table_name` in a Hive compatible way. Persisting it into Hive 
> metastore in Spark SQL specific format.
> org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:User 
> daki does not have privileges for CREATETABLE)
>   at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:720)
> ...
> {code}
> The Exception (user does not have privileges for CREATETABLE) is misleading 
> (I do have the CREATE TABLE privilege).
> Querying the table with Hive does not return any result. With Spark one can 
> access the data.
> The following code creates the table correctly (workaround):
> {code}
> def sqlStatement(df : org.apache.spark.sql.DataFrame, database : String, 
> table: String, path: String) : String = {
>   val rows = (for(col <- df.schema) 
> yield "`" + col.name + "` " + 
> col.dataType.simpleString).mkString(",\n")
>   val sqlStmnt = ("CREATE EXTERNAL TABLE `%s`.`%s` (%s) " +
> "STORED AS PARQUET " +
> "Location 'hdfs://nameservice1%s'").format(database, table, rows, path)
>   return sqlStmnt
> }
> spark.sql("DROP TABLE IF EXISTS " + database + "." + table)
> spark.sql(sqlStatement(df, database, table, path))
> {code}
> The code is executed via YARN against a Cloudera CDH 5.7.5 cluster with 
> Sentry enabled (in case this matters regarding the privilege warning). Spark 
> was built against the CDH libraries.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-20808) External Table unnecessarily not create in Hive-compatible way

2017-05-19 Thread Joachim Hereth (JIRA)
Joachim Hereth created SPARK-20808:
--

 Summary: External Table unnecessarily not create in 
Hive-compatible way
 Key: SPARK-20808
 URL: https://issues.apache.org/jira/browse/SPARK-20808
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.1.1, 2.1.0
Reporter: Joachim Hereth
Priority: Minor


In Spark 2.1.0 and 2.1.1 {{spark.catalog.createExternalTable}} creates tables 
unnecessarily in a hive-incompatible way.

For instance executing in a spark shell

{code}
val database = "default"
val table = "table_name"
val path = "/user/daki/"  + database + "/" + table

var data = Array(("Alice", 23), ("Laura", 33), ("Peter", 54))
val df = sc.parallelize(data).toDF("name","age") 

df.write.mode(org.apache.spark.sql.SaveMode.Overwrite).parquet(path)

spark.sql("DROP TABLE IF EXISTS " + database + "." + table)

spark.catalog.createExternalTable(database + "."+ table, path)
{code}

issues the warning

{code}
Search Subject for Kerberos V5 INIT cred (<>, 
sun.security.jgss.krb5.Krb5InitCredential)
17/05/19 11:01:17 WARN hive.HiveExternalCatalog: Could not persist 
`default`.`table_name` in a Hive compatible way. Persisting it into Hive 
metastore in Spark SQL specific format.
org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:User 
daki does not have privileges for CREATETABLE)
at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:720)
...
{code}

The Exception (user does not have privileges for CREATETABLE) is misleading (I 
do have the CREATE TABLE privilege).

Querying the table with Hive does not return any result. With Spark one can 
access the data.

The following code creates the table correctly (workaround):
{code}
def sqlStatement(df : org.apache.spark.sql.DataFrame, database : String, table: 
String, path: String) : String = {
  val rows = (for(col <- df.schema) 
yield "`" + col.name + "` " + 
col.dataType.simpleString).mkString(",\n")
  val sqlStmnt = ("CREATE EXTERNAL TABLE `%s`.`%s` (%s) " +
"STORED AS PARQUET " +
"Location 'hdfs://nameservice1%s'").format(database, table, rows, path)
  return sqlStmnt
}

spark.sql("DROP TABLE IF EXISTS " + database + "." + table)
spark.sql(sqlStatement(df, database, table, path))
{code}

The code is executed via YARN against a Cloudera CDH 5.7.5 cluster with Sentry 
enabled (in case this matters regarding the privilege warning). Spark was built 
against the CDH libraries.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org