OK, this indicates that the operator following the source is a bottleneck. If that's the WindowOperator, it makes sense to try the refactoring of the WindowFunction. Alternatively, you can try to run that operator with a higher parallelism.
2016-06-09 17:39 GMT+02:00 Christophe Salperwyck < christophe.salperw...@gmail.com>: > Hi Fabian, > > Thanks for the help, I will try that. The backpressure was on the source > (HBase). > > Christophe > > 2016-06-09 16:38 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > >> Hi Christophe, >> >> where does the backpressure appear? In front of the sink operator or >> before the window operator? >> >> In any case, I think you can improve your WindowFunction if you convert >> parts of it into a FoldFunction<ANA, SummaryStatistics>. >> The FoldFunction would take care of the statistics computation and the >> WindowFunction would only assemble the result record including extracting >> the start time of the window. >> >> Then you could do: >> >> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new >> YourWindowFunction()); >> >> This is more efficient because the FoldFunction is eagerly applied when >> ever a new element is added to a window. Hence, the window does only hold a >> single value (SummaryStatistics) instead of all element added to the >> window. In contrast the WindowFunction is called when the window is finally >> evaluated. >> >> Hope this helps, >> Fabian >> >> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck < >> christophe.salperw...@gmail.com>: >> >>> Hi, >>> >>> I am writing a program to read timeseries from HBase and do some daily >>> aggregations (Flink streaming). For now I am just computing some average so >>> not very consuming but my HBase read get slower and slower (I have few >>> billions of points to read). The back pressure is almost all the time close >>> to 1. >>> >>> I use custom timestamp: >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>> >>> so I implemented a custom extractor based on: >>> AscendingTimestampExtractor >>> >>> At the beginning I have 5M reads/s and after 15 min I have just 1M >>> read/s then it get worse and worse. Even when I cancel the job, data are >>> still being written in HBase (I did a sink similar to the example - with a >>> cache of 100s of HBase Puts to be a bit more efficient). >>> >>> When I don't put a sink it seems to stay on 1M reads/s. >>> >>> Do you have an idea why ? >>> >>> Here is a bit of code if needed: >>> final WindowedStream<ANA, Tuple, TimeWindow> ws = hbaseDS.keyBy(0) >>> .assignTimestampsAndWatermarks(new xxxxAscendingTimestampExtractor()) >>> .keyBy(0) >>> .timeWindow(Time.days(1)); >>> >>> final SingleOutputStreamOperator<Put> puts = ws.apply(new >>> WindowFunction<ANA, Put, Tuple, TimeWindow>() { >>> >>> @Override >>> public void apply(final Tuple key, final TimeWindow window, final >>> Iterable<ANA> input, >>> final Collector<Put> out) throws Exception { >>> >>> final SummaryStatistics summaryStatistics = new SummaryStatistics(); >>> for (final ANA ana : input) { >>> summaryStatistics.addValue(ana.getValue()); >>> } >>> final Put put = buildPut((String) key.getField(0), window.getStart(), >>> summaryStatistics); >>> out.collect(put); >>> } >>> }); >>> >>> And how I started Flink on YARN : >>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2 >>> -Dtaskmanager.network.numberOfBuffers=4096 >>> >>> Thanks for any feedback! >>> >>> Christophe >>> >> >> >