Hi Sam,

could you please also send the code of the timestamp/watermark assigner?
This could also affect things.


Best,

Aljoscha





On Thu, Mar 9, 2017, at 19:58, Sam Huang wrote:

> Hi Aljoscha,

> 

> Here's the code:

> private static class DataFilterFunImpl extends
> RichCoFlatMapFunction<KVTuple6, String, KVTuple6> {
>
>         private JSONParser parser; private Map<String, Map<String,
>         ControlJsonConfig>> whiteListMap = new HashMap<>();
>
>         @Override // tuple5(domain, device_type, type, key,
>         count_or_sum) public void flatMap1(KVTuple6 dataTuple,
>         Collector<KVTuple6> collector) throws Exception {
>
>             String type = dataTuple.f2; String[] keyValue = dataTuple-
>             .f3.split(RawEventExtractor.Constants.*DEFAULT_VALUE_SP*);
>             String key = keyValue[]; switch (type) {
>
>                 case RawEventExtractor.Constants.*VALUE_COUNT*: {
> 

>                     if (whiteListMap.containsKey(key)) {
> 

>                         ControlJsonConfig ruleConfig = whiteListMap.g-
>                         et(key).get(RawEventExtractor.Constants.*VALU-
>                         E_COUNT*); if (ruleConfig != null) {
>
>                             String value = keyValue.length > 1 ?
>                             keyValue[1] : ""; String bucket =
>                             ruleConfig.getBucketName(value); if
>                             (bucket != null) {
>
>                                 dataTuple.setField(String.*join*(RawE-
>                                 ventExtractor.Constants.*DEFAULT_VALU-
>                                 E_SP*, key, bucket), 3);
>                                 collector.collect(dataTuple); }
>
>                         } else {
> 

>                             collector.collect(dataTuple); }
>
>                     }
> 

>                     break;
>                 }
> 

>                 case RawEventExtractor.Constants.*VALUE_SUM*: {
> 

>                     if (whiteListMap.containsKey(key) && whiteListMap-
>                     .get(key).containsKey(RawEventExtractor.Constants-
>                     .*VALUE_SUM*)) {
>
>                         collector.collect(dataTuple); }
>
>                     break;
>                 }
> 

>                 default: collector.collect(dataTuple); }
>
>         }
> 

> 
> 

> 
> 

>         @Override public void flatMap2(String jsonStr,
>         Collector<KVTuple6> collector) throws Exception {
>
> //            Map<String, Map<String, ControlJsonConfig>> whiteListMap
> = whiteListMapState.value();            try {
>
>                 if (parser == null) {
> 

>                      parser = new JSONParser(); }
>
>                 JSONObject jsonConfig = (JSONObject)
>                 parser.parse(jsonStr); Tuple2<String, Map<String,
>                 ControlJsonConfig>> config =
>                 RawEventExtractor.*getKeyConfig*(jsonConfig); if
>                 (config.f1 == null) {
>
>                     whiteListMap.remove(config.f0); } else {
>
>                     whiteListMap.put(config.f0, config.f1); }
>
>             } catch (Exception e) {}
> 

>         }
> 

>     }
> 

> 

> FYI, if I setParallelism of both the control stream and data stream,
> the window function works. Is it necessary to do so for broadcast()
> function?
> 

> 

> On Thu, Mar 9, 2017 at 2:26 AM, Aljoscha Krettek
> <aljos...@apache.org> wrote:
>> __

>> Hi Sam,

>> could you please also send the code for the DataFilterFunImpl and
>> your timestamps/watermark assigner. That could help in figuring out
>> the problem.
>> 

>> Best,

>> Aljoscha

>> 

>> 

>> On Wed, Mar 8, 2017, at 19:56, Sam Huang wrote:

>>> Hi Timo,

>>> 

>>> The window function sinks the data into InfluxDB, and it's not
>>> triggered.
>>> If I comment the ".timeWindow", and print results after the reduce
>>> function, it works
>>> Code for window function is here:

>>> 

>>> private static class WindowFunImpl implements
>>> WindowFunction<KVTuple6,Point,Tuple,TimeWindow> {
>>>
>>> 
>>> 
>>> 

>>>     @Override public void apply(Tuple tuple, TimeWindow window,
>>>     Iterable<KVTuple6> iterable,                  Collector<Point>
>>>     collector) throws Exception {
>>>
>>> 
>>> 
>>> 

>>>         KVTuple6 kvTypeTuple = iterable.iterator().next();
>>>         System.*out*.println("window: " + kvTypeTuple);
>>>         // Doesn't work here if use broadcast Point.Builder builder
>>>         = Point.*measurement*(*INFLUXDB_MEASUREMENT*)
>>>
>>> 
>>> 
>>> 

>>>                 .time(window.getStart(), TimeUnit.*MILLISECONDS*)
>>>
>>> 
>>> 
>>> 

>>>                 .tag(*TAG_DOMAIN*, kvTypeTuple.f0)
>>> 

>>> 
>>> 
>>> 

>>>                 .tag(*TAG_DEVICE*, kvTypeTuple.f1)
>>> 

>>> 
>>> 
>>> 

>>>                 .tag(*TAG_TYPE*, kvTypeTuple.f2)
>>> 

>>> 
>>> 
>>> 

>>>                 .tag(*TAG_KEY*, kvTypeTuple.f3)
>>> 

>>> 
>>> 
>>> 

>>>                 .addField(*FIELD*, kvTypeTuple.f4);
>>>
>>>         collector.collect(builder.build()); }
>>>
>>> 
>>> 
>>> 

>>> }
>>> 

>>> 
>>> 
>>> 

>>> 

>>> On Wed, Mar 8, 2017 at 1:10 AM, Timo Walther <twal...@apache.org>
>>> wrote:
>>>> Hi Sam,

>>>> 

>>>> could you explain the behavior a bit more? How does the window
>>>> function behave? Is it not triggered or what is the content? What
>>>> is the result if you don't use a window function?
>>>> 

>>>> Timo

>>>> 

>>>> 

>>>> Am 08/03/17 um 02:59 schrieb Sam Huang:

>>>> 

>>>>> btw, the reduce function works well, I've printed out the data,
>>>>> and they are
>>>>> all correct. So are the timestamps and watermarks. And if I remove
>>>>> ".broadcast()", the data is successfully sinked.

>>>>> 

>>>>> Any help?

>>>>> 

>>>>> 

>>>>> 

>>>>> --

>>>>> View this message in context:
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/window-function-not-working-when-control-stream-broadcast-tp12093p12094.html
>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>> archive at Nabble.com.
>>>> 

>> 


Reply via email to