Re: Access Sliding window

2017-08-03 Thread Raj Kumar
Thanks Fabian. Your suggestion helped. But, I am stuck at 3rd step

1. I didn't completely understand the step 3. What the process function
should look like ? Why does it needs to be stateful. Can you please provide
more details on this. 
2. In the stateful, function, we need to have a value state ? what details
we need to store would be helpful to implement the use case.
3. Moreover, I see that RichProcessFunction is deprecated. What else can we
use in place of RichProcessFunction ?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Access-Sliding-window-tp14519p14675.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Cannot restore from savepoint after adding a sink operator

2017-08-03 Thread Sam Huang
Hi all! I added a S3 bucketing sink operator to my flink job and tried to
start it from a savepoint using --allowNonRestoreState option, and it's
showing me this error:

 

I found on Flink official document saying:

 
So I suppose the restoration should succeed even if the operator number
doesn't match. Can someone explain to me why this happens, and what's the
possible solution?

FYI, some specs:
Flink version: 1.2.1
Job parallelism: 10
S3 sink parallelism: 1
Job execution graph: 

 


Thanks




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cannot-restore-from-savepoint-after-adding-a-sink-operator-tp14674.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Customer inputformat

2017-08-03 Thread Ted Yu
Did you use StreamExecutionEnvironment.createFileInput() ?

What did the modification times of the 2 files look like (were they the
newest) ?

Cheers

On Mon, Jul 31, 2017 at 12:42 PM, Mohit Anchlia 
wrote:

> Thanks! When I give path to a directory flink is only reading 2 files. It
> seems to be picking these 2 files randomly.
>
> On Mon, Jul 31, 2017 at 12:05 AM, Fabian Hueske  wrote:
>
>> Hi Mohit,
>>
>> as Ted said, there are plenty of InputFormats which are based on
>> FileInputFormat.
>> FileInputFormat also supports reading all files in a directory. Simply
>> specify the path of the directory.
>>
>> Check StreamExecutionEnvironment.createFileInput() which takes a several
>> parameters such as a FileInputFormat and a time interval in which the
>> directory is periodically checked.
>>
>> Best, Fabian
>>
>> 2017-07-30 21:31 GMT+02:00 Ted Yu :
>>
>>> For #1, you can find quite a few classes which extend FileInputFormat.
>>> e.g.
>>>
>>> flink-connectors/flink-avro/src/main/java/org/apache/flink/a
>>> pi/java/io/AvroInputFormat.java:public class AvroInputFormat extends
>>> FileInputFormat implements ResultTypeQuer
>>> flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java:public
>>> abstract class BinaryInputFormat extends FileInputFormat
>>> flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java:public
>>> abstract class DelimitedInputFormat extends FileInputFormat
>>> implements Checkpoi
>>>
>>> flink-streaming-java/src/test/java/org/apache/flink/streamin
>>> g/runtime/operators/ContinuousFileProcessingRescalingTest.java:
>>> extends FileInputFormat
>>>
>>> FYI
>>>
>>> On Sun, Jul 30, 2017 at 12:26 PM, Mohit Anchlia 
>>> wrote:
>>>
 Thanks. Few more questions:

 - Is there an example for FileInputFormat?
 - how to make it read all the files in a directory?
 - how to make an inputformat a streaming input instead of batch? Eg:
 read as new files come to a dir.

 Thanks again.

 On Sun, Jul 30, 2017 at 12:53 AM, Fabian Hueske 
 wrote:

> Hi,
>
> Flink calls the reachedEnd() method before it calls nextRecord() and
> closes the IF when reachedEnd() returns true.
> So, it should not return true until nextRecord() was called and the
> first and last record was emitted.
>
> You might also want to built your PDFFileInputFormat on
> FileInputFormat and set unsplittable to true.
> FileInputFormat comes with lots of built-in functionality such as
> InputSplit generation.
>
> Cheers, Fabian
>
> 2017-07-30 3:41 GMT+02:00 Mohit Anchlia :
>
>> Hi,
>>
>> I created a custom input format. Idea behind this is to read all
>> binary files from a directory and use each file as it's own split. Each
>> split is read as one whole record. When I run it in flink I don't get any
>> error but I am not seeing any output from .print. Am I missing something?
>>
>> 
>>
>> *public* *class* *PDFFileInputFormat* *extends*
>> RichInputFormat {
>>
>> *private* *static* *final* Logger *logger* = LoggerFactory.
>> *getLogger*(PDFFileInputFormat.*class*.getName());
>>
>> PDFFileInputSplit current = *null*;
>>
>> *public* *static* *void* main(String... args) *throws* Exception {
>>
>> PDFFileInputFormat pdfReader = *new* PDFFileInputFormat("c:\\proj\\
>> test");
>>
>> InputSplit[] splits = pdfReader.createInputSplits(1);
>>
>> pdfReader.open(splits[0]);
>>
>> pdfReader.nextRecord(*null*);
>>
>> *final* ExecutionEnvironment env = ExecutionEnvironment.
>> *getExecutionEnvironment*();
>>
>> env.fromElements(1, 2, 3)
>>
>> // returns the squared i
>>
>> .print();
>>
>> PDFFileInputFormat format = *new* PDFFileInputFormat("c:\\proj\\test"
>> );
>>
>> InputFormatSourceFunction *reader* = *new*
>> InputFormatSourceFunction<>(format,
>>
>> TypeInformation.*of*(StringValue.*class*));
>>
>> env.createInput(format,TypeInformation.*of*(StringValue.*class*)
>> ).print();
>>
>> }
>>
>> String path = *null*;
>>
>> *public* PDFFileInputFormat(String path) {
>>
>> *this*.path = path;
>>
>> }
>>
>> *public* *void* configure(Configuration parameters) {
>>
>> // *TODO* Auto-generated method stub
>>
>> }
>>
>> *public* BaseStatistics getStatistics(BaseStatistics cachedStatistics)
>> *throws* IOException {
>>
>> // *TODO* Auto-generated method stub
>>
>> *return* cachedStatistics;
>>
>> }
>>
>> *public* InputSplit[] createInputSplits(*int* minNumSplits) *throws*
>> IOException {
>>
>> *final* List splits = *new*
>> ArrayList();
>>
>> Files.*list*(Paths.*get*(path)).forEach(f -> {
>>
>> PDFFileInputSplit split = *new* PDFFileInputSplit(splits.size(), f);
>>
>> splits.ad

Re: CEP condition expression and its event consuming strategy

2017-08-03 Thread Chao Wang
Thank you, Dawid. FYI, I've implemented the discarding logic by 
CoFlatMapFunction, for the special case where there are only two input 
streams: I maintain a logical state (no match, input1 matched, or input2 
matched) and use private variables to store the matched event so far, 
which waits to be processed along with the event from the other input 
source.


Chao


On 07/31/2017 02:13 AM, Dawid Wysakowicz wrote:

Ad. 1 Yes it returns and Iterable to support times and oneOrMore patterns(which 
can accept more than one event).

Ad. 2 Some use case for not discarding used events could be e.g. looking for 
some shapes in our data, e.g. W-shapes. In this case one W-shape could start on 
the middle peak of the previous one.

Unfortunately personally I can’t point you to any in-use applications. Maybe 
Kostas, I’ve added to the discussion, know of any.

Anyway, thanks for interest in the CEP library. We will be happy to hear any 
comments and suggestions for future improvements.




On 28 Jul 2017, at 21:54, Chao Wang  wrote:

Hi Dawid,

Thank you.

Ad. 1 I noticed that the method getEventsForPattern() returns an Iterable 
and we need to further invoke .operator().next() to get access to the event value.

Ad. 2 Here is a bit about a use case we have that calls for such discarding 
semantics. In the event processing project I am currently working on, input 
event streams are sensor data, and we join streams and do Kalman filtering, 
FFT, etc. We therefore choose to discard the accepted events once the data they 
carry have been processed; otherwise, it may cause duplicated processing as 
well as incorrect join semantics.

We came up with this question while doing an empirical comparison of Flink and 
our system (implemented with the TAO real-time event service). We implemented 
in our system such semantics, by removing input events once CEP emits the 
corresponding output events.

Could you provide some use cases where the discarding semantics are not needed? 
I guess I am wired into processing sensor data and thus cannot think of a case 
where reusing accepted events would be of interest. Also, could you share some 
pointers to streaming application in-use? We are seeking to make our research 
work more relevant to current practice.

Thank you very much,

Chao

On 07/27/2017 02:17 AM, Dawid Wysakowicz wrote:

Hi Chao,

Ad. 1 You could implement it with IterativeCondition. Sth like this:

Pattern pattern = Pattern.begin("first").where(new 
SimpleCondition() {
@Override
public boolean filter(Event value) throws Exception {
   return value.equals("A") || value.equals("B");
}
}).followedBy("second").where(new IterativeCondition() {
@Override
public boolean filter(Event value, Context ctx) throws Exception {
   return (value.equals("A") || value.equals("B")) && 
!value.equals(ctx.getEventsForPattern("first"));
}
}).

Ad. 2 Unfortunately right now as you said Pattern restarts each other event and 
it is not possible to change that strategy. There is ongoing work to introduce 
AfterMatchSkipStrategy[1], but at best it will be merged in 1.4.0. I did not 
give it much thought, but I would try implement some discarding logic.

Regards,
Dawid

[1] https://issues.apache.org/jira/browse/FLINK-7169


On 26 Jul 2017, at 22:45, Chao Wang  wrote:

Hi,

I have two questions regarding the use of the Flink CEP library 
(flink-cep_2.11:1.3.1), as follows:

1. I'd like to know how to use the API to express "emit event C in the presence of events A and B, with 
no restriction on the arriving order of A and B"? I've tried by creating two patterns, one for "A 
and then B" and the other for "B and then A", and consequently using two patternStreams to 
handle each case, which emits C. It worked but to me this approach seems redundant.

2. Given the above objective expression, how to consume the accepted events so 
that they will not be used for future matchings? For example, with the arriving 
sequence {A, B, A}, the CEP should only emit one C (due to the matching of 
{A,B}), not two Cs (due to {A,B} and {B,A}). Similarly, with the arriving 
sequence {B, A, B, A}, the CPE should only emit two Cs, not three.


Thanks,

Chao





Re: json mapper

2017-08-03 Thread Eron Wright
I think your snippet looks good.  The Jackson ObjectMapper is designed to
be reused by numerous threads, and routinely stored as a static field.  It
is somewhat expensive to create.

Hope this helps,
-Eron

On Thu, Aug 3, 2017 at 7:46 AM, Nico Kruber  wrote:

> Hi Peter,
> I'm no Scala developer but I may be able to help with some concepts:
>
> * a static reference used inside a [Map]Function will certainly cause
> problems
> when executed in parallel in the same JVM, e.g. a TaskManager with multiple
> slots, depending on whether this static object is stateful and/or
> thread-safe
> * additionally, not all parallel instances of your map may be executed in
> the
> same JVM, e.g. on multiple TaskManagers, so you cannot assume that the
> state
> of the JsonMapper is consistent among them
> * if the ObjectMapper does not store any state that is worth recovering
> during
> a failure (none that I see from https://fasterxml.github.io/
> jackson-databind/
> javadoc/2.3.0/com/fasterxml/jackson/databind/ObjectMapper.html if that is
> the
> one you are using), then you don't need to put it into flink state but can
> either initialise it as a (non-static) member of your MapFunction class or
> even in your map function itself
> * for the correct use of keyed/non-keyed state, please refer to my other
> email
> or [1]
> * for 'class' vs. 'object': if you're using
> com.fasterxml.jackson.databind.ObjectMapper as described above, you'll
> have
> state again ("It will use instances of JsonParser and JsonGenerator for
> implementing actual reading/writing of JSON. " from the docs) but in
> general,
> it is a good question whether the singleton would work for stateless
> operators
> and whether it actually improves performance.
>
>
> Nico
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/stream/
> state.html
>
> On Thursday, 3 August 2017 12:41:36 CEST Peter Ertl wrote:
> > Hi flink users,
> >
> > I just wanted to ask if this kind of scala map function is correct?
> >
> > object JsonMapper {
> >   private val mapper: ObjectMapper = new ObjectMapper()
> > }
> >
> > class JsonMapper extends MapFunction[String, ObjectNode] {
> >   override def map(value: String): ObjectNode =
> > JsonMapper.mapper.readValue(value, classOf[ObjectNode]) }
> >
> > Is using a static reference to ObjectMapper fine or will this cause
> issues
> > on a distributed cluster / with checkpoint / serializing state /
> whatever ?
> >
> > Or should I instead use a non-transient property initialized in ctor
> > (ObjectMapper is java.io.Serializable) ?
> >
> > Or should I initialize it with RichMapFunction.open into a transient
> > property?
> >
> > Also I am wondering if replacing 'class' with 'object' (=> singleton)
> >
> > object JsonMapper extends MapFunction[String, ObjectNode] { /* ..*/ }
> >
> > is ok (actually the mapper is stateless so no obvious need to
> re-instantiate
> > it again and again ? )
> >
> > Thanks and best regards
> > Peter
>
>


Re: Akka Quarantine & Old YARN Versions

2017-08-03 Thread Konstantin Knauf
Hi Nico,

thanks for the quick response! No, this was note enabled :( Since we are
in the process of upgrading to 1.3.1: I did not find this option in 1.3,
only 1.2. Is this the default behaviour in 1.3 or is this configuration
just not documented?

Cheers,

Konstantin

On 03.08.2017 17:11, Nico Kruber wrote:
> Hi Konstantin,
> I digged through the linked pull requests (of https://issues.apache.org/jira/
> browse/FLINK-3347) a bit just to notice that the fix-version tag was wrong 
> (should have been 1.2.1, not 1.2.0) but you have that already.
> 
> In there, it was also mentioned that the quarantine monitor is disabled by 
> default and can be enabled by setting `taskmanager.exit-on-fatal-akka-error` 
> to true. If enabled, it should detect a quarantined task manager and shut it 
> down. In that case, YARN should notice it and start a new one, if I'm not 
> mistaken.
> 
> Are you already working with `taskmanager.exit-on-fatal-akka-error` enabled?
> 
> 
> Nico
> 
> On Thursday, 3 August 2017 10:53:00 CEST Konstantin Knauf wrote:
>> Hi everyone,
>>
>> we are running Flink 1.2.1 on YARN 2.4 (I know, way to old :().
>> Correlated with the last Flink Upgrade from 1.1.3 -> 1.2.1 we are
>> experiencing regular TaskManager failures due to
>>
>> [Taskmanager Logs]
>> 2017-07-10 15:25:26,448 ERROR Remoting
>>- Association to
>> [akka.tcp://flink@:45303] with UID [-382428140]
>> irrecoverably failed. Quarantining address.
>> java.lang.IllegalStateException: Error encountered while processing
>> system message acknowledgement buffer: [1 {0, 1}] ack: ACK[3, {}]
>> at
>> akka.remote.ReliableDeliverySupervisor$$anonfun$receive$1.applyOrElse(Endpoi
>> nt.scala:289) at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>> at ...
>>
>> As far as I understand https://issues.apache.org/jira/browse/FLINK-3345
>> the taskmanager should be restarted in this case. In our case YARN does
>> not start a new taskmanager container, but the container is just missing
>> indefinitely. Is it known, that this does not work on YARN 2.4?
>>
>> If it helps, I can also provide the full job and taskmanager logs...
>>
>> Cheers & Thanks,
>>
>> Konstantin
> 

-- 
Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
Sitz: Unterföhring * Amtsgericht München * HRB 135082



signature.asc
Description: OpenPGP digital signature


RE: Event-time and first watermark

2017-08-03 Thread Gwenhael Pasquiers
We're not using a Window but a more basic ProcessFunction to handle sessions. 
We made this choice because we have to handle (millions of) sessions that can 
last from 10 seconds to 24 hours so we wanted to handle things manually using 
the State class.
 
We're using the watermark as an event-time "clock" to:
* compute "lateness" of a message relatively to the watermark (most recent 
message from the stream)
* fire timer events 

We're using event-time instead of processing time because our stream will be 
late and data arrive by hourly bursts.
 
Maybe we're misusing the watermark ?

B.R.

-Original Message-
From: Nico Kruber [mailto:n...@data-artisans.com] 
Sent: jeudi 3 août 2017 16:30
To: user@flink.apache.org
Cc: Gwenhael Pasquiers 
Subject: Re: Event-time and first watermark

Hi Gwenhael,
"A Watermark(t) declares that event time has reached time t in that stream, 
meaning that there should be no more elements from the stream with a timestamp 
t’ <= t (i.e. events with timestamps older or equal to the watermark)." [1]

Therefore, they should be behind the actual event with timestamp t.

What is it that you want to achieve in the end? What do you want to use the 
watermark for? They are basically a means to defining when an event time window 
ends.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/
event_time.html#event-time-and-watermarks

On Thursday, 3 August 2017 10:24:35 CEST Gwenhael Pasquiers wrote:
> Hi,
> 
> From my tests it seems that the initial watermark value is 
> Long.MIN_VALUE even though my first data passed through the timestamp 
> extractor before arriving into my ProcessFunction. It looks like the 
> watermark "lags" behind the data by one message.
> 
> Is there a way to have a watermark more "up to date" ? Or is the only 
> way to compute it myself into my ProcessFunction ?
> 
> Thanks.



WaterMark & Eventwindow not fired correctly

2017-08-03 Thread aitozi

Hi,

i have encounted a problem, i apply generate and assign watermark at the
datastream, and then keyBy, and  EventTimewindow and  apply window Function.

in the log, i can see that watermark and the eventtime with the message are
correct , and i think the situation bellow will trigger the window function
:

1、watermark Time >= window_end_time
2、there is data in [window_start_time,window_end_time)

i check the log , it is satisfied . and i try to apply the
trigger(CountTrigger.of(5)) Function and i can see in the log the
windowapply Function is invocked.

And i am doubt why is the windowapply Function can not be triggerd only by
the event time and watermark

thanks,
aitozi



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Akka Quarantine & Old YARN Versions

2017-08-03 Thread Nico Kruber
Hi Konstantin,
I digged through the linked pull requests (of https://issues.apache.org/jira/
browse/FLINK-3347) a bit just to notice that the fix-version tag was wrong 
(should have been 1.2.1, not 1.2.0) but you have that already.

In there, it was also mentioned that the quarantine monitor is disabled by 
default and can be enabled by setting `taskmanager.exit-on-fatal-akka-error` 
to true. If enabled, it should detect a quarantined task manager and shut it 
down. In that case, YARN should notice it and start a new one, if I'm not 
mistaken.

Are you already working with `taskmanager.exit-on-fatal-akka-error` enabled?


Nico

On Thursday, 3 August 2017 10:53:00 CEST Konstantin Knauf wrote:
> Hi everyone,
> 
> we are running Flink 1.2.1 on YARN 2.4 (I know, way to old :().
> Correlated with the last Flink Upgrade from 1.1.3 -> 1.2.1 we are
> experiencing regular TaskManager failures due to
> 
> [Taskmanager Logs]
> 2017-07-10 15:25:26,448 ERROR Remoting
>- Association to
> [akka.tcp://flink@:45303] with UID [-382428140]
> irrecoverably failed. Quarantining address.
> java.lang.IllegalStateException: Error encountered while processing
> system message acknowledgement buffer: [1 {0, 1}] ack: ACK[3, {}]
> at
> akka.remote.ReliableDeliverySupervisor$$anonfun$receive$1.applyOrElse(Endpoi
> nt.scala:289) at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> at ...
> 
> As far as I understand https://issues.apache.org/jira/browse/FLINK-3345
> the taskmanager should be restarted in this case. In our case YARN does
> not start a new taskmanager container, but the container is just missing
> indefinitely. Is it known, that this does not work on YARN 2.4?
> 
> If it helps, I can also provide the full job and taskmanager logs...
> 
> Cheers & Thanks,
> 
> Konstantin



signature.asc
Description: This is a digitally signed message part.


Re: json mapper

2017-08-03 Thread Nico Kruber
Hi Peter,
I'm no Scala developer but I may be able to help with some concepts:

* a static reference used inside a [Map]Function will certainly cause problems 
when executed in parallel in the same JVM, e.g. a TaskManager with multiple 
slots, depending on whether this static object is stateful and/or thread-safe
* additionally, not all parallel instances of your map may be executed in the 
same JVM, e.g. on multiple TaskManagers, so you cannot assume that the state 
of the JsonMapper is consistent among them
* if the ObjectMapper does not store any state that is worth recovering during 
a failure (none that I see from https://fasterxml.github.io/jackson-databind/
javadoc/2.3.0/com/fasterxml/jackson/databind/ObjectMapper.html if that is the 
one you are using), then you don't need to put it into flink state but can 
either initialise it as a (non-static) member of your MapFunction class or 
even in your map function itself
* for the correct use of keyed/non-keyed state, please refer to my other email 
or [1]
* for 'class' vs. 'object': if you're using 
com.fasterxml.jackson.databind.ObjectMapper as described above, you'll have 
state again ("It will use instances of JsonParser and JsonGenerator for 
implementing actual reading/writing of JSON. " from the docs) but in general, 
it is a good question whether the singleton would work for stateless operators 
and whether it actually improves performance.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
state.html

On Thursday, 3 August 2017 12:41:36 CEST Peter Ertl wrote:
> Hi flink users,
> 
> I just wanted to ask if this kind of scala map function is correct?
> 
> object JsonMapper {
>   private val mapper: ObjectMapper = new ObjectMapper()
> }
> 
> class JsonMapper extends MapFunction[String, ObjectNode] {
>   override def map(value: String): ObjectNode =
> JsonMapper.mapper.readValue(value, classOf[ObjectNode]) }
> 
> Is using a static reference to ObjectMapper fine or will this cause issues
> on a distributed cluster / with checkpoint / serializing state / whatever ?
> 
> Or should I instead use a non-transient property initialized in ctor
> (ObjectMapper is java.io.Serializable) ?
> 
> Or should I initialize it with RichMapFunction.open into a transient
> property?
> 
> Also I am wondering if replacing 'class' with 'object' (=> singleton)
> 
> object JsonMapper extends MapFunction[String, ObjectNode] { /* ..*/ }
> 
> is ok (actually the mapper is stateless so no obvious need to re-instantiate
> it again and again ? )
> 
> Thanks and best regards
> Peter



signature.asc
Description: This is a digitally signed message part.


State Backend

2017-08-03 Thread Vijay Srinivasaraghavan
Hello,
I would like to know if we have any latency requirements for choosing 
appropriate state backend? 
For example, if an HCFS implementation is used as Flink state backend (instead 
of stock HDFS), are there any implications that one needs to know with respect 
to the performance?
- Frequency of read/write operations, random vs sequential reads- Load/Usage 
pattern (Frequent small updates vs bulk operation)- RocksDB->HCFS (Is this kind 
of recommended option to mitigate some of the challenges outlined above)- S3 Vs 
HDFS any performance numbers?
Appreciate any inputs on this.
RegardsVijay



Re: Event-time and first watermark

2017-08-03 Thread Nico Kruber
Hi Gwenhael,
"A Watermark(t) declares that event time has reached time t in that stream, 
meaning that there should be no more elements from the stream with a timestamp 
t’ <= t (i.e. events with timestamps older or equal to the watermark)." [1]

Therefore, they should be behind the actual event with timestamp t.

What is it that you want to achieve in the end? What do you want to use the 
watermark for? They are basically a means to defining when an event time window 
ends.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/
event_time.html#event-time-and-watermarks

On Thursday, 3 August 2017 10:24:35 CEST Gwenhael Pasquiers wrote:
> Hi,
> 
> From my tests it seems that the initial watermark value is Long.MIN_VALUE
> even though my first data passed through the timestamp extractor before
> arriving into my ProcessFunction. It looks like the watermark "lags" behind
> the data by one message.
> 
> Is there a way to have a watermark more "up to date" ? Or is the only way to
> compute it myself into my ProcessFunction ?
> 
> Thanks.



signature.asc
Description: This is a digitally signed message part.


Re: state inside functions

2017-08-03 Thread Nico Kruber
Hi Peter,
there's no need to worry about transient members as the operator itself is not 
serialized - only the state itself, depending on the state back-end.

If you want your state to be recovered by checkpoints you should implement the 
open() method and initialise your state there as in your point (2) and as 
described in [1].

If you want to re-scale your job, you have to take a savepoint and may resume 
from there with a different parallelism [2] but be sure to set a maximum 
parallelism (per job / or operator) and set UUIDs for operators as described 
in [3].


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
state.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/setup/
savepoints.html
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/
production_ready.html

On Thursday, 3 August 2017 12:11:14 CEST Peter Ertl wrote:
> Hi,
> 
> can someone elaborate on when I should set properties transient /
> non-transient within operators (e.g. map / flatMap / reduce) ?
> 
> I see these two possibilies:
> 
> (1) initialize a non-transient property from the constructor
> (2) initialize a transient property inside a Rich???Function when
> open(ConfigurationParameters) is invoked
> 
> on what criteria should I choose (1) or (2) ?
> 
> how is this related to checkpointing / rebalancing?
> 
> Thanks in advance
> Peter



signature.asc
Description: This is a digitally signed message part.


Re: Getting JobManager address and port within a running job

2017-08-03 Thread Biplob Biswas
Hi nico,

This behaviour was on my cluster and not on the local mode as I wanted to
check whether it's an issue of my job or the behaviour with jobmanager is
consistent everywhere.

When I run my job on the yarn-cluster mode, it's not honouring the IP and
port I specified and its randomly assigning a node and port. For this
reason, I created a yarn session and I ran my flink job within this session.

But still i am having issues with fetching the jobid within a running flink
instance.

Thanks 
Biplob



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-JobManager-address-and-port-within-a-running-job-tp14656p14662.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Can't find correct JobManager address, job fails with Queryable state

2017-08-03 Thread Biplob Biswas
Hi Nico,

I had actually tried doing that but I still get the same error as before
with the actor not found. I then ran on my mock cluster and I was getting
the same error although I could observe the jobmanager on the yarn cluster
mode with a defined port.

The addres and port combination was random and as mentioned  here

 
, the the JobManager is always allocated at different machines. 

So to circumvent this issue, I created a flink yarn session and ran my job
within this session. As the jobmanager is fixed within a yarn session, so I
set my jobmanager address and port using the corresponding data.

Now it can connect to jobmanager (only on cluster, still not on local mode)
but the job id I specified was wrong and I don't really know how to fetch
the jobid of my flink job within a running instance.

Thanks,
Biplob



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-find-correct-JobManager-address-job-fails-with-Queryable-state-tp14644p14661.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Getting JobManager address and port within a running job

2017-08-03 Thread Biplob Biswas
Also, is it possible to get the JobID from a running flink instance for a
streaming job? 

I know I can get for a batch job with
ExecutionEnvironment.getExecutionEnvironment().getId() but apparently, it
generates a new execution environment and returns the job id of that
environment for a batch environment and not a streaming environment.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-JobManager-address-and-port-within-a-running-job-tp14656p14660.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Getting JobManager address and port within a running job

2017-08-03 Thread Nico Kruber
Assuming, from your previous email, that you fire up a LocalFlinkMiniCluster: 
this, afaik, does not process flink-conf.yaml but only the configuration given 
to it.

If you start a "real" flink cluster, e.g. by bin/start-cluster.sh, it will show 
the behaviour you desired.


Nico

On Thursday, 3 August 2017 13:20:37 CEST Biplob Biswas wrote:
> Hi,
> 
> Is there a way to fetch the jobmanager address and port from a running flink
> job, I was expecting the address and port to be constant but it changes
> everytime I am running a job. ANd somehow its not honoring the
> jobmanager.rpc.address and jobmanager.rpc.port set in the flink-conf.yaml
> file.
> 
> If I can get the address within the job itself, I can set it up based on
> that value.
> 
> Thanks & regards
> Biplob
> 
> 
> 
> 
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting
> -JobManager-address-and-port-within-a-running-job-tp14656.html Sent from the
> Apache Flink User Mailing List archive. mailing list archive at Nabble.com.



signature.asc
Description: This is a digitally signed message part.


Re: Can't find correct JobManager address, job fails with Queryable state

2017-08-03 Thread Nico Kruber
Hi Biplob,
by starting a local environment the way you described, i.e. by using

LocalStreamEnvironment.createLocalEnvironmentWithWebUI(conf);

you are firing up a LocalFlinkMiniCluster which, by default, has the queryable 
state server disabled. You can enable it via:

config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);



Nico

On Thursday, 3 August 2017 11:40:12 CEST Biplob Biswas wrote:
> I managed to get the Web UI up and running but I am still getting the error
> with "Actor not found"
> 
> Before the job failed I got the output for the Flink config from the WebUI
> and it seems okay to me, this corresponds to the config I have already set.
> 
>  4653/flinkconfig.png>
> 
> 
> But when I try to access my shared store (from the same job), I get the
> error with the following stacktrace:
> 
> 
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$ano
> nfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933) at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$ano
> nfun$applyOrElse$7.apply(JobManager.scala:876) at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$ano
> nfun$applyOrElse$7.apply(JobManager.scala:876) at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.
> scala:24) at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDis
> patcher.scala:397) at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1
> 339) at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java
> :107) Caused by: akka.actor.ActorNotFound: Actor not found for:
> ActorSelection[Anchor(akka.tcp://flink@127.0.0.1:6123/),
> Path(/user/jobmanager)]
>   at
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:6
> 5) at
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:6
> 3) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>   at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.s
> cala:55) at
> akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73) at
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute
> (Future.scala:74) at
> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120) at
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.s
> cala:73) at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>   at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
>   at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266)
>   at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533)
>   at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569)
>   at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559)
>   at
> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActo
> rRefProvider.scala:87) at
> akka.remote.EndpointWriter.postStop(Endpoint.scala:583)
>   at akka.actor.Actor$class.aroundPostStop(Actor.scala:477)
>   at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:437)
>   at
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$fin
> ishTerminate(FaultHandling.scala:210) at
> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
>   at akka.actor.ActorCell.terminate(ActorCell.scala:369)
>   at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
>   at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
>   at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>   ... 5 more
> 
> 
> 
> 
> 
> 
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-f
> ind-correct-JobManager-address-job-fails-with-Queryable-state-tp14644p14653.
> html Sent from the Apache Flink User Mailing List archive. mailing list
> archive at Nabble.com.



