OK, this indicates that the operator following the source is a bottleneck.

If that's the WindowOperator, it makes sense to try the refactoring of the
WindowFunction.
Alternatively, you can try to run that operator with a higher parallelism.

2016-06-09 17:39 GMT+02:00 Christophe Salperwyck <
christophe.salperw...@gmail.com>:

> Hi Fabian,
>
> Thanks for the help, I will try that. The backpressure was on the source
> (HBase).
>
> Christophe
>
> 2016-06-09 16:38 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>
>> Hi Christophe,
>>
>> where does the backpressure appear? In front of the sink operator or
>> before the window operator?
>>
>> In any case, I think you can improve your WindowFunction if you convert
>> parts of it into a FoldFunction<ANA, SummaryStatistics>.
>> The FoldFunction would take care of the statistics computation and the
>> WindowFunction would only assemble the result record including extracting
>> the start time of the window.
>>
>> Then you could do:
>>
>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
>> YourWindowFunction());
>>
>> This is more efficient because the FoldFunction is eagerly applied when
>> ever a new element is added to a window. Hence, the window does only hold a
>> single value (SummaryStatistics) instead of all element added to the
>> window. In contrast the WindowFunction is called when the window is finally
>> evaluated.
>>
>> Hope this helps,
>> Fabian
>>
>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck <
>> christophe.salperw...@gmail.com>:
>>
>>> Hi,
>>>
>>> I am writing a program to read timeseries from HBase and do some daily
>>> aggregations (Flink streaming). For now I am just computing some average so
>>> not very consuming but my HBase read get slower and slower (I have few
>>> billions of points to read). The back pressure is almost all the time close
>>> to 1.
>>>
>>> I use custom timestamp:
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>
>>> so I implemented a custom extractor based on:
>>> AscendingTimestampExtractor
>>>
>>> At the beginning I have 5M reads/s and after 15 min I have just 1M
>>> read/s then it get worse and worse. Even when I cancel the job, data are
>>> still being written in HBase (I did a sink similar to the example - with a
>>> cache of 100s of HBase Puts to be a bit more efficient).
>>>
>>> When I don't put a sink it seems to stay on 1M reads/s.
>>>
>>> Do you have an idea why ?
>>>
>>> Here is a bit of code if needed:
>>> final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0)
>>> .assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor())
>>> .keyBy(0)
>>> .timeWindow(Time.days(1));
>>>
>>> final SingleOutputStreamOperator<Put> puts = ws.apply(new
>>> WindowFunction<ANA, Put, Tuple, TimeWindow>() {
>>>
>>> @Override
>>> public void apply(final Tuple key, final TimeWindow window, final
>>> Iterable<ANA> input,
>>> final Collector<Put> out) throws Exception {
>>>
>>> final SummaryStatistics summaryStatistics = new SummaryStatistics();
>>> for (final ANA ana : input) {
>>> summaryStatistics.addValue(ana.getValue());
>>> }
>>> final Put put = buildPut((String) key.getField(0), window.getStart(),
>>> summaryStatistics);
>>> out.collect(put);
>>> }
>>> });
>>>
>>> And how I started Flink on YARN :
>>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
>>> -Dtaskmanager.network.numberOfBuffers=4096
>>>
>>> Thanks for any feedback!
>>>
>>> Christophe
>>>
>>
>>
>

Reply via email to