Re: Nullpointerexception error when in repartition

2018-04-12 Thread Junfeng Chen
Hi,
I know it, but my purpose it to transforming json string in DataSet
to Dataset, while spark.readStream can only support read json file in
specified path.
https://stackoverflow.com/questions/48617474/how-to-convert-json-dataset-to-dataframe-in-spark-structured-streaming
gives an essential method, but the formats of every json data are not same.
Either Spark java api seems not supporting grammer like

.select(from_json($"value", colourSchema))



Regard,
Junfeng Chen

On Fri, Apr 13, 2018 at 7:09 AM, Tathagata Das 
wrote:

> Have you read through the documentation of Structured Streaming?
> https://spark.apache.org/docs/latest/structured-streaming-
> programming-guide.html
>
> One of the basic mistakes you are making is defining the dataset as with
> `spark.read()`. You define a streaming Dataset as `spark.readStream()`
>
> On Thu, Apr 12, 2018 at 3:02 AM, Junfeng Chen  wrote:
>
>> Hi, Tathagata
>>
>> I have tried structured streaming, but in line
>>
>>> Dataset rowDataset = spark.read().json(jsondataset);
>>
>>
>> Always throw
>>
>>> Queries with streaming sources must be executed with writeStream.start()
>>
>>
>> But what i need to do in this step is only transforming json string data
>> to Dataset . How to fix it?
>>
>> Thanks!
>>
>>
>> Regard,
>> Junfeng Chen
>>
>> On Thu, Apr 12, 2018 at 3:08 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> It's not very surprising that doing this sort of RDD to DF conversion
>>> inside DStream.foreachRDD has weird corner cases like this. In fact, you
>>> are going to have additional problems with partial parquet files (when
>>> there are failures) in this approach. I strongly suggest that you use
>>> Structured Streaming, which is designed to do this sort of processing. It
>>> will take care of tracking the written parquet files correctly.
>>>
>>> TD
>>>
>>> On Wed, Apr 11, 2018 at 6:58 PM, Junfeng Chen 
>>> wrote:
>>>
 I write a program to read some json data from kafka and purpose to save
 them to parquet file on hdfs.
 Here is my code:

> JavaInputDstream stream = ...
> JavaDstream rdd = stream.map...
> rdd.repartition(taksNum).foreachRDD(VoldFunction stringjavardd->{
> Dataset df = spark.read().json( stringjavardd ); // convert
> json to df
> JavaRDD rowJavaRDD = df.javaRDD().map...  //add some new
> fields
> StructType type = df.schema()...; // constuct new type for new
> added fields
> Dataset //create new dataframe
> newdf.repatition(taskNum).write().mode(SaveMode.Append).pati
> tionedBy("appname").parquet(savepath); // save to parquet
> })



 However, if I remove the repartition method of newdf in writing parquet
 stage, the program always throw nullpointerexception error in json convert
 line:

 Java.lang.NullPointerException
>  at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.
> scala:1783)
> ...


 While it looks make no sense, writing parquet operation should be in
 different stage with json transforming operation.
 So how to solve it? Thanks!

 Regard,
 Junfeng Chen

>>>
>>>
>>
>


Re: Spark LOCAL mode and external jar (extraClassPath)

2018-04-12 Thread Haoyuan Li
This link should be helpful:
https://alluxio.org/docs/1.7/en/Running-Spark-on-Alluxio.html

Best regards,

Haoyuan (HY)

alluxio.com  | alluxio.org
 | powered
by Alluxio 


On Thu, Apr 12, 2018 at 6:32 PM, jb44  wrote:

> I'm running spark in LOCAL mode and trying to get it to talk to alluxio.
> I'm
> getting the error: java.lang.ClassNotFoundException: Class
> alluxio.hadoop.FileSystem not found
> The cause of this error is apparently that Spark cannot find the alluxio
> client jar in its classpath.
>
> I have looked at the page here:
> https://www.alluxio.org/docs/master/en/Debugging-Guide.
> html#q-why-do-i-see-exceptions-like-javalangruntimeexception-
> javalangclassnotfoundexception-class-alluxiohadoopfilesystem-not-found
>
> Which details the steps to take in this situation, but I'm not finding
> success.
>
> According to Spark documentation, I can instance a local Spark like so:
>
> SparkSession.builder
>   .appName("App")
>   .getOrCreate
>
> Then I can add the alluxio client library like so:
> sparkSession.conf.set("spark.driver.extraClassPath", ALLUXIO_SPARK_CLIENT)
> sparkSession.conf.set("spark.executor.extraClassPath",
> ALLUXIO_SPARK_CLIENT)
>
> I have verified that the proper jar file exists in the right location on my
> local machine with:
> logger.error(sparkSession.conf.get("spark.driver.extraClassPath"))
> logger.error(sparkSession.conf.get("spark.executor.extraClassPath"))
>
> But I still get the error. Is there anything else I can do to figure out
> why
> Spark is not picking the library up?
>
> Please note I am not using spark-submit - I am aware of the methods for
> adding the client jar to a spark-submit job. My Spark instance is being
> created as local within my application and this is the use case I want to
> solve.
>
> As an FYI there is another application in the cluster which is connecting
> to
> my alluxio using the fs client and that all works fine. In that case,
> though, the fs client is being packaged as part of the application through
> standard sbt dependencies.
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Does partition by and order by works only in stateful case?

2018-04-12 Thread Tathagata Das
The traditional SQL windows with `over` is not supported in streaming. Only
time-based windows, that is, `window("timestamp", "10 minutes")` is
supported in streaming.

On Thu, Apr 12, 2018 at 7:34 PM, kant kodali  wrote:

> Hi All,
>
> Does partition by and order by works only in stateful case?
>
> For example:
>
> select row_number() over (partition by id order by timestamp) from table
>
> gives me
>
> *SEVERE: Exception occured while submitting the query:
> java.lang.RuntimeException: org.apache.spark.sql.AnalysisException:
> Non-time-based windows are not supported on streaming DataFrames/Datasets;;*
>
> I wonder what time based window means? is it not the window from over()
> clause or does it mean group by(window('timestamp'), '10 minutes') like the
> stateful case?
>
> Thanks
>


Does partition by and order by works only in stateful case?

2018-04-12 Thread kant kodali
Hi All,

Does partition by and order by works only in stateful case?

For example:

select row_number() over (partition by id order by timestamp) from table

gives me

