Re: [Structured Streaming] More than 1 streaming in a code

2018-04-13 Thread spark receiver
Hi Panagiotis ,

Wondering you solved the problem or not? Coz I met the same issue today. I’d 
appreciate  so much if you could paste the code snippet  if it’s working .

Thanks.


> 在 2018年4月6日,上午7:40,Aakash Basu  写道:
> 
> Hi Panagiotis,
> 
> I did that, but it still prints the result of the first query and awaits for 
> new data, doesn't even goes to the next one.
> 
> Data -
> 
> $ nc -lk 9998
> 
> 1,2
> 3,4
> 5,6
> 7,8
> 
> Result -
> 
> ---
> Batch: 0
> ---
> ++
> |aver|
> ++
> | 3.0|
> ++
> 
> ---
> Batch: 1
> ---
> ++
> |aver|
> ++
> | 4.0|
> ++
> 
> 
> Updated Code -
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import split
> 
> spark = SparkSession \
> .builder \
> .appName("StructuredNetworkWordCount") \
> .getOrCreate()
> 
> data = spark \
> .readStream \
> .format("socket") \
> .option("header","true") \
> .option("host", "localhost") \
> .option("port", 9998) \
> .load("csv")
> 
> 
> id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), 
> split(data.value, ",").getItem(1).alias("col2"))
> 
> id_DF.createOrReplaceTempView("ds")
> 
> df = spark.sql("select avg(col1) as aver from ds")
> 
> df.createOrReplaceTempView("abcd")
> 
> wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 
> from ds")  # (select aver from abcd)
> 
> query2 = df \
> .writeStream \
> .format("console") \
> .outputMode("complete") \
> .trigger(processingTime='5 seconds') \
> .start()
> 
> query = wordCounts \
> .writeStream \
> .format("console") \
> .trigger(processingTime='5 seconds') \
> .start()
> 
> spark.streams.awaitAnyTermination()
> 
> 
> Thanks,
> Aakash.
> 
> On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis  > wrote:
> Hello Aakash,
> 
> When you use query.awaitTermination you are pretty much blocking there 
> waiting for the current query to stop or throw an exception. In your case the 
> second query will not even start.
> What you could do instead is remove all the blocking calls and use 
> spark.streams.awaitAnyTermination instead (waiting for either query1 or 
> query2 to terminate). Make sure you do that after the query2.start call.
> 
> I hope this helps.
> 
> Cheers,
> Panagiotis
> 
> On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu  > wrote:
> Any help?
> 
> Need urgent help. Someone please clarify the doubt?
> 
> -- Forwarded message --
> From: Aakash Basu  >
> Date: Thu, Apr 5, 2018 at 3:18 PM
> Subject: [Structured Streaming] More than 1 streaming in a code
> To: user >
> 
> 
> Hi,
> 
> If I have more than one writeStream in a code, which operates on the same 
> readStream data, why does it produce only the first writeStream? I want the 
> second one to be also printed on the console.
> 
> How to do that?
> 
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import split, col
> 
> class test:
> 
> 
> spark = SparkSession.builder \
> .appName("Stream_Col_Oper_Spark") \
> .getOrCreate()
> 
> data = spark.readStream.format("kafka") \
> .option("startingOffsets", "latest") \
> .option("kafka.bootstrap.servers", "localhost:9092") \
> .option("subscribe", "test1") \
> .load()
> 
> ID = data.select('value') \
> .withColumn('value', data.value.cast("string")) \
> .withColumn("Col1", split(col("value"), ",").getItem(0)) \
> .withColumn("Col2", split(col("value"), ",").getItem(1)) \
> .drop('value')
> 
> ID.createOrReplaceTempView("transformed_Stream_DF")
> 
> df = spark.sql("select avg(col1) as aver from transformed_Stream_DF")
> 
> df.createOrReplaceTempView("abcd")
> 
> wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) 
> col3 from transformed_Stream_DF")
> 
> 
> # ---#
> 
> query1 = df \
> .writeStream \
> .format("console") \
> .outputMode("complete") \
> .trigger(processingTime='3 seconds') \
> .start()
> 
> query1.awaitTermination()
> # ---#
> 
> query2 = wordCounts \
> .writeStream \
> .format("console") \
> .trigger(processingTime='3 seconds') \
> .start()
> 
> query2.awaitTermination()
> 
> # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit 
> --packages 
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3
>  
> 

avoid duplicate records when appending new data to a parquet

2018-04-13 Thread Lian Jiang
I have a parquet which has an id field which is the hash of the composite
key fields. Is it possible to maintain the uniqueness of the id field when
appending new data which may duplicate with existing records in the
parquet? Thanks!


Re: Structured Streaming on Kubernetes

2018-04-13 Thread Anirudh Ramanathan
+ozzieba who was experimenting with streaming workloads recently. +1 to
what Matt said. Checkpointing and driver recovery is future work.
Structured streaming is important, and it would be good to get some
production experiences here and try and target improving the feature's
support on K8s for 2.4/3.0.


On Fri, Apr 13, 2018 at 11:55 AM Matt Cheah  wrote:

