Re: Different watermark for different kafka partitions in Structured Streaming

2017-09-04 Thread 张万新
@Rayn It's frequently observed in our production environment that different
partition's consumption rate vary for kinds of reasons, including
performance difference of machines holding the partitions, unevenly
distribution of messages and so on. So I hope there can be some advice on
how to design a per-kafka-partition watermark in SS.

Ryan 于2017年9月2日周六 上午10:36写道:

> I don't think ss now support "partitioned" watermark. and why different
> partition's consumption rate vary? If the handling logic is quite
> different, using different topic is a better way.
>
> On Fri, Sep 1, 2017 at 4:59 PM, 张万新  wrote:
>
>> Thanks, it's true that looser watermark can guarantee more data not be
>> dropped, but at the same time more state need to be kept.   I just consider
>> if there is sth like kafka-partition-aware watermark in flink in SS may be
>> a better solution.
>>
>> Tathagata Das 于2017年8月31日周四 上午9:13写道:
>>
>>> Why not set the watermark to be looser, one that works across all
>>> partitions? The main usage of watermark is to drop state. If you loosen the
>>> watermark threshold (e.g. from 1 hour to 10 hours), then you will keep more
>>> state with older data, but you are guaranteed that you will not drop
>>> important data.
>>>
>>> On Wed, Aug 30, 2017 at 7:41 AM, KevinZwx 
>>> wrote:
>>>
 Hi,

 I'm working with Structured Streaming to process logs from kafka and use
 watermark to handle late events. Currently the watermark is computed by
 (max
 event time seen by the engine - late threshold), and the same watermark
 is
 used for all partitions.

 But in production environment it happens frequently that different
 partition
 is consumed at different speed, the consumption of some partitions may
 be
 left behind, so the newest event time in these partitions may be much
 smaller than than the others'. In this case using the same watermark
 for all
 partitions may cause heavy data loss.

 So is there any way to achieve different watermark for different kafka
 partition or any plan to work on this?



 --
 Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org


>>>
>


Re: Different watermark for different kafka partitions in Structured Streaming

2017-09-01 Thread Ryan
I don't think ss now support "partitioned" watermark. and why different
partition's consumption rate vary? If the handling logic is quite
different, using different topic is a better way.

On Fri, Sep 1, 2017 at 4:59 PM, 张万新  wrote:

> Thanks, it's true that looser watermark can guarantee more data not be
> dropped, but at the same time more state need to be kept.   I just consider
> if there is sth like kafka-partition-aware watermark in flink in SS may be
> a better solution.
>
> Tathagata Das 于2017年8月31日周四 上午9:13写道:
>
>> Why not set the watermark to be looser, one that works across all
>> partitions? The main usage of watermark is to drop state. If you loosen the
>> watermark threshold (e.g. from 1 hour to 10 hours), then you will keep more
>> state with older data, but you are guaranteed that you will not drop
>> important data.
>>
>> On Wed, Aug 30, 2017 at 7:41 AM, KevinZwx  wrote:
>>
>>> Hi,
>>>
>>> I'm working with Structured Streaming to process logs from kafka and use
>>> watermark to handle late events. Currently the watermark is computed by
>>> (max
>>> event time seen by the engine - late threshold), and the same watermark
>>> is
>>> used for all partitions.
>>>
>>> But in production environment it happens frequently that different
>>> partition
>>> is consumed at different speed, the consumption of some partitions may be
>>> left behind, so the newest event time in these partitions may be much
>>> smaller than than the others'. In this case using the same watermark for
>>> all
>>> partitions may cause heavy data loss.
>>>
>>> So is there any way to achieve different watermark for different kafka
>>> partition or any plan to work on this?
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>


Re: Different watermark for different kafka partitions in Structured Streaming

2017-09-01 Thread 张万新
Thanks, it's true that looser watermark can guarantee more data not be
dropped, but at the same time more state need to be kept.   I just consider
if there is sth like kafka-partition-aware watermark in flink in SS may be
a better solution.

Tathagata Das 于2017年8月31日周四 上午9:13写道:

> Why not set the watermark to be looser, one that works across all
> partitions? The main usage of watermark is to drop state. If you loosen the
> watermark threshold (e.g. from 1 hour to 10 hours), then you will keep more
> state with older data, but you are guaranteed that you will not drop
> important data.
>
> On Wed, Aug 30, 2017 at 7:41 AM, KevinZwx  wrote:
>
>> Hi,
>>
>> I'm working with Structured Streaming to process logs from kafka and use
>> watermark to handle late events. Currently the watermark is computed by
>> (max
>> event time seen by the engine - late threshold), and the same watermark is
>> used for all partitions.
>>
>> But in production environment it happens frequently that different
>> partition
>> is consumed at different speed, the consumption of some partitions may be
>> left behind, so the newest event time in these partitions may be much
>> smaller than than the others'. In this case using the same watermark for
>> all
>> partitions may cause heavy data loss.
>>
>> So is there any way to achieve different watermark for different kafka
>> partition or any plan to work on this?
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Different watermark for different kafka partitions in Structured Streaming

2017-08-30 Thread Tathagata Das
Why not set the watermark to be looser, one that works across all
partitions? The main usage of watermark is to drop state. If you loosen the
watermark threshold (e.g. from 1 hour to 10 hours), then you will keep more
state with older data, but you are guaranteed that you will not drop
important data.

On Wed, Aug 30, 2017 at 7:41 AM, KevinZwx  wrote:

> Hi,
>
> I'm working with Structured Streaming to process logs from kafka and use
> watermark to handle late events. Currently the watermark is computed by
> (max
> event time seen by the engine - late threshold), and the same watermark is
> used for all partitions.
>
> But in production environment it happens frequently that different
> partition
> is consumed at different speed, the consumption of some partitions may be
> left behind, so the newest event time in these partitions may be much
> smaller than than the others'. In this case using the same watermark for
> all
> partitions may cause heavy data loss.
>
> So is there any way to achieve different watermark for different kafka
> partition or any plan to work on this?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Different watermark for different kafka partitions in Structured Streaming

2017-08-30 Thread KevinZwx
Hi,

I'm working with Structured Streaming to process logs from kafka and use
watermark to handle late events. Currently the watermark is computed by (max
event time seen by the engine - late threshold), and the same watermark is
used for all partitions.

But in production environment it happens frequently that different partition
is consumed at different speed, the consumption of some partitions may be
left behind, so the newest event time in these partitions may be much
smaller than than the others'. In this case using the same watermark for all
partitions may cause heavy data loss. 

So is there any way to achieve different watermark for different kafka
partition or any plan to work on this?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Different watermark for different kafka partitions in Structured Streaming

2017-08-30 Thread 张万新
Hi,

I'm working with Structured Streaming to process logs from kafka and use
watermark to handle late events. Currently the watermark is computed by (max
event time seen by the engine - late threshold), and the same watermark is
used for all partitions.

But in production environment it happens frequently that different
partition is consumed at different speed, the consumption of some
partitions may be left behind, so the newest event time in these partitions
may be much smaller than than the others'. In this case using the same
watermark for all partitions may cause heavy data loss.

So is there any way to achieve different watermark for different kafka
partition or any plan to work on this?


Different watermark for different kafka partitions in Structured Streaming

2017-08-30 Thread 张万新
Hi,

I'm working with Structured Streaming to process logs from kafka and use
watermark to handle late events. Currently the watermark is computed by (max
event time seen by the engine - late threshold), and the same watermark is
used for all partitions.

But in production environment it happens frequently that different
partition is consumed at different speed, the consumption of some
partitions may be left behind, so the newest event time in these partitions
may be much smaller than than the others'. In this case using the same
watermark for all partitions may cause heavy data loss.

So is there any way to achieve different watermark for different kafka
partition or any plan to work on this?
-- 
Wanxin Zhang,
Master candidate,
National Lab for Parallel and Distributed Processing (PDL),
School of Computer Science,
National University of Defense Technology,
Changsha, Hunan, China