signature.asc
Description: This is a digitally signed message part.


Re: replacement for KeyedStream.fold(..) ?

2017-08-03 Thread Nico Kruber
Hi Peter,
although unfortunately not documented yet in [1] (rumor has it that that is 
going to change soon) and without a proper replacement note in the deprecation 
javadoc, two things come to mind for replacing fold():

* AggregateFunction and 
[All]WindowedStream#aggregate(AggregateFunction) to create aggregates 
of all events in a window
* a stateful [flat]map (as you mentioned)

These two should cover all cases.
Or did you have any use case in mind that doesn't fit in there?


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html

On Wednesday, 2 August 2017 23:56:16 CEST Peter Ertl wrote:
> Hi folks,
> 
> since KeyedStream.fold(..) is marked as @deprecated what is the proper
> replacement for that kind of functionality?
> 
> Is mapWithState() and flatMapWithState() a *full* replacement?
> 
> Cheers
> Peter



signature.asc
Description: This is a digitally signed message part.


Getting JobManager address and port within a running job

2017-08-03 Thread Biplob Biswas
Hi,

Is there a way to fetch the jobmanager address and port from a running flink
job, I was expecting the address and port to be constant but it changes
everytime I am running a job. ANd somehow its not honoring the
jobmanager.rpc.address and jobmanager.rpc.port set in the flink-conf.yaml
file.