*SEVERE: Exception occured while submitting the query:
java.lang.RuntimeException: org.apache.spark.sql.AnalysisException:
Non-time-based windows are not supported on streaming DataFrames/Datasets;;*

I wonder what time based window means? is it not the window from over()
clause or does it mean group by(window('timestamp'), '10 minutes') like the
stateful case?

Thanks


Spark LOCAL mode and external jar (extraClassPath)

2018-04-12 Thread jb44
I'm running spark in LOCAL mode and trying to get it to talk to alluxio. I'm
getting the error: java.lang.ClassNotFoundException: Class
alluxio.hadoop.FileSystem not found
The cause of this error is apparently that Spark cannot find the alluxio
client jar in its classpath.

I have looked at the page here:
https://www.alluxio.org/docs/master/en/Debugging-Guide.html#q-why-do-i-see-exceptions-like-javalangruntimeexception-javalangclassnotfoundexception-class-alluxiohadoopfilesystem-not-found

Which details the steps to take in this situation, but I'm not finding
success.

According to Spark documentation, I can instance a local Spark like so:

SparkSession.builder
  .appName("App")
  .getOrCreate

Then I can add the alluxio client library like so:
sparkSession.conf.set("spark.driver.extraClassPath", ALLUXIO_SPARK_CLIENT)
sparkSession.conf.set("spark.executor.extraClassPath", ALLUXIO_SPARK_CLIENT)

I have verified that the proper jar file exists in the right location on my
local machine with:
logger.error(sparkSession.conf.get("spark.driver.extraClassPath"))
logger.error(sparkSession.conf.get("spark.executor.extraClassPath"))

But I still get the error. Is there anything else I can do to figure out why
Spark is not picking the library up?

Please note I am not using spark-submit - I am aware of the methods for
adding the client jar to a spark-submit job. My Spark instance is being
created as local within my application and this is the use case I want to
solve.

As an FYI there is another application in the cluster which is connecting to
my alluxio using the fs client and that all works fine. In that case,
though, the fs client is being packaged as part of the application through
standard sbt dependencies.





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Nullpointerexception error when in repartition

2018-04-12 Thread Tathagata Das
Have you read through the documentation of Structured Streaming?
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

One of the basic mistakes you are making is defining the dataset as with
`spark.read()`. You define a streaming Dataset as `spark.readStream()`

On Thu, Apr 12, 2018 at 3:02 AM, Junfeng Chen  wrote:

> Hi, Tathagata
>
> I have tried structured streaming, but in line
>
>> Dataset rowDataset = spark.read().json(jsondataset);
>
>
> Always throw
>
>> Queries with streaming sources must be executed with writeStream.start()
>
>
> But what i need to do in this step is only transforming json string data
> to Dataset . How to fix it?
>
> Thanks!
>
>
> Regard,
> Junfeng Chen
>
> On Thu, Apr 12, 2018 at 3:08 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> It's not very surprising that doing this sort of RDD to DF conversion
>> inside DStream.foreachRDD has weird corner cases like this. In fact, you
>> are going to have additional problems with partial parquet files (when
>> there are failures) in this approach. I strongly suggest that you use
>> Structured Streaming, which is designed to do this sort of processing. It
>> will take care of tracking the written parquet files correctly.
>>
>> TD
>>
>> On Wed, Apr 11, 2018 at 6:58 PM, Junfeng Chen  wrote:
>>
>>> I write a program to read some json data from kafka and purpose to save
>>> them to parquet file on hdfs.
>>> Here is my code:
>>>
 JavaInputDstream stream = ...
 JavaDstream rdd = stream.map...
 rdd.repartition(taksNum).foreachRDD(VoldFunction{
 Dataset df = spark.read().json( stringjavardd ); // convert
 json to df
 JavaRDD rowJavaRDD = df.javaRDD().map...  //add some new fields
 StructType type = df.schema()...; // constuct new type for new
 added fields
 Dataset>>
>>>
>>>
>>> However, if I remove the repartition method of newdf in writing parquet
>>> stage, the program always throw nullpointerexception error in json convert
>>> line:
>>>
>>> Java.lang.NullPointerException
  at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.
 scala:1783)
 ...
>>>
>>>
>>> While it looks make no sense, writing parquet operation should be in
>>> different stage with json transforming operation.
>>> So how to solve it? Thanks!
>>>
>>> Regard,
>>> Junfeng Chen
>>>
>>
>>
>


Re: Live Stream Code Reviews :)

2018-04-12 Thread Gourav Sengupta
Hi,

This is definitely one of the best messages ever in this group. The videos
are absolutely fantastic in case anyone is trying to learn about
contributing to SPARK, I had been through one of them. Just trying to
repeat the steps in the video (without of course doing anything really
stupid) makes a person learn quite a lot.

Thanks a ton, Holden for the great help.

Also if you click on the link to the video it does show within how many
hours will the session be active so we do not have to worry about the time
zone I guess.

Regards,
Gourav Sengupta

On Thu, Apr 12, 2018 at 8:23 PM, Holden Karau  wrote:

> Hi Y'all,
>
> If your interested in learning more about how the development process in
> Apache Spark works I've been doing a weekly live streamed code review most
> Fridays at 11am. This weeks will be on twitch/youtube (
> https://www.twitch.tv/holdenkarau / https://www.youtube.com/
> watch?v=vGVSa9KnD80 ). If you have a PR into Spark (or a related project)
> you'd like me to review live let me know and I'll add it to my queue.
>
> Cheers,
>
> Holden :)
>
> --
> Twitter: https://twitter.com/holdenkarau
>


Re: Live Stream Code Reviews :)

2018-04-12 Thread Holden Karau
Ah yes really good point 11am pacific :)

On Thu, Apr 12, 2018 at 1:01 PM, Marco Mistroni  wrote:

> PST  I believelike last time
> Works out 9pm bst & 10 pm cet if I m correct
>
> On Thu, Apr 12, 2018, 8:47 PM Matteo Olivi  wrote:
>
>> Hi,
>> 11 am in which timezone?
>>
>> Il gio 12 apr 2018, 21:23 Holden Karau  ha scritto:
>>
>>> Hi Y'all,
>>>
>>> If your interested in learning more about how the development process in
>>> Apache Spark works I've been doing a weekly live streamed code review most
>>> Fridays at 11am. This weeks will be on twitch/youtube (
>>> https://www.twitch.tv/holdenkarau / https://www.youtube.com/
>>> watch?v=vGVSa9KnD80 ). If you have a PR into Spark (or a related
>>> project) you'd like me to review live let me know and I'll add it to my
>>> queue.
>>>
>>> Cheers,
>>>
>>> Holden :)
>>>
>>> --
>>> Twitter: https://twitter.com/holdenkarau
>>>
>>


