Re[2]: Flink CEP: can't process PatternStream (v 1.12, EventTime mode)

2021-02-26 Thread Люльченко Юрий Николаевич

David,
 
Thank you again for a reply. It really looks like this situation is happened 
because of the parallel instances.
 
Best,
Yuri L.
  
>Пятница, 26 февраля 2021, 15:40 +03:00 от Dawid Wysakowicz 
>:
> 
>Hi,
>What is exactly the problem? Is it that no patterns are being generated?
>Usually the problem is in idle parallel instances[1]. You need to have data 
>flowing in each of the parallel instances for a watermark to progress. You can 
>also read about it in the aspect of Kafka's partitions[2].
>Best,
>Dawid
>[1]  
>https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#dealing-with-idle-sources
>[2]  
>https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector
>On 26/02/2021 13:21, Люльченко Юрий Николаевич wrote:
>>Hello,
>> 
>>I’ve already asked the question today and got the solve:  
>>http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-can-t-process-PatternStream-td41722.html
>> , and it’s clean for me how PatternStream works with ProcessTime.
>> 
>>But I need help again, I can’t write proper code to execute PatternStream 
>>with EventTime regime.
>>I think the problem is how I assign the watermark strategy.
>> 
>>My code is below, version of Flink is 1.12:
>> 
>>public class Main {
>> 
>>    public static void main(String[] args) throws Exception {
>> 
>>    Properties properties = new Properties();
>>    properties.put("group.id", "Flink");
>>    properties.put("bootstrap.servers", "broker:9092");
>> 
>> 
>>    StreamExecutionEnvironment env = 
>>StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>>    FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>(
>>    "test",
>>    new SimpleStringSchema(),
>>    properties);
>> 
>>    DataStream stream = env
>>    .addSource(consumer)
>>    .map((MapFunction) s -> {
>>                    //  Just getting an object model
>>    return model.toString();
>>    }). 
>>assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20))
>>    .withTimestampAssigner((event, timestamp) -> {
>>    Model model = new Gson().fromJson(event, 
>>Model.class);
>>    return model.getServerTime();
>>     }));
>> 
>>    stream.print("Stream");
>> 
>> 
>> 
>>    Pattern firstPattern = Pattern
>>    .begin("first")
>>    .where(new IterativeCondition() {
>>    @Override
>>    public boolean filter(String s, Context context) 
>>throws Exception {
>>    return s.contains("Start");
>>    }
>>    });
>> 
>>    DataStream result = CEP
>>    .pattern(stream, firstPattern)
>>     .inEventTime() //  default TimeCharacteristic for 1.12
>>    .process(new PatternProcessFunction() {
>>    @Override
>>    public void processMatch(Map> map, 
>>Context context, Collector collector) throws Exception {
>>    collector.collect(map.get("first").get(0));
>>    }
>>    });
>> 
>>    result.print("Result");
>> 
>>    env.execute();
>>}
>> 
>>}
>> 
>>Please, help me to correct the code )
>> 
>>Thanks, Yuri L.
>> Ответить
>> Переслать
>> Предложить звонок
>> Создать событие
>>Принято Хорошо Все понятно, спасибо за информацию
>> 
>> 
>> 
 
 
Люльченко Юрий
 

Re: Flink CEP: can't process PatternStream

2021-02-26 Thread Maminspapin
Hello, David.

Yes, I’m using 1.12. And my code is now working. Thank you very much for
your comment.
 
Yuri L.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink CEP: can't process PatternStream (v 1.12, EventTime mode)

2021-02-26 Thread Dawid Wysakowicz
Hi,

What is exactly the problem? Is it that no patterns are being generated?

Usually the problem is in idle parallel instances[1]. You need to have
data flowing in each of the parallel instances for a watermark to
progress. You can also read about it in the aspect of Kafka's partitions[2].

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#dealing-with-idle-sources

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector

On 26/02/2021 13:21, Люльченко Юрий Николаевич wrote:
> Hello,
>  
> I’ve already asked the question today and got the
> solve: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-can-t-process-PatternStream-td41722.html
> ,
>  and
> it’s clean for me how PatternStream works with ProcessTime.
>  
> But I need help again, I can’t write proper code to execute
> PatternStream with EventTime regime.
> I think the problem is how I assign the watermark strategy.
>  
> My code is below, version of Flink is 1.12:
>  
>
> public class Main {
>
>  
>
>     public static void main(String[] args) throws Exception {
>
>  
>
>     Properties properties = new Properties();
>
>     properties.put("group.id", "Flink");
>
>     properties.put("bootstrap.servers", "broker:9092");
>
>  
>
>  
>
>     StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>  
>
>     FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>(
>
>     "test",
>
>     new SimpleStringSchema(),
>
>     properties);
>
>  
>
>     DataStream stream = env
>
>     .addSource(consumer)
>
>     .map((MapFunction) s -> {
>
>                     // /Just getting an object model/
>
>     return model.toString();
>
>    
> }).*assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20))*
>
> *    .withTimestampAssigner((event, timestamp) -> {*
>
> *    Model model = new Gson().fromJson(event,
> Model.class);*
>
> *    return model.getServerTime();*
>
>   *  }));*
>
>  
>
>     stream.print("Stream");
>
>  
>
>  
>
>  
>
>     Pattern firstPattern = Pattern
>
>     .begin("first")
>
>     .where(new IterativeCondition() {
>
>     @Override
>
>     public boolean filter(String s, Context
> context) throws Exception {
>
>     return s.contains("Start");
>
>     }
>
>     });
>
>  
>
>     DataStream result = CEP
>
>     .pattern(stream, firstPattern)
>
>     *.inEventTime() // default TimeCharacteristic for 1.12*
>
>     .process(new PatternProcessFunction() {
>
>     @Override
>
>     public void processMatch(Map>
> map, Context context, Collector collector) throws Exception {
>
>     collector.collect(map.get("first").get(0));
>
>     }
>
>     });
>
>  
>
>     result.print("Result");
>
>  
>
>     env.execute();
>
> }
>
>  
>
> }
>
>  
> Please, help me to correct the code )
>  
> Thanks,Yuri L.
>  Ответить
>  Переслать
>  Предложить звонок
>  Создать событие
> ПринятоХорошоВсе понятно, спасибо за информацию
>
>  
>  
>  