If I can get the address within the job itself, I can set it up based on
that value. 

Thanks & regards
Biplob




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-JobManager-address-and-port-within-a-running-job-tp14656.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


json mapper

2017-08-03 Thread Peter Ertl
Hi flink users,

I just wanted to ask if this kind of scala map function is correct?

object JsonMapper {
  private val mapper: ObjectMapper = new ObjectMapper()
}

class JsonMapper extends MapFunction[String, ObjectNode] {
  override def map(value: String): ObjectNode = 
JsonMapper.mapper.readValue(value, classOf[ObjectNode])
}

Is using a static reference to ObjectMapper fine or will this cause issues on a 
distributed cluster / with checkpoint / serializing state / whatever ?

Or should I instead use a non-transient property initialized in ctor 
(ObjectMapper is java.io.Serializable) ?

Or should I initialize it with RichMapFunction.open into a transient property?

Also I am wondering if replacing 'class' with 'object' (=> singleton)

object JsonMapper extends MapFunction[String, ObjectNode] { /* ..*/ }

is ok (actually the mapper is stateless so no obvious need to re-instantiate it 
again and again ? )

Thanks and best regards
Peter

state inside functions

2017-08-03 Thread Peter Ertl
Hi,

can someone elaborate on when I should set properties transient / non-transient 
within operators (e.g. map / flatMap / reduce) ?

