Re: [Structured Streaming] Deserializing avro messages from kafka source using schema registry

2018-02-09 Thread Michael Armbrust
This isn't supported yet, but there is on going work at spark-avro
 to enable this use
case.  Stay tuned.

On Fri, Feb 9, 2018 at 3:07 PM, Bram  wrote:

> Hi,
>
> I couldn't find any documentation about avro message deserialization using
> pyspark structured streaming. My aim is using confluent schema registry to
> get per topic schema then parse the avro messages with it.
>
> I found one but it was using DirectStream approach
> https://stackoverflow.com/questions/30339636/spark-
> python-avro-kafka-deserialiser
>
> Can anyone show me how?
>
> Thanks
>
> Regards,
>
> Abraham
>


Re: [Structured Streaming] Commit protocol to move temp files to dest path only when complete, with code

2018-02-09 Thread Michael Armbrust
We didn't go this way initially because it doesn't work on storage systems
that have weaker guarantees than HDFS with respect to rename.  That said,
I'm happy to look at other options if we want to make this configurable.



On Fri, Feb 9, 2018 at 2:53 PM, Dave Cameron 
wrote:

> Hi
>
>
> I have a Spark structured streaming job that reads from Kafka and writes
> parquet files to Hive/HDFS. The files are not very large, but the Kafka
> source is noisy so each spark job takes a long time to complete. There is a
> significant window during which the parquet files are incomplete and other
> tools, including PrestoDB, encounter errors while trying to read them.
>
> I wrote this list and stackoverflow about the problem last summer:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-
> Structured-Streaming-truncated-Parquet-after-driver-crash-or-kill-tt29043.
> html
> https://stackoverflow.com/questions/47337030/not-a-
> parquet-file-too-small-from-presto-during-spark-structured-streaming-r/
> 47339805#47339805
>
> After hesitating for a while, I wrote a custom commit protocol to solve
> the problem. It combines HadoopMapReduceCommitProtocol's behavior of
> writing to a temp file first, with ManifestFileCommitProtocol. From what
> I can tell ManifestFileCommitProtocol is required for the normal Structured
> Streaming behavior of being able to resume streams from a known point.
>
> I think this commit protocol could be generally useful. Writing to a temp
> file and moving it to the final location is low cost on HDFS and is the
> standard behavior for non-streaming jobs, as implemented in
> HadoopMapReduceCommitProtocol. At the same time ManifestFileCommitProtocol
> is needed for structured streaming jobs. We have been running this for a
> few months in production without problems.
>
> Here is the code (at the moment not up to Spark standards, admittedly):
> https://github.com/davcamer/spark/commit/361f1c69851f0f94cfd974ce720c69
> 4407f9340b
>
> Did I miss a better approach? Does anyone else think this would be useful?
>
> Thanks for reading,
> Dave
>
>
>


[Structured Streaming] Deserializing avro messages from kafka source using schema registry

2018-02-09 Thread Bram
Hi,

I couldn't find any documentation about avro message deserialization using
pyspark structured streaming. My aim is using confluent schema registry to
get per topic schema then parse the avro messages with it.

I found one but it was using DirectStream approach
https://stackoverflow.com/questions/30339636/spark-python-avro-kafka-deserialiser

Can anyone show me how?

Thanks

Regards,

Abraham


[Structured Streaming] Commit protocol to move temp files to dest path only when complete, with code

2018-02-09 Thread Dave Cameron
Hi


I have a Spark structured streaming job that reads from Kafka and writes
parquet files to Hive/HDFS. The files are not very large, but the Kafka
source is noisy so each spark job takes a long time to complete. There is a
significant window during which the parquet files are incomplete and other
tools, including PrestoDB, encounter errors while trying to read them.

I wrote this list and stackoverflow about the problem last summer:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Structured-Streaming-truncated-Parquet-after-driver-crash-or-kill-tt29043.html
https://stackoverflow.com/questions/47337030/not-a-parquet-file-too-small-from-presto-during-spark-structured-streaming-r/47339805#47339805

