Re: Spark structured streaming watermarks on nested attributes

2019-05-07 Thread Joe Ammann
Hi Yuanjian

On 5/7/19 4:55 AM, Yuanjian Li wrote:
> Hi Joe
> 
> I think you met this issue: https://issues.apache.org/jira/browse/SPARK-27340
> You can check the description in Jira and PR. We also met this in our 
> production env and fixed by the providing PR.
> 
> The PR is still in review. cc Langchang Zhu(zhuliangch...@baidu.com 
> ), who's the author for the fix.

Yes, this very much looks like the issue I'm having. As an exercise for me 
(never built Spark locally) I will try to build your branch and see if it fixes 
my issue

-- 
CU, Joe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark structured streaming watermarks on nested attributes

2019-05-06 Thread Yuanjian Li
Hi Joe

I think you met this issue:
https://issues.apache.org/jira/browse/SPARK-27340
You can check the description in Jira and PR. We also met this in our
production env and fixed by the providing PR.

The PR is still in review. cc Langchang Zhu(zhuliangch...@baidu.com), who's
the author for the fix.

Best,
Yuanjian

Joe Ammann  于2019年5月7日周二 上午4:53写道:

> On 5/6/19 6:23 PM, Pat Ferrel wrote:
> > Streams have no end until watermarked or closed. Joins need bounded
> datasets, et voila. Something tells me you should consider the streaming
> nature of your data and whether your joins need to use increments/snippets
> of infinite streams or to re-join the entire contents of the streams
> accumulated at checkpoints.
>
> I certainly don't question the need for watermarks.
>
> What I was wondering is that when I use fields called
> "entityX_LAST_MODIFICATION" for watermarks/conditions, my joins work as
> expected.
>
> Whereas when I nest the attributes and use "entityX.LAST_MODIFICATION"
> (notice the dot for the nesting) the joins fail.
>
> I have a feeling that the Spark execution plan get's somewhat confused,
> because in the latter case, there are multiple fields called
> "LAST_MODIFICATION" with differing nesting prefixes.
>
> > From: Joe Ammann  <mailto:j...@pyx.ch>
> > Reply: Joe Ammann  <mailto:j...@pyx.ch>
> > Date: May 6, 2019 at 6:45:13 AM
> > To: user@spark.apache.org <mailto:user@spark.apache.org> <
> user@spark.apache.org> <mailto:user@spark.apache.org>
> > Subject: Spark structured streaming watermarks on nested attributes
> >
> >> Hi all
> >>
> >> I'm pretty new to Spark and implementing my first non-trivial
> structured streaming job with outer joins. My environment is a Hortonworks
> HDP 3.1 cluster with Spark 2.3.2, working with Python.
> >>
> >> I understood that I need to provide watermarks and join conditions for
> left outer joins to work. All my incoming Kafka streams have an attribute
> "LAST_MODIFICATION" which is well suited to indicate the event time, so I
> chose that for watermarking. Since I'm joining from multiple topics where
> the incoming messages have common attributes, I though I'd prefix/nest all
> incoming messages. Something like
> >>
> >>
> entity1DF.select(struct("*").alias("entity1")).withWatermark("entity1.LAST_MODIFICATION")
> >>
> entity2DF.select(struct("*").alias("entity2")).withWatermark("entity2.LAST_MODIFICATION")
> >>
> >> Now when I try to join such 2 streams, it would fail and tell me that I
> need to use watermarks
> >>
> >> When I leave the watermarking attribute "at the top level", everything
> works as expected, e.g.
> >>
> >> entity1DF.select(struct("*").alias("entity1"),
> col("LAST_MODIFICATION").alias("entity1_LAST_MODIFICATION")).withWatermark("entity1_LAST_MODIFICATION")
> >>
> >> Before I hunt this down any further, is this kind of a known
> limitation? Or am I doing something fundamentally wrong?
>
>
> --
> CU, Joe
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark structured streaming watermarks on nested attributes

2019-05-06 Thread Joe Ammann
On 5/6/19 6:23 PM, Pat Ferrel wrote:
> Streams have no end until watermarked or closed. Joins need bounded datasets, 
> et voila. Something tells me you should consider the streaming nature of your 
> data and whether your joins need to use increments/snippets of infinite 
> streams or to re-join the entire contents of the streams accumulated at 
> checkpoints.

I certainly don't question the need for watermarks. 

What I was wondering is that when I use fields called 
"entityX_LAST_MODIFICATION" for watermarks/conditions, my joins work as 
expected.

Whereas when I nest the attributes and use "entityX.LAST_MODIFICATION" (notice 
the dot for the nesting) the joins fail.

I have a feeling that the Spark execution plan get's somewhat confused, because 
in the latter case, there are multiple fields called "LAST_MODIFICATION" with 
differing nesting prefixes.