-- 
Twitter: https://twitter.com/holdenkarau


Re: Live Stream Code Reviews :)

2018-04-12 Thread Marco Mistroni
PST  I believelike last time
Works out 9pm bst & 10 pm cet if I m correct

On Thu, Apr 12, 2018, 8:47 PM Matteo Olivi  wrote:

> Hi,
> 11 am in which timezone?
>
> Il gio 12 apr 2018, 21:23 Holden Karau  ha scritto:
>
>> Hi Y'all,
>>
>> If your interested in learning more about how the development process in
>> Apache Spark works I've been doing a weekly live streamed code review most
>> Fridays at 11am. This weeks will be on twitch/youtube (
>> https://www.twitch.tv/holdenkarau /
>> https://www.youtube.com/watch?v=vGVSa9KnD80 ). If you have a PR into
>> Spark (or a related project) you'd like me to review live let me know and
>> I'll add it to my queue.
>>
>> Cheers,
>>
>> Holden :)
>>
>> --
>> Twitter: https://twitter.com/holdenkarau
>>
>


Re: Live Stream Code Reviews :)

2018-04-12 Thread Matteo Olivi
Hi,
11 am in which timezone?

Il gio 12 apr 2018, 21:23 Holden Karau  ha scritto:

> Hi Y'all,
>
> If your interested in learning more about how the development process in
> Apache Spark works I've been doing a weekly live streamed code review most
> Fridays at 11am. This weeks will be on twitch/youtube (
> https://www.twitch.tv/holdenkarau /
> https://www.youtube.com/watch?v=vGVSa9KnD80 ). If you have a PR into
> Spark (or a related project) you'd like me to review live let me know and
> I'll add it to my queue.
>
> Cheers,
>
> Holden :)
>
> --
> Twitter: https://twitter.com/holdenkarau
>


Live Stream Code Reviews :)

2018-04-12 Thread Holden Karau
Hi Y'all,

If your interested in learning more about how the development process in
Apache Spark works I've been doing a weekly live streamed code review most
Fridays at 11am. This weeks will be on twitch/youtube (
https://www.twitch.tv/holdenkarau /
https://www.youtube.com/watch?v=vGVSa9KnD80 ). If you have a PR into Spark
(or a related project) you'd like me to review live let me know and I'll
add it to my queue.

Cheers,

Holden :)

-- 
Twitter: https://twitter.com/holdenkarau


Re: Spark Kubernetes Volumes

2018-04-12 Thread Anirudh Ramanathan
There's a JIRA SPARK-23529
 that deals with
mounting hostpath volumes.
I propose we extend that PR/JIRA to encompass all the different volume
types and allow mounting them into the driver/executors.

On Thu, Apr 12, 2018 at 10:55 AM Yinan Li  wrote:

> Hi Marius,
>
> Spark on Kubernetes does not yet support mounting user-specified volumes
> natively. But mounting volume is supported in
> https://github.com/GoogleCloudPlatform/spark-on-k8s-operator. Please see
> https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/docs/user-guide.md#mounting-volumes
> .
>
> On Thu, Apr 12, 2018 at 7:50 AM, Marius  wrote:
>
>> Hey,
>>
>> i have a question regarding the Spark on Kubernetes feature. I would like
>> to mount a pre-populated Kubernetes volume into the execution pods of
>> Spark. One of my tools that i invoke using the Sparks pipe command requires
>> these files to be available on a POSIX compatible FS and they are too large
>> to justify copying them around using addFile. If this is not possible i
>> would like to know if the community be interested in such a feature.
>>
>> Cheers
>>
>> Marius
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>

-- 
Anirudh Ramanathan


Re: Spark Kubernetes Volumes

2018-04-12 Thread Yinan Li
Hi Marius,

Spark on Kubernetes does not yet support mounting user-specified volumes
natively. But mounting volume is supported in
https://github.com/GoogleCloudPlatform/spark-on-k8s-operator. Please see
https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/docs/user-guide.md#mounting-volumes
.

On Thu, Apr 12, 2018 at 7:50 AM, Marius  wrote:

> Hey,
>
> i have a question regarding the Spark on Kubernetes feature. I would like
> to mount a pre-populated Kubernetes volume into the execution pods of
> Spark. One of my tools that i invoke using the Sparks pipe command requires
> these files to be available on a POSIX compatible FS and they are too large
> to justify copying them around using addFile. If this is not possible i
> would like to know if the community be interested in such a feature.
>
> Cheers
>
> Marius
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark is only using one worker machine when more are available

2018-04-12 Thread Gourav Sengupta
Hi,

Just for sake of clarity can you please given the full statement for
reading the data from the largest table? I mean not the programmatic one
but the one which has the full statement in it.


Regards,
Gourav Sengupta




On Thu, Apr 12, 2018 at 7:19 AM, Jhon Anderson Cardenas Diaz <
jhonderson2...@gmail.com> wrote:

> Hi.
>
> On spark standalone i think you can not specify the number of workers
> machines to use but you can achieve that in this way:
> https://stackoverflow.com/questions/39399205/spark-
> standalone-number-executors-cores-control.
>
> For example, if you want that your jobs run on the 10 machines using all
> their cores (10 executors, each one in a different machine and with 40
> cores), you can use this configuration:
>
> spark.cores.max= 400
> spark.executor.cores  = 40
>
> If you want more executors with less cores each one (lets say 20
> executors, each one with 20 cores):
>
> spark.cores.max= 400
> spark.executor.cores  = 20
>
> Note that in the last case each worker machine will run two executors.
>
> In summary, use this trick:
>
> number-of-executors = spark.cores.max / spark.executor.cores.
>
> And have in mind that the executors will be divided among the available
> workers.
>
> Regards.
>
>
> 2018-04-11 21:39 GMT-05:00 宋源栋 :
>
>> Hi
>>  1. Spark version : 2.3.0
>>  2. jdk: oracle jdk 1.8
>>  3. os version: centos 6.8
>>  4. spark-env.sh: null
>>  5. spark session config:
>>
>>
>> SparkSession.builder().appName("DBScale")
>> .config("spark.sql.crossJoin.enabled", "true")
>> .config("spark.sql.adaptive.enabled", "true")
>> .config("spark.scheduler.mode", "FAIR")
>> .config("spark.executor.memory", "1g")
>> .config("spark.executor.cores", 1)
>> .config("spark.driver.memory", "20")
>> .config("spark.serializer", 
>> "org.apache.spark.serializer.KryoSerializer")
>> .config("spark.executor.extraJavaOptions",
>> "-XX:+UseG1GC -XX:+PrintFlagsFinal 
>> -XX:+PrintReferenceGC " +
>> "-verbose:gc -XX:+PrintGCDetails " +
>> "-XX:+PrintGCTimeStamps 
>> -XX:+PrintAdaptiveSizePolicy")
>> .master(this.spark_master)
>> .getOrCreate();
>>
>>   6. core code:
>>
>>
>>  for (SparksqlTableInfo tableInfo: this.dbtable){ // this loop reads 
>> data from mysql
>> String dt = "(" + tableInfo.sql + ")" + tableInfo.tmp_table_name;
>> String[] pred = new String[tableInfo.partition_num];
>> if (tableInfo.partition_num > 0) {
>> for (int j = 0; j < tableInfo.partition_num; j++) {
>> String str = "some where clause to split mysql table 
>> into many partitions";
>> pred[j] = str;
>> }
>> Dataset jdbcDF = ss.read().jdbc(this.url, dt, pred, 
>> connProp); //this.url is mysql-jdbc-url (mysql://XX.XX.XX.XX:)
>> jdbcDF.createOrReplaceTempView(tableInfo.tmp_table_name);
>> } else {
>> logger.warn("[\033[32m" + "partition_num == 0" + "\033[0m]");
>> Dataset jdbcDF = ss.read().jdbc(this.url, dt, connProp);
>> jdbcDF.createOrReplaceTempView(tableInfo.tmp_table_name);
>> }
>> }
>>
>>
>> // Then run a query and write the result set to mysql
>>
>> Dataset result = ss.sql(this.sql);
>> result.explain(true);
>> connProp.put("rewriteBatchedStatements", "true");
>> connProp.put("sessionVariables", "sql_log_bin=off");
>> result.write().jdbc(this.dst_url, this.dst_table, connProp);
>>
>>
>>
>> --
>> 发件人:Jhon Anderson Cardenas Diaz 
>> 发送时间:2018年4月11日(星期三) 22:42
>> 收件人:宋源栋 
>> 抄 送:user 
>> 主 题:Re: Spark is only using one worker machine when more are available
>>
>> Hi, could you please share the environment variables values that you are
>> sending when you run the jobs, spark version, etc.. more details.
>> Btw, you should take a look on SPARK_WORKER_INSTANCES and
>> SPARK_WORKER_CORES if you are using spark 2.0.0
>> .
>>
>> Regards.
>>
>> 2018-04-11 4:10 GMT-05:00 宋源栋 :
>>
>>
>> Hi all,
>>
>> I hava a standalone mode spark cluster without HDFS with 10 machines that
>> each one has 40 cpu cores and 128G RAM.
>>
>> My application is a sparksql application that reads data from database
>> "tpch_100g" in mysql and run tpch queries. When loading tables from myql to
>> spark, I spilts the biggest table "lineitem" into 600 partitions.
>>
>> When my application runs, there are only 40
>> executor(spark.executor.memory = 1g, 

Re: Broadcasting huge array or persisting on HDFS to read on executors - both not working

2018-04-12 Thread surender kumar
Question was not what kind of sampling but random sampling per user. There's no 
value associated with items to create stratas. If you read Matteo's answer, 
that's the way to go about it.
-Surender 

On Thursday, 12 April, 2018, 5:49:43 PM IST, Gourav Sengupta 
 wrote:  
 
 Hi,
There is an option for Stratified Sampling available in SPARK: 
https://spark.apache.org/docs/latest/mllib-statistics.html#stratified-sampling. 
Also there is a method called randomSplit which may be called on dataframes in 
case we want to split them into training and test data.
Please let me know whether using any of these built in functions helps or not.

Regards,Gourav 
On Thu, Apr 12, 2018 at 3:25 AM, surender kumar  
wrote:

Thanks Matteo, this should work!
-Surender 

On Thursday, 12 April, 2018, 1:13:38 PM IST, Matteo Cossu 
 wrote:  
 
 I don't think it's trivial. Anyway, the naive solution would be a cross join 
between user x items. But this can be very very expensive. I've encountered 
once a similar problem, here how I solved it:   
   - create a new RDD with (itemID, index) where the index is a unique integer 
between 0 and the number of items   

   - for every user sample n items by generating randomly n distinct integers 
between 0 and the number of items (e.g. with rand.randint()), so you have a new 
RDD (userID, [sample_items])
   - flatten all the list in the previously created RDD and join them back with 
the RDD with (itemID, index) using index as join attribute
You can do the same things with DataFrame using UDFs.
On 11 April 2018 at 23:01, surender kumar  wrote:

right, this is what I did when I said I tried to persist and create an RDD out 
of it to sample from. But how to do for each user?You have one rdd of users on 
one hand and rdd of items on the other. How to go from here? Am I missing 
something trivial?  

On Thursday, 12 April, 2018, 2:10:51 AM IST, Matteo Cossu 
 wrote:  
 
 Why broadcasting this list then? You should use an RDD or DataFrame. For 
example, RDD has a method sample() that returns a random sample from it.
On 11 April 2018 at 22:34, surender kumar  wrote:

I'm using pySpark.I've list of 1 million items (all float values ) and 1 
million users. for each user I want to sample randomly some items from the item 
list.Broadcasting the item list results in Outofmemory error on the driver, 
tried setting driver memory till 10G.  I tried to persist this array on disk 
but I'm not able to figure out a way to read the same on the workers.
Any suggestion would be appreciated.

  

  

  

Spark Kubernetes Volumes

2018-04-12 Thread Marius

Hey,

i have a question regarding the Spark on Kubernetes feature. I would 
like to mount a pre-populated Kubernetes volume into the execution pods 
of Spark. One of my tools that i invoke using the Sparks pipe command 
requires these files to be available on a POSIX compatible FS and they 
are too large to justify copying them around using addFile. If this is 
not possible i would like to know if the community be interested in such 
a feature.


Cheers

Marius

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark is only using one worker machine when more are available

2018-04-12 Thread Jhon Anderson Cardenas Diaz
Hi.

On spark standalone i think you can not specify the number of workers
machines to use but you can achieve that in this way:
https://stackoverflow.com/questions/39399205/spark-standalone-number-executors-cores-control
.

For example, if you want that your jobs run on the 10 machines using all
their cores (10 executors, each one in a different machine and with 40
cores), you can use this configuration:

spark.cores.max= 400
spark.executor.cores  = 40

If you want more executors with less cores each one (lets say 20 executors,
each one with 20 cores):

spark.cores.max= 400
spark.executor.cores  = 20

Note that in the last case each worker machine will run two executors.

In summary, use this trick:

number-of-executors = spark.cores.max / spark.executor.cores.

And have in mind that the executors will be divided among the available
workers.

Regards.


2018-04-11 21:39 GMT-05:00 宋源栋 :

> Hi
>  1. Spark version : 2.3.0
>  2. jdk: oracle jdk 1.8
>  3. os version: centos 6.8
>  4. spark-env.sh: null
>  5. spark session config:
>
>
> SparkSession.builder().appName("DBScale")
> .config("spark.sql.crossJoin.enabled", "true")
> .config("spark.sql.adaptive.enabled", "true")
> .config("spark.scheduler.mode", "FAIR")
> .config("spark.executor.memory", "1g")
> .config("spark.executor.cores", 1)
> .config("spark.driver.memory", "20")
> .config("spark.serializer", 
> "org.apache.spark.serializer.KryoSerializer")
> .config("spark.executor.extraJavaOptions",
> "-XX:+UseG1GC -XX:+PrintFlagsFinal 
> -XX:+PrintReferenceGC " +
> "-verbose:gc -XX:+PrintGCDetails " +
> "-XX:+PrintGCTimeStamps 
> -XX:+PrintAdaptiveSizePolicy")
> .master(this.spark_master)
> .getOrCreate();
>
>   6. core code:
>
>
>  for (SparksqlTableInfo tableInfo: this.dbtable){ // this loop reads 
> data from mysql
> String dt = "(" + tableInfo.sql + ")" + tableInfo.tmp_table_name;
> String[] pred = new String[tableInfo.partition_num];
> if (tableInfo.partition_num > 0) {
> for (int j = 0; j < tableInfo.partition_num; j++) {
> String str = "some where clause to split mysql table into 
> many partitions";
> pred[j] = str;
> }
> Dataset jdbcDF = ss.read().jdbc(this.url, dt, pred, 
> connProp); //this.url is mysql-jdbc-url (mysql://XX.XX.XX.XX:)
> jdbcDF.createOrReplaceTempView(tableInfo.tmp_table_name);
> } else {
> logger.warn("[\033[32m" + "partition_num == 0" + "\033[0m]");
> Dataset jdbcDF = ss.read().jdbc(this.url, dt, connProp);
> jdbcDF.createOrReplaceTempView(tableInfo.tmp_table_name);
> }
> }
>
>
> // Then run a query and write the result set to mysql
>
> Dataset result = ss.sql(this.sql);
> result.explain(true);
> connProp.put("rewriteBatchedStatements", "true");
> connProp.put("sessionVariables", "sql_log_bin=off");
> result.write().jdbc(this.dst_url, this.dst_table, connProp);
>
>
>
> --
> 发件人:Jhon Anderson Cardenas Diaz 
> 发送时间:2018年4月11日(星期三) 22:42
> 收件人:宋源栋 
> 抄 送:user 
> 主 题:Re: Spark is only using one worker machine when more are available
>
> Hi, could you please share the environment variables values that you are
> sending when you run the jobs, spark version, etc.. more details.
> Btw, you should take a look on SPARK_WORKER_INSTANCES and
> SPARK_WORKER_CORES if you are using spark 2.0.0
> .
>
> Regards.
>
> 2018-04-11 4:10 GMT-05:00 宋源栋 :
>
>
> Hi all,
>
> I hava a standalone mode spark cluster without HDFS with 10 machines that
> each one has 40 cpu cores and 128G RAM.
>
> My application is a sparksql application that reads data from database
> "tpch_100g" in mysql and run tpch queries. When loading tables from myql to
> spark, I spilts the biggest table "lineitem" into 600 partitions.
>
> When my application runs, there are only 40 executor(spark.executor.memory
> = 1g, spark.executor.cores = 1) in executor page of spark application web
> and all executors are on the same mathine. It is too slowly that all tasks
> are parallelly running in only one mathine.
>
>
>
>
>
>


Re: Broadcasting huge array or persisting on HDFS to read on executors - both not working

2018-04-12 Thread Gourav Sengupta
Hi,

There is an option for Stratified Sampling available in SPARK:
https://spark.apache.org/docs/latest/mllib-statistics.html#stratified-sampling
.

Also there is a method called randomSplit which may be called on dataframes
in case we want to split them into training and test data.

Please let me know whether using any of these built in functions helps or
not.


Regards,
Gourav

On Thu, Apr 12, 2018 at 3:25 AM, surender kumar <
skiit...@yahoo.co.uk.invalid> wrote:

> Thanks Matteo, this should work!
>
> -Surender
>
>
> On Thursday, 12 April, 2018, 1:13:38 PM IST, Matteo Cossu <
> elco...@gmail.com> wrote:
>
>
> I don't think it's trivial. Anyway, the naive solution would be a cross
> join between user x items. But this can be very very expensive. I've
> encountered once a similar problem, here how I solved it:
>
>- create a new RDD with (itemID, index) where the index is a unique
>integer between 0 and the number of items
>- for every user sample n items by generating randomly n distinct
>integers between 0 and the number of items (e.g. with rand.randint()), so
>you have a new RDD (userID, [sample_items])
>- flatten all the list in the previously created RDD and join them
>back with the RDD with (itemID, index) using index as join attribute
>
> You can do the same things with DataFrame using UDFs.
>
> On 11 April 2018 at 23:01, surender kumar  wrote:
>
> right, this is what I did when I said I tried to persist and create an RDD
> out of it to sample from. But how to do for each user?
> You have one rdd of users on one hand and rdd of items on the other. How
> to go from here? Am I missing something trivial?
>
>
> On Thursday, 12 April, 2018, 2:10:51 AM IST, Matteo Cossu <
> elco...@gmail.com> wrote:
>
>
> Why broadcasting this list then? You should use an RDD or DataFrame. For
> example, RDD has a method sample() that returns a random sample from it.
>
> On 11 April 2018 at 22:34, surender kumar 
> wrote:
>
> I'm using pySpark.
> I've list of 1 million items (all float values ) and 1 million users. for
> each user I want to sample randomly some items from the item list.
> Broadcasting the item list results in Outofmemory error on the driver,
> tried setting driver memory till 10G.  I tried to persist this array on
> disk but I'm not able to figure out a way to read the same on the workers.
>
> Any suggestion would be appreciated.
>
>
>
>


unsubscribe

2018-04-12 Thread varma dantuluri
-- 
Regards,

Varma Dantuluri


Re: Broadcasting huge array or persisting on HDFS to read on executors - both not working

2018-04-12 Thread surender kumar
Thanks Matteo, this should work!
-Surender 

On Thursday, 12 April, 2018, 1:13:38 PM IST, Matteo Cossu 
 wrote:  
 
 I don't think it's trivial. Anyway, the naive solution would be a cross join 
between user x items. But this can be very very expensive. I've encountered 
once a similar problem, here how I solved it:   
   - create a new RDD with (itemID, index) where the index is a unique integer 
between 0 and the number of items   

   - for every user sample n items by generating randomly n distinct integers 
between 0 and the number of items (e.g. with rand.randint()), so you have a new 
RDD (userID, [sample_items])
   - flatten all the list in the previously created RDD and join them back with 
the RDD with (itemID, index) using index as join attribute
You can do the same things with DataFrame using UDFs.
On 11 April 2018 at 23:01, surender kumar  wrote:

right, this is what I did when I said I tried to persist and create an RDD out 
of it to sample from. But how to do for each user?You have one rdd of users on 
one hand and rdd of items on the other. How to go from here? Am I missing 
something trivial?  

On Thursday, 12 April, 2018, 2:10:51 AM IST, Matteo Cossu 
 wrote:  
 
 Why broadcasting this list then? You should use an RDD or DataFrame. For 
example, RDD has a method sample() that returns a random sample from it.
On 11 April 2018 at 22:34, surender kumar  wrote:

I'm using pySpark.I've list of 1 million items (all float values ) and 1 
million users. for each user I want to sample randomly some items from the item 
list.Broadcasting the item list results in Outofmemory error on the driver, 
tried setting driver memory till 10G.  I tried to persist this array on disk 
but I'm not able to figure out a way to read the same on the workers.
Any suggestion would be appreciated.

  

  

Re: Driver aborts on Mesos when unable to connect to one of external shuffle services

2018-04-12 Thread Szuromi Tamás
Hi Igor,

Have you started the external shuffle service manually?

Cheers

2018-04-12 10:48 GMT+02:00 igor.berman :

> Hi,
> any input regarding is it expected:
> Driver starts and unable to connect to external shuffle service on one of
> the nodes(no matter what is the reason)
> This makes framework to go to Inactive mode in Mesos UI
> However it seems that driver doesn't exits and continues to execute
> tasks(or
> tries to). The attached stacktrace below shows few lines around the
> connection error and aborting message
>
> The question is is it expected behaviour?
>
> Here is stacktracke
>
> I0412 07:31:25.827283   274 sched.cpp:759] Framework registered with
> 15d9838f-b266-413b-842d-f7c3567bd04a-0051
> Exception in thread "Thread-295" java.io.IOException: Failed to connect to
> my-company.com/x.x.x.x:7337
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(
> TransportClientFactory.java:232)
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(
> TransportClientFactory.java:182)
> at
> org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient.
> registerDriverWithShuffleService(MesosExternalShuffleClient.java:75)
> at
> org.apache.spark.scheduler.cluster.mesos.MesosCoarseGrainedSchedulerBac
> kend.statusUpdate(MesosCoarseGrainedSchedulerBackend.scala:537)
> Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException:
> Connection refused:my-company.com/x.x.x.x:7337
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> at
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(
> NioSocketChannel.java:257)
> at
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(
> AbstractNioChannel.java:291)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(
> NioEventLoop.java:631)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(
> NioEventLoop.java:566)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(
> NioEventLoop.java:480)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.
> run(SingleThreadEventExecutor.java:131)
> at
> io.netty.util.concurrent.DefaultThreadFactory$
> DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
> at java.lang.Thread.run(Thread.java:748)
> I0412 07:35:12.032925   277 sched.cpp:2055] Asked to abort the driver
> I0412 07:35:12.033035   277 sched.cpp:1233] Aborting framework
> 15d9838f-b266-413b-842d-f7c3567bd04a-0051
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Nullpointerexception error when in repartition

