Re: Keyed CEP checkpoint fails

2017-08-10 Thread Dawid Wysakowicz
As @Kostas asked in your previous thread would be possible for you to share 
your code for that job or at least a minimal example to reproduce this 
behaviour. I fear we won’t be able to help you without any further info.

Regards,
Dawid

> On 10 Aug 2017, at 14:10, Daiqing Li <lidaiqing1...@gmail.com> wrote:
> 
> Hi Flink user,
> 
> I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this 
> exception after running for a while. Could anyone give me some help to debug 
> this? I try parallelism 1, and it has the same problem. I also try 
> reimplemented hashcode and equals method. I use UUID as hashcode right now.
> 2017-08-09 18:15:04,572 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- 
> KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) 
> (d4749a4c3469732a2a5edf40b83f88
> d4) switched from RUNNING to FAILED.
> AsynchronousException{java.
> lang.Exception: Could not materialize checkpoint 946 for operator 
> KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).}
>   at org.apache.flink.streaming.
> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
> StreamTask.java:970)
>   at java.util.concurrent.
> Executors$RunnableAdapter.
> call(Executors.java:511)
>   at java.util.concurrent.
> FutureTask.run(FutureTask.
> java:266)
>   at java.util.concurrent.
> ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149)
>   at java.util.concurrent.
> ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.
> java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 946 for 
> operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).
>   ... 6 more
> Caused by: java.util.concurrent.
> ExecutionException: java.lang.IllegalStateException: Could not find id for 
> entry: SharedBufferEntry(
> ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1)
>   at java.util.concurrent.
> FutureTask.report(FutureTask.
> java:122)
>   at java.util.concurrent.
> FutureTask.get(FutureTask.
> java:192)
>   at org.apache.flink.util.
> FutureUtil.runIfNotDoneAndGet(
> FutureUtil.java:43)
>   at org.apache.flink.streaming.
> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
> StreamTask.java:897)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at org.apache.flink.streaming.
> api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:
> 90)
>   at org.apache.flink.streaming.
> runtime.tasks.StreamTask$AsyncCheckpointRunnable.
> cleanup(StreamTask.java:1023)
>   at org.apache.flink.streaming.
> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
> StreamTask.java:961)
>   ... 5 more
> 



signature.asc
Description: Message signed with OpenPGP


Re: CEP with Kafka source

2017-08-04 Thread Dawid Wysakowicz
Hi Björn,

You are correct that CEP library buffers events until a watermark with a 
greater timestamp arrives. It is because the order of events in case of CEP is 
crucial.
Imagine a Pattern like A next B. And sequence a(t=1) c(t=10) b(t=2). If we do 
not wait until the Watermark and sort the events upon arrival of it, we would 
not be able to produce proper results.

I don’t know how does your text-file approach looks like, but if it does work 
differently I would assume you do not work in EventTime.

Regards,
Dawid


> On 4 Aug 2017, at 09:40, Björn Hedström <bjorn.e.hedst...@gmail.com> wrote:
> 
> Hi,
> 
> I am writing a small application which monitors a couple of directories for
> files which are read by Kafka and later consumed by Flink. Flink then
> performs some operations on the records (such as extracting the embedded
> timestamp) and tries to find a pattern using CEP. Since the data can be out
> of order I am using a BoundedOutOfOrdernessTimestampExtractor with the
> window allowing for elements to come up to 24 hours late. The
> TimeCharacteristic is set to EventTime.
> 
> However here is where i run into some issues. I noticed that Flink does not
> start to process the data through the defined pattern until the watermark
> is greater than the  timestamp of the record. This issue does not appear
> when using a text-file directly as a source and disregarding Kafka. In
> practice this could mean that a pattern only consisting of two consecutive
> datapoints would not be found until another subsequent 22 datapoints are
> collected. It seems that I am missing something fundamental here and any
> help would be appreciated
> 
> I am using a FlinkKafkaConsumer010, Flink 1.3.0, Kafka 0.11.0.0
> 
> Best,
> Björn



signature.asc
Description: Message signed with OpenPGP


Re: CEP condition expression and its event consuming strategy

2017-07-31 Thread Dawid Wysakowicz

Ad. 1 Yes it returns and Iterable to support times and oneOrMore patterns(which 
can accept more than one event).

Ad. 2 Some use case for not discarding used events could be e.g. looking for 
some shapes in our data, e.g. W-shapes. In this case one W-shape could start on 
the middle peak of the previous one.

Unfortunately personally I can’t point you to any in-use applications. Maybe 
Kostas, I’ve added to the discussion, know of any.

Anyway, thanks for interest in the CEP library. We will be happy to hear any 
comments and suggestions for future improvements.



> On 28 Jul 2017, at 21:54, Chao Wang <chaow...@wustl.edu> wrote:
> 
> Hi Dawid,
> 
> Thank you.
> 
> Ad. 1 I noticed that the method getEventsForPattern() returns an Iterable 
> and we need to further invoke .operator().next() to get access to the event 
> value.
> 
> Ad. 2 Here is a bit about a use case we have that calls for such discarding 
> semantics. In the event processing project I am currently working on, input 
> event streams are sensor data, and we join streams and do Kalman filtering, 
> FFT, etc. We therefore choose to discard the accepted events once the data 
> they carry have been processed; otherwise, it may cause duplicated processing 
> as well as incorrect join semantics.
> 
> We came up with this question while doing an empirical comparison of Flink 
> and our system (implemented with the TAO real-time event service). We 
> implemented in our system such semantics, by removing input events once CEP 
> emits the corresponding output events.
> 
> Could you provide some use cases where the discarding semantics are not 
> needed? I guess I am wired into processing sensor data and thus cannot think 
> of a case where reusing accepted events would be of interest. Also, could you 
> share some pointers to streaming application in-use? We are seeking to make 
> our research work more relevant to current practice.
> 
> Thank you very much,
> 
> Chao
> 
> On 07/27/2017 02:17 AM, Dawid Wysakowicz wrote:
>> Hi Chao,
>> 
>> Ad. 1 You could implement it with IterativeCondition. Sth like this:
>> 
>> Pattern<Event, ?> pattern = Pattern.begin("first").where(new 
>> SimpleCondition() {
>>@Override
>>public boolean filter(Event value) throws Exception {
>>   return value.equals("A") || value.equals("B");
>>}
>> }).followedBy("second").where(new IterativeCondition() {
>>@Override
>>public boolean filter(Event value, Context ctx) throws Exception {
>>   return (value.equals("A") || value.equals("B")) && 
>> !value.equals(ctx.getEventsForPattern("first"));
>>}
>> }).
>> 
>> Ad. 2 Unfortunately right now as you said Pattern restarts each other event 
>> and it is not possible to change that strategy. There is ongoing work to 
>> introduce AfterMatchSkipStrategy[1], but at best it will be merged in 1.4.0. 
>> I did not give it much thought, but I would try implement some discarding 
>> logic.
>> 
>> Regards,
>> Dawid
>> 
>> [1] https://issues.apache.org/jira/browse/FLINK-7169
>> 
>>> On 26 Jul 2017, at 22:45, Chao Wang <chaow...@wustl.edu> wrote:
>>> 
>>> Hi,
>>> 
>>> I have two questions regarding the use of the Flink CEP library 
>>> (flink-cep_2.11:1.3.1), as follows:
>>> 
>>> 1. I'd like to know how to use the API to express "emit event C in the 
>>> presence of events A and B, with no restriction on the arriving order of A 
>>> and B"? I've tried by creating two patterns, one for "A and then B" and the 
>>> other for "B and then A", and consequently using two patternStreams to 
>>> handle each case, which emits C. It worked but to me this approach seems 
>>> redundant.
>>> 
>>> 2. Given the above objective expression, how to consume the accepted events 
>>> so that they will not be used for future matchings? For example, with the 
>>> arriving sequence {A, B, A}, the CEP should only emit one C (due to the 
>>> matching of {A,B}), not two Cs (due to {A,B} and {B,A}). Similarly, with 
>>> the arriving sequence {B, A, B, A}, the CPE should only emit two Cs, not 
>>> three.
>>> 
>>> 
>>> Thanks,
>>> 
>>> Chao
>>> 
> 



signature.asc
Description: Message signed with OpenPGP


Re: CEP condition expression and its event consuming strategy

2017-07-27 Thread Dawid Wysakowicz
Hi Chao,

Ad. 1 You could implement it with IterativeCondition. Sth like this:

Pattern<Event, ?> pattern = Pattern.begin("first").where(new 
SimpleCondition() {
   @Override
   public boolean filter(Event value) throws Exception {
  return value.equals("A") || value.equals("B");
   }
}).followedBy("second").where(new IterativeCondition() {
   @Override
   public boolean filter(Event value, Context ctx) throws Exception {
  return (value.equals("A") || value.equals("B")) && 
!value.equals(ctx.getEventsForPattern("first"));
   }
}).

Ad. 2 Unfortunately right now as you said Pattern restarts each other event and 
it is not possible to change that strategy. There is ongoing work to introduce 
AfterMatchSkipStrategy[1], but at best it will be merged in 1.4.0. I did not 
give it much thought, but I would try implement some discarding logic.

Regards,
Dawid

[1] https://issues.apache.org/jira/browse/FLINK-7169