I see these two possibilies:

(1) initialize a non-transient property from the constructor
(2) initialize a transient property inside a Rich???Function when 
open(ConfigurationParameters) is invoked

on what criteria should I choose (1) or (2) ?

how is this related to checkpointing / rebalancing?

Thanks in advance
Peter

Re: Can't find correct JobManager address, job fails with Queryable state

2017-08-03 Thread Biplob Biswas
I managed to get the Web UI up and running but I am still getting the error
with "Actor not found"

Before the job failed I got the output for the Flink config from the WebUI
and it seems okay to me, this corresponds to the config I have already set.


 


But when I try to access my shared store (from the same job), I get the
error with the following stacktrace:


Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.actor.ActorNotFound: Actor not found for:
ActorSelection[Anchor(akka.tcp://flink@127.0.0.1:6123/),
Path(/user/jobmanager)]
at
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
at
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73)
at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
at 
akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120)
at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266)
at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533)
at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569)
at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559)
at
akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
at akka.remote.EndpointWriter.postStop(Endpoint.scala:583)
at akka.actor.Actor$class.aroundPostStop(Actor.scala:477)
at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:437)
at
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
at
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
at akka.actor.ActorCell.terminate(ActorCell.scala:369)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
... 5 more






--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-find-correct-JobManager-address-job-fails-with-Queryable-state-tp14644p14653.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Akka Quarantine & Old YARN Versions