> We don’t provide any Kubernetes-specific mechanisms for streaming, such as
> checkpointing to persistent volumes. But as long as streaming doesn’t
> require persisting to the executor’s local disk, streaming ought to work
> out of the box. E.g. you can checkpoint to HDFS, but not to the pod’s local
> directories.
>
>
>
> However, I’m unaware of any specific use of streaming with the Spark on
> Kubernetes integration right now. Would be curious to get feedback on the
> failover behavior right now.
>
>
>
> -Matt Cheah
>
>
>
> *From: *Tathagata Das 
> *Date: *Friday, April 13, 2018 at 1:27 AM
> *To: *Krishna Kalyan 
> *Cc: *user 
> *Subject: *Re: Structured Streaming on Kubernetes
>
>
>
> Structured streaming is stable in production! At Databricks, we and our
> customers collectively process almost 100s of billions of records per day
> using SS. However, we are not using kubernetes :)
>
>
>
> Though I don't think it will matter too much as long as kubes are
> correctly provisioned+configured and you are checkpointing to HDFS (for
> fault-tolerance guarantees).
>
>
>
> TD
>
>
>
> On Fri, Apr 13, 2018, 12:28 AM Krishna Kalyan 
> wrote:
>
> Hello All,
>
> We were evaluating Spark Structured Streaming on Kubernetes (Running on
> GCP). It would be awesome if the spark community could share their
> experience around this. I would like to know more about you production
> experience and the monitoring tools you are using.
>
>
>
> Since spark on kubernetes is a relatively new addition to spark, I was
> wondering if structured streaming is stable in production. We were also
> evaluating Apache Beam with Flink.
>
>
>
> Regards,
>
> Krishna
>
>
>
>
>
>

-- 
Anirudh Ramanathan


Re: Spark parse fixed length file [Java]

2018-04-13 Thread Georg Heiler
I am not 100% sure if spark is smart enough to achieve this using a single
pass over the data. If not you could create a java udf for this which
correctly parses all the columns at once.


Otherwise you could enable Tungsten off heap memory which might speed
things up.
lsn24  schrieb am Fr. 13. Apr. 2018 um 19:02:

> Hello,
>
>  We are running into issues while trying to process fixed length files
> using
> spark.
>
> The approach we took is as follows:
>
> 1. Read the .bz2 file  into a dataset from hdfs using
> spark.read().textFile() API.Create a temporary view.
>
>  Dataset rawDataset = sparkSession.read().textFile(filePath);
>  rawDataset.createOrReplaceTempView(tempView);
>
> 2. Run a sql query on the view, to slice and dice the data the way we need
> it (using substring).
>
>  (SELECT
>  TRIM(SUBSTRING(value,1 ,16)) AS record1 ,
>  TRIM(SUBSTRING(value,17 ,8)) AS record2 ,
>  TRIM(SUBSTRING(value,25 ,5)) AS record3 ,
>  TRIM(SUBSTRING(value,30 ,16)) AS record4 ,
>  CAST(SUBSTRING(value,46 ,8) AS BIGINT) AS record5 ,
>  CAST(SUBSTRING(value,54 ,6) AS BIGINT) AS record6 ,
>  CAST(SUBSTRING(value,60 ,3) AS BIGINT) AS record7 ,
>  CAST(SUBSTRING(value,63 ,6) AS BIGINT) AS record8 ,
>  TRIM(SUBSTRING(value,69 ,20)) AS record9 ,
>  TRIM(SUBSTRING(value,89 ,40)) AS record10 ,
>  TRIM(SUBSTRING(value,129 ,32)) AS record11 ,
>  TRIM(SUBSTRING(value,161 ,19)) AS record12,
>  TRIM(SUBSTRING(value,180 ,1)) AS record13 ,
>  TRIM(SUBSTRING(value,181 ,9)) AS record14 ,
>  TRIM(SUBSTRING(value,190 ,3)) AS record15 ,
>  CAST(SUBSTRING(value,193 ,8) AS BIGINT) AS record16 ,
>  CAST(SUBSTRING(value,201 ,8) AS BIGINT) AS record17
>  FROM tempView)
>
> 3.Write the output of sql query to a parquet file.
>  loadDataset.write().mode(SaveMode.Append).parquet(outputDirectory);
>
> Problem :
>
>   The step #2 takes a longer time , if the length of line is ~2000
> characters. If each line in the file is only 1000 characters then it takes
> only 4 minutes to process 20 million lines. If we increase the line length
> to 2000 characters it takes 20 minutes to process 20 million lines.
>
>
> Is there a better way in spark to parse fixed length lines?
>
>
> *Note: *Spark version we use is 2.2.0 and we are using  Spark with Java.
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Structured Streaming on Kubernetes

2018-04-13 Thread Matt Cheah
We don’t provide any Kubernetes-specific mechanisms for streaming, such as 
checkpointing to persistent volumes. But as long as streaming doesn’t require 
persisting to the executor’s local disk, streaming ought to work out of the 
box. E.g. you can checkpoint to HDFS, but not to the pod’s local directories.

 

However, I’m unaware of any specific use of streaming with the Spark on 
Kubernetes integration right now. Would be curious to get feedback on the 
failover behavior right now.

 

-Matt Cheah

 

From: Tathagata Das 
Date: Friday, April 13, 2018 at 1:27 AM
To: Krishna Kalyan 
Cc: user 
Subject: Re: Structured Streaming on Kubernetes

 

Structured streaming is stable in production! At Databricks, we and our 
customers collectively process almost 100s of billions of records per day using 
SS. However, we are not using kubernetes :) 

 

Though I don't think it will matter too much as long as kubes are correctly 
provisioned+configured and you are checkpointing to HDFS (for fault-tolerance 
guarantees).

 

TD

 

On Fri, Apr 13, 2018, 12:28 AM Krishna Kalyan  wrote:

Hello All, 

We were evaluating Spark Structured Streaming on Kubernetes (Running on GCP). 
It would be awesome if the spark community could share their experience around 
this. I would like to know more about you production experience and the 
monitoring tools you are using.

 

Since spark on kubernetes is a relatively new addition to spark, I was 
wondering if structured streaming is stable in production. We were also 
evaluating Apache Beam with Flink.

 

Regards,

Krishna

 

 



smime.p7s
Description: S/MIME cryptographic signature


Re: Spark LOCAL mode and external jar (extraClassPath)

2018-04-13 Thread Jason Boorn
Hi Gene - 

Are you saying that I just need to figure out how to get the Alluxio jar into 
the classpath of my parent application?  If it shows up in the classpath then 
Spark will automatically know that it needs to use it when communicating with 
Alluxio?