After hesitating for a while, I wrote a custom commit protocol to solve the
problem. It combines HadoopMapReduceCommitProtocol's behavior of writing to
a temp file first, with ManifestFileCommitProtocol. From what I can tell
ManifestFileCommitProtocol is required for the normal Structured Streaming
behavior of being able to resume streams from a known point.

I think this commit protocol could be generally useful. Writing to a temp
file and moving it to the final location is low cost on HDFS and is the
standard behavior for non-streaming jobs, as implemented in
HadoopMapReduceCommitProtocol. At the same time ManifestFileCommitProtocol
is needed for structured streaming jobs. We have been running this for a
few months in production without problems.

Here is the code (at the moment not up to Spark standards, admittedly):
https://github.com/davcamer/spark/commit/361f1c69851f0f94cfd974ce720c694407f9340b

Did I miss a better approach? Does anyone else think this would be useful?

Thanks for reading,
Dave


Re: ML:One vs Rest with crossValidator for multinomial in logistic regression

2018-02-09 Thread Nicolas Paris
Brian

This is absolutely this problem.

Good to hear it will be fix in 2.3 release


Le 09 févr. 2018 à 02:17, Bryan Cutler écrivait :
> Nicolas, are you referring to printing the model params in that example with
> "print(model1.extractParamMap())"?  There was a problem with pyspark models 
> not
> having params after being fit, which causes this example to show nothing for
> model paramMaps.  This was fixed in https://issues.apache.org/jira/browse/
> SPARK-10931 and the example now shows all model params.  The fix will be in 
> the
> Spark 2.3 release.
> 
> Bryan
> 
> On Wed, Jan 31, 2018 at 10:20 PM, Nicolas Paris  wrote:
> 
> Hey
> 
> I am also interested in how to get those parameters.
> For example, the demo code spark-2.2.1-bin-hadoop2.7/examples/src/main/
> python/ml/estimator_transformer_param_example.py
> return empty parameters when  printing "lr.extractParamMap()"
> 
> That's weird
> 
> Thanks
> 
> Le 30 janv. 2018 à 23:10, Bryan Cutler écrivait :
> > Hi Michelle,
> >
> > Your original usage of ParamGridBuilder was not quite right, `addGrid`
> expects
> > (some parameter, array of values for that parameter).  If you want to do
> a grid
> > search with different regularization values, you would do the following:
> >
> > val paramMaps = new ParamGridBuilder().addGrid(logist.regParam, Array
> (0.1,
> > 0.3)).build()
> >
> > * don't forget to build the grid after adding values
> >
> > On Tue, Jan 30, 2018 at 6:55 AM, michelleyang <
> michelle1026sh...@gmail.com>
> > wrote:
> >
> >     I tried to use One vs Rest in spark ml with pipeline and
> crossValidator for
> >     multimultinomial in logistic regression.
> >
> >     It came out with empty coefficients. I figured out it was the 
> setting
> of
> >     ParamGridBuilder. Can anyone help me understand how does the
> parameter
> >     setting affect the crossValidator process?
> >
> >     the orginal code: //output empty coefficients.
> >
> >     val logist=new LogisticRegression
> >
> >     val ova = new OneVsRest().setClassifier(logist)
> >
> >     val paramMaps = new ParamGridBuilder().addGrid(ova.classifier,
> >     Array(logist.getRegParam))
> >
> >     New code://output multi classes coefficients
> >
> >     val logist=new LogisticRegression
> >
> >     val ova = new OneVsRest().setClassifier(logist)
> >
> >     val classifier1 = new LogisticRegression().setRegParam(2.0)
> >
> >     val classifier2 = new LogisticRegression().setRegParam(3.0)
> >
> >     val paramMaps = new ParamGridBuilder() .addGrid(ova.classifier,
> >     Array(classifier1, classifier2))
> >
> >     Please help Thanks.
> >
> >
> >
> >     --
> >     Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> >
> >     
> -
> >     To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
> >
> >
> 
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