> On 26 Jul 2017, at 22:45, Chao Wang <chaow...@wustl.edu> wrote:
> 
> Hi,
> 
> I have two questions regarding the use of the Flink CEP library 
> (flink-cep_2.11:1.3.1), as follows:
> 
> 1. I'd like to know how to use the API to express "emit event C in the 
> presence of events A and B, with no restriction on the arriving order of A 
> and B"? I've tried by creating two patterns, one for "A and then B" and the 
> other for "B and then A", and consequently using two patternStreams to handle 
> each case, which emits C. It worked but to me this approach seems redundant.
> 
> 2. Given the above objective expression, how to consume the accepted events 
> so that they will not be used for future matchings? For example, with the 
> arriving sequence {A, B, A}, the CEP should only emit one C (due to the 
> matching of {A,B}), not two Cs (due to {A,B} and {B,A}). Similarly, with the 
> arriving sequence {B, A, B, A}, the CPE should only emit two Cs, not three.
> 
> 
> Thanks,
> 
> Chao
> 



signature.asc
Description: Message signed with OpenPGP


Re: notNext() and next(negation) not yielding same output in Flink CEP

2017-07-23 Thread Dawid Wysakowicz
Hi Yassine,

First of all notNext(A) is not equal to next(not A). notNext should be 
considered as a “stopCondition” which tells if an event matching the A 
condition occurs the current partial match is discarded. The next(not A) on the 
other hand accepts every event that do not match the A condition.

So let’s analyze a sequence of events like “b c a1 a2 a3 d”. For the first 
version with next(not A) the output will be “c a1 a2 a3 d” which is what you 
expect, I think. In the other version with notNext(A) a partial match “c a1” 
will be discarded after “a2” as the notNext says that after the A’s there 
should be no A.

I hope this helps understanding how notNext works.

Regards,
Dawid

> On 22 Jul 2017, at 20:32, Yassine MARZOUGUI <y.marzou...@mindlytix.com> wrote:
> 
> Hi all,
> 
> I would like to match the maximal consecutive sequences of events of type A 
> in a stream.
> I'm using the following :
> Pattern.begin("start").where(event is not A)
> .next("middle").where(event is A).oneOrMore().consecutive()
> .next("not").where(event is not A)
> I This give the output I want. However if I use notNext("not").where(event is 
> A) instead of next("not").where(event is not A), the middle patterns contain 
> only sequences of single elements of type A.
> My understaning is that notNext() in this case is equivalent to 
> next(negation), so why is the output different?
> 
> Thank you in advance.
> 
> Best,
> Yassine



signature.asc
Description: Message signed with OpenPGP


Re: Getting Errors when using keyby()

2017-07-12 Thread Dawid Wysakowicz
Hi Sridhar,

Your class is missing default constructor(without arguments) thus it is not a 
valid POJO in Flink.

You can check the requirements for POJO in link here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#pojos


> On 12 Jul 2017, at 19:54, Sridhar Chellappa  wrote:
> 
> I have a DataStream on which I am applying a CEP pattern and grouping the 
> results using keyby(). The DataStream Object is a pojo :
> 
> public class DataStreamObject {
> private String field1;
> private String field2;
> 
> public DataStreamObject(String field1, String field2) {
> this.field1 = field1;
> this.field2 = field2;
> }
> 
> public void setField1(String field1) {
> this.field1 = field1;
> }
> 
> public String getField1() {
> return field1;
> }
> 
> 
> public void setField2(String field2) {
> this.field2 = field2;
> }
> 
> public String getField2() {
> return field2;
> }
> 
> @Override
> public boolean equals(Object o) {
> if (this == o) return true;
> if (!(o instanceof DataStreamObject)) return false;
> 
> DataStreamObject that = (DataStreamObject) o;
> 
> if (!getField1().equals(that.getField1())) return false;
> return getField2().equals(that.getField2());
> }
> 
> @Override
> public int hashCode() {
> int result = getField1().hashCode();
> result = 31 * result + getField2().hashCode();
> return result;
> }
> 
> @Override
> public String toString() {
> return "DriverSameAsCustomer{" +
> "field1='" + field1 + '\'' +
> ", field2='" + field2 + '\'' +
> '}';
> }
> }
> 
> When I submit my flinkjob, I get the following error :
> 
> 
> This type (GenericType) cannot be used as key.
>   
> org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:330)
>   
> org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:294)
>   com.foo.Main.main(Main.java:66)
> 
> 
> As I understand, I do not need to implement Key interface if the class is a 
> POJO (which it is).
> 
> Please help me understand where I am going wrong an suggest a fix.
> 
> 



signature.asc
Description: Message signed with OpenPGP


Re: Should customized Complex Events be Serializable?

2017-07-12 Thread Dawid Wysakowicz
What do you mean by ComplexEvents? Do you mean that the output of CEP library 
is DataStream? If so, then yes, they should be either 
Serializable or you should provide custom TypeSerializer.
> On 12 Jul 2017, at 06:58, Sridhar Chellappa  wrote:
> 
> Folks,
> 
> I am using the CEP library to create ComplexEvents. My question is, should 
> the ComplexEvents be serializable?



signature.asc
Description: Message signed with OpenPGP


Re: Queries regarding FlinkCEP

2017-06-20 Thread Dawid Wysakowicz
For example if there is no events in one of the partitions the partition
will always generate Watermark(Long.MIN_VALUE) which will result in
Watermark not being advanced. There is open JIRA to improve such
situations: FLINK-5479 <https://issues.apache.org/jira/browse/FLINK-5479>.

Z pozdrowieniami! / Cheers!

Dawid Wysakowicz

*Data/Software Engineer*

Skype: dawid_wys | Twitter: @OneMoreCoder

<http://getindata.com/>

2017-06-20 14:00 GMT+02:00 Biplob Biswas <revolutioni...@gmail.com>:

> Hi dawid,
>
> Yes I am reading from multiple topics and yes a few topics have multiple
> partitions, not all of them.
>
> But I didn't understand the concept of stalled partition.
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Queries-regarding-FlinkCEP-
> tp13454p13853.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Flink CEP not emitting timed out events properly

2017-06-20 Thread Dawid Wysakowicz
It is possible that two Watermarks will be passed through the Stream, but
at an operator that has more than one input streams(in your case from
different topics and partitions) the smallest Watermark is considered valid
and passed downstream.

Z pozdrowieniami! / Cheers!

Dawid Wysakowicz

*Data/Software Engineer*

Skype: dawid_wys | Twitter: @OneMoreCoder

<http://getindata.com/>

2017-06-20 13:50 GMT+02:00 Biplob Biswas <revolutioni...@gmail.com>:

> Hi dawid,
>
> First of all congratulations on being a Flink committer, saw your tweet in
> the morning.
>
> Now regarding that link, that talks about multiple partitions for a single
> topic, here I am talking about multiple topics each having different number
> of partitions.
>
> I tried adding tinestampextractor at the kafkasource, but
> I still observe different watermarks when I am logging the current
> watermark
> timestamp.
>
> If I expect the same behaviour shouldnt there be only one value for a
> watermark?
>
> Regards,
> Biplob
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-not-
> emitting-timed-out-events-properly-tp13794p13850.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Queries regarding FlinkCEP

2017-06-20 Thread Dawid Wysakowicz
Hi Biplop,

Your recent post on reading from different topics made me realise it may be
a problem with "stalled" partitions. Did your topic have more than one
partition? If it did, it may be the problem that Watermark is generated
independently per partition and then the smallest one is taken as a
"global" Watermark.

Z pozdrowieniami! / Cheers!

Dawid Wysakowicz

*Data/Software Engineer*

Skype: dawid_wys | Twitter: @OneMoreCoder

<http://getindata.com/>

2017-06-08 15:35 GMT+02:00 Biplob Biswas <revolutioni...@gmail.com>:

> Hi,
>
> Can anyone check, whether they can reproduce this issue on their end?
> There's no log yet as t what is happening. Any idea to debug this issue is
> well appreciated.
>
> Regards,
> Biplob
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Queries-regarding-FlinkCEP-
> tp13454p13591.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Flink CEP not emitting timed out events properly

2017-06-20 Thread Dawid Wysakowicz
Hi Biplop,

If an operator has two inputs, the resulting Watermark is the smallest one
from the two upstreams. More on that you can check here

.


Re: FlinkCEP 1.3 scala, cannot apply select function

2017-06-19 Thread Dawid Wysakowicz
Hi,

Because of some optimizations between java <-> scala collections
conversions, the type of Map used for select method is scala.collection.Map
instead of Predef.Map imported by default.

Try importing:

import scala.collection.Map


or use fully qualified name in function definition:

def myFunction(pattern: scala.collection.Map[String,Iterable[MyEventType]]):
> MyEventType = {
> val startEvent = pattern.get("first").get.head
> val endEvent = pattern.get("second").get.head
> // dummy functionality for illustrating purposes
> endEvent
> }


Z pozdrowieniami! / Cheers!

Dawid Wysakowicz

*Data/Software Engineer*

Skype: dawid_wys | Twitter: @OneMoreCoder

<http://getindata.com/>

2017-06-19 9:35 GMT+02:00 Sonex <alfredjens...@gmail.com>:

> Hello I have created a simple pattern with FlinkCEP 1.3 as well as a simple
> pattern select function. My simple function is as follows:
>
> def myFunction(pattern: Map[String,Iterable[MyEventType]]): MyEventType =
> {
> val startEvent = pattern.get("first").get.head
> val endEvent = pattern.get("second").get.head
> // dummy functionality for illustrating purposes
> endEvent
> }
>
> When I apply the function above to a pattern in the following way:
>
> CEP.pattern(myKeyedStream,myDummyPattern).select(myFunction(_)) it gives
> the
> following error:
>
> Cannot resolve reference myFunction with such signature.
>
> Type mismatch, expected:
> scala.Predef.Map[scala.Predef.String,scala.Iterable[MyEventType]], actual:
> scala.collection.Map[scala.Predef.String,scala.Iterable[MyEventType]]
>
> What is the reason of this behavior?
>
>
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/FlinkCEP-1-3-
> scala-cannot-apply-select-function-tp13824.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Stream sql example

2017-06-09 Thread Dawid Wysakowicz
Thanks a lot Timo, after I added the ResultTypeQueryable interface to my
mapper it worked. As for the SongEvent the reason I tried remapping it to
Row is that it has an enum field on which I want to filter, so my first
approach was to remap it in TableSource to String. What do you think should
be the way to go in such case?

After successfully producing DataStream[Row] I tried sth like:

>
> tEnv.toAppendStream(table)(TypeInformation.of(classOf[UserSongsStatistics])).print();
>

The class UserSongsStatistics is a pojo with fields named the same as
expressions in SELECT clause. Is such a construct intended to work? Right
now I get an error:

org.apache.flink.table.api.TableException: The field types of physical and
> logical row types do not match.This is a bug and should not happen. Please
> file an issue.


Is it really a bug?

Anyway thanks for help. I will file a JIRA for the previous issue tomorrow.

Z pozdrowieniami! / Cheers!

Dawid Wysakowicz

*Data/Software Engineer*

Skype: dawid_wys | Twitter: @OneMoreCoder

<http://getindata.com/>

2017-06-09 22:25 GMT+02:00 Timo Walther <twal...@apache.org>:

> Hi David,
>
> I think the problem is that the type of the DataStream produced by the
> TableSource, does not match the type that is declared in the `
> getReturnType()`. A `MapFunction<xxx, Row>` is always a generic type
> (because Row cannot be analyzed). A solution would be that the mapper
> implements `ResultTypeQueryable`. I agree that the error should be thrown
> earlier, not in the CodeGenerator. Can you create an issue for this?
>
> Btw the Table API supports nested types, it should work that the
> TableSource returns ` SongEvent`.
>
> Regards,
> Timo
>
>
> Am 09.06.17 um 20:19 schrieb Dawid Wysakowicz:
>
> Sorry forgot to add the link:
>
> https://gist.github.com/dawidwys/537d12a6f2355cba728bf93f1af87b45
>
> Z pozdrowieniami! / Cheers!
>
> Dawid Wysakowicz
>
> *Data/Software Engineer*
>
> Skype: dawid_wys | Twitter: @OneMoreCoder
>
> <http://getindata.com/>
>
> 2017-06-09 20:19 GMT+02:00 Dawid Wysakowicz <wysakowicz.da...@gmail.com>:
>
>> Hi,
>> I tried writing a simple sql query with custom StreamTableSource and it
>> fails with error:
>>
>> org.apache.flink.table.codegen.CodeGenException: Arity of result type
>>>> does not match number of expressions.
>>>
>>> at org.apache.flink.table.codegen.CodeGenerator.generateResultE
>>>> xpression(CodeGenerator.scala:940)
>>>
>>> at org.apache.flink.table.codegen.CodeGenerator.generateConvert
>>>> erResultExpression(CodeGenerator.scala:883)
>>>
>>> at org.apache.flink.table.plan.nodes.CommonScan$class.generated
>>>> ConversionFunction(CommonScan.scala:57)
>>>
>>> at org.apache.flink.table.plan.nodes.datastream.StreamTableSour
>>>> ceScan.generatedConversionFunction(StreamTableSourceScan.scala:35)
>>>
>>> at org.apache.flink.table.plan.nodes.datastream.StreamScan$clas
>>>> s.convertToInternalRow(StreamScan.scala:48)
>>>
>>> at org.apache.flink.table.plan.nodes.datastream.StreamTableSour
>>>> ceScan.convertToInternalRow(StreamTableSourceScan.scala:35)
>>>
>>> at org.apache.flink.table.plan.nodes.datastream.StreamTableSour
>>>> ceScan.translateToPlan(StreamTableSourceScan.scala:107)
>>>
>>>
>> You can check the source code here:
>>
>>
>> Z pozdrowieniami! / Cheers!
>>
>> Dawid Wysakowicz
>>
>> *Data/Software Engineer*
>>
>> Skype: dawid_wys | Twitter: @OneMoreCoder
>>
>> <http://getindata.com/>
>>
>
>
>


Re: Stream sql example

2017-06-09 Thread Dawid Wysakowicz
Sorry forgot to add the link:

https://gist.github.com/dawidwys/537d12a6f2355cba728bf93f1af87b45

Z pozdrowieniami! / Cheers!

Dawid Wysakowicz

*Data/Software Engineer*

Skype: dawid_wys | Twitter: @OneMoreCoder

<http://getindata.com/>

2017-06-09 20:19 GMT+02:00 Dawid Wysakowicz <wysakowicz.da...@gmail.com>:

> Hi,
> I tried writing a simple sql query with custom StreamTableSource and it
> fails with error:
>
> org.apache.flink.table.codegen.CodeGenException: Arity of result type
>>> does not match number of expressions.
>>
>> at org.apache.flink.table.codegen.CodeGenerator.generateResultExpression(
>>> CodeGenerator.scala:940)
>>
>> at org.apache.flink.table.codegen.CodeGenerator.
>>> generateConverterResultExpression(CodeGenerator.scala:883)
>>
>> at org.apache.flink.table.plan.nodes.CommonScan$class.
>>> generatedConversionFunction(CommonScan.scala:57)
>>
>> at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.
>>> generatedConversionFunction(StreamTableSourceScan.scala:35)
>>
>> at org.apache.flink.table.plan.nodes.datastream.StreamScan$
>>> class.convertToInternalRow(StreamScan.scala:48)
>>
>> at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.
>>> convertToInternalRow(StreamTableSourceScan.scala:35)
>>
>> at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.
>>> translateToPlan(StreamTableSourceScan.scala:107)
>>
>>
> You can check the source code here:
>
>
> Z pozdrowieniami! / Cheers!
>
> Dawid Wysakowicz
>
> *Data/Software Engineer*
>
> Skype: dawid_wys | Twitter: @OneMoreCoder
>
> <http://getindata.com/>
>


Stream sql example

2017-06-09 Thread Dawid Wysakowicz
Hi,
I tried writing a simple sql query with custom StreamTableSource and it
fails with error:

org.apache.flink.table.codegen.CodeGenException: Arity of result type does
>> not match number of expressions.
>
> at
>> org.apache.flink.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:940)
>
> at
>> org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:883)
>
> at
>> org.apache.flink.table.plan.nodes.CommonScan$class.generatedConversionFunction(CommonScan.scala:57)
>
> at
>> org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.generatedConversionFunction(StreamTableSourceScan.scala:35)
>
> at
>> org.apache.flink.table.plan.nodes.datastream.StreamScan$class.convertToInternalRow(StreamScan.scala:48)
>
> at
>> org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.convertToInternalRow(StreamTableSourceScan.scala:35)
>
> at
>> org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:107)
>
>
You can check the source code here:


Z pozdrowieniami! / Cheers!

Dawid Wysakowicz

*Data/Software Engineer*

Skype: dawid_wys | Twitter: @OneMoreCoder

<http://getindata.com/>


Re: Problem with WebUI

2017-06-09 Thread Dawid Wysakowicz
I had a look into yarn logs and I found such exception:

> 2017-06-09 17:10:20,922 ERROR
> org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler  - Caught
> exception
> java.lang.AbstractMethodError
> at io.netty.util.ReferenceCountUtil.touch(ReferenceCountUtil.java:73)
> at
> io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:107)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
> at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
> at
> io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
> at
> io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
> at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
> at
> org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:105)
> at
> org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
> at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
> at
> io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:435)
> at
> io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
> at
> io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:250)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
> at
> io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
> at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:651)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:574)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:488)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:450)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
> at
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
> at java.lang.Thread.run(Thread.java:745)


Any idea how to tackle it?

Z pozdrowieniami! / Cheers!

Dawid Wysakowicz

*Data/Software Engineer*

Skype: dawid_wys | Twitter: @OneMoreCoder

<http://getindata.com/>

2017-06-09 16:17 GMT+02:00 Dawid Wysakowicz <wysakowicz.da...@gmail.com>:

> Hi,
>
> I am trying to run a flink job on yarn. When I submit the job with
> following command
>
> bin/flink run -m yarn-cluster -yn 2 -c ${className} ${jarName}
>
>
> I cannot access the WebUI, it seem

Problem with WebUI

2017-06-09 Thread Dawid Wysakowicz
Hi,

I am trying to run a flink job on yarn. When I submit the job with
following command

bin/flink run -m yarn-cluster -yn 2 -c ${className} ${jarName}


I cannot access the WebUI, it seems to run correctly, but I can't access
the UI. I get 500 response code.

When I run yarn-session, the WebUI is accessible and when I attach the same
job to the running session everything is fine (the UI is still accessible)
and the job runs correctly.