Apologies for going back-and-forth on this - I feel like my particular use case 
is clouding what is already a tricky issue.

> On Apr 13, 2018, at 2:26 PM, Gene Pang  wrote:
> 
> Hi Jason,
> 
> Alluxio does work with Spark in master=local mode. This is because both 
> spark-submit and spark-shell have command-line options to set the classpath 
> for the JVM that is being started.
> 
> If you are not using spark-submit or spark-shell, you will have to figure out 
> how to configure that JVM instance with the proper properties.
> 
> Thanks,
> Gene
> 
> On Fri, Apr 13, 2018 at 10:47 AM, Jason Boorn  > wrote:
> Ok thanks - I was basing my design on this:
> 
> https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html
>  
> 
> 
> Wherein it says:
> Once the SparkSession is instantiated, you can configure Spark’s runtime 
> config properties. 
> Apparently the suite of runtime configs you can change does not include 
> classpath.  
> 
> So the answer to my original question is basically this:
> 
> When using local (pseudo-cluster) mode, there is no way to add external jars 
> to the spark instance.  This means that Alluxio will not work with Spark when 
> Spark is run in master=local mode.
> 
> Thanks again - often getting a definitive “no” is almost as good as a yes.  
> Almost ;)
> 
>> On Apr 13, 2018, at 1:21 PM, Marcelo Vanzin > > wrote:
>> 
>> There are two things you're doing wrong here:
>> 
>> On Thu, Apr 12, 2018 at 6:32 PM, jb44 > > wrote:
>>> Then I can add the alluxio client library like so:
>>> sparkSession.conf.set("spark.driver.extraClassPath", ALLUXIO_SPARK_CLIENT)
>> 
>> First one, you can't modify JVM configuration after it has already
>> started. So this line does nothing since it can't re-launch your
>> application with a new JVM.
>> 
>>> sparkSession.conf.set("spark.executor.extraClassPath", ALLUXIO_SPARK_CLIENT)
>> 
>> There is a lot of configuration that you cannot set after the
>> application has already started. For example, after the session is
>> created, most probably this option will be ignored, since executors
>> will already have started.
>> 
>> I'm not so sure about what happens when you use dynamic allocation,
>> but these post-hoc config changes in general are not expected to take
>> effect.
>> 
>> The documentation could be clearer about this (especially stuff that
>> only applies to spark-submit), but that's the gist of it.
>> 
>> 
>> -- 
>> Marcelo
> 
> 



Re: Spark LOCAL mode and external jar (extraClassPath)

2018-04-13 Thread Gene Pang
Hi Jason,

Alluxio does work with Spark in master=local mode. This is because both
spark-submit and spark-shell have command-line options to set the classpath
for the JVM that is being started.

If you are not using spark-submit or spark-shell, you will have to figure
out how to configure that JVM instance with the proper properties.

Thanks,
Gene

On Fri, Apr 13, 2018 at 10:47 AM, Jason Boorn  wrote:

> Ok thanks - I was basing my design on this:
>
> https://databricks.com/blog/2016/08/15/how-to-use-
> sparksession-in-apache-spark-2-0.html
>
> Wherein it says:
> Once the SparkSession is instantiated, you can configure Spark’s runtime
> config properties.
> Apparently the suite of runtime configs you can change does not include
> classpath.
>
> So the answer to my original question is basically this:
>
> When using local (pseudo-cluster) mode, there is no way to add external
> jars to the spark instance.  This means that Alluxio will not work with
> Spark when Spark is run in master=local mode.
>
> Thanks again - often getting a definitive “no” is almost as good as a
> yes.  Almost ;)
>
> On Apr 13, 2018, at 1:21 PM, Marcelo Vanzin  wrote:
>
> There are two things you're doing wrong here:
>
> On Thu, Apr 12, 2018 at 6:32 PM, jb44  wrote:
>
> Then I can add the alluxio client library like so:
> sparkSession.conf.set("spark.driver.extraClassPath", ALLUXIO_SPARK_CLIENT)
>
>
> First one, you can't modify JVM configuration after it has already
> started. So this line does nothing since it can't re-launch your
> application with a new JVM.
>
> sparkSession.conf.set("spark.executor.extraClassPath",
> ALLUXIO_SPARK_CLIENT)
>
>
> There is a lot of configuration that you cannot set after the
> application has already started. For example, after the session is
> created, most probably this option will be ignored, since executors
> will already have started.
>
> I'm not so sure about what happens when you use dynamic allocation,
> but these post-hoc config changes in general are not expected to take
> effect.
>
> The documentation could be clearer about this (especially stuff that
> only applies to spark-submit), but that's the gist of it.
>
>
> --
> Marcelo
>
>
>


Re: Spark LOCAL mode and external jar (extraClassPath)

2018-04-13 Thread Jason Boorn
Ok thanks - I was basing my design on this:

https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html
 


Wherein it says:
Once the SparkSession is instantiated, you can configure Spark’s runtime config 
properties. 
Apparently the suite of runtime configs you can change does not include 
classpath.  

So the answer to my original question is basically this:

When using local (pseudo-cluster) mode, there is no way to add external jars to 
the spark instance.  This means that Alluxio will not work with Spark when 
Spark is run in master=local mode.

Thanks again - often getting a definitive “no” is almost as good as a yes.  
Almost ;)