NullPointerException issue in LDA.train()

2018-02-09 Thread Kevin Lam
Hi,

We're encountering an issue with training an LDA model in PySpark. The
issue is as follows:

- Running LDA on some large set of documents (12M, ~2-5kB each)
- Works fine for small subset of full set (100K - 1M)
- Hit a NullPointerException for full data set
- Running workload on google cloud dataproc

The following two issues I was able to find online appear relevant:
https://issues.apache.org/jira/browse/SPARK-299 (which may not have been
addressed as far as I can tell)
http://apache-spark-user-list.1001560.n3.nabble.com/MLLIB-LDA-throws-
NullPointerException-td26686.html

Also I've heavily followed the code outlined here:
http://sean.lane.sh/blog/2016/PySpark_and_LDA

Any ideas or help is appreciated!!

Thanks in advance,
Kevin

Example trace of output:

16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 8.0 in
>> stage 42.0 (TID 16163, 
>> royallda-20180209-152710-w-1.c.fathom-containers.internal,
>> executor 4): java.lang.NullPointerException
>
> at org.apache.spark.mllib.clustering.LDA$.computePTopic(LDA.scala:432)
>
> at org.apache.spark.mllib.clustering.EMLDAOptimizer$$
>> anonfun$5.apply(LDAOptimizer.scala:190)
>
> at org.apache.spark.mllib.clustering.EMLDAOptimizer$$
>> anonfun$5.apply(LDAOptimizer.scala:184)
>
> at org.apache.spark.graphx.impl.EdgePartition.aggregateMessagesEdgeScan(
>> EdgePartition.scala:409)
>
> at org.apache.spark.graphx.impl.GraphImpl$$anonfun$13$$
>> anonfun$apply$3.apply(GraphImpl.scala:237)
>
> at org.apache.spark.graphx.impl.GraphImpl$$anonfun$13$$
>> anonfun$apply$3.apply(GraphImpl.scala:207)
>
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>
> at org.apache.spark.util.collection.ExternalSorter.
>> insertAll(ExternalSorter.scala:199)
>
> at org.apache.spark.shuffle.sort.SortShuffleWriter.write(
>> SortShuffleWriter.scala:63)
>
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:96)
>
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:53)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1142)
>
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:748)
>
>
>>
>> [Stage 42:> (0 + 24) / 1000][Stage 43:=>   (104 + 0)
>> / 1000]18/02/09 16:22:55 ERROR org.apache.spark.scheduler.TaskSetManager:
>> Task 8 in stage 42.0 failed 4 times; aborting job
>
> Traceback (most recent call last):
>
>   File "/tmp/61801514-d562-433b-ac42-faa758c27b63/pipeline_launcher.py",
>> line 258, in 
>
> fire.Fire({'run_pipeline': run_pipeline})
>
>   File "/usr/local/lib/python3.6/dist-packages/fire/core.py", line 120,
>> in Fire
>
> component_trace = _Fire(component, args, context, name)
>
>   File "/usr/local/lib/python3.6/dist-packages/fire/core.py", line 358,
>> in _Fire
>
> component, remaining_args)
>
>   File "/usr/local/lib/python3.6/dist-packages/fire/core.py", line 561,
>> in _CallCallable
>
> result = fn(*varargs, **kwargs)
>
>   File "/tmp/61801514-d562-433b-ac42-faa758c27b63/pipeline_launcher.py",
>> line 77, in run_pipeline
>
> run_pipeline_local(pipeline_id, **kwargs)
>
>   File "/tmp/61801514-d562-433b-ac42-faa758c27b63/pipeline_launcher.py",
>> line 94, in run_pipeline_local
>
> pipeline.run(**kwargs)
>
>   File 
> "/tmp/61801514-d562-433b-ac42-faa758c27b63/diseaseTools.zip/spark/pipelines/royal_lda.py",
>> line 142, in run
>
>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/mllib/clustering.py",
>> line 1039, in train
>
>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/mllib/common.py",
>> line 130, in callMLlibFunc
>
>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/mllib/common.py",
>> line 123, in callJavaFunc
>
>   File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py",
>> line 1133, in __call__
>
>   File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py",
>> line 319, in get_return_value
>
> py4j.protocol.Py4JJavaError: An error occurred while calling
>> o161.trainLDAModel.
>
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 8 in stage 42.0 failed 4 times, most recent failure: Lost