Any ideas what may be the problem?

Z pozdrowieniami! / Cheers!

Dawid Wysakowicz

*Data/Software Engineer*

Skype: dawid_wys | Twitter: @OneMoreCoder

<http://getindata.com/>


Re: Queries regarding FlinkCEP

2017-06-06 Thread Dawid Wysakowicz
Hi Biplop,

CEP library internally must ensure order of the incoming events. It sorts
events upon Watermark arrival. At that time it sorts events with timestamp
< Watermark.

With BoundedOutOfOrdernessTimestampExtractor a Watermark with time* t* is
generated if there arrives event with timestamp *t + maxOutOfOrderness*.

Try adding event like: 12,b,7,6 to your test set and some alerts should
be generated.

Z pozdrowieniami! / Cheers!

Dawid Wysakowicz

*Data/Software Engineer*

Skype: dawid_wys | Twitter: @OneMoreCoder

<http://getindata.com/>

2017-06-06 15:17 GMT+02:00 Biplob Biswas <revolutioni...@gmail.com>:

> Sorry to bombard with so many messages , but one last thing is the example
> would produce alert if the line specifying Event Time is commented out.
>
> More specifically, this one:
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
> Only with event time, there is no alert.
>
> Thanks, Biplob
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Queries-regarding-FlinkCEP-
> tp13454p13513.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Queries regarding FlinkCEP

2017-06-05 Thread Dawid Wysakowicz
I think Till answered all your question but just to rephrase a bit.

1. The within and TimeCharacteristic are working on different levels. The
TimeCharacteristics tells how events are assigned a timestamp. The within
operator specifies the maximal time between first and last event of a
matched sequence (the time here corresponds to the chosen
TimeCharacteristic). So if we have within(Time.minutes(10)) in EventTime,
upon Watermark arrival the events are sorted with the assigned Timestamp
and then the within is applied.

3. Looking at your code there is nothing wrong with it. As I don't know how
the timestamps of your events looks like, I can just guess, but I would say
either

   - there is no matching sequences of events in your stream that fit into
   10 minutes window
   - or, your events are more mixed than across 60 seconds. Consider
   example: we have events with timestamps {t1=600s, t2=620, t3=550s}. Event
   with t3=550s cannot match with t1 because it lags 70s > 60s behind t2.
   FlinkCEP right now drops all late events.

For deeper understanding of Event/Processing Time I would suggest having a
look at :
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html#event-time


Z pozdrowieniami! / Cheers!

Dawid Wysakowicz

*Data/Software Engineer*

Skype: dawid_wys | Twitter: @OneMoreCoder

<http://getindata.com/>

2017-06-02 18:22 GMT+02:00 Till Rohrmann <trohrm...@apache.org>:

> Hi Biplob,
>
> 1. The CEPPatternOperator can use either processing time or event time for
> its internal processing logic. It only depends on what TimeCharacteristic
> you have set for your program. Consequently, with event time, your example
> should be detected as an alert.
>
> 2. If you don't provide a keyed input stream, then Flink will execute the
> CEP operator only with a parallelism of 1. Thus, all events pass through
> the same instance of the CEP operator.
>
> 3. It's hard to tell but I would assume that something with the watermark
> generation does not properly work. For example, it could be that you've set
> the out of orderness to a very large value such that it will take a long
> time until you can be sure that you've seen all events for a given
> watermark on the input without monotonically increasing timestamps. The
> easiest way to debug the problem would be a self-contained example program
> which reproduces the problem.
>
> Cheers,
> Till
>
> On Fri, Jun 2, 2017 at 1:10 PM, Biplob Biswas <revolutioni...@gmail.com>
> wrote:
>
>> Hi ,
>>
>> Thanks a lot for the help last time, I have a few more questions and I
>> chose
>> to create a new topic as the problem in the previous topic was solved,
>> thanks to useful inputs from Flink Community. The questions are as follows
>>
>> *1.* What time does the "within" operator works on "Event Time" or
>> "Processing Time", I am asking this as I wanted to know whether something
>> like the following would be captured or not.
>>
>> MaxOutofOrderness is set to 10 mins, and "within" operator is specified
>> for
>> 5 mins. So if a first events event time is at 1:00  and the corresponding
>> next event is has an event time of 1:04 but it arrives in the system at
>> 1:06. Would this still be processed and alert would be generated or not?
>>
>> *2.* What would happen if I don't have a key to specify, the way 2 events
>> are correlated is by using the ctx of the first event and matching some
>> different id. So, we can't group by some unique field. I tried a test run
>> without specifying a key and it apparently works. But how is the shuffling
>> done then in this case?
>>
>> *3.* This is one of the major issue, So I could use Event Time with
>> ascending event time extractor for one of my kafka topic because its
>> behavior is consistent.  But when i added another topic to read from where
>> the events are not in ascending order, using ascending timestampextractor
>> gave me timestamp monotonicity violation. Then when I am using
>> BoundedOutOfOrdernessTimestampExtractor for the same, I am not getting
>> any
>> warnings anymore but I am no more getting my alerts.
>>
>> If I go back to using processing time, then I am again getting alerts
>> properly. What could be the problem here?
>>
>> *This is the code I am using:*
>>
>> /public class CEPForBAM {
>>
>>
>>   public static void main(String[] args) throws Exception {
>>
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> System.out.println(env.getStreamTimeCharacteristic());
>> env.setStreamTimeCharacteristic(TimeCh

Re: No Alerts with FinkCEP

2017-05-30 Thread Dawid Wysakowicz
Hi Biplob,

The message you mention should not be a problem here. It just says you
can't use your events as POJOs (e.g. you can't use keyBy("chargedAccount")
).
Your code seems fine and without some example data I think it will be hard
to help you.

As for the PART 2 of your first email.
In 1.3 we introduced NOT pattern but right now it does not support time
ranges in which a pattern should not occur. The thing you can do though is
to specify a positive Pattern like: ("a" -> "b" within 1s) and select the
timeouted patterns, which in fact are the ones that you want to trigger
alerts for.


Re: FlinkCEP latency/throughput

2017-05-19 Thread Dawid Wysakowicz
Hello Alfred,

Just some considerations  from my side as for the latency. I think the
first step should be defining what does "latency" for a CEP library really
means.
The first thing that comes to my mind is the time period between the
arrival of an event that should trigger a match (ending pattern) and actual
time when the match is emitted(for that case a select function is a good
place I think).

I think Kostas was also referring to similar kind of issue.

Hope it will be helpful.

Z pozdrowieniami! / Cheers!

Dawid Wysakowicz

*Data/Software Engineer*

Skype: dawid_wys | Twitter: @OneMoreCoder

<http://getindata.com/>

2017-05-19 10:59 GMT+02:00 Sonex <alfredjens...@gmail.com>:

> Hello Kostas,
>
> thanks for your response. Regarding throughput, it makes sense.
>
> But there is still one question remaining. How can I measure the latency of
> my FlinkCEP application ???
>
> Maybe you answered it, but I didn`t quite get that. As far as your number 2
> question about measuring latency, the answer is yes, the first element in
> the matching pattern will wait inevitably longer than the last one
>
> Thank you for your time!!!
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/FlinkCEP-latency-throughput-
> tp13170p13221.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: CEP memory requirements

2017-05-04 Thread Dawid Wysakowicz
Yes you are right, prior to 1.3.0 the state per key was never cleared.
Right now due to FLINK-5174
<https://issues.apache.org/jira/browse/FLINK-5174>, in master branch, it is
stored only if necessary.

Z pozdrowieniami! / Cheers!

Dawid Wysakowicz

*Data/Software Engineer*

Skype: dawid_wys | Twitter: @OneMoreCoder

<http://getindata.com/>

2017-05-04 22:12 GMT+02:00 Elias Levy <fearsome.lucid...@gmail.com>:

> Looking at the code I gather that 1.2 does not clear the per key NFA state
> even if there is no state to keep, whereas this appears fixed in the master
> branch. Yes?
>
> On Thu, May 4, 2017 at 11:25 AM, Elias Levy <fearsome.lucid...@gmail.com>
> wrote:
>
>> I am observing odd memory behavior with the CEP library and I am
>> wondering if it is expected.
>>
>> If I write a simple local streaming Flink job that reads from a 65MB
>> compressed file of JSON objects, one per line, parses the JSON, performs a
>> filter operation, and then a keyBy, heap usage is stable, staying below
>> 250MB throughout per VisualVM.
>>
>> But if I create a CEP pattern that matches nothing
>> (Pattern.begin[T]("foo").where( _ => false )) and match it against the
>> stream produced by the last keyBy (CEP.pattern(stream, pattern).select),
>> then memory balloons until the program terminates, steadily growing until
>> 3GB.
>>
>> The VisualVM memory profiler appears unable to account for that used heap
>> space.  If I add the Live Bytes column I'd get only between 200-100 MB.
>>
>> Any idea what is going on?
>>
>> Flink 1.2.  Java 8.
>>
>>
>


Re: CEP timeout occurs even for a successful match when using followedBy

2017-04-29 Thread Dawid Wysakowicz
Hi,

This is an expected behaviour. After the "ar" event there still may occur
other "ar" event that will also trigger a match.
To be more generic in all versions prior to 1.3.0 there are two different
consuming strategies:

   - STRICT (the next operator) - that accepts only if the event occurs
   directly after the previous
   - SKIP TILL ANY (the followedBy operator) - it accepts any matching
   event following event if there were already an event that matched this
   pattern

Because after "ni" event we could match with some other "ar" events, the
match is timeouted after 5 seconds.

In FLINK-6208 <https://issues.apache.org/jira/browse/FLINK-6208> we
introduced third consuming strategy:

   - SKIP TILL NEXT(this is the strategy for followedBy right now) - the
   event does not have to occur directly after the previous one but only one
   event can be matched

and you can still use SKIP TILL ANY by using followedByAny. I believe the
SKIP TILL NEXT strategy is the one you expected.
You can check it on master branch. We did introduce lots of new features
and bugfixes to CEP for 1.3.0 version so any comments,
tests or suggestions are welcome.


Z pozdrowieniami! / Cheers!

Dawid Wysakowicz

*Data/Software Engineer*

Skype: dawid_wys | Twitter: @OneMoreCoder

<http://getindata.com/>

2017-04-29 12:14 GMT+02:00 Moiz S Jinia <moiz.ji...@gmail.com>:

> When using "next", this pattern works fine for the both a match as well as
> a timeout:
>
> Pattern<Event, Event> pattern = Pattern.begin("start")
> .where(evt -> evt.value.equals("ni"))
> .next("last").where(evt -> evt.value.equals("ar")).
> within(Time.seconds(5));
>
> 1. "ni" then "ar" within 5 seconds - triggers match
> 2. "ni" then no "ar" within 5 seconds - triggers timeout
>
> But with "followedBy", this does not behave as expected:
>
> Pattern<Event, Event> pattern = Pattern.begin("start")
> .where(evt -> evt.value.equals("ni"))
> .followedBy("last").where(evt -> evt.value.equals("ar")).
> within(Time.seconds(5));
>
> "ni" then "ar" within 5 seconds - triggers match and also triggers timeout.
>
> Why is the timeout triggered when using followedBy (when there is a match)?
>
> Version - 1.1.5.
>


Re: CEP join across events

2017-04-26 Thread Dawid Wysakowicz
Hi Elias,

You can do it with 1.3 and IterativeConditions. Method
ctx.getEventsForPattern("foo") returns only those events that were matched
in "foo" pattern in that particular branch.
I mean that for a sequence like (type =1, value_b = X); (type=1,
value_b=Y); (type=2, value_b=X) both events of type = 1 create a seperate
pattern branch and the event with type = 2 will be checked for a match
twice for both of those branches.

Regards,
Dawid

2017-04-26 7:48 GMT+02:00 Elias Levy <fearsome.lucid...@gmail.com>:

> There doesn't appear to be a way to join events across conditions using
> the CEP library.
>
> Consider events of the form (type, value_a, value_b) on a stream keyed by
> the value_a field.
>
> Under 1.2 you can create a pattern that for a given value_a, as specified
> by the stream key, there is a match if an event of type 1 is followed by an
> event of type 2 (e.g. begin("foo").where(_.type==1).
> followedBy("bar").where(_.type==2).  But this will return a match
> regardless of whether value_b in the first event matches value_b in the
> second event.
>
> 1.3 snapshot introduces iterative conditions, but this is insufficient.
> In 1.3 you can do:
>
> begin("foo").where(_.type==1).followedBy("bar").where(
> (v, ctx) => {
>v.type == 2 &&
>ctx.getEventsForPattern("foo").asScala.exists(prev => prev.value_b
> == v.value_b)
> })
>
> This will accept the current event if any if any previously had a value_b
> that matches the current event. But the matches will include all previous
> events, even those that did not match the current event at value_b, instead
> of only matching the previous event where value_b equals the current event.
>
> Is there a way to only output the match there previous event matches the
> current event value_b (e.g. foo == (type=1, value_a=K, value_b=X) and bar
> == (type=2, value_a=K, value_b=X)?
>
>
>


Re: Grafana plug-in for Flink

2017-04-24 Thread Dawid Wysakowicz
Hi,
As far as I am aware Jamie used the example json datasource for grafana
https://github.com/grafana/simple-json-datasource .

At least I used it when I recreated his example for some introductory
purposes. You can browse my example here:
https://github.com/dawidwys/flink-intro.

Best,
Dawid

2017-04-24 18:04 GMT+02:00 Andy Gibraltar <andy.gibral...@gmail.com>:

> Hi,
>
> In his presentation “The Stream Processor as a Database”, Jamie Grier
> mentions a Grafana plugin for Flink. Does anyone know where can I find it.
> Unfortunately Google search did not yield anything. I’m not sure if he
> “open-sourced” it.
>
> https://image.slidesharecdn.com/jamiegrier-thestreamprocessorasadatabase-
> buildingonlineapplicationsdirectlyonstreams-160919123750/95/
> jamie-grier-the-stream-processor-as-a-database-
> building-online-applications-directly-on-streams-19-638.jpg?cb=1474290476
>
>
> Thanks,
> Andy
>


Starting flink HA cluster with start-cluster.sh

2017-03-08 Thread Dawid Wysakowicz
Hi,

I've tried to start cluster with HA mode as described in the doc, but with
a current state of bin/config.sh I failed.

I think there is a bug with configuring the HIGH_AVAILABILITY variable in
block (bin/config.sh):

if [ -z "${HIGH_AVAILABILITY}" ]; then
 HIGH_AVAILABILITY=$(readFromConfig ${KEY_HIGH_AVAILABILITY} ""
"${YAML_CONF}")
 if [ -z "${HIGH_AVAILABILITY}" ]; then
# Try deprecated value
DEPRECATED_HA=$(readFromConfig "recovery.mode" "" "${YAML_CONF}")
if [ -z "${DEPRECATED_HA}" ]; then
HIGH_AVAILABILITY="none"
elif [ ${DEPRECATED_HA} == "standalone" ]; then
# Standalone is now 'none'
HIGH_AVAILABILITY="none"
else
HIGH_AVAILABILITY=${DEPRECATED_HA}
fi
 else
 HIGH_AVAILABILITY="none"
 fi
fi

if value "zookeeper" is read from config file the variable will be reset to
"none" with the else branch.

I just want to confirm it is a bug before filing a JIRA.

Regards
Dawid


Re: Issues with Event Time and Kafka

2017-03-07 Thread Dawid Wysakowicz
Hi Ethan,

I believe then it is because the Watermark and Timestamps in your
implementation are uncorrelated. What Watermark really is a marker that
says there will be no elements with timestamp smaller than the value of
this watermark. For more info on the concept see [1]
<https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html#event-time-and-watermarks>
.

In your case as you say that events can "lag" for 30 minutes, you should
try the BoundedOutOfOrdernessTimestampExtractor. It is designed exactly for
a case like yours.

Regards,
Dawid

2017-03-07 22:33 GMT+01:00 ext.eformichella <ext.eformiche...@riotgames.com>
:

> Hi Dawid, I'm working with Max on the project
> Our code for the TimestampAndWatermarkAssigner is:
> ```
> class TimestampAndWatermarkAssigner(val maxLateness: Long) extends
> AssignerWithPeriodicWatermarks[Row] {
>
>   override def extractTimestamp(element: Row, previousElementTimestamp:
> Long): Long = {
> element.minTime
>   }
>
>   override def getCurrentWatermark(): Watermark = {
> new Watermark(System.currentTimeMillis() - maxLateness)
>   }
> }
> ```
>
> Where Row is a class representing the incoming JSON object coming from
> Kafka, which includes the timestamp
>
> Thanks,
> -Ethan
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Issues-with-
> Event-Time-and-Kafka-tp12061p12090.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Issues with Event Time and Kafka

2017-03-07 Thread Dawid Wysakowicz
Hi Max,
How do you assign timestamps to your events (in event-time case)? Could you
post whole code for your TimestampAndWatermarkAssigner?

Regards,
Dawid

2017-03-07 20:59 GMT+01:00 ext.mwalker <ext.mwal...@riotgames.com>:

> Hi Stephan,
>
> The right number of events seem to leave the source and enter the windows,
> but it shows that 0 exit the windows.
>
> Also I have tried 30 minutes and not setting the watermark interval, I am
> not sure what I am supposed to put there the docs seem vague about that.
>
> Best,
>
> Max
>
> On Tue, Mar 7, 2017 at 1:54 PM, Stephan Ewen [via Apache Flink User
> Mailing List archive.] <[hidden email]
> <http:///user/SendEmail.jtp?type=node=12087=0>> wrote:
>
>> Hi!
>>
>> At a first glance, your code looks correct to assign the Watermarks. What
>> is your watermark interval in the config?
>>
>> Can you check with the Flink metrics (if you are using 1.2) to see how
>> many rows leave the source, how many enter/leave the window operators, etc?
>>
>> That should help figuring out why there are so few result rows...
>>
>> Stephan
>>
>>
>> On Mon, Mar 6, 2017 at 8:57 PM, ext.mwalker <[hidden email]
>> <http:///user/SendEmail.jtp?type=node=12084=0>> wrote:
>>
>>> Hi Folks,
>>>
>>> We are working on a Flink job to proccess a large amount of data coming
>>> in
>>> from a Kafka stream.
>>>
>>> We selected Flink because the data is sometimes out of order or late,
>>> and we
>>> need to roll up the data into 30-minutes event time windows, after which
>>> we
>>> are writing it back out to an s3 bucket.
>>>
>>> We have hit a couple issues:
>>>
>>> 1) The job works fine using processing time, but when we switch to event
>>> time (almost) nothing seems to be written out.
>>> Our watermark code looks like this:
>>> ```
>>>   override def getCurrentWatermark(): Watermark = {
>>> new Watermark(System.currentTimeMillis() - maxLateness);
>>>   }
>>> ```
>>> And we are doing this:
>>> ```
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>> ```
>>> and this:
>>> ```
>>> .assignTimestampsAndWatermarks(new
>>> TimestampAndWatermarkAssigner(Time.minutes(30).toMilliseconds))
>>> ```
>>>
>>> However even though we get millions of records per hour (the vast
>>> majority
>>> of which are no more that 30 minutes late) we get like 2 - 10 records per
>>> hour written out to the s3 bucket.
>>> We are using a custom BucketingFileSink Bucketer if folks believe that is
>>> the issue I would be happy to provide that code here as well.
>>>
>>> 2) On top of all this, we would really prefer to write the records
>>> directly
>>> to Aurora in RDS rather than to an intermediate s3 bucket, but it seems
>>> that
>>> the JDBC sink connector is unsupported / doesn't exist.
>>> If this is not the case we would love to know.
>>>
>>> Thanks in advance for all the help / insight on this,
>>>
>>> Max Walker
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-flink-user-maili
>>> ng-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time
>>> -and-Kafka-tp12061.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive at Nabble.com.
>>>
>>
>>
>>
>> --
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12084.html
>> To unsubscribe from Issues with Event Time and Kafka, click here.
>> NAML
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
> --
> View this message in context: Re: Issues with Event Time and Kafka
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12087.html>
>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>


Re: FlinkKafkaProducer usage

2017-02-01 Thread Dawid Wysakowicz
Have a look at
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
.

02.02.2017 1:07 AM "Boris Lublinsky" 
napisał(a):

> I am trying to write a quick sample of streaming word count using Beam
> APIs and FlinkBeamRunner.
> The problem that I am getting into is that
>
> apply("Write to Kafka", Write.to(UnboundedFlinkSink.of(kafkaSink)))
>
> Does not work in this way - it assumes bounded stream and mine is
> unbounded.
>
> I have not found any unbounded equivalent for Write, So I tried to
> implement a custom ParDo function:
>
> /**
>  * Write content to Kafka.
>  *
>  */
> static class WriteToKafkaFn extends DoFn, 
> Tuple2> {
>
> private FlinkKafkaProducer09> kafkaSink;
> private boolean opened = false;
>
> public WriteToKafkaFn(FlinkKafkaProducer09> 
> kafkaSink){
> this.kafkaSink = kafkaSink;
> }
>
> @ProcessElement
> public void processElement(ProcessContext c) {
> if(!opened){
> kafkaSink.open(new Configuration());
> opened = true;
> }
> Tuple2 record = c.element();
> try {
> kafkaSink.invoke(record);
> }catch(Throwable t){
> System.out.println("Error writing record " + record + " to 
> Kafka");
> t.printStackTrace();
> }
> }
> }
>
>
>
> The problem with this approach is that ParDo is not initialized with
> Streaming context, that FlinkKafkaConsumer relies upon, so open fails.
>
>
> Any suggestions?
>


Re: Queryable State

2017-01-27 Thread Dawid Wysakowicz
Hi Nico,

No problem at all, I've already presented my showcase with
ValueStateDescriptor.

Anyway, if I could help you somehow with the Queryablestate let me know. I
will be happy to contribute some code.

2017-01-25 14:47 GMT+01:00 Nico Kruber <n...@data-artisans.com>:

> Hi Dawid,
> sorry for the late reply, I was fixing some issues for queryable state and
> may
> now have gotten to the point of your error: you may be seeing a race
> condition
> with the MemoryStateBackend state backend (the default) as described here:
> https://issues.apache.org/jira/browse/FLINK-5642
> I'm currently working on a fix.
>
> KvStateRequestSerializer#deserializeList(), however, is the right
> function to
> de-serialise list state! - KvStateRequestSerializer#deserializeValue()
> will
> not work!
>
> Thanks for the tip regarding KvStateRequestSerializer#serializeList, this
> was
> indeed not used since the list state backends had their own serialisation
> function.
> We removed KvStateRequestSerializer#serializeList as well as the queryable
> list state sink for 1.2 and up.
>
>
> Nico
>
> On Monday, 16 January 2017 14:47:59 CET Dawid Wysakowicz wrote:
> > Hi Nico, Ufuk,
> >
> > Thanks for diving into this issue.
> >
> > @Nico
> >
> > I don't think that's the problem. The code can be exactly reproduced in
> > java. I am using other constructor for ListDescriptor than you did:
> >
> > You used:
> > > public ListStateDescriptor(String name, TypeInformation typeInfo)
> >
> > While I used:
> > >  public ListStateDescriptor(String name, Class typeClass)
> >
> > I think the problem is with the way I deserialized the value on the
> > QueryClient side as I tried to use:
> >
> >
> >
> > KvStateRequestSerializer.deserializeList(serializedResult, {
> >
> >   TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]()
> {})
> >
> > .createSerializer(new ExecutionConfig)
> >
> > })
> >
> > I have not checked it, but now I suspect this code would work:
> > > KvStateRequestSerializer.deserializeValue(serializedResult, {
> > >
> > >   TypeInformation.of(new
> > >
> > > TypeHint[util.List[KeyedDataPoint[lang.Integer]]]() {})
> > >
> > > .createSerializer(new ExecutionConfig)
> > >
> > > })
> >
> > Regarding removing the queryable state list I agree, using it seems
> > pointless. Moreover while removing it I would take a second look at those
> >
> > functions:
> > > KvStateRequestSerializer::deserializeList
> >
> >  KvStateRequestSerializer.serializeList
> >
> >
> > As I think they are not used at all even right now. Thanks for your time.
> >
> > Regards
> > Dawid Wysakowicz
> >
> > 2017-01-16 13:25 GMT+01:00 Nico Kruber <n...@data-artisans.com>:
> > > Hi Dawid,
> > > regarding the original code, I couldn't reproduce this with the Java
> code
> > > I
> > > wrote and my guess is that the second parameter of the
> ListStateDescriptor
> > > is
> > >
> > > wrong:
> > >   .asQueryableState(
> > >
> > > "type-time-series-count",
> > > new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
> > >
> > >   "type-time-series-count",
> > >   classOf[KeyedDataPoint[java.lang.Integer]]))
> > >
> > > this should rather be
> > >
> > > TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {}
> > >
> > > as in the query itself. It sounds strange to me that you don't get ant
> > > ClassCastException or a compile-time error due to the type being wrong
> but
> > > I
> > > lack some Scala knowledge to get to the ground of this.
> > >
> > >
> > > Regarding the removal of the queryable list state "sink", I created a
> JIRA
> > > issue for it and will open a PR:
> > > https://issues.apache.org/jira/browse/FLINK-5507
> > >
> > >
> > > Nico
> > >
> > > On Saturday, 14 January 2017 14:03:41 CET Dawid Wysakowicz wrote:
> > > > Hi Nico,
> > > >
> > > > Recently I've tried the queryable state a bit differently, by using
> > > > ValueState with a value of a util.ArrayList and a ValueSerializer for
> > > > util.ArrayList and it works as expected.
> > > >
> > > > The non-working example you can browse here:
> > > > http

Re: Queryable State

2017-01-16 Thread Dawid Wysakowicz
Hi Nico, Ufuk,

Thanks for diving into this issue.

@Nico

I don't think that's the problem. The code can be exactly reproduced in
java. I am using other constructor for ListDescriptor than you did:

You used:

> public ListStateDescriptor(String name, TypeInformation typeInfo)
>

While I used:

>  public ListStateDescriptor(String name, Class typeClass)


I think the problem is with the way I deserialized the value on the
QueryClient side as I tried to use:

>

KvStateRequestSerializer.deserializeList(serializedResult, {

  TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {})

.createSerializer(new ExecutionConfig)

})


I have not checked it, but now I suspect this code would work:

> KvStateRequestSerializer.deserializeValue(serializedResult, {
>   TypeInformation.of(new
> TypeHint[util.List[KeyedDataPoint[lang.Integer]]]() {})
> .createSerializer(new ExecutionConfig)
> })


Regarding removing the queryable state list I agree, using it seems
pointless. Moreover while removing it I would take a second look at those
functions:

> KvStateRequestSerializer::deserializeList
>
 KvStateRequestSerializer.serializeList


As I think they are not used at all even right now. Thanks for your time.

Regards
Dawid Wysakowicz

2017-01-16 13:25 GMT+01:00 Nico Kruber <n...@data-artisans.com>:

> Hi Dawid,
> regarding the original code, I couldn't reproduce this with the Java code I
> wrote and my guess is that the second parameter of the ListStateDescriptor
> is
> wrong:
>
>   .asQueryableState(
> "type-time-series-count",
> new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
>   "type-time-series-count",
>   classOf[KeyedDataPoint[java.lang.Integer]]))
>
> this should rather be
>
> TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {}
>
> as in the query itself. It sounds strange to me that you don't get ant
> ClassCastException or a compile-time error due to the type being wrong but
> I
> lack some Scala knowledge to get to the ground of this.
>
>
> Regarding the removal of the queryable list state "sink", I created a JIRA
> issue for it and will open a PR:
> https://issues.apache.org/jira/browse/FLINK-5507
>
>
> Nico
>
> On Saturday, 14 January 2017 14:03:41 CET Dawid Wysakowicz wrote:
> > Hi Nico,
> >
> > Recently I've tried the queryable state a bit differently, by using
> > ValueState with a value of a util.ArrayList and a ValueSerializer for
> > util.ArrayList and it works as expected.
> >
> > The non-working example you can browse here:
> > https://github.com/dawidwys/flink-intro/tree/
> c66f01117b0fe3c0adc8923000543a7
> > 0a6fe2219 The working example here:
> > https://github.com/dawidwys/flink-intro/tree/master
> > (The QueryableJob is in module flink-queryable-job and the QueryClient in
> > flink-state-server)
> >
> > Sure, I am aware of the downfall of the ListState. I need it just for
> > presentational purpose, but you may be right there might not be any
> > production use for this state and it should be removed.
> > Maybe the problem is just with the ListState and removing it would
> resolve
> > also my problem :)
> >
> > Regards
> > Dawid Wysakowicz
> >
> > 2017-01-13 18:50 GMT+01:00 Nico Kruber <n...@data-artisans.com>:
> > > Hi Dawid,
> > > I'll try to reproduce the error in the next couple of days. Can you
> also
> > > share
> > > the value deserializer you use? Also, have you tried even smaller
> examples
> > > in
> > > the meantime? Did they work?
> > >
> > > As a side-note in general regarding the queryable state "sink" using
> > > ListState
> > > (".asQueryableState(, ListStateDescriptor)"): everything that
> enters
> > > this operator will be stored forever and never cleaned. Eventually, it
> > > will
> > > pile up too much memory and is thus of limited use. Maybe it should
> even
> > > be
> > > removed from the API.
> > >
> > >
> > > Nico
> > >
> > > On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote:
> > > > Hey Ufuk.
> > > > Did you maybe had a while to have a look at that problem?
> > > >
> > > > 2017-01-09 10:47 GMT+01:00 Ufuk Celebi <u...@apache.org>:
> > > > > Hey Dawid! Thanks for reporting this. I will try to have a look
> over
> > > > > the course of the day. From a first impression, this seems like a
> bug
> > > > > to me.
> > > > &g

Re: Queryable State

2017-01-14 Thread Dawid Wysakowicz
Hi Nico,

Recently I've tried the queryable state a bit differently, by using
ValueState with a value of a util.ArrayList and a ValueSerializer for
util.ArrayList and it works as expected.

The non-working example you can browse here:
https://github.com/dawidwys/flink-intro/tree/c66f01117b0fe3c0adc8923000543a70a6fe2219
The working example here:
https://github.com/dawidwys/flink-intro/tree/master
(The QueryableJob is in module flink-queryable-job and the QueryClient in
flink-state-server)

Sure, I am aware of the downfall of the ListState. I need it just for
presentational purpose, but you may be right there might not be any
production use for this state and it should be removed.
Maybe the problem is just with the ListState and removing it would resolve
also my problem :)

Regards
Dawid Wysakowicz


2017-01-13 18:50 GMT+01:00 Nico Kruber <n...@data-artisans.com>:

> Hi Dawid,
> I'll try to reproduce the error in the next couple of days. Can you also
> share
> the value deserializer you use? Also, have you tried even smaller examples
> in
> the meantime? Did they work?
>
> As a side-note in general regarding the queryable state "sink" using
> ListState
> (".asQueryableState(, ListStateDescriptor)"): everything that enters
> this operator will be stored forever and never cleaned. Eventually, it will
> pile up too much memory and is thus of limited use. Maybe it should even be
> removed from the API.
>
>
> Nico
>
> On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote:
> > Hey Ufuk.
> > Did you maybe had a while to have a look at that problem?
> >
> > 2017-01-09 10:47 GMT+01:00 Ufuk Celebi <u...@apache.org>:
> > > Hey Dawid! Thanks for reporting this. I will try to have a look over
> > > the course of the day. From a first impression, this seems like a bug
> > > to me.
> > >
> > > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
> > >
> > > <wysakowicz.da...@gmail.com> wrote:
> > > > Hi I was experimenting with the Query State feature and I have some
> > >
> > > problems
> > >
> > > > querying the state.
> > > >
> > > > The code which I use to produce the queryable state is:
> > > > env.addSource(kafkaConsumer).map(
> > > >
> > > >   e => e match {
> > > >
> > > > case LoginClickEvent(_, t) => ("login", 1, t)
> > > > case LogoutClickEvent(_, t) => ("logout", 1, t)
> > > > case ButtonClickEvent(_, _, t) => ("button", 1, t)
> > > >
> > > >   }).keyBy(0).timeWindow(Time.seconds(1))
> > > >   .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3,
> > > >   e2._3)))
> > > >   .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3,
> e._2))
> > > >   .keyBy("key")
> > > >   .asQueryableState(
> > > >
> > > > "type-time-series-count",
> > > > new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
> > > >
> > > >   "type-time-series-count",
> > > >   classOf[KeyedDataPoint[java.lang.Integer]]))
> > > >
> > > > As you see it is a rather simple job, in which I try to count events
> of
> > > > different types in windows and then query by event type.
> > > >
> > > > In client code I do:
> > > > // Query Flink state
> > > > val future = client.getKvState(jobId, "type-time-series-count",
> > > >
> > > > key.hashCode, seralizedKey)
> > > >
> > > > // Await async result
> > > > val serializedResult: Array[Byte] = Await.result(
> > > >
> > > >   future, new FiniteDuration(
> > > >
> > > > 10,
> > > > duration.SECONDS))
> > > >
> > > > // Deserialize response
> > > > val results = deserializeResponse(serializedResult)
> > > >
> > > > results
> > > >
> > > >   }
> > > >
> > > >   private def deserializeResponse(serializedResult: Array[Byte]):
> > > > util.List[KeyedDataPoint[lang
> > > >
> > > >   .Integer]] = {
> > > >
> > > > KvStateRequestSerializer.deserializeList(serializedResult,
> > > >
> > > > getValueSerializer())
> > > >
> > > >   }
> > > &g

Re: Queryable State

2017-01-10 Thread Dawid Wysakowicz
Hey Ufuk.
Did you maybe had a while to have a look at that problem?

2017-01-09 10:47 GMT+01:00 Ufuk Celebi <u...@apache.org>:

> Hey Dawid! Thanks for reporting this. I will try to have a look over
> the course of the day. From a first impression, this seems like a bug
> to me.
>
> On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
> <wysakowicz.da...@gmail.com> wrote:
> > Hi I was experimenting with the Query State feature and I have some
> problems
> > querying the state.
> >
> > The code which I use to produce the queryable state is:
> >
> > env.addSource(kafkaConsumer).map(
> >   e => e match {
> > case LoginClickEvent(_, t) => ("login", 1, t)
> > case LogoutClickEvent(_, t) => ("logout", 1, t)
> > case ButtonClickEvent(_, _, t) => ("button", 1, t)
> >   }).keyBy(0).timeWindow(Time.seconds(1))
> >   .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3, e2._3)))
> >   .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3, e._2))
> >   .keyBy("key")
> >   .asQueryableState(
> > "type-time-series-count",
> > new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
> >   "type-time-series-count",
> >   classOf[KeyedDataPoint[java.lang.Integer]]))
> >
> > As you see it is a rather simple job, in which I try to count events of
> > different types in windows and then query by event type.
> >
> > In client code I do:
> > // Query Flink state
> > val future = client.getKvState(jobId, "type-time-series-count",
> > key.hashCode, seralizedKey)
> >
> > // Await async result
> > val serializedResult: Array[Byte] = Await.result(
> >   future, new FiniteDuration(
> > 10,
> > duration.SECONDS))
> >
> > // Deserialize response
> > val results = deserializeResponse(serializedResult)
> >
> > results
> >   }
> >
> >   private def deserializeResponse(serializedResult: Array[Byte]):
> > util.List[KeyedDataPoint[lang
> >   .Integer]] = {
> > KvStateRequestSerializer.deserializeList(serializedResult,
> > getValueSerializer())
> >   }
> >
> > As I was trying to debug the issue I see the first element in list gets
> > deserialized correctly, but it fails on the second one. It seems like the
> > serialized result is broken. Do you have any idea if I am doing sth
> wrong or
> > there is some bug?
> >
> >
> > The exception I get is:
> > java.io.EOFException: null
> > at
> > org.apache.flink.runtime.util.DataInputDeserializer.readFully(
> DataInputDeserializer.java:157)
> > at
> > org.apache.flink.runtime.util.DataInputDeserializer.readUTF(
> DataInputDeserializer.java:240)
> > at
> > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(
> PojoSerializer.java:386)
> > at
> > org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.
> deserializeList(KvStateRequestSerializer.java:487)
> > at
> > com.dataartisans.stateserver.queryclient.QueryClient.
> deserializeResponse(QueryClient.scala:44)
> >
> > You can browse the exact code at: https://github.com/dawidwys/
> flink-intro
> >
> > I would be grateful for any advice.
> >
> > Regards
> > Dawid Wysakowicz
>


Queryable State

2017-01-08 Thread Dawid Wysakowicz
Hi I was experimenting with the Query State feature and I have some
problems querying the state.

The code which I use to produce the queryable state is:

env.addSource(kafkaConsumer).map(
  e => e match {
case LoginClickEvent(_, t) => ("login", 1, t)
case LogoutClickEvent(_, t) => ("logout", 1, t)
case ButtonClickEvent(_, _, t) => ("button", 1, t)
  }).keyBy(0).timeWindow(Time.seconds(1))
  .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3, e2._3)))
  .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3, e._2))
  .keyBy("key")
  .asQueryableState(
"type-time-series-count",
new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
  "type-time-series-count",
  classOf[KeyedDataPoint[java.lang.Integer]]))

As you see it is a rather simple job, in which I try to count events of
different types in windows and then query by event type.

In client code I do:
// Query Flink state
val future = client.getKvState(jobId, "type-time-series-count",
key.hashCode, seralizedKey)

// Await async result
val serializedResult: Array[Byte] = Await.result(
  future, new FiniteDuration(
10,
duration.SECONDS))

// Deserialize response
val results = deserializeResponse(serializedResult)

results
  }

  private def deserializeResponse(serializedResult: Array[Byte]):
util.List[KeyedDataPoint[lang
  .Integer]] = {
KvStateRequestSerializer.deserializeList(serializedResult,
getValueSerializer())
  }

As I was trying to debug the issue I see the first element in list gets
deserialized correctly, but it fails on the second one. It seems like the
serialized result is broken. Do you have any idea if I am doing sth wrong
or there is some bug?


The exception I get is:
java.io.EOFException: null
at
org.apache.flink.runtime.util.DataInputDeserializer.readFully(DataInputDeserializer.java:157)
at
org.apache.flink.runtime.util.DataInputDeserializer.readUTF(DataInputDeserializer.java:240)
at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:386)
at
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeList(KvStateRequestSerializer.java:487)
at
com.dataartisans.stateserver.queryclient.QueryClient.deserializeResponse(QueryClient.scala:44)

You can browse the exact code at: https://github.com/dawidwys/flink-intro

I would be grateful for any advice.

Regards
Dawid Wysakowicz


Re: Strange behaviour of windows

2015-12-08 Thread Dawid Wysakowicz
Thanks for the explanation. That was really stupid mistake from my side. By
the way, I really like the whole idea and API. Really good job!

Regards
Dawid

2015-12-08 12:30 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:

> Hi,
> an important concept of the Flink API is that transformations do not
> modify the original stream (or dataset) but return a new stream with the
> modifications in place. In your example the result of the
> extractTimestamps() call should be used for further processing. I attached
> your source code with the required modifications.
>
> Other than that, I think you understood the watermarks quite well. :D
>
> Let us know if you need more information.
>
> Cheers,
> Aljoscha
>
>
> > On 07 Dec 2015, at 20:34, Dawid Wysakowicz <wysakowicz.da...@gmail.com>
> wrote:
> >
> > Forgot to mention. I've checked it both on 0.10 and current master.
> >
> > 2015-12-07 20:32 GMT+01:00 Dawid Wysakowicz <wysakowicz.da...@gmail.com
> >:
> > Hi,
> >
> > I have recently experimented a bit with windowing and event-time
> mechanism in flink and either I do not understand how should it work or
> there is some kind of a bug.
> >
> > I have prepared two Source Functions. One that emits watermark itself
> and one that does not, but I have prepared a TimestampExtractor that should
> produce same results that the previous Source Function, at least from my
> point of view.
> >
> > Afterwards I've prepared a simple summing over an EventTimeTriggered
> Sliding Window.
> >
> > What I expected is a sum of 3*(t_sum) property of Event regardless of
> the sleep time in Source Function. That is how the EventTimeSourceFunction
> works, but for the SourceFunction it depends on the sleep and does not
> equals 3*(t_sum).
> >
> > I have done some debugging and for the SourceFunction the output of
> ExtractTimestampsOperator does not chain to the aggregator operator(the
> property output.allOutputs is empty).
> >
> > Do I understand the mechanism correctly and should my code work as I
> described? If not could you please explain a little bit? The code I've
> attached to this email.
> >
> > I would be grateful.
> >
> > Regards
> > Dawid Wysakowicz
> >
> >
> >
>
>
>


Strange behaviour of windows

2015-12-07 Thread Dawid Wysakowicz
Hi,

I have recently experimented a bit with windowing and event-time mechanism
in flink and either I do not understand how should it work or there is some
kind of a bug.

I have prepared two Source Functions. One that emits watermark itself and
one that does not, but I have prepared a TimestampExtractor that should
produce same results that the previous Source Function, at least from my
point of view.

Afterwards I've prepared a simple summing over an EventTimeTriggered
Sliding Window.

What I expected is a sum of 3*(t_sum) property of Event regardless of the
sleep time in Source Function. That is how the EventTimeSourceFunction
works, but for the SourceFunction it depends on the sleep and does not
equals 3*(t_sum).

I have done some debugging and for the SourceFunction the output of
ExtractTimestampsOperator does not chain to the aggregator operator(the
property output.allOutputs is empty).

Do I understand the mechanism correctly and should my code work as I
described? If not could you please explain a little bit? The code I've
attached to this email.

I would be grateful.

Regards
Dawid Wysakowicz

import java.util.concurrent.TimeUnit

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.TimestampExtractor
import org.apache.flink.streaming.api.functions.source.{SourceFunction, EventTimeSourceFunction}
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger

object FlinkWindows {

  case class Event(name: String, toSum: Int, timestamp: Long)


  private var hostName: String = null
  private var port: Int = 0

  def main(args: Array[String]) {

hostName = args(0)
port = args(1).toInt

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//env.getConfig.setAutoWatermarkInterval(0)



//Read from a socket stream at map it to StockPrice objects
//val stream = env.socketTextStream(hostName, port).map(x => {
//  val split = x.split(",")
//  Event(split(0), split(1).toInt, split(2).toLong)
//})
val stream = env.addSource(new EventSourceWithTimestamp)
  //val stream = env.fromCollection(genCarStream())
  .keyBy("name")


stream.assignTimestamps(new TimestampExtractor[Event] {

  private var currentWatermark: Long = Long.MinValue

  override def getCurrentWatermark: Long = currentWatermark

  override def extractWatermark(element: Event, currentTimestamp: Long): Long = {
if (element.timestamp > currentWatermark) {
  currentWatermark = element.timestamp - 999
}
currentWatermark
  }

  override def extractTimestamp(element: Event, currentTimestamp: Long): Long = element.timestamp
})

val sumed = stream
  .window(SlidingTimeWindows.of(
Time.of(3, TimeUnit.SECONDS),
Time.of(1, TimeUnit.SECONDS)
  ))
  .trigger(EventTimeTrigger.create())
  .sum("toSum")
  .print()



env.execute("Stock stream")
  }



  class EventSource extends SourceFunction[Event] {

var isRunning = true
val offset = System.currentTimeMillis()

override def cancel(): Unit = isRunning = false

override def run(ctx: SourceContext[Event]): Unit = {

  var idx = 0
  while (isRunning) {
Thread.sleep(1500)
ctx.collect(Event("a", 3, offset + idx * 999))
ctx.collect(Event("b", 2, offset + idx * 999))
idx += 1
  }
}
  }

  class EventSourceWithTimestamp extends EventTimeSourceFunction[Event] {

var isRunning = true
val offset = System.currentTimeMillis()

override def cancel(): Unit = isRunning = false

override def run(ctx: SourceContext[Event]): Unit = {

  var idx = 0
  while (isRunning) {
Thread.sleep(1500)
ctx.collectWithTimestamp(Event("a", 3, offset + idx * 999), offset + idx * 999)
ctx.collectWithTimestamp(Event("b", 2, offset + idx * 999), offset + idx * 999)
ctx.emitWatermark(new Watermark(offset + (idx - 1) * 999))
idx += 1
  }
}
  }

}

Re: Strange behaviour of windows

2015-12-07 Thread Dawid Wysakowicz
Forgot to mention. I've checked it both on 0.10 and current master.

2015-12-07 20:32 GMT+01:00 Dawid Wysakowicz <wysakowicz.da...@gmail.com>:

> Hi,
>
> I have recently experimented a bit with windowing and event-time mechanism
> in flink and either I do not understand how should it work or there is some
> kind of a bug.
>
> I have prepared two Source Functions. One that emits watermark itself and
> one that does not, but I have prepared a TimestampExtractor that should
> produce same results that the previous Source Function, at least from my
> point of view.
>
> Afterwards I've prepared a simple summing over an EventTimeTriggered
> Sliding Window.
>
> What I expected is a sum of 3*(t_sum) property of Event regardless of the
> sleep time in Source Function. That is how the EventTimeSourceFunction
> works, but for the SourceFunction it depends on the sleep and does not
> equals 3*(t_sum).
>
> I have done some debugging and for the SourceFunction the output of
> ExtractTimestampsOperator does not chain to the aggregator operator(the
> property output.allOutputs is empty).
>
> Do I understand the mechanism correctly and should my code work as I
> described? If not could you please explain a little bit? The code I've
> attached to this email.
>
> I would be grateful.
>
> Regards
> Dawid Wysakowicz
>
>
>


<    1   2   3   4   5