2018-04-12 Thread Junfeng Chen
Hi, Tathagata

I have tried structured streaming, but in line

> Dataset rowDataset = spark.read().json(jsondataset);


Always throw

> Queries with streaming sources must be executed with writeStream.start()


But what i need to do in this step is only transforming json string data to
Dataset . How to fix it?

Thanks!


Regard,
Junfeng Chen

On Thu, Apr 12, 2018 at 3:08 PM, Tathagata Das 
wrote:

> It's not very surprising that doing this sort of RDD to DF conversion
> inside DStream.foreachRDD has weird corner cases like this. In fact, you
> are going to have additional problems with partial parquet files (when
> there are failures) in this approach. I strongly suggest that you use
> Structured Streaming, which is designed to do this sort of processing. It
> will take care of tracking the written parquet files correctly.
>
> TD
>
> On Wed, Apr 11, 2018 at 6:58 PM, Junfeng Chen  wrote:
>
>> I write a program to read some json data from kafka and purpose to save
>> them to parquet file on hdfs.
>> Here is my code:
>>
>>> JavaInputDstream stream = ...
>>> JavaDstream rdd = stream.map...
>>> rdd.repartition(taksNum).foreachRDD(VoldFunction>> stringjavardd->{
>>> Dataset df = spark.read().json( stringjavardd ); // convert
>>> json to df
>>> JavaRDD rowJavaRDD = df.javaRDD().map...  //add some new fields
>>> StructType type = df.schema()...; // constuct new type for new added
>>> fields
>>> Dataset>> //create new dataframe
>>> newdf.repatition(taskNum).write().mode(SaveMode.Append).pati
>>> tionedBy("appname").parquet(savepath); // save to parquet
>>> })
>>
>>
>>
>> However, if I remove the repartition method of newdf in writing parquet
>> stage, the program always throw nullpointerexception error in json convert
>> line:
>>
>> Java.lang.NullPointerException
>>>  at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.
>>> scala:1783)
>>> ...
>>
>>
>> While it looks make no sense, writing parquet operation should be in
>> different stage with json transforming operation.
>> So how to solve it? Thanks!
>>
>> Regard,
>> Junfeng Chen
>>
>
>