Re: PySpark Tweedie GLM

2018-02-09 Thread Bryan Cutler
Can you provide some code/data to reproduce the problem?

On Fri, Feb 9, 2018 at 9:42 AM, nhamwey 
wrote:

> I am using Spark 2.2.0 through Python.
>
> I am repeatedly getting a zero weight of sums error when trying to run a
> model. This happens even when I do not specify a defined weightCol =
> "variable"
>
> Py4JJavaError: An error occurred while calling o1295.fit.
> : java.lang.AssertionError: assertion failed: Sum of weights cannot be
> zero.
> at scala.Predef$.assert(Predef.scala:170)
> at
> org.apache.spark.ml.optim.WeightedLeastSquares$Aggregator.validate(
> WeightedLeastSquares.scala:418)
> at
> org.apache.spark.ml.optim.WeightedLeastSquares.fit(
> WeightedLeastSquares.scala:101)
> at
> org.apache.spark.ml.optim.IterativelyReweightedLeastSquares.fit(
> IterativelyReweightedLeastSquares.scala:86)
> at
> org.apache.spark.ml.regression.GeneralizedLinearRegression.train(
> GeneralizedLinearRegression.scala:369)
> at
> org.apache.spark.ml.regression.GeneralizedLinearRegression.train(
> GeneralizedLinearRegression.scala:203)
> at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:
> 62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at py4j.reflection.ReflectionEngine.invoke(
> ReflectionEngine.java:357)
> at py4j.Gateway.invoke(Gateway.java:280)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.
> java:132)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:214)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Dataframe and HIVE

2018-02-09 Thread Nicholas Hakobian
Its possible that the format of your table is not compatible with your
version of hive, so Spark saved it in a way such that only Spark can read
it. When this happens it prints out a very visible warning letting you know
this has happened.

We've seen it most frequently when trying to save a parquet file with a
column in date format into a Hive table. In older versions of hive, its
parquet reader/writer did not support Date formats (among a couple others).

Nicholas Szandor Hakobian, Ph.D.
Staff Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com


On Fri, Feb 9, 2018 at 9:59 AM, Prakash Joshi 
wrote:

> Ravi,
>
> Can you send the result of
> Show create table your_table_name
>
> Thanks
> Prakash
>
> On Feb 9, 2018 8:20 PM, "☼ R Nair (रविशंकर नायर)" <
> ravishankar.n...@gmail.com> wrote:
>
>> All,
>>
>> It has been three days continuously I am on this issue. Not getting any
>> clue.
>>
>> Environment: Spark 2.2.x, all configurations are correct. hive-site.xml
>> is in spark's conf.
>>
>> 1) Step 1: I created a data frame DF1 reading a csv file.
>>
>> 2) Did  manipulations on DF1. Resulting frame is passion_df.
>>
>> 3) passion_df.write.format("orc").saveAsTable("sampledb.passion")
>>
>> 4) The metastore shows the hive table., when I do "show tables" in HIVE,
>> I can see table name
>>
>> 5) I can't select in HIVE, though I can select from SPARK as
>> spark.sql("select * from sampledb.passion")
>>
>> Whats going on here? Please help. Why I am not seeing data from HIVE
>> prompt?
>> The "describe formatted " command on the table in HIVE shows he data is
>> is in default warehouse location ( /user/hive/warehouse) since I set it.
>>
>> I am not getting any definite answer anywhere. Many suggestions and
>> answers given in Stackoverflow et al.Nothing really works.
>>
>> So asking experts here for some light on this, thanks
>>
>> Best,
>> Ravion
>>
>>
>>


