Re: Pyspark Partitioning

2018-09-30 Thread ayan guha
Hi

There are a set pf finction which can be used with the construct
Over (partition by col order by col).

You search for rank and window functions in spark documentation.

On Mon, 1 Oct 2018 at 5:29 am, Riccardo Ferrari  wrote:

> Hi Dimitris,
>
> I believe the methods partitionBy
> 
> and mapPartitions
> 
> are specific to RDDs while you're talking about DataFrames
> .
> I guess you have few options including:
> 1. use the Dataframe.rdd
> 
> call and process the returned RDD. Please note the return type for this
> call is and RDD of Row
> 2. User the groupBy
> 
> from Dataframes and start from there, this may involved defining an udf or
> leverage on the existing GroupedData
> 
> functions.
>
> It really depends on your use-case and your performance requirements.
> HTH
>
> On Sun, Sep 30, 2018 at 8:31 PM dimitris plakas 
> wrote:
>
>> Hello everyone,
>>
>> I am trying to split a dataframe on partitions and i want to apply a
>> custom function on every partition. More precisely i have a dataframe like
>> the one below
>>
>> Group_Id | Id | Points
>> 1| id1| Point1
>> 2| id2| Point2
>>
>> I want to have a partition for every Group_Id and apply on every
>> partition a function defined by me.
>> I have tried with partitionBy('Group_Id').mapPartitions() but i receive
>> error.
>> Could you please advice me how to do it?
>>
> --
Best Regards,
Ayan Guha


Re: Pyspark Partitioning

2018-09-30 Thread Riccardo Ferrari
Hi Dimitris,

I believe the methods partitionBy

and mapPartitions

are specific to RDDs while you're talking about DataFrames
.
I guess you have few options including:
1. use the Dataframe.rdd

call and process the returned RDD. Please note the return type for this
call is and RDD of Row
2. User the groupBy

from Dataframes and start from there, this may involved defining an udf or
leverage on the existing GroupedData

functions.

It really depends on your use-case and your performance requirements.
HTH

On Sun, Sep 30, 2018 at 8:31 PM dimitris plakas 
wrote:

> Hello everyone,
>
> I am trying to split a dataframe on partitions and i want to apply a
> custom function on every partition. More precisely i have a dataframe like
> the one below
>
> Group_Id | Id | Points
> 1| id1| Point1
> 2| id2| Point2
>
> I want to have a partition for every Group_Id and apply on every partition
> a function defined by me.
> I have tried with partitionBy('Group_Id').mapPartitions() but i receive
> error.
> Could you please advice me how to do it?
>


Pyspark Partitioning

2018-09-30 Thread dimitris plakas
Hello everyone,

I am trying to split a dataframe on partitions and i want to apply a custom
function on every partition. More precisely i have a dataframe like the one
below

Group_Id | Id | Points
1| id1| Point1
2| id2| Point2

I want to have a partition for every Group_Id and apply on every partition
a function defined by me.
I have tried with partitionBy('Group_Id').mapPartitions() but i receive
error.
Could you please advice me how to do it?


Re: Watermarking without aggregation with Structured Streaming

2018-09-30 Thread peay
Thanks for the pointers. I guess right now the only workaround would be to 
apply a "dummy" aggregation (e.g., group by the timestamp itself) only to have 
the stateful processing logic kick in and apply the filtering?

For my purposes, an alternative solution to pushing it out to the source would 
be to make the watermark timestamp available through a function so that it can 
be used in a regular filter clause. Based on my experiments, the timestamp is 
computed and updated even when no stateful computations occur. I am not sure 
how easy that would be to contribute though, maybe someone can suggest a 
starting point?

Thanks,

‐‐‐ Original Message ‐‐‐
On Sunday, 30 September 2018 10:41, Jungtaek Lim  wrote:

> The purpose of watermark is to set a limitation on handling records due to 
> state going infinity. In other cases (non-stateful operations), it is pretty 
> normal to handle all of records even they're pretty late.
>
> Btw, there was some comments regarding this: while Spark delegates to filter 
> out late records in stateful operations for now, some of us (including me) 
> think filtering out late records in earlier phase (source, or just after 
> source) makes more sense. It just didn't come out as action, but I think it 
> is still valid.
>
> https://github.com/apache/spark/pull/21617#issuecomment-400119049
>
> If we move the phase of filtering out late records, what you would like to do 
> may become the default behavior. This also means the output may be also 
> changed for queries which use non-stateful operations, so it is not a trivial 
> change and may require consensus like SPIP process.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> 2018년 9월 30일 (일) 오후 5:19, chandan prakash 님이 작성:
>
>> Interesting question.
>> I do not think without any aggregation operation/groupBy , watermark is 
>> supported currently .
>>
>> Reason:
>> Watermark in Structured Streaming is used for limiting the size of state 
>> needed to keep intermediate information in-memory.
>> And state only comes in picture in case of stateful processing.
>> Also in the code, it seems that  filtering out records on basis of watermark 
>> happen only in case of stateful operators (statefulOperators.scala)
>> Have not tried running code though and would like to know if someone can 
>> shed more light on this.
>>
>> Regards,
>> Chandan
>>
>> On Sat, Sep 22, 2018 at 7:43 PM peay  wrote:
>>
>>> Hello,
>>>
>>> I am trying to use watermarking without aggregation, to filter out records 
>>> that are just too late, instead of appending them to the output. My 
>>> understanding is that aggregation is required for `withWatermark` to have 
>>> any effect. Is that correct?
>>>
>>> I am looking for something along the lines of
>>>
>>> ```
>>> df.withWatermark("ts", ...).filter(F.col("ts") >> ```
>>>
>>> Is there any way to get the watermark value to achieve that?
>>>
>>> Thanks!
>>
>> --
>> Chandan Prakash

Re: Watermarking without aggregation with Structured Streaming

2018-09-30 Thread Jungtaek Lim
The purpose of watermark is to set a limitation on handling records due to
state going infinity. In other cases (non-stateful operations), it is
pretty normal to handle all of records even they're pretty late.

Btw, there was some comments regarding this: while Spark delegates to
filter out late records in stateful operations for now, some of us
(including me) think filtering out late records in earlier phase (source,
or just after source) makes more sense. It just didn't come out as action,
but I think it is still valid.

https://github.com/apache/spark/pull/21617#issuecomment-400119049

If we move the phase of filtering out late records, what you would like to
do may become the default behavior. This also means the output may be also
changed for queries which use non-stateful operations, so it is not a
trivial change and may require consensus like SPIP process.

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 9월 30일 (일) 오후 5:19, chandan prakash 님이 작성:

> Interesting question.
> I do not think without any aggregation operation/groupBy , watermark is
> supported currently .
>
> Reason:
> Watermark in Structured Streaming is used for limiting the size of state
> needed to keep intermediate information in-memory.
> And state only comes in picture in case of stateful processing.
> Also in the code, it seems that  filtering out records on basis of
> watermark happen only in case of stateful operators
> (statefulOperators.scala)
> Have not tried running code though and would like to know if someone can
> shed more light on this.
>
> Regards,
> Chandan
>
>
> On Sat, Sep 22, 2018 at 7:43 PM peay  wrote:
>
>> Hello,
>>
>> I am trying to use watermarking without aggregation, to filter out
>> records that are just too late, instead of appending them to the output. My
>> understanding is that aggregation is required for `withWatermark` to have
>> any effect. Is that correct?
>>
>> I am looking for something along the lines of
>>
>> ```
>> df.withWatermark("ts", ...).filter(F.col("ts") > ```
>>
>> Is there any way to get the watermark value to achieve that?
>>
>> Thanks!
>>
>
>
> --
> Chandan Prakash
>
>


Re: Watermarking without aggregation with Structured Streaming

2018-09-30 Thread chandan prakash
Interesting question.
I do not think without any aggregation operation/groupBy , watermark is
supported currently .

