[jira] [Commented] (SPARK-21392) Unable to infer schema when loading large Parquet file
[ https://issues.apache.org/jira/browse/SPARK-21392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16091854#comment-16091854 ] Stuart Reynolds commented on SPARK-21392: - Okie dokey: http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-save-parquet-file-td28874.html I think there's still a bug here. (I suspect filename given to the cluster can't be saved on the cluster -- but then write should fail, not read, and the error should be different). > Unable to infer schema when loading large Parquet file > -- > > Key: SPARK-21392 > URL: https://issues.apache.org/jira/browse/SPARK-21392 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.1.1, 2.2.0 > Environment: Spark 2.1.1. python 2.7.6 >Reporter: Stuart Reynolds > Labels: parquet, pyspark > > The following boring code works up until when I read in the parquet file. > {code:none} > import numpy as np > import pandas as pd > import pyspark > from pyspark import SQLContext, SparkContext, SparkConf > print pyspark.__version__ > sc = SparkContext(conf=SparkConf().setMaster('local')) > df = pd.DataFrame({"mi":np.arange(100), "eid":np.arange(100)}) > print df > sqlc = SQLContext(sc) > df = sqlc.createDataFrame(df) > df = df.createOrReplaceTempView("outcomes") > rdd = sqlc.sql("SELECT eid,mi FROM outcomes limit 5") > print rdd.schema > rdd.show() > rdd.write.parquet("mi", mode="overwrite") > rdd2 = sqlc.read.parquet("mi") # FAIL! > {code} > {code:none} > # print pyspark.__version__ > 2.2.0 > # print df > eid mi > 0 0 0 > 1 1 1 > 2 2 2 > 3 3 3 > ... > [100 rows x 2 columns] > # print rdd.schema > StructType(List(StructField(eid,LongType,true),StructField(mi,LongType,true))) > # rdd.show() > +---+---+ > |eid| mi| > +---+---+ > | 0| 0| > | 1| 1| > | 2| 2| > | 3| 3| > | 4| 4| > +---+---+ > {code} > > fails with: > {code:none} > rdd2 = sqlc.read.parquet("mixx") > File "/usr/local/lib/python2.7/dist-packages/pyspark/sql/readwriter.py", > line 291, in parquet > return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths))) > File "/usr/local/lib/python2.7/dist-packages/py4j/java_gateway.py", line > 1133, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File "/usr/local/lib/python2.7/dist-packages/pyspark/sql/utils.py", line > 69, in deco > raise AnalysisException(s.split(': ', 1)[1], stackTrace) > pyspark.sql.utils.AnalysisException: u'Unable to infer schema for Parquet. It > must be specified manually.;' > {code} > The documentation for parquet says the format is self describing, and the > full schema was available when the parquet file was saved. What gives? > Works with master='local', but fails with my cluster is specified. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21392) Unable to infer schema when loading large Parquet file
[ https://issues.apache.org/jira/browse/SPARK-21392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16091086#comment-16091086 ] Hyukjin Kwon commented on SPARK-21392: -- [~stuartreynolds], Are you running a cluster but using a local file system to write out and read in the data? > Unable to infer schema when loading large Parquet file > -- > > Key: SPARK-21392 > URL: https://issues.apache.org/jira/browse/SPARK-21392 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.1.1, 2.2.0 > Environment: Spark 2.1.1. python 2.7.6 >Reporter: Stuart Reynolds > Labels: parquet, pyspark > > The following boring code works up until when I read in the parquet file. > {code:none} > import numpy as np > import pandas as pd > import pyspark > from pyspark import SQLContext, SparkContext, SparkConf > print pyspark.__version__ > sc = SparkContext(conf=SparkConf().setMaster('local')) > df = pd.DataFrame({"mi":np.arange(100), "eid":np.arange(100)}) > print df > sqlc = SQLContext(sc) > df = sqlc.createDataFrame(df) > df = df.createOrReplaceTempView("outcomes") > rdd = sqlc.sql("SELECT eid,mi FROM outcomes limit 5") > print rdd.schema > rdd.show() > rdd.write.parquet("mi", mode="overwrite") > rdd2 = sqlc.read.parquet("mi") # FAIL! > {code} > {code:none} > # print pyspark.__version__ > 2.2.0 > # print df > eid mi > 0 0 0 > 1 1 1 > 2 2 2 > 3 3 3 > ... > [100 rows x 2 columns] > # print rdd.schema > StructType(List(StructField(eid,LongType,true),StructField(mi,LongType,true))) > # rdd.show() > +---+---+ > |eid| mi| > +---+---+ > | 0| 0| > | 1| 1| > | 2| 2| > | 3| 3| > | 4| 4| > +---+---+ > {code} > > fails with: > {code:none} > rdd2 = sqlc.read.parquet("mixx") > File "/usr/local/lib/python2.7/dist-packages/pyspark/sql/readwriter.py", > line 291, in parquet > return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths))) > File "/usr/local/lib/python2.7/dist-packages/py4j/java_gateway.py", line > 1133, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File "/usr/local/lib/python2.7/dist-packages/pyspark/sql/utils.py", line > 69, in deco > raise AnalysisException(s.split(': ', 1)[1], stackTrace) > pyspark.sql.utils.AnalysisException: u'Unable to infer schema for Parquet. It > must be specified manually.;' > {code} > The documentation for parquet says the format is self describing, and the > full schema was available when the parquet file was saved. What gives? > Works with master='local', but fails with my cluster is specified. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21392) Unable to infer schema when loading large Parquet file
[ https://issues.apache.org/jira/browse/SPARK-21392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16090159#comment-16090159 ] Stuart Reynolds commented on SPARK-21392: - So trying to look at the csv was helpful. {code:none} #root = "/network/folder" # succeeds root = "" # fails rdd.write.parquet(root+"mi", mode="overwrite") rdd.write.csv(root+"minn.csv", mode="overwrite") rdd2 = sqlc.read.parquet(root+"mi") {code} The above creates a folder on my local machine, but no data. {code:none} % ls -la mi minn.csv mi: total 12 drwxrwxr-x 2 builder builder 4096 Jul 17 10:42 . drwxrwxr-x 5 builder builder 4096 Jul 17 10:42 .. -rw-r--r-- 1 builder builder0 Jul 17 10:42 _SUCCESS -rw-r--r-- 1 builder builder8 Jul 17 10:42 ._SUCCESS.crc minn.csv/: total 12 drwxrwxr-x 2 builder builder 4096 Jul 17 10:42 . drwxrwxr-x 5 builder builder 4096 Jul 17 10:42 .. -rw-r--r-- 1 builder builder0 Jul 17 10:42 _SUCCESS -rw-r--r-- 1 builder builder8 Jul 17 10:42 ._SUCCESS.crc {code} Prepending the paths with network folder that's available to spark succeeds. So, is this just a "file not found error", with a terrible error message? > Unable to infer schema when loading large Parquet file > -- > > Key: SPARK-21392 > URL: https://issues.apache.org/jira/browse/SPARK-21392 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.1.1, 2.2.0 > Environment: Spark 2.1.1. python 2.7.6 >Reporter: Stuart Reynolds > Labels: parquet, pyspark > > The following boring code works up until when I read in the parquet file. > {code:none} > import numpy as np > import pandas as pd > import pyspark > from pyspark import SQLContext, SparkContext, SparkConf > print pyspark.__version__ > sc = SparkContext(conf=SparkConf().setMaster('local')) > df = pd.DataFrame({"mi":np.arange(100), "eid":np.arange(100)}) > print df > sqlc = SQLContext(sc) > df = sqlc.createDataFrame(df) > df = df.createOrReplaceTempView("outcomes") > rdd = sqlc.sql("SELECT eid,mi FROM outcomes limit 5") > print rdd.schema > rdd.show() > rdd.write.parquet("mi", mode="overwrite") > rdd2 = sqlc.read.parquet("mi") # FAIL! > {code} > {code:none} > # print pyspark.__version__ > 2.2.0 > # print df > eid mi > 0 0 0 > 1 1 1 > 2 2 2 > 3 3 3 > ... > [100 rows x 2 columns] > # print rdd.schema > StructType(List(StructField(eid,LongType,true),StructField(mi,LongType,true))) > # rdd.show() > +---+---+ > |eid| mi| > +---+---+ > | 0| 0| > | 1| 1| > | 2| 2| > | 3| 3| > | 4| 4| > +---+---+ > {code} > > fails with: > {code:none} > rdd2 = sqlc.read.parquet("mixx") > File "/usr/local/lib/python2.7/dist-packages/pyspark/sql/readwriter.py", > line 291, in parquet > return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths))) > File "/usr/local/lib/python2.7/dist-packages/py4j/java_gateway.py", line > 1133, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File "/usr/local/lib/python2.7/dist-packages/pyspark/sql/utils.py", line > 69, in deco > raise AnalysisException(s.split(': ', 1)[1], stackTrace) > pyspark.sql.utils.AnalysisException: u'Unable to infer schema for Parquet. It > must be specified manually.;' > {code} > The documentation for parquet says the format is self describing, and the > full schema was available when the parquet file was saved. What gives? > Works with master='local', but fails with my cluster is specified. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21392) Unable to infer schema when loading large Parquet file
[ https://issues.apache.org/jira/browse/SPARK-21392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16090134#comment-16090134 ] Stuart Reynolds commented on SPARK-21392: - I've made the example self contained and sourced from a pandas dataframe. It seems to succeed with master=local, and fails on the cluster (the cluster's dashboard says its also spark 2.2.0). > Unable to infer schema when loading large Parquet file > -- > > Key: SPARK-21392 > URL: https://issues.apache.org/jira/browse/SPARK-21392 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.1.1, 2.2.0 > Environment: Spark 2.1.1. python 2.7.6 >Reporter: Stuart Reynolds > Labels: parquet, pyspark > > The following boring code works up until when I read in the parquet file. > {code:none} > import numpy as np > import pandas as pd > import pyspark > from pyspark import SQLContext, SparkContext, SparkConf > print pyspark.__version__ > sc = SparkContext(conf=SparkConf().setMaster('local')) > df = pd.DataFrame({"mi":np.arange(100), "eid":np.arange(100)}) > print df > sqlc = SQLContext(sc) > df = sqlc.createDataFrame(df) > df = df.createOrReplaceTempView("outcomes") > rdd = sqlc.sql("SELECT eid,mi FROM outcomes limit 5") > print rdd.schema > rdd.show() > rdd.write.parquet("mi", mode="overwrite") > rdd2 = sqlc.read.parquet("mi") # FAIL! > {code} > {code:none} > # print pyspark.__version__ > 2.2.0 > # print df > eid mi > 0 0 0 > 1 1 1 > 2 2 2 > 3 3 3 > ... > [100 rows x 2 columns] > # print rdd.schema > StructType(List(StructField(eid,LongType,true),StructField(mi,LongType,true))) > # rdd.show() > +---+---+ > |eid| mi| > +---+---+ > | 0| 0| > | 1| 1| > | 2| 2| > | 3| 3| > | 4| 4| > +---+---+ > {code} > > fails with: > {code:none} > rdd2 = sqlc.read.parquet("mixx") > File "/usr/local/lib/python2.7/dist-packages/pyspark/sql/readwriter.py", > line 291, in parquet > return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths))) > File "/usr/local/lib/python2.7/dist-packages/py4j/java_gateway.py", line > 1133, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File "/usr/local/lib/python2.7/dist-packages/pyspark/sql/utils.py", line > 69, in deco > raise AnalysisException(s.split(': ', 1)[1], stackTrace) > pyspark.sql.utils.AnalysisException: u'Unable to infer schema for Parquet. It > must be specified manually.;' > {code} > The documentation for parquet says the format is self describing, and the > full schema was available when the parquet file was saved. What gives? > Works with master='local', but fails with my cluster is specified. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21392) Unable to infer schema when loading large Parquet file
[ https://issues.apache.org/jira/browse/SPARK-21392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16088921#comment-16088921 ] Hyukjin Kwon commented on SPARK-21392: -- Thanks for investigations and details. I tried to reproduce this as below: {code} response = "mi_or_chd_5" data = [[226, None], [442, None], [978, 0], [851, 0], [428, 0]] spark.createDataFrame(data, "eid: int, mi_or_chd_5: short").createOrReplaceTempView("outcomes") df = sql("SELECT eid,mi_or_chd_5 FROM outcomes") df.write.parquet(response, mode="overwrite") spark.read.parquet(response).show() {code} but I couldn't. Would you mind sharing the output file from {{.write.parquet}} and checking output files after writing out that via {{.write.csv}} via {{cat}} ? Also, It would be helpful if you remove out the custom codes parts. > Unable to infer schema when loading large Parquet file > -- > > Key: SPARK-21392 > URL: https://issues.apache.org/jira/browse/SPARK-21392 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.1.1, 2.2.0 > Environment: Spark 2.1.1. python 2.7.6 >Reporter: Stuart Reynolds > Labels: parquet, pyspark > > The following boring code works up until when I read in the parquet file. > {code:none} > response = "mi_or_chd_5" > sc = get_spark_context() # custom > sqlc = get_sparkSQLContextWithTables(sc, tables=["outcomes"]) # custom > rdd = sqlc.sql("SELECT eid,mi_or_chd_5 FROM outcomes") > print rdd.schema > #>> > StructType(List(StructField(eid,IntegerType,true),StructField(mi_or_chd_5,ShortType,true))) > rdd.show() > #+---+---+ > #|eid|mi_or_chd_5| > #+---+---+ > #|226| null| > #|442| null| > #|978| 0| > #|851| 0| > #|428| 0| > rdd.write.parquet(response, mode="overwrite") # success! > rdd2 = sqlc.read.parquet(response) # fail > {code} > > fails with: > {code:none}AnalysisException: u'Unable to infer schema for Parquet. It must > be specified manually.;' > {code} > in > {code:none} > /usr/local/lib/python2.7/dist-packages/pyspark-2.1.0+hadoop2.7-py2.7.egg/pyspark/sql/utils.pyc > in deco(*a, **kw) > {code} > The documentation for parquet says the format is self describing, and the > full schema was available when the parquet file was saved. What gives? > The error doesn't happen if I add "limit 10" to the sql query. The whole > selected table is 500k rows with an int and short column. > Seems related to: https://issues.apache.org/jira/browse/SPARK-16975, but > which claims it was fixed in 2.0.1, 2.1.0. (Current bug is 2.1.1) -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21392) Unable to infer schema when loading large Parquet file
[ https://issues.apache.org/jira/browse/SPARK-21392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085987#comment-16085987 ] Stuart Reynolds commented on SPARK-21392: - My bad -- I may have missed the error message. Upgraded to spark 2.2.0 today and re-ran this. I get the error no matter what the number of rows. > Unable to infer schema when loading large Parquet file > -- > > Key: SPARK-21392 > URL: https://issues.apache.org/jira/browse/SPARK-21392 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.1.1 > Environment: Spark 2.1.1. python 2.7.6 >Reporter: Stuart Reynolds > Labels: parquet, pyspark > > The following boring code works > {code:none} > response = "mi_or_chd_5" > sc = get_spark_context() # custom > sqlc = get_sparkSQLContextWithTables(sc, tables=["outcomes"]) # custom > rdd = sqlc.sql("SELECT eid,mi_or_chd_5 FROM outcomes") > print rdd.schema > #>> > StructType(List(StructField(eid,IntegerType,true),StructField(mi_or_chd_5,ShortType,true))) > rdd.show() > #+---+---+ > #|eid|mi_or_chd_5| > #+---+---+ > #|226| null| > #|442| null| > #|978| 0| > #|851| 0| > #|428| 0| > rdd.write.parquet(response, mode="overwrite") # success! > rdd2 = sqlc.read.parquet(response) # fail > {code} > > fails with: > {code:none}AnalysisException: u'Unable to infer schema for Parquet. It must > be specified manually.;' > {code} > in > {code:none} > /usr/local/lib/python2.7/dist-packages/pyspark-2.1.0+hadoop2.7-py2.7.egg/pyspark/sql/utils.pyc > in deco(*a, **kw) > {code} > The documentation for parquet says the format is self describing, and the > full schema was available when the parquet file was saved. What gives? > The error doesn't happen if I add "limit 10" to the sql query. The whole > selected table is 500k rows with an int and short column. > Seems related to: https://issues.apache.org/jira/browse/SPARK-16975, but > which claims it was fixed in 2.0.1, 2.1.0. (Current bug is 2.1.1) -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21392) Unable to infer schema when loading large Parquet file
[ https://issues.apache.org/jira/browse/SPARK-21392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16084837#comment-16084837 ] Stuart Reynolds commented on SPARK-21392: - I've simplified the example a little more and also found the limiting the query size to 100 rows succeeds, whereas if I select all 500k rows * 2 columns, it fails. > Unable to infer schema when loading large Parquet file > -- > > Key: SPARK-21392 > URL: https://issues.apache.org/jira/browse/SPARK-21392 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.1.1 > Environment: Spark 2.1.1. python 2.7.6 >Reporter: Stuart Reynolds > Labels: parquet, pyspark > > The following boring code works > {code:none} > response = "mi_or_chd_5" > sc = get_spark_context() # custom > sqlc = get_sparkSQLContextWithTables(sc, tables=["outcomes"]) # custom > rdd = sqlc.sql("SELECT eid,mi_or_chd_5 FROM outcomes") > print rdd.schema > #>> > StructType(List(StructField(eid,IntegerType,true),StructField(mi_or_chd_5,ShortType,true))) > rdd.show() > #+---+---+ > #|eid|mi_or_chd_5| > #+---+---+ > #|216| null| > #|431| null| > #|978| 0| > #|852| 0| > #|418| 0| > rdd.write.parquet(response, mode="overwrite") # success! > rdd2 = sqlc.read.parquet(response) # fail > {code} > > fails with: > {code:none}AnalysisException: u'Unable to infer schema for Parquet. It must > be specified manually.;' > {code} > in > {code:none} > /usr/local/lib/python2.7/dist-packages/pyspark-2.1.0+hadoop2.7-py2.7.egg/pyspark/sql/utils.pyc > in deco(*a, **kw) > {code} > The documentation for parquet says the format is self describing, and the > full schema was available when the parquet file was saved. What gives? > The error doesn't happen if I add "limit 10" to the sql query. The whole > selected table is 500k rows with an int and short column. > Seems related to: https://issues.apache.org/jira/browse/SPARK-16975, but > which claims it was fixed in 2.0.1, 2.1.0. (Current bug is 2.1.1) -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org