Re: Spark Dataframe and HIVE

2018-02-09 Thread Prakash Joshi
Ravi,

Can you send the result of
Show create table your_table_name

Thanks
Prakash

On Feb 9, 2018 8:20 PM, "☼ R Nair (रविशंकर नायर)" <
ravishankar.n...@gmail.com> wrote:

> All,
>
> It has been three days continuously I am on this issue. Not getting any
> clue.
>
> Environment: Spark 2.2.x, all configurations are correct. hive-site.xml is
> in spark's conf.
>
> 1) Step 1: I created a data frame DF1 reading a csv file.
>
> 2) Did  manipulations on DF1. Resulting frame is passion_df.
>
> 3) passion_df.write.format("orc").saveAsTable("sampledb.passion")
>
> 4) The metastore shows the hive table., when I do "show tables" in HIVE, I
> can see table name
>
> 5) I can't select in HIVE, though I can select from SPARK as
> spark.sql("select * from sampledb.passion")
>
> Whats going on here? Please help. Why I am not seeing data from HIVE
> prompt?
> The "describe formatted " command on the table in HIVE shows he data is is
> in default warehouse location ( /user/hive/warehouse) since I set it.
>
> I am not getting any definite answer anywhere. Many suggestions and
> answers given in Stackoverflow et al.Nothing really works.
>
> So asking experts here for some light on this, thanks
>
> Best,
> Ravion
>
>
>


PySpark Tweedie GLM

2018-02-09 Thread nhamwey
I am using Spark 2.2.0 through Python.

I am repeatedly getting a zero weight of sums error when trying to run a
model. This happens even when I do not specify a defined weightCol =
"variable"

Py4JJavaError: An error occurred while calling o1295.fit.
: java.lang.AssertionError: assertion failed: Sum of weights cannot be zero.
at scala.Predef$.assert(Predef.scala:170)
at
org.apache.spark.ml.optim.WeightedLeastSquares$Aggregator.validate(WeightedLeastSquares.scala:418)
at
org.apache.spark.ml.optim.WeightedLeastSquares.fit(WeightedLeastSquares.scala:101)
at
org.apache.spark.ml.optim.IterativelyReweightedLeastSquares.fit(IterativelyReweightedLeastSquares.scala:86)
at
org.apache.spark.ml.regression.GeneralizedLinearRegression.train(GeneralizedLinearRegression.scala:369)
at
org.apache.spark.ml.regression.GeneralizedLinearRegression.train(GeneralizedLinearRegression.scala:203)
at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Dataframe and HIVE

2018-02-09 Thread Gourav Sengupta
Hi Ravi,

can you please post the entire code?

Regards,
Gourav

On Fri, Feb 9, 2018 at 3:39 PM, Patrick Alwell 
wrote:

> Might sound silly, but are you using a Hive context?
>
> What errors do the Hive query results return?
>
>
>
> spark = SparkSession.builder.enableHiveSupport().getOrCreate()
>
>
>
> The second part of your questions, you are creating a temp table and then
> subsequently creating another table from that temp view. Doesn’t seem like
> you are reading the table from the spark or hive warehouse.
>
>
>
> This works fine for me; albeit I was using spark thrift to communicate
> with my directory of choice.
>
>
>
> *from pyspark import SparkContext*
>
> *from pyspark.sql import SparkSession, Row, types*
>
> *from pyspark.sql.types import **
>
> *from pyspark.sql import functions as f*
>
> *from decimal import **
>
> *from datetime import datetime*
>
>
>
> *# instantiate our sparkSession and context*
>
> *spark = SparkSession.builder.enableHiveSupport().getOrCreate()*
>
> *sc = spark.sparkContext*
>
>
>
> *# Generating customer orc table files*
>
> *# load raw data as an RDD*
>
> *customer_data = sc.textFile("/data/tpch/customer.tbl")*
>
> *# map the data into an RDD split with pipe delimitations*
>
> *customer_split = customer_data.map(lambda l: l.split("|"))*
>
> *# map the split data with a row method; this is where we specificy column
> names and types*
>
> *# default type is string- UTF8*
>
> *# there are issues with converting string to date and these issues have
> been addressed*
>
> *# in those tables with dates: See notes below*
>
> *customer_row = customer_split.map( lambda r: Row(*
>
> *custkey=long(r[0]),*
>
> *name=r[1],*
>
> *address=r[2],*
>
> *nationkey=long(r[3]),*
>
> *phone=r[4],*
>
> *acctbal=Decimal(r[5]),*
>
> *mktsegment=r[6],*
>
> *comment=r[7]*
>
> *))*
>
>
>
> *# we can have Spark infer the schema, or apply a strict schema and
> identify whether or not we want null values*
>
> *# in this case we don't want null values for keys; and we want explicit
> data types to support the TPCH tables/ data model*
>
> *customer_schema = types.StructType([*
>
> *   types.StructField('custkey',types.LongType(),False)*
>
> *   ,types.StructField('name',types.StringType())*
>
> *   ,types.StructField('address',types.StringType())*
>
> *   ,types.StructField('nationkey',types.LongType(),False)*
>
> *   ,types.StructField('phone',types.StringType())*
>
> *   ,types.StructField('acctbal',types.DecimalType())*
>
> *   ,types.StructField('mktsegment',types.StringType())*
>
> *   ,types.StructField('comment',types.StringType())])*
>
>
>
> *# we create a dataframe object by referencing our sparkSession class and
> the createDataFrame method*
>
> *# this method takes two arguments by default (row, schema)*
>
> *customer_df = spark.createDataFrame(customer_row,customer_schema)*
>
>
>
> *# we can now write a file of type orc by referencing our dataframe object
> we created*
>
> *customer_df.write.orc("/data/tpch/customer.orc")*
>
>
>
> *# read that same file we created but create a seperate dataframe object*
>
> *customer_df_orc = spark.read.orc("/data/tpch/customer.orc")*
>
>
>
> *# reference the newly created dataframe object and create a tempView for
> QA purposes*
>
> *customer_df_orc.createOrReplaceTempView("customer")*
>
>
>
> *# reference the sparkSession class and SQL method in order to issue SQL
> statements to the materialized view*
>
> *spark.sql("SELECT * FROM customer LIMIT 10").show()*
>
>
>
> *From: *"☼ R Nair (रविशंकर नायर)" 
> *Date: *Friday, February 9, 2018 at 7:03 AM
> *To: *"user @spark/'user @spark'/spark users/user@spark" <
> user@spark.apache.org>
> *Subject: *Re: Spark Dataframe and HIVE
>
>
>
> An update: (Sorry I missed)
>
>
>
> When I do
>
>
>
> passion_df.createOrReplaceTempView("sampleview")
>
>
>
> spark.sql("create table sample table as select * from sample view")
>
>
>
> Now, I can see table and can query as well.
>
>
>
> So why this do work from Spark and other method discussed below is not?
>
>
>
> Thanks
>
>
>
>
>
>
>
> On Fri, Feb 9, 2018 at 9:49 AM, ☼ R Nair (रविशंकर नायर) <
> ravishankar.n...@gmail.com> wrote:
>
> All,
>
>
>
> It has been three days continuously I am on this issue. Not getting any
> clue.
>
>
>
> Environment: Spark 2.2.x, all configurations are correct. hive-site.xml is
> in spark's conf.
>
>
>
> 1) Step 1: I created a data frame DF1 reading a csv file.
>
>
>
> 2) Did  manipulations on DF1. Resulting frame is passion_df.
>
>
>
> 3) passion_df.write.format("orc").saveAsTable("sampledb.passion")
>
>
>
> 4) The metastore shows the hive table., when I do "show tables" in HIVE, I
> can see table name
>
>
>
> 5) I can't select in HIVE, though I can select from SPARK as
> spark.sql("select * from sampledb.passion")
>
>
>
> Whats going on here? Please help. Why I am not seeing data from HIVE
> prompt?
>
> The "describe formatted " 