2017-08-03 Thread Konstantin Knauf
Hi everyone,

we are running Flink 1.2.1 on YARN 2.4 (I know, way to old :().
Correlated with the last Flink Upgrade from 1.1.3 -> 1.2.1 we are
experiencing regular TaskManager failures due to

[Taskmanager Logs]
2017-07-10 15:25:26,448 ERROR Remoting
   - Association to
[akka.tcp://flink@:45303] with UID [-382428140]
irrecoverably failed. Quarantining address.
java.lang.IllegalStateException: Error encountered while processing
system message acknowledgement buffer: [1 {0, 1}] ack: ACK[3, {}]
at
akka.remote.ReliableDeliverySupervisor$$anonfun$receive$1.applyOrElse(Endpoint.scala:289)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at ...

As far as I understand https://issues.apache.org/jira/browse/FLINK-3345
the taskmanager should be restarted in this case. In our case YARN does
not start a new taskmanager container, but the container is just missing
indefinitely. Is it known, that this does not work on YARN 2.4?

If it helps, I can also provide the full job and taskmanager logs...

Cheers & Thanks,

Konstantin

-- 
Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
Sitz: Unterföhring * Amtsgericht München * HRB 135082



signature.asc
Description: OpenPGP digital signature


Event-time and first watermark

2017-08-03 Thread Gwenhael Pasquiers
Hi,

>From my tests it seems that the initial watermark value is Long.MIN_VALUE even 
>though my first data passed through the timestamp extractor before arriving 
>into my ProcessFunction. It looks like the watermark "lags" behind the data by 
>one message.

Is there a way to have a watermark more "up to date" ? Or is the only way to 
compute it myself into my ProcessFunction ?

Thanks.