Reason:
Watermark in Structured Streaming is used for limiting the size of state
needed to keep intermediate information in-memory.
And state only comes in picture in case of stateful processing.
Also in the code, it seems that  filtering out records on basis of
watermark happen only in case of stateful operators
(statefulOperators.scala)
Have not tried running code though and would like to know if someone can
shed more light on this.

Regards,
Chandan


On Sat, Sep 22, 2018 at 7:43 PM peay  wrote:

> Hello,
>
> I am trying to use watermarking without aggregation, to filter out records
> that are just too late, instead of appending them to the output. My
> understanding is that aggregation is required for `withWatermark` to have
> any effect. Is that correct?
>
> I am looking for something along the lines of
>
> ```
> df.withWatermark("ts", ...).filter(F.col("ts")  ```
>
> Is there any way to get the watermark value to achieve that?
>
> Thanks!
>


-- 
Chandan Prakash


Re: [Structured Streaming SPARK-23966] Why non-atomic rename is problem in State Store ?

2018-09-30 Thread chandan prakash
Anyone who can clear doubts on the questions asked here   ?

Regards,
Chandan

On Sat, Aug 11, 2018 at 10:03 PM chandan prakash 
wrote:

> Hi All,
> I was going through this pull request about new CheckpointFileManager
> abstraction in structured streaming coming in 2.4 :
> https://issues.apache.org/jira/browse/SPARK-23966
> https://github.com/apache/spark/pull/21048
>
> I went through the code in detail and found it will indtroduce a very nice
> abstraction which is much cleaner and extensible for Direct Writes File
> System like S3 (in addition to current HDFS file system).
>
> *But I am unable to understand, is it really solving some problem in
> exsisting State Store code which is currently  existing in Spark 2.3 ? *
>
> *My questions related to above statements in State Store code : *
>  *PR description*:: "Checkpoint files must be written atomically such
> that *no partial files are generated*.
> *QUESTION*: When are partial files generated in current code ?  I can see
> that data is first written to temp-delta file and then renamed to
> version.delta file. If something bad happens, the task will fail due to
> thrown exception and abort() will be called on store to close and delete
> tempDeltaFileStream . I think it is quite clean, what is the case that
> partial files might be generated ?
>
>  *PR description*:: *State Store behavior is incorrect - HDFS FileSystem
> implementation does not have atomic rename*"
> *QUESTION*:  Hdfs filesystem rename operation is atomic, I think above
> line takes into account about checking existing file if exists and then
> taking appropriate action which together makes the file renaming operation
> multi-steps and hence non-atomic. But why this behaviour is incorrect ?
> Even if multiple executors try to write to the same version.delta file,
> only 1st of them will succeed, the second one will see the file exists and
> will delete its temp-delta file. Looks good .
>
> Anything I am missing here?
> Really curious to know which corner cases we are trying to solve by this
> new pull request ?
>
> Regards,
> Chandan
>
>
>
>
>

-- 
Chandan Prakash


error in job

2018-09-30 Thread yuvraj singh
Hi , i am getting this error please help me .


18/09/30 05:14:44 INFO Client:
 client token: N/A
 diagnostics: User class threw exception: java.lang.NoClassDefFoundError:
org/apache/commons/configuration/ConfigurationException
at
org.apache.spark.sql.cassandra.DefaultSource$.(DefaultSource.scala:135)
at
org.apache.spark.sql.cassandra.DefaultSource$.(DefaultSource.scala)
at
org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:55)
at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:340)
at
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
at com.ola.ss.lhf.LoginHourFetcher.main(LoginHourFetcher.java:39)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)
Caused by: java.lang.ClassNotFoundException:
org.apache.commons.configuration.ConfigurationException
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 13 more

 ApplicationMaster host: 10.14.58.163
 ApplicationMaster RPC port: 0
 queue: signals
 start time: 1538284434296
 final status: FAILED
 tracking URL:
http://as-data3.prod-ambari16.olacabs.net:8088/proxy/application_1537943933870_0088/
 user: yubrajsingh
Exception in thread "main" org.apache.spark.SparkException: Application
application_1537943933870_0088 finished with failed status
at org.apache.spark.deploy.yarn.Client.run(Client.scala:1269)
at
org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1627)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)