Driver aborts on Mesos when unable to connect to one of external shuffle services

2018-04-12 Thread igor.berman
Hi,
any input regarding is it expected:
Driver starts and unable to connect to external shuffle service on one of
the nodes(no matter what is the reason)
This makes framework to go to Inactive mode in Mesos UI
However it seems that driver doesn't exits and continues to execute tasks(or
tries to). The attached stacktrace below shows few lines around the
connection error and aborting message

The question is is it expected behaviour?

Here is stacktracke

I0412 07:31:25.827283   274 sched.cpp:759] Framework registered with
15d9838f-b266-413b-842d-f7c3567bd04a-0051
Exception in thread "Thread-295" java.io.IOException: Failed to connect to
my-company.com/x.x.x.x:7337
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)
at
org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient.registerDriverWithShuffleService(MesosExternalShuffleClient.java:75)
at
org.apache.spark.scheduler.cluster.mesos.MesosCoarseGrainedSchedulerBackend.statusUpdate(MesosCoarseGrainedSchedulerBackend.scala:537)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException:
Connection refused:my-company.com/x.x.x.x:7337
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257)
at
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:631)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
at
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:748)
I0412 07:35:12.032925   277 sched.cpp:2055] Asked to abort the driver
I0412 07:35:12.033035   277 sched.cpp:1233] Aborting framework
15d9838f-b266-413b-842d-f7c3567bd04a-0051



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Structured Streaming] File source, Parquet format: use of the mergeSchema option.