> From: Joe Ammann  <mailto:j...@pyx.ch>
> Reply: Joe Ammann  <mailto:j...@pyx.ch>
> Date: May 6, 2019 at 6:45:13 AM
> To: user@spark.apache.org <mailto:user@spark.apache.org> 
>  <mailto:user@spark.apache.org>
> Subject: Spark structured streaming watermarks on nested attributes
> 
>> Hi all
>>
>> I'm pretty new to Spark and implementing my first non-trivial structured 
>> streaming job with outer joins. My environment is a Hortonworks HDP 3.1 
>> cluster with Spark 2.3.2, working with Python.
>>
>> I understood that I need to provide watermarks and join conditions for left 
>> outer joins to work. All my incoming Kafka streams have an attribute 
>> "LAST_MODIFICATION" which is well suited to indicate the event time, so I 
>> chose that for watermarking. Since I'm joining from multiple topics where 
>> the incoming messages have common attributes, I though I'd prefix/nest all 
>> incoming messages. Something like
>>
>> entity1DF.select(struct("*").alias("entity1")).withWatermark("entity1.LAST_MODIFICATION")
>> entity2DF.select(struct("*").alias("entity2")).withWatermark("entity2.LAST_MODIFICATION")
>>
>> Now when I try to join such 2 streams, it would fail and tell me that I need 
>> to use watermarks
>>
>> When I leave the watermarking attribute "at the top level", everything works 
>> as expected, e.g.
>>
>> entity1DF.select(struct("*").alias("entity1"), 
>> col("LAST_MODIFICATION").alias("entity1_LAST_MODIFICATION")).withWatermark("entity1_LAST_MODIFICATION")
>>
>> Before I hunt this down any further, is this kind of a known limitation? Or 
>> am I doing something fundamentally wrong?


-- 
CU, Joe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark structured streaming watermarks on nested attributes

2019-05-06 Thread Pat Ferrel
Streams have no end until watermarked or closed. Joins need bounded
datasets, et voila. Something tells me you should consider the streaming
nature of your data and whether your joins need to use increments/snippets
of infinite streams or to re-join the entire contents of the streams
accumulated at checkpoints.


From: Joe Ammann  
Reply: Joe Ammann  
Date: May 6, 2019 at 6:45:13 AM
To: user@spark.apache.org  
Subject:  Spark structured streaming watermarks on nested attributes

Hi all

I'm pretty new to Spark and implementing my first non-trivial structured
streaming job with outer joins. My environment is a Hortonworks HDP 3.1
cluster with Spark 2.3.2, working with Python.

I understood that I need to provide watermarks and join conditions for left
outer joins to work. All my incoming Kafka streams have an attribute
"LAST_MODIFICATION" which is well suited to indicate the event time, so I
chose that for watermarking. Since I'm joining from multiple topics where
the incoming messages have common attributes, I though I'd prefix/nest all
incoming messages. Something like

entity1DF.select(struct("*").alias("entity1")).withWatermark("entity1.LAST_MODIFICATION")

entity2DF.select(struct("*").alias("entity2")).withWatermark("entity2.LAST_MODIFICATION")


Now when I try to join such 2 streams, it would fail and tell me that I
need to use watermarks

When I leave the watermarking attribute "at the top level", everything
works as expected, e.g.

entity1DF.select(struct("*").alias("entity1"),
col("LAST_MODIFICATION").alias("entity1_LAST_MODIFICATION")).withWatermark("entity1_LAST_MODIFICATION")


Before I hunt this down any further, is this kind of a known limitation? Or
am I doing something fundamentally wrong?

-- 
CU, Joe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Spark structured streaming watermarks on nested attributes

2019-05-06 Thread Joe Ammann
Hi all

I'm pretty new to Spark and implementing my first non-trivial structured 
streaming job with outer joins. My environment is a Hortonworks HDP 3.1 cluster 
with Spark 2.3.2, working with Python.

I understood that I need to provide watermarks and join conditions for left 
outer joins to work. All my incoming Kafka streams have an attribute 
"LAST_MODIFICATION" which is well suited to indicate the event time, so I chose 
that for watermarking. Since I'm joining from multiple topics where the 
incoming messages have common attributes, I though I'd prefix/nest all incoming 
messages. Something like


entity1DF.select(struct("*").alias("entity1")).withWatermark("entity1.LAST_MODIFICATION")

entity2DF.select(struct("*").alias("entity2")).withWatermark("entity2.LAST_MODIFICATION")

Now when I try to join such 2 streams, it would fail and tell me that I need to 
use watermarks

When I leave the watermarking attribute "at the top level", everything works as 
expected, e.g.

entity1DF.select(struct("*").alias("entity1"), 
col("LAST_MODIFICATION").alias("entity1_LAST_MODIFICATION")).withWatermark("entity1_LAST_MODIFICATION")

Before I hunt this down any further, is this kind of a known limitation? Or am 
I doing something fundamentally wrong?

-- 
CU, Joe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org