Hi Sam, could you please also send the code of the timestamp/watermark assigner? This could also affect things.
Best, Aljoscha On Thu, Mar 9, 2017, at 19:58, Sam Huang wrote: > Hi Aljoscha, > > Here's the code: > private static class DataFilterFunImpl extends > RichCoFlatMapFunction<KVTuple6, String, KVTuple6> { > > private JSONParser parser; private Map<String, Map<String, > ControlJsonConfig>> whiteListMap = new HashMap<>(); > > @Override // tuple5(domain, device_type, type, key, > count_or_sum) public void flatMap1(KVTuple6 dataTuple, > Collector<KVTuple6> collector) throws Exception { > > String type = dataTuple.f2; String[] keyValue = dataTuple- > .f3.split(RawEventExtractor.Constants.*DEFAULT_VALUE_SP*); > String key = keyValue[]; switch (type) { > > case RawEventExtractor.Constants.*VALUE_COUNT*: { > > if (whiteListMap.containsKey(key)) { > > ControlJsonConfig ruleConfig = whiteListMap.g- > et(key).get(RawEventExtractor.Constants.*VALU- > E_COUNT*); if (ruleConfig != null) { > > String value = keyValue.length > 1 ? > keyValue[1] : ""; String bucket = > ruleConfig.getBucketName(value); if > (bucket != null) { > > dataTuple.setField(String.*join*(RawE- > ventExtractor.Constants.*DEFAULT_VALU- > E_SP*, key, bucket), 3); > collector.collect(dataTuple); } > > } else { > > collector.collect(dataTuple); } > > } > > break; > } > > case RawEventExtractor.Constants.*VALUE_SUM*: { > > if (whiteListMap.containsKey(key) && whiteListMap- > .get(key).containsKey(RawEventExtractor.Constants- > .*VALUE_SUM*)) { > > collector.collect(dataTuple); } > > break; > } > > default: collector.collect(dataTuple); } > > } > > > > > > @Override public void flatMap2(String jsonStr, > Collector<KVTuple6> collector) throws Exception { > > // Map<String, Map<String, ControlJsonConfig>> whiteListMap > = whiteListMapState.value(); try { > > if (parser == null) { > > parser = new JSONParser(); } > > JSONObject jsonConfig = (JSONObject) > parser.parse(jsonStr); Tuple2<String, Map<String, > ControlJsonConfig>> config = > RawEventExtractor.*getKeyConfig*(jsonConfig); if > (config.f1 == null) { > > whiteListMap.remove(config.f0); } else { > > whiteListMap.put(config.f0, config.f1); } > > } catch (Exception e) {} > > } > > } > > > FYI, if I setParallelism of both the control stream and data stream, > the window function works. Is it necessary to do so for broadcast() > function? > > > On Thu, Mar 9, 2017 at 2:26 AM, Aljoscha Krettek > <aljos...@apache.org> wrote: >> __ >> Hi Sam, >> could you please also send the code for the DataFilterFunImpl and >> your timestamps/watermark assigner. That could help in figuring out >> the problem. >> >> Best, >> Aljoscha >> >> >> On Wed, Mar 8, 2017, at 19:56, Sam Huang wrote: >>> Hi Timo, >>> >>> The window function sinks the data into InfluxDB, and it's not >>> triggered. >>> If I comment the ".timeWindow", and print results after the reduce >>> function, it works >>> Code for window function is here: >>> >>> private static class WindowFunImpl implements >>> WindowFunction<KVTuple6,Point,Tuple,TimeWindow> { >>> >>> >>> >>> >>> @Override public void apply(Tuple tuple, TimeWindow window, >>> Iterable<KVTuple6> iterable, Collector<Point> >>> collector) throws Exception { >>> >>> >>> >>> >>> KVTuple6 kvTypeTuple = iterable.iterator().next(); >>> System.*out*.println("window: " + kvTypeTuple); >>> // Doesn't work here if use broadcast Point.Builder builder >>> = Point.*measurement*(*INFLUXDB_MEASUREMENT*) >>> >>> >>> >>> >>> .time(window.getStart(), TimeUnit.*MILLISECONDS*) >>> >>> >>> >>> >>> .tag(*TAG_DOMAIN*, kvTypeTuple.f0) >>> >>> >>> >>> >>> .tag(*TAG_DEVICE*, kvTypeTuple.f1) >>> >>> >>> >>> >>> .tag(*TAG_TYPE*, kvTypeTuple.f2) >>> >>> >>> >>> >>> .tag(*TAG_KEY*, kvTypeTuple.f3) >>> >>> >>> >>> >>> .addField(*FIELD*, kvTypeTuple.f4); >>> >>> collector.collect(builder.build()); } >>> >>> >>> >>> >>> } >>> >>> >>> >>> >>> >>> On Wed, Mar 8, 2017 at 1:10 AM, Timo Walther <twal...@apache.org> >>> wrote: >>>> Hi Sam, >>>> >>>> could you explain the behavior a bit more? How does the window >>>> function behave? Is it not triggered or what is the content? What >>>> is the result if you don't use a window function? >>>> >>>> Timo >>>> >>>> >>>> Am 08/03/17 um 02:59 schrieb Sam Huang: >>>> >>>>> btw, the reduce function works well, I've printed out the data, >>>>> and they are >>>>> all correct. So are the timestamps and watermarks. And if I remove >>>>> ".broadcast()", the data is successfully sinked. >>>>> >>>>> Any help? >>>>> >>>>> >>>>> >>>>> -- >>>>> View this message in context: >>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/window-function-not-working-when-control-stream-broadcast-tp12093p12094.html >>>>> Sent from the Apache Flink User Mailing List archive. mailing list >>>>> archive at Nabble.com. >>>> >>