2018-04-12 Thread Gerard Maas
Hi,

I'm looking into the Parquet format support for the File source in
Structured Streaming.
The docs mention the use of the option 'mergeSchema' to merge the schemas
of the part files found.[1]

What would be the practical use of that in a streaming context?

In its batch counterpart, `mergeSchemas` would infer the schema superset of
the part-files found.


When using the File source + parquet format in streaming mode, we must
provide a schema to the readStream.schema(...) builder and that schema is
fixed for the duration of the stream.

My current understanding is that:

- Files containing a subset of the fields declared in the schema will
render null values for the non-existing fields.
- For files containing a superset of the fields, the additional data fields
will be lost.
- Files not matching the schema set on the streaming source, will render
all fields null for each record in the file.

Is the 'mergeSchema' option playing another role? From the user
perspective, they may think that this option would help their job cope with
schema evolution at runtime, but that does not seem to be the case.

What is the use of this option?

-kr, Gerard.


[1] https://github.com/apache/spark/blob/master/sql/core/src
/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala#L376


Re: Broadcasting huge array or persisting on HDFS to read on executors - both not working

2018-04-12 Thread Matteo Cossu
I don't think it's trivial. Anyway, the naive solution would be a cross
join between user x items. But this can be very very expensive. I've
encountered once a similar problem, here how I solved it:

   - create a new RDD with (itemID, index) where the index is a unique
   integer between 0 and the number of items
   - for every user sample n items by generating randomly n distinct
   integers between 0 and the number of items (e.g. with rand.randint()), so
   you have a new RDD (userID, [sample_items])
   - flatten all the list in the previously created RDD and join them back
   with the RDD with (itemID, index) using index as join attribute

You can do the same things with DataFrame using UDFs.

On 11 April 2018 at 23:01, surender kumar  wrote:

> right, this is what I did when I said I tried to persist and create an RDD
> out of it to sample from. But how to do for each user?
> You have one rdd of users on one hand and rdd of items on the other. How
> to go from here? Am I missing something trivial?
>
>
> On Thursday, 12 April, 2018, 2:10:51 AM IST, Matteo Cossu <
> elco...@gmail.com> wrote:
>
>
> Why broadcasting this list then? You should use an RDD or DataFrame. For
> example, RDD has a method sample() that returns a random sample from it.
>
> On 11 April 2018 at 22:34, surender kumar 
> wrote:
>
> I'm using pySpark.
> I've list of 1 million items (all float values ) and 1 million users. for
> each user I want to sample randomly some items from the item list.
> Broadcasting the item list results in Outofmemory error on the driver,
> tried setting driver memory till 10G.  I tried to persist this array on
> disk but I'm not able to figure out a way to read the same on the workers.
>
> Any suggestion would be appreciated.
>
>
>


Re: Nullpointerexception error when in repartition

2018-04-12 Thread Tathagata Das
It's not very surprising that doing this sort of RDD to DF conversion
inside DStream.foreachRDD has weird corner cases like this. In fact, you
are going to have additional problems with partial parquet files (when
there are failures) in this approach. I strongly suggest that you use
Structured Streaming, which is designed to do this sort of processing. It
will take care of tracking the written parquet files correctly.

TD

On Wed, Apr 11, 2018 at 6:58 PM, Junfeng Chen  wrote:

> I write a program to read some json data from kafka and purpose to save
> them to parquet file on hdfs.
> Here is my code:
>
>> JavaInputDstream stream = ...
>> JavaDstream rdd = stream.map...
>> rdd.repartition(taksNum).foreachRDD(VoldFunction> stringjavardd->{
>> Dataset df = spark.read().json( stringjavardd ); // convert
>> json to df
>> JavaRDD rowJavaRDD = df.javaRDD().map...  //add some new fields
>> StructType type = df.schema()...; // constuct new type for new added
>> fields
>> Dataset> //create new dataframe
>> newdf.repatition(taskNum).write().mode(SaveMode.Append).
>> patitionedBy("appname").parquet(savepath); // save to parquet
>> })
>
>
>
> However, if I remove the repartition method of newdf in writing parquet
> stage, the program always throw nullpointerexception error in json convert
> line:
>
> Java.lang.NullPointerException
>>  at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.
>> scala:1783)
>> ...
>
>
> While it looks make no sense, writing parquet operation should be in
> different stage with json transforming operation.
> So how to solve it? Thanks!
>
> Regard,
> Junfeng Chen
>


Re: Does structured streaming support Spark Kafka Direct?

2018-04-12 Thread Tathagata Das
The parallelism is same for Structured Streaming. In fact, the Kafka
Structured Streaming source is based on the same principle as DStream's
Kafka Direct, hence it has very similar behavior.


On Tue, Apr 10, 2018 at 11:03 PM, SRK  wrote:

> hi,
>
> We have code based on Spark Kafka Direct in production and we want to port
> this code to Structured Streaming. Does structured streaming support spark
> kafka direct? What are the configs for parallelism and scalability in
> structured streaming? In Spark Kafka Direct, the number of kafka partitions
> take care of parallelism when doing the consumption. Is it the same case
> with Structured Streaming?
>
> Thanks for the help in Advance!
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Fwd: pyspark:APID iS coming as null

2018-04-12 Thread nirav nishith
insert_push_body_df = spark.sql('''select CASE WHEN t2.offer_id='' then
NULL else t2.offer_id end as offer_id,\
CASE WHEN t2.content_set_id='' then NULL else t2.content_set_id end as
content_set_id,\
CASE WHEN t2.post_id='' then NULL else t2.post_id end as
post_id,t2.nuid,t2.apid,\
t2.push_id,t2.event_id as id1,t2.offset,t2.event_occurre
d,min(t2.event_processed),\
t2.payload,t2.group_id,t2.trimmed,t2.event_type,from_unixtime(unix_timestamp(),'-MM-dd
hh:mm:ss') as load_date,\
t2.app_id as app_id,CAST(REGEXP_REPLACE(SUBSTR(t2.event_occurred, 1, 10),
'-', '') AS INT) AS occurred_part FROM \
(SELECT regexp_extract(t1.pl_unbase,'"offer_id":"([^"]*)"',1) as offer_id,\
regexp_extract(t1.pl_unbase,'"content_set_id":"([^"]*)"',1) as
content_set_id,\
regexp_extract(t1.pl_unbase,'"post_id":"([^"]*)"',1) as post_id,\
regexp_replace(COALESCE(get_json_object(t1.pl_unbase, '$.audience.alias'),\
get_json_object(t1.pl_unbase,'$.audience.or.alias')),'\\[|\\"|\\]','') as
nuid,\
regexp_replace(COALESCE(get_json_object(t1.pl_unbase, '$.audience.apid'),\
get_json_object(t1.pl_unbase, '$.audience.or.apid') ), '\\[|\\"|\\]','') as
apid,\
t1.push_id  as push_id,t1.event_id  as event_id,t1.offset as offset,\
t1.occurred as event_occurred,t1.processed as event_processed,t1.pl_unbase
as payload,\
t1.group_id as group_id,t1.trimmed as trimmed,\
t1.event_type as event_type,t1.app_id as app_id FROM (select
cast(unbase64(body.payload) as string) as pl_unbase,\
body.push_id as push_id,id as event_id,offset as offset,occurred as
occurred,\
processed as processed,body.group_id as group_id,body.trimmed as trimmed,\
type as event_type,app_id as app_id from dev_stg_urban_airship.push_body_stage)
t1 ) t2 GROUP BY t2.offer_id,\
t2.content_set_id,t2.post_id,t2.nuid,t2.apid,t2.push_id,t2.e
vent_id,t2.offset,t2.event_occurred,t2.payload,t2.group_id,
t2.trimmed,t2.event_type,t2.app_id''')



In the above code when running as HQL APID value is getting populated ,but
when using as sprak_sql its giving NULL.
ITS line 12
regexp_replace(COALESCE(get_json_object(t1.pl_unbase, '$.audience.apid'),\
get_json_object(t1.pl_unbase, '$.audience.or.apid') ), '\\[|\\"|\\]','') as
apid,\

HOW does get_json_object works in spark

Regards
Nirav


Re: How to use disk instead of just InMemoryRelation when use JDBC datasource in SPARKSQL?

2018-04-12 Thread Takeshi Yamamuro
You want to use `Dataset.persist(StorageLevel.MEMORY_AND_DISK)`?

On Thu, Apr 12, 2018 at 1:12 PM, Louis Hust  wrote:

> We  want to extract data from mysql, and calculate in sparksql.
> The sql explain like below.
>
>
> REGIONKEY#177,N_COMMENT#178] PushedFilters: [], ReadSchema:
> struct
>   +- *(20) Sort [r_regionkey#203 ASC NULLS FIRST], false, 0
>  +- Exchange(coordinator id: 266374831)
> hashpartitioning(r_regionkey#203, 200), coordinator[target post-shuffle
> partition size: 67108864]
> +- *(19) Project [R_REGIONKEY#203]
>+- *(19) Filter ((isnotnull(r_name#204) &&
> (r_name#204 = AFRICA)) && isnotnull(r_regionkey#203))
>   +- InMemoryTableScan [R_REGIONKEY#203,
> r_name#204], [isnotnull(r_name#204), (r_name#204 = AFRICA),
> isnotnull(r_regionkey#203)]
> +- InMemoryRelation [R_REGIONKEY#203,
> R_NAME#204, R_COMMENT#205], true, 1, StorageLevel(disk, memory, 1
> replicas)
>   +- *(1) Scan
> JDBCRelation(region) [numPartitions=1] 
> [R_REGIONKEY#203,R_NAME#204,R_COMMENT#205]
> PushedFilters: [], ReadSchema: struct string,R_COMMENT:string>
>
>
> As you see, all JDBCRelation convert to InMemoryRelation. Cause the JDBC
> table is so big, the all data can not be filled into memory,  OOM occurs.
> If there is some option to make SparkSQL use Disk if memory not enough?
>



-- 
---
Takeshi Yamamuro


Problem running Kubernetes example v2.2.0-kubernetes-0.5.0

2018-04-12 Thread Rico Bergmann
Hi!

I was trying to get the SparkPi example running using the spark-on-k8s
distro from kubespark. But I get the following error:
+ /sbin/tini -s -- driver
[FATAL tini (11)] exec driver failed: No such file or directory

Did anyone get the example running on a Kubernetes cluster?

Best,
Rico.

invoked cmd:
bin/spark-submit \
  --deploy-mode cluster \
  --class org.apache.spark.examples.SparkPi \
  --master k8s://https://cluster:port \
  --conf spark.executor.instances=2 \
  --conf spark.app.name=spark-pi \
  --conf
spark.kubernetes.container.docker.image=kubespark/spark-driver:v2.2.0-kubernetes-0.5.0
\
  --conf
spark.kubernetes.driver.docker.image=kubespark/spark-driver:v2.2.0-kubernetes-0.5.0
\
  --conf
spark.kubernetes.executor.docker.image=kubespark/spark-executor:v2.2.0-kubernetes-0.5.0
\
 
local:///opt/spark/examples/jars/spark-examples_2.11-v2.2.0-kubernetes-0.5.0.jar

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org