OpenPGP_signature
Description: OpenPGP digital signature


Flink CEP: can't process PatternStream (v 1.12, EventTime mode)

2021-02-26 Thread Люльченко Юрий Николаевич

Hello,
 
I’ve already asked the question today and got the solve:  
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-can-t-process-PatternStream-td41722.html
 , and it’s clean for me how PatternStream works with ProcessTime.
 
But I need help again, I can’t write proper code to execute PatternStream with 
EventTime regime.
I think the problem is how I assign the watermark strategy.
 
My code is below, version of Flink is 1.12:
 
public class Main {
 
    public static void main(String[] args) throws Exception {
 
    Properties properties = new Properties();
    properties.put("group.id", "Flink");
    properties.put("bootstrap.servers", "broker:9092");
 
 
    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
    FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>(
    "test",
    new SimpleStringSchema(),
    properties);
 
    DataStream stream = env
    .addSource(consumer)
    .map((MapFunction) s -> {
                    //  Just getting an object model
    return model.toString();
    }). 
assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20))
    .withTimestampAssigner((event, timestamp) -> {
    Model model = new Gson().fromJson(event, 
Model.class);
    return model.getServerTime();
     }));
 
    stream.print("Stream");
 
 
 
    Pattern firstPattern = Pattern
    .begin("first")
    .where(new IterativeCondition() {
    @Override
    public boolean filter(String s, Context context) 
throws Exception {
    return s.contains("Start");
    }
    });
 
    DataStream result = CEP
    .pattern(stream, firstPattern)
     .inEventTime() //  default TimeCharacteristic for 1.12
    .process(new PatternProcessFunction() {
    @Override
    public void processMatch(Map> map, 
Context context, Collector collector) throws Exception {
    collector.collect(map.get("first").get(0));
    }
    });
 
    result.print("Result");
 
    env.execute();
}
 
}
 
Please, help me to correct the code )
 
Thanks, Yuri L.
 Ответить
 Переслать
 Предложить звонок
 Создать событие
Принято Хорошо Все понятно, спасибо за информацию
 
 
 

Re: Flink CEP: can't process PatternStream

2021-02-26 Thread Dawid Wysakowicz
Hi Yuri,

Which Flink version are you using? Is it 1.12? In 1.12 we changed the
default TimeCharacteristic to EventTime. Therefore you need watermarks
and timestamp[1] for your program to work correctly. If you want to
apply your pattern in ProcessingTime you can do:

PatternStream patternStream = CEP.pattern(stream,
pattern).inProcessingTime();

Basically you are facing exactly the same problem as described in the
stackoverflow entry you posted.

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#generating-watermarks

On 26/02/2021 09:18, Люльченко Юрий Николаевич wrote:
> Hello everyone.
>  
> I’m trying to use Flink Cep library and I want to fetch some events by
> pattern. At first I’ve created a simple HelloWorld project. But I have
> a problem exactly like it described
> here: 
> https://stackoverflow.com/questions/39575991/flink-cep-no-results-printed
> 
>  
> You can see my code at this
> page: https://gist.github.com/Maminspapin/07615706f4ce975eb3cf5f0b407b0644
> 
>  
> No actions are heppend at this block:
>  
> *        DataStream alerts = patternStream
>                 .process(new PatternProcessFunction() {
>                     @Override
>                     public void processMatch(Map>
> map, Context context, Collector collector)
>                             throws Exception {
>                        
>                         String first = map.get("first").get(0);
>                         System.out.println("First: " + first);
>                     }
>                 });*
> *        alerts.print();*
>  
> Can someone help me understand the cause?
>  
> Thanks,
> Yuri L.
>  


OpenPGP_signature
Description: OpenPGP digital signature


Flink CEP: can't process PatternStream

2021-02-26 Thread Люльченко Юрий Николаевич

Hello everyone.
 
I’m trying to use Flink Cep library and I want to fetch some events by pattern. 
At first I’ve created a simple HelloWorld project. But I have a problem exactly 
like it described here:  
https://stackoverflow.com/questions/39575991/flink-cep-no-results-printed
 
You can see my code at this page:  
https://gist.github.com/Maminspapin/07615706f4ce975eb3cf5f0b407b0644
 
No actions are heppend at this block:
 
        DataStream alerts = patternStream
                .process(new PatternProcessFunction() {
                    @Override
                    public void processMatch(Map> map, 
Context context, Collector collector)
                            throws Exception {
                       
                        String first = map.get("first").get(0);
                        System.out.println("First: " + first);
                    }
                });
        alerts.print();
 
Can someone help me understand the cause?
 
Thanks,
Yuri L.