> On Apr 13, 2018, at 1:21 PM, Marcelo Vanzin  wrote:
> 
> There are two things you're doing wrong here:
> 
> On Thu, Apr 12, 2018 at 6:32 PM, jb44  wrote:
>> Then I can add the alluxio client library like so:
>> sparkSession.conf.set("spark.driver.extraClassPath", ALLUXIO_SPARK_CLIENT)
> 
> First one, you can't modify JVM configuration after it has already
> started. So this line does nothing since it can't re-launch your
> application with a new JVM.
> 
>> sparkSession.conf.set("spark.executor.extraClassPath", ALLUXIO_SPARK_CLIENT)
> 
> There is a lot of configuration that you cannot set after the
> application has already started. For example, after the session is
> created, most probably this option will be ignored, since executors
> will already have started.
> 
> I'm not so sure about what happens when you use dynamic allocation,
> but these post-hoc config changes in general are not expected to take
> effect.
> 
> The documentation could be clearer about this (especially stuff that
> only applies to spark-submit), but that's the gist of it.
> 
> 
> -- 
> Marcelo



Re: Live Stream Code Reviews :)

2018-04-13 Thread Holden Karau
Thank you :)

Just a reminder this is going to start in under 20 minutes. If anyone has a
PR they'd live reviewed please respond and I'll add it to the list
(otherwise I'll go stick to the normal list of folks who have opted in to
live reviews).

On Thu, Apr 12, 2018 at 2:08 PM, Gourav Sengupta 
wrote:

> Hi,
>
> This is definitely one of the best messages ever in this group. The videos
> are absolutely fantastic in case anyone is trying to learn about
> contributing to SPARK, I had been through one of them. Just trying to
> repeat the steps in the video (without of course doing anything really
> stupid) makes a person learn quite a lot.
>
> Thanks a ton, Holden for the great help.
>
> Also if you click on the link to the video it does show within how many
> hours will the session be active so we do not have to worry about the time
> zone I guess.
>
> Regards,
> Gourav Sengupta
>
> On Thu, Apr 12, 2018 at 8:23 PM, Holden Karau 
> wrote:
>
>> Hi Y'all,
>>
>> If your interested in learning more about how the development process in
>> Apache Spark works I've been doing a weekly live streamed code review most
>> Fridays at 11am. This weeks will be on twitch/youtube (
>> https://www.twitch.tv/holdenkarau / https://www.youtube.com/watc
>> h?v=vGVSa9KnD80 ). If you have a PR into Spark (or a related project)
>> you'd like me to review live let me know and I'll add it to my queue.
>>
>> Cheers,
>>
>> Holden :)
>>
>> --
>> Twitter: https://twitter.com/holdenkarau
>>
>
>


-- 
Twitter: https://twitter.com/holdenkarau


Re: Shuffling Data After Union and Write

2018-04-13 Thread Rahul Nandi
You can put a new column say order to each of the DF having 1, 2 and 3 for
df1, df2 and df3 respectively. Then you can sort the data based on the
order.

On Fri 13 Apr, 2018, 21:56 SNEHASISH DUTTA, 
wrote:

> Hi,
>
> I am currently facing an issue , while performing union on three data
> fames say df1,df2,df3 once the operation is performed and I am trying to
> save the data , the data is getting shuffled so the ordering of data in
> df1,df2,df3 are not maintained.
>
> When I save the data as text/csv file the content of the data gets
> shuffled within.
> There is no way to order the dataframe as these 3 dataframes don't share
> any common field/constraint.
>
> Let me know if there is a work around to maintain the ordering of the
> dataframes after union and write.
>
> Regards,
> Snehasish
>


Re: Spark LOCAL mode and external jar (extraClassPath)

2018-04-13 Thread Marcelo Vanzin
There are two things you're doing wrong here:

On Thu, Apr 12, 2018 at 6:32 PM, jb44  wrote:
> Then I can add the alluxio client library like so:
> sparkSession.conf.set("spark.driver.extraClassPath", ALLUXIO_SPARK_CLIENT)

First one, you can't modify JVM configuration after it has already
started. So this line does nothing since it can't re-launch your
application with a new JVM.

> sparkSession.conf.set("spark.executor.extraClassPath", ALLUXIO_SPARK_CLIENT)

There is a lot of configuration that you cannot set after the
application has already started. For example, after the session is
created, most probably this option will be ignored, since executors
will already have started.

I'm not so sure about what happens when you use dynamic allocation,
but these post-hoc config changes in general are not expected to take
effect.

The documentation could be clearer about this (especially stuff that
only applies to spark-submit), but that's the gist of it.


-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark parse fixed length file [Java]

2018-04-13 Thread lsn24
Hello,

 We are running into issues while trying to process fixed length files using
spark.

The approach we took is as follows:

1. Read the .bz2 file  into a dataset from hdfs using
spark.read().textFile() API.Create a temporary view.

 Dataset rawDataset = sparkSession.read().textFile(filePath);
 rawDataset.createOrReplaceTempView(tempView);

2. Run a sql query on the view, to slice and dice the data the way we need
it (using substring).

 (SELECT 
 TRIM(SUBSTRING(value,1 ,16)) AS record1 ,
 TRIM(SUBSTRING(value,17 ,8)) AS record2 ,
 TRIM(SUBSTRING(value,25 ,5)) AS record3 ,
 TRIM(SUBSTRING(value,30 ,16)) AS record4 ,
 CAST(SUBSTRING(value,46 ,8) AS BIGINT) AS record5 , 
 CAST(SUBSTRING(value,54 ,6) AS BIGINT) AS record6 , 
 CAST(SUBSTRING(value,60 ,3) AS BIGINT) AS record7 , 
 CAST(SUBSTRING(value,63 ,6) AS BIGINT) AS record8 , 
 TRIM(SUBSTRING(value,69 ,20)) AS record9 ,
 TRIM(SUBSTRING(value,89 ,40)) AS record10 ,
 TRIM(SUBSTRING(value,129 ,32)) AS record11 ,
 TRIM(SUBSTRING(value,161 ,19)) AS record12,
 TRIM(SUBSTRING(value,180 ,1)) AS record13 ,
 TRIM(SUBSTRING(value,181 ,9)) AS record14 ,
 TRIM(SUBSTRING(value,190 ,3)) AS record15 ,
 CAST(SUBSTRING(value,193 ,8) AS BIGINT) AS record16 , 
 CAST(SUBSTRING(value,201 ,8) AS BIGINT) AS record17 
 FROM tempView)

