Hi Christophe, where does the backpressure appear? In front of the sink operator or before the window operator?
In any case, I think you can improve your WindowFunction if you convert parts of it into a FoldFunction<ANA, SummaryStatistics>. The FoldFunction would take care of the statistics computation and the WindowFunction would only assemble the result record including extracting the start time of the window. Then you could do: ws.apply(new SummaryStatistics(), new YourFoldFunction(), new YourWindowFunction()); This is more efficient because the FoldFunction is eagerly applied when ever a new element is added to a window. Hence, the window does only hold a single value (SummaryStatistics) instead of all element added to the window. In contrast the WindowFunction is called when the window is finally evaluated. Hope this helps, Fabian 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck < christophe.salperw...@gmail.com>: > Hi, > > I am writing a program to read timeseries from HBase and do some daily > aggregations (Flink streaming). For now I am just computing some average so > not very consuming but my HBase read get slower and slower (I have few > billions of points to read). The back pressure is almost all the time close > to 1. > > I use custom timestamp: > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > so I implemented a custom extractor based on: > AscendingTimestampExtractor > > At the beginning I have 5M reads/s and after 15 min I have just 1M read/s > then it get worse and worse. Even when I cancel the job, data are still > being written in HBase (I did a sink similar to the example - with a cache > of 100s of HBase Puts to be a bit more efficient). > > When I don't put a sink it seems to stay on 1M reads/s. > > Do you have an idea why ? > > Here is a bit of code if needed: > final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0) > .assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor()) > .keyBy(0) > .timeWindow(Time.days(1)); > > final SingleOutputStreamOperator<Put> puts = ws.apply(new > WindowFunction<ANA, Put, Tuple, TimeWindow>() { > > @Override > public void apply(final Tuple key, final TimeWindow window, final > Iterable<ANA> input, > final Collector<Put> out) throws Exception { > > final SummaryStatistics summaryStatistics = new SummaryStatistics(); > for (final ANA ana : input) { > summaryStatistics.addValue(ana.getValue()); > } > final Put put = buildPut((String) key.getField(0), window.getStart(), > summaryStatistics); > out.collect(put); > } > }); > > And how I started Flink on YARN : > flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 > -Dtaskmanager.network.numberOfBuffers=4096 > > Thanks for any feedback! > > Christophe >