Re: Spark Dataframe and HIVE

2018-02-09 Thread Patrick Alwell
Might sound silly, but are you using a Hive context?
What errors do the Hive query results return?

spark = SparkSession.builder.enableHiveSupport().getOrCreate()

The second part of your questions, you are creating a temp table and then 
subsequently creating another table from that temp view. Doesn’t seem like you 
are reading the table from the spark or hive warehouse.

This works fine for me; albeit I was using spark thrift to communicate with my 
directory of choice.

from pyspark import SparkContext
from pyspark.sql import SparkSession, Row, types
from pyspark.sql.types import *
from pyspark.sql import functions as f
from decimal import *
from datetime import datetime

# instantiate our sparkSession and context
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
sc = spark.sparkContext

# Generating customer orc table files
# load raw data as an RDD
customer_data = sc.textFile("/data/tpch/customer.tbl")
# map the data into an RDD split with pipe delimitations
customer_split = customer_data.map(lambda l: l.split("|"))
# map the split data with a row method; this is where we specificy column names 
and types
# default type is string- UTF8
# there are issues with converting string to date and these issues have been 
addressed
# in those tables with dates: See notes below
customer_row = customer_split.map( lambda r: Row(
custkey=long(r[0]),
name=r[1],
address=r[2],
nationkey=long(r[3]),
phone=r[4],
acctbal=Decimal(r[5]),
mktsegment=r[6],
comment=r[7]
))

# we can have Spark infer the schema, or apply a strict schema and identify 
whether or not we want null values
# in this case we don't want null values for keys; and we want explicit data 
types to support the TPCH tables/ data model
customer_schema = types.StructType([
   types.StructField('custkey',types.LongType(),False)
   ,types.StructField('name',types.StringType())
   ,types.StructField('address',types.StringType())
   ,types.StructField('nationkey',types.LongType(),False)
   ,types.StructField('phone',types.StringType())
   ,types.StructField('acctbal',types.DecimalType())
   ,types.StructField('mktsegment',types.StringType())
   ,types.StructField('comment',types.StringType())])

# we create a dataframe object by referencing our sparkSession class and the 
createDataFrame method
# this method takes two arguments by default (row, schema)
customer_df = spark.createDataFrame(customer_row,customer_schema)

# we can now write a file of type orc by referencing our dataframe object we 
created
customer_df.write.orc("/data/tpch/customer.orc")

# read that same file we created but create a seperate dataframe object
customer_df_orc = spark.read.orc("/data/tpch/customer.orc")

# reference the newly created dataframe object and create a tempView for QA 
purposes
customer_df_orc.createOrReplaceTempView("customer")

# reference the sparkSession class and SQL method in order to issue SQL 
statements to the materialized view
spark.sql("SELECT * FROM customer LIMIT 10").show()

From: "☼ R Nair (रविशंकर नायर)" 
Date: Friday, February 9, 2018 at 7:03 AM
To: "user @spark/'user @spark'/spark users/user@spark" 
Subject: Re: Spark Dataframe and HIVE

An update: (Sorry I missed)

When I do

passion_df.createOrReplaceTempView("sampleview")

spark.sql("create table sample table as select * from sample view")

Now, I can see table and can query as well.

So why this do work from Spark and other method discussed below is not?

Thanks



On Fri, Feb 9, 2018 at 9:49 AM, ☼ R Nair (रविशंकर नायर) 
> wrote:
All,

It has been three days continuously I am on this issue. Not getting any clue.

Environment: Spark 2.2.x, all configurations are correct. hive-site.xml is in 
spark's conf.

1) Step 1: I created a data frame DF1 reading a csv file.

2) Did  manipulations on DF1. Resulting frame is passion_df.

3) passion_df.write.format("orc").saveAsTable("sampledb.passion")

4) The metastore shows the hive table., when I do "show tables" in HIVE, I can 
see table name

5) I can't select in HIVE, though I can select from SPARK as spark.sql("select 
* from sampledb.passion")

Whats going on here? Please help. Why I am not seeing data from HIVE prompt?
The "describe formatted " command on the table in HIVE shows he data is is in 
default warehouse location ( /user/hive/warehouse) since I set it.

I am not getting any definite answer anywhere. Many suggestions and answers 
given in Stackoverflow et al.Nothing really works.

So asking experts here for some light on this, thanks

Best,
Ravion





--
[mage removed by sender.]


Re: Spark Dataframe and HIVE

2018-02-09 Thread रविशंकर नायर
An update: (Sorry I missed)

When I do

passion_df.createOrReplaceTempView("sampleview")

spark.sql("create table sample table as select * from sample view")

Now, I can see table and can query as well.

So why this do work from Spark and other method discussed below is not?

Thanks



On Fri, Feb 9, 2018 at 9:49 AM, ☼ R Nair (रविशंकर नायर) <
ravishankar.n...@gmail.com> wrote:

> All,
>
> It has been three days continuously I am on this issue. Not getting any
> clue.
>
> Environment: Spark 2.2.x, all configurations are correct. hive-site.xml is
> in spark's conf.
>
> 1) Step 1: I created a data frame DF1 reading a csv file.
>
> 2) Did  manipulations on DF1. Resulting frame is passion_df.
>
> 3) passion_df.write.format("orc").saveAsTable("sampledb.passion")
>
> 4) The metastore shows the hive table., when I do "show tables" in HIVE, I
> can see table name
>
> 5) I can't select in HIVE, though I can select from SPARK as
> spark.sql("select * from sampledb.passion")
>
> Whats going on here? Please help. Why I am not seeing data from HIVE
> prompt?
> The "describe formatted " command on the table in HIVE shows he data is is
> in default warehouse location ( /user/hive/warehouse) since I set it.
>
> I am not getting any definite answer anywhere. Many suggestions and
> answers given in Stackoverflow et al.Nothing really works.
>
> So asking experts here for some light on this, thanks
>
> Best,
> Ravion
>
>
>


--


Spark Dataframe and HIVE

2018-02-09 Thread रविशंकर नायर
All,

It has been three days continuously I am on this issue. Not getting any
clue.

Environment: Spark 2.2.x, all configurations are correct. hive-site.xml is
in spark's conf.

1) Step 1: I created a data frame DF1 reading a csv file.

2) Did  manipulations on DF1. Resulting frame is passion_df.

3) passion_df.write.format("orc").saveAsTable("sampledb.passion")

4) The metastore shows the hive table., when I do "show tables" in HIVE, I
can see table name

5) I can't select in HIVE, though I can select from SPARK as
spark.sql("select * from sampledb.passion")

Whats going on here? Please help. Why I am not seeing data from HIVE prompt?
The "describe formatted " command on the table in HIVE shows he data is is
in default warehouse location ( /user/hive/warehouse) since I set it.

I am not getting any definite answer anywhere. Many suggestions and answers
given in Stackoverflow et al.Nothing really works.

So asking experts here for some light on this, thanks

Best,
Ravion


H2O ML use

2018-02-09 Thread Mich Talebzadeh
Hi,

Has anyone had experience of using the enterprise version of H2O by any
chance?

How does it compare with other tools like Cloudera Data Science Workbench
please?

thanks


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Unsubscribe

2018-02-09 Thread wangsan


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org