3.Write the output of sql query to a parquet file.
 loadDataset.write().mode(SaveMode.Append).parquet(outputDirectory);

Problem :

  The step #2 takes a longer time , if the length of line is ~2000
characters. If each line in the file is only 1000 characters then it takes
only 4 minutes to process 20 million lines. If we increase the line length
to 2000 characters it takes 20 minutes to process 20 million lines.


Is there a better way in spark to parse fixed length lines?


*Note: *Spark version we use is 2.2.0 and we are using  Spark with Java.




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark LOCAL mode and external jar (extraClassPath)

2018-04-13 Thread Jason Boorn
Thanks - I’ve seen this SO post, it covers spark-submit, which I am not using.

Regarding the ALLUXIO_SPARK_CLIENT variable, it is located on the machine that 
is running the job which spawns the master=local spark.  According to the Spark 
documentation, this should be possible, but it appears it is not.

Once again - I’m trying to solve the use case for master=local, NOT for a 
cluster and NOT with spark-submit.  

> On Apr 13, 2018, at 12:47 PM, yohann jardin  wrote:
> 
> Hey Jason,
> Might be related to what is behind your variable ALLUXIO_SPARK_CLIENT and 
> where is located the lib (is it on HDFS, on the node that submits the job, or 
> locally to all spark workers?)
> There is a great post on SO about it: https://stackoverflow.com/a/37348234 
> 
> We might as well check that you provide correctly the jar based on its 
> location. I have found it tricky in some cases.
> As a debug try, if the jar is not on HDFS, you can copy it there and then 
> specify the full path in the extraclasspath property. 
> Regards,
> Yohann Jardin
> 
> Le 4/13/2018 à 5:38 PM, Jason Boorn a écrit :
>> I do, and this is what I will fall back to if nobody has a better idea :)
>> 
>> I was just hoping to get this working as it is much more convenient for my 
>> testing pipeline.
>> 
>> Thanks again for the help
>> 
>>> On Apr 13, 2018, at 11:33 AM, Geoff Von Allmen >> > wrote:
>>> 
>>> Ok - `LOCAL` makes sense now.
>>> 
>>> Do you have the option to still use `spark-submit` in this scenario, but 
>>> using the following options:
>>> 
>>> ```bash
>>> --master "local[*]" \
>>> --deploy-mode "client" \
>>> ...
>>> ```
>>> 
>>> I know in the past, I have setup some options using `.config("Option", 
>>> "value")` when creating the spark session, and then other runtime options 
>>> as you describe above with `spark.conf.set`. At this point though I've just 
>>> moved everything out into a `spark-submit` script.
>>> 
>>> On Fri, Apr 13, 2018 at 8:18 AM, Jason Boorn >> > wrote:
>>> Hi Geoff -
>>> 
>>> Appreciate the help here - I do understand what you’re saying below.  And I 
>>> am able to get this working when I submit a job to a local cluster.
>>> 
>>> I think part of the issue here is that there’s ambiguity in the 
>>> terminology.  When I say “LOCAL” spark, I mean an instance of spark that is 
>>> created by my driver program, and is not a cluster itself.  It means that 
>>> my master node is “local”, and this mode is primarily used for testing.
>>> 
>>> https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-local.html
>>>  
>>> 
>>> 
>>> While I am able to get alluxio working with spark-submit, I am unable to 
>>> get it working when using local mode.  The mechanisms for setting class 
>>> paths during spark-submit are not available in local mode.  My 
>>> understanding is that all one is able to use is:
>>> 
>>> spark.conf.set(“”)
>>> 
>>> To set any runtime properties of the local instance.  Note that it is 
>>> possible (and I am more convinced of this as time goes on) that alluxio 
>>> simply does not work in spark local mode as described above.
>>> 
>>> 
 On Apr 13, 2018, at 11:09 AM, Geoff Von Allmen > wrote:
 
 I fought with a 
 ClassNotFoundException for quite some time, but it was for kafka.
 
 The final configuration that got everything working was running 
 spark-submit with the following options:
 
 --jars "/path/to/.ivy2/jars/package.jar" \
 --driver-class-path "/path/to/.ivy2/jars/package.jar" \
 --conf "spark.executor.extraClassPath=/path/to/.ivy2/package.jar" \
 --packages org.some.package:package_name:version
 While this was needed for me to run in 
 cluster mode, it works equally well for 
 client mode as well.
 
 One other note when needing to supplied multiple items to these args - 
 --jars and 
 --packages should be comma separated, 
 --driver-class-path and 
 extraClassPath should be 
 : separated
 
 HTH
 
 
 On Fri, Apr 13, 2018 at 4:28 AM, jb44 > wrote:
 Haoyuan -
 
 As I mentioned below, I've been through the documentation already.  It has
 not helped me to resolve the issue.
 
 Here is what I have tried so far:
 
 - setting extraClassPath as explained below
 - adding fs.alluxio.impl through sparkconf
 - adding spark.sql.hive.metastore.sharedPrefixes (though I don't believe
 this matters in my case)
 - compiling the client from source 
 
 Do you have any other suggestions on how to get this working?  
 
 Thanks
 
 
 
 --
 Sent from: 

Re: Spark LOCAL mode and external jar (extraClassPath)

2018-04-13 Thread yohann jardin
Hey Jason,

Might be related to what is behind your variable ALLUXIO_SPARK_CLIENT and where 
is located the lib (is it on HDFS, on the node that submits the job, or locally 
to all spark workers?)
There is a great post on SO about it: https://stackoverflow.com/a/37348234

We might as well check that you provide correctly the jar based on its 
location. I have found it tricky in some cases.
As a debug try, if the jar is not on HDFS, you can copy it there and then 
specify the full path in the extraclasspath property.

Regards,

Yohann Jardin

Le 4/13/2018 à 5:38 PM, Jason Boorn a écrit :
I do, and this is what I will fall back to if nobody has a better idea :)

I was just hoping to get this working as it is much more convenient for my 
testing pipeline.

Thanks again for the help

On Apr 13, 2018, at 11:33 AM, Geoff Von Allmen 
> wrote:

Ok - `LOCAL` makes sense now.

Do you have the option to still use `spark-submit` in this scenario, but using 
the following options:

```bash
--master "local[*]" \
--deploy-mode "client" \
...
```

I know in the past, I have setup some options using `.config("Option", 
"value")` when creating the spark session, and then other runtime options as 
you describe above with `spark.conf.set`. At this point though I've just moved 
everything out into a `spark-submit` script.

On Fri, Apr 13, 2018 at 8:18 AM, Jason Boorn 
> wrote:
Hi Geoff -

Appreciate the help here - I do understand what you’re saying below.  And I am 
able to get this working when I submit a job to a local cluster.

I think part of the issue here is that there’s ambiguity in the terminology.  
When I say “LOCAL” spark, I mean an instance of spark that is created by my 
driver program, and is not a cluster itself.  It means that my master node is 
“local”, and this mode is primarily used for testing.

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-local.html

While I am able to get alluxio working with spark-submit, I am unable to get it 
working when using local mode.  The mechanisms for setting class paths during 
spark-submit are not available in local mode.  My understanding is that all one 
is able to use is:

spark.conf.set(“”)

To set any runtime properties of the local instance.  Note that it is possible 
(and I am more convinced of this as time goes on) that alluxio simply does not 
work in spark local mode as described above.


On Apr 13, 2018, at 11:09 AM, Geoff Von Allmen 
> wrote:


I fought with a ClassNotFoundException for quite some time, but it was for 
kafka.

The final configuration that got everything working was running spark-submit 
with the following options:

--jars "/path/to/.ivy2/jars/package.jar" \
--driver-class-path "/path/to/.ivy2/jars/package.jar" \
--conf "spark.executor.extraClassPath=/path/to/.ivy2/package.jar" \
--packages org.some.package:package_name:version


While this was needed for me to run in cluster mode, it works equally well for 
client mode as well.

One other note when needing to supplied multiple items to these args - --jars 
and --packages should be comma separated, --driver-class-path and 
extraClassPath should be : separated

HTH

​

On Fri, Apr 13, 2018 at 4:28 AM, jb44 
> wrote:
Haoyuan -

As I mentioned below, I've been through the documentation already.  It has
not helped me to resolve the issue.

Here is what I have tried so far:

- setting extraClassPath as explained below
- adding fs.alluxio.impl through sparkconf
- adding spark.sql.hive.metastore.sharedPrefixes (though I don't believe
this matters in my case)
- compiling the client from source

Do you have any other suggestions on how to get this working?

Thanks



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org








Shuffling Data After Union and Write

2018-04-13 Thread SNEHASISH DUTTA
Hi,

I am currently facing an issue , while performing union on three data fames
say df1,df2,df3 once the operation is performed and I am trying to save the
data , the data is getting shuffled so the ordering of data in df1,df2,df3
are not maintained.

When I save the data as text/csv file the content of the data gets shuffled
within.
There is no way to order the dataframe as these 3 dataframes don't share
any common field/constraint.

Let me know if there is a work around to maintain the ordering of the
dataframes after union and write.

Regards,
Snehasish


Re: Spark LOCAL mode and external jar (extraClassPath)

2018-04-13 Thread Jason Boorn
I do, and this is what I will fall back to if nobody has a better idea :)

I was just hoping to get this working as it is much more convenient for my 
testing pipeline.

Thanks again for the help

> On Apr 13, 2018, at 11:33 AM, Geoff Von Allmen  wrote:
> 
> Ok - `LOCAL` makes sense now.
> 
> Do you have the option to still use `spark-submit` in this scenario, but 
> using the following options:
> 
> ```bash
> --master "local[*]" \
> --deploy-mode "client" \
> ...
> ```
> 
> I know in the past, I have setup some options using `.config("Option", 
> "value")` when creating the spark session, and then other runtime options as 
> you describe above with `spark.conf.set`. At this point though I've just 
> moved everything out into a `spark-submit` script.
> 
> On Fri, Apr 13, 2018 at 8:18 AM, Jason Boorn  > wrote:
> Hi Geoff -
> 
> Appreciate the help here - I do understand what you’re saying below.  And I 
> am able to get this working when I submit a job to a local cluster.
> 
> I think part of the issue here is that there’s ambiguity in the terminology.  
> When I say “LOCAL” spark, I mean an instance of spark that is created by my 
> driver program, and is not a cluster itself.  It means that my master node is 
> “local”, and this mode is primarily used for testing.
> 
> https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-local.html
>  
> 
> 
> While I am able to get alluxio working with spark-submit, I am unable to get 
> it working when using local mode.  The mechanisms for setting class paths 
> during spark-submit are not available in local mode.  My understanding is 
> that all one is able to use is:
> 
> spark.conf.set(“”)
> 
> To set any runtime properties of the local instance.  Note that it is 
> possible (and I am more convinced of this as time goes on) that alluxio 
> simply does not work in spark local mode as described above.
> 
> 
>> On Apr 13, 2018, at 11:09 AM, Geoff Von Allmen > > wrote:
>> 
>> I fought with a ClassNotFoundException for quite some time, but it was for 
>> kafka.
>> 
>> The final configuration that got everything working was running spark-submit 
>> with the following options:
>> 
>> --jars "/path/to/.ivy2/jars/package.jar" \
>> --driver-class-path "/path/to/.ivy2/jars/package.jar" \
>> --conf "spark.executor.extraClassPath=/path/to/.ivy2/package.jar" \
>> --packages org.some.package:package_name:version
>> While this was needed for me to run in cluster mode, it works equally well 
>> for client mode as well.
>> 
>> One other note when needing to supplied multiple items to these args - 
>> --jars and --packages should be comma separated, --driver-class-path and 
>> extraClassPath should be : separated
>> 
>> HTH
>> 
>> 
>> On Fri, Apr 13, 2018 at 4:28 AM, jb44 > > wrote:
>> Haoyuan -
>> 
>> As I mentioned below, I've been through the documentation already.  It has
>> not helped me to resolve the issue.
>> 
>> Here is what I have tried so far:
>> 
>> - setting extraClassPath as explained below
>> - adding fs.alluxio.impl through sparkconf
>> - adding spark.sql.hive.metastore.sharedPrefixes (though I don't believe
>> this matters in my case)
>> - compiling the client from source 
>> 
>> Do you have any other suggestions on how to get this working?  
>> 
>> Thanks
>> 
>> 
>> 
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ 
>> 
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>> 
>> 
>> 
> 
> 



Re: Spark LOCAL mode and external jar (extraClassPath)

2018-04-13 Thread Geoff Von Allmen
Ok - `LOCAL` makes sense now.

Do you have the option to still use `spark-submit` in this scenario, but
using the following options:

```bash
--master "local[*]" \
--deploy-mode "client" \
...
```

I know in the past, I have setup some options using `.config("Option",
"value")` when creating the spark session, and then other runtime options
as you describe above with `spark.conf.set`. At this point though I've just
moved everything out into a `spark-submit` script.

On Fri, Apr 13, 2018 at 8:18 AM, Jason Boorn  wrote:

> Hi Geoff -
>
> Appreciate the help here - I do understand what you’re saying below.  And
> I am able to get this working when I submit a job to a local cluster.
>
> I think part of the issue here is that there’s ambiguity in the
> terminology.  When I say “LOCAL” spark, I mean an instance of spark that is
> created by my driver program, and is not a cluster itself.  It means that
> my master node is “local”, and this mode is primarily used for testing.
>
> https://jaceklaskowski.gitbooks.io/mastering-apache-
> spark/content/spark-local.html
>
> While I am able to get alluxio working with spark-submit, I am unable to
> get it working when using local mode.  The mechanisms for setting class
> paths during spark-submit are not available in local mode.  My
> understanding is that all one is able to use is:
>
> spark.conf.set(“”)
>
> To set any runtime properties of the local instance.  Note that it is
> possible (and I am more convinced of this as time goes on) that alluxio
> simply does not work in spark local mode as described above.
>
>
> On Apr 13, 2018, at 11:09 AM, Geoff Von Allmen 
> wrote:
>
> I fought with a ClassNotFoundException for quite some time, but it was
> for kafka.
>
> The final configuration that got everything working was running
> spark-submit with the following options:
>
> --jars "/path/to/.ivy2/jars/package.jar" \
> --driver-class-path "/path/to/.ivy2/jars/package.jar" \
> --conf "spark.executor.extraClassPath=/path/to/.ivy2/package.jar" \
> --packages org.some.package:package_name:version
>
> While this was needed for me to run in cluster mode, it works equally
> well for client mode as well.
>
> One other note when needing to supplied multiple items to these args -
> --jars and --packages should be comma separated, --driver-class-path and
> extraClassPath should be : separated
>
> HTH
> ​
>
> On Fri, Apr 13, 2018 at 4:28 AM, jb44  wrote:
>
>> Haoyuan -
>>
>> As I mentioned below, I've been through the documentation already.  It has
>> not helped me to resolve the issue.
>>
>> Here is what I have tried so far:
>>
>> - setting extraClassPath as explained below
>> - adding fs.alluxio.impl through sparkconf
>> - adding spark.sql.hive.metastore.sharedPrefixes (though I don't believe
>> this matters in my case)
>> - compiling the client from source
>>
>> Do you have any other suggestions on how to get this working?
>>
>> Thanks
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>


Re: Spark LOCAL mode and external jar (extraClassPath)

2018-04-13 Thread Jason Boorn
Hi Geoff -

Appreciate the help here - I do understand what you’re saying below.  And I am 
able to get this working when I submit a job to a local cluster.

I think part of the issue here is that there’s ambiguity in the terminology.  
When I say “LOCAL” spark, I mean an instance of spark that is created by my 
driver program, and is not a cluster itself.  It means that my master node is 
“local”, and this mode is primarily used for testing.

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-local.html
 


While I am able to get alluxio working with spark-submit, I am unable to get it 
working when using local mode.  The mechanisms for setting class paths during 
spark-submit are not available in local mode.  My understanding is that all one 
is able to use is:

spark.conf.set(“”)

To set any runtime properties of the local instance.  Note that it is possible 
(and I am more convinced of this as time goes on) that alluxio simply does not 
work in spark local mode as described above.


> On Apr 13, 2018, at 11:09 AM, Geoff Von Allmen  wrote:
> 
> I fought with a ClassNotFoundException for quite some time, but it was for 
> kafka.
> 
> The final configuration that got everything working was running spark-submit 
> with the following options:
> 
> --jars "/path/to/.ivy2/jars/package.jar" \
> --driver-class-path "/path/to/.ivy2/jars/package.jar" \
> --conf "spark.executor.extraClassPath=/path/to/.ivy2/package.jar" \
> --packages org.some.package:package_name:version
> While this was needed for me to run in cluster mode, it works equally well 
> for client mode as well.
> 
> One other note when needing to supplied multiple items to these args - --jars 
> and --packages should be comma separated, --driver-class-path and 
> extraClassPath should be : separated
> 
> HTH
> 
> 
> On Fri, Apr 13, 2018 at 4:28 AM, jb44  > wrote:
> Haoyuan -
> 
> As I mentioned below, I've been through the documentation already.  It has
> not helped me to resolve the issue.
> 
> Here is what I have tried so far:
> 
> - setting extraClassPath as explained below
> - adding fs.alluxio.impl through sparkconf
> - adding spark.sql.hive.metastore.sharedPrefixes (though I don't believe
> this matters in my case)
> - compiling the client from source 
> 
> Do you have any other suggestions on how to get this working?  
> 
> Thanks
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ 
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> 
> 
> 



Re: Spark LOCAL mode and external jar (extraClassPath)

2018-04-13 Thread Geoff Von Allmen
I fought with a ClassNotFoundException for quite some time, but it was for
kafka.

The final configuration that got everything working was running spark-submit
with the following options:

--jars "/path/to/.ivy2/jars/package.jar" \
--driver-class-path "/path/to/.ivy2/jars/package.jar" \
--conf "spark.executor.extraClassPath=/path/to/.ivy2/package.jar" \
--packages org.some.package:package_name:version

While this was needed for me to run in cluster mode, it works equally well
for client mode as well.

One other note when needing to supplied multiple items to these args -
--jars and --packages should be comma separated, --driver-class-path and
extraClassPath should be : separated

HTH
​

On Fri, Apr 13, 2018 at 4:28 AM, jb44  wrote:

> Haoyuan -
>
> As I mentioned below, I've been through the documentation already.  It has
> not helped me to resolve the issue.
>
> Here is what I have tried so far:
>
> - setting extraClassPath as explained below
> - adding fs.alluxio.impl through sparkconf
> - adding spark.sql.hive.metastore.sharedPrefixes (though I don't believe
> this matters in my case)
> - compiling the client from source
>
> Do you have any other suggestions on how to get this working?
>
> Thanks
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Task failure to read input files

2018-04-13 Thread Srikanth
Hello,

I'm running Spark job on AWS EMR that reads many lzo files from a S3 bucket
partitioned by date.
Sometimes I see errors in logs similar to

18/04/13 11:53:52 WARN TaskSetManager: Lost task 151177.0 in stage
43.0 (TID 1516123, ip-10-10-2-6.ec2.internal, executor 57):
java.io.IOException: Corrupted uncompressed block
at 
com.hadoop.compression.lzo.LzopInputStream.verifyChecksums(LzopInputStream.java:219)
at 
com.hadoop.compression.lzo.LzopInputStream.getCompressedData(LzopInputStream.java:284)
at 
com.hadoop.compression.lzo.LzopInputStream.decompress(LzopInputStream.java:261)


I don't see the jobs fail. I assume this task succeeded when it is retried.
If the input file is actually corrupted even task retries should fail and
eventually job will fail based on "spark.task.maxFailures" config rt?

Is there way to make Spark/Hadoop lzo library to print the full file name
when such failures happen? So that I can then manually check if the file is
indeed corrupted.

Thanks,
Srikanth


Re: Spark LOCAL mode and external jar (extraClassPath)

2018-04-13 Thread jb44
Haoyuan -

As I mentioned below, I've been through the documentation already.  It has
not helped me to resolve the issue.

Here is what I have tried so far:

- setting extraClassPath as explained below
- adding fs.alluxio.impl through sparkconf
- adding spark.sql.hive.metastore.sharedPrefixes (though I don't believe
this matters in my case)
- compiling the client from source 

Do you have any other suggestions on how to get this working?  

Thanks



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Structured Streaming on Kubernetes

2018-04-13 Thread Tathagata Das
Structured streaming is stable in production! At Databricks, we and our
customers collectively process almost 100s of billions of records per day
using SS. However, we are not using kubernetes :)

Though I don't think it will matter too much as long as kubes are correctly
provisioned+configured and you are checkpointing to HDFS (for
fault-tolerance guarantees).

TD

On Fri, Apr 13, 2018, 12:28 AM Krishna Kalyan 
wrote:

> Hello All,
> We were evaluating Spark Structured Streaming on Kubernetes (Running on
> GCP). It would be awesome if the spark community could share their
> experience around this. I would like to know more about you production
> experience and the monitoring tools you are using.
>
> Since spark on kubernetes is a relatively new addition to spark, I was
> wondering if structured streaming is stable in production. We were also
> evaluating Apache Beam with Flink.
>
> Regards,
> Krishna
>
>
>


Passing Hive Context to FPGrowth.

2018-04-13 Thread Sbf xyz
Hi,

I am using Apache Spark 2.2 and mllib library in Python. I have to pass a
Hive context to FPGrowth algorithm. For that, I converted a Df to RDD. I am
struggling with some pickling errors. After going through stack overflow.
It seems we need to convert an RDD to pipelineRDD. Could anyone suggest how
that could be done ?


Thanks.


Transforming json string in structured streaming problem

2018-04-13 Thread Junfeng Chen
Hi all,

I need to read some string data in json format from kafka, and convert them
to dataframe and write to parquet file at last.
But now I meet some problems. The spark.readStream().json() can only
support json file on a specified location, cannot support Dataset
like spark.read.json.
I found some potential solution in
https://stackoverflow.com/questions/48617474/how-to-convert-json-dataset-to-dataframe-in-spark-structured-streaming
,
but it needs to construct the StructType, while the structure of my json
data is variable.

So how to solve it?

Thanks!


Regard,
Junfeng Chen


Structured Streaming on Kubernetes

2018-04-13 Thread Krishna Kalyan
Hello All,
We were evaluating Spark Structured Streaming on Kubernetes (Running on
GCP). It would be awesome if the spark community could share their
experience around this. I would like to know more about you production
experience and the monitoring tools you are using.

Since spark on kubernetes is a relatively new addition to spark, I was
wondering if structured streaming is stable in production. We were also
evaluating Apache Beam with Flink.

Regards,
Krishna