Flink multiple windows

2018-06-08 Thread antonio saldivar
Hello

Has anyone work this way? I am asking because I have to get the aggregation
( Sum and Count) for multiple windows size  (10 mins, 20 mins, 30 mins)
please let me know if this works properly or is there other good solution.


DataStream data = ...
// append a Long 1 to each record to count it.
DataStream> withOnes = data.map(new AppendOne);

DataStream> 1minCnts = withOnes
  // key by String field
  .keyBy(0)
  // define time window
  .timeWindow(Time.of(1, MINUTES))
  // sum ones of the Long field
  // in practice you want to use an incrementally aggregating
ReduceFunction and
  // a WindowFunction to extract the start/end timestamp of the window
  .sum(1);

// emit 1-min counts to wherever you need it
1minCnts.addSink(new YourSink());

// compute 5-min counts based on 1-min counts
DataStream> 5minCnts = 1minCnts
  // key by String field
  .keyBy(0)
  // define time window of 5 minutes
  .timeWindow(Time.of(5, MINUTES))
  // sum the 1-minute counts in the Long field
  .sum(1);

// emit 5-min counts to wherever you need it
5minCnts.addSink(new YourSink());

// continue with 1 day window and 1 week window

Thank you Regards


Re: Conceptual question

2018-06-08 Thread TechnoMage
Thank you all.  This discussion is very helpful.  It sounds like I can wait for 
1.6 though given our development status.

Michael

> On Jun 8, 2018, at 1:08 PM, David Anderson  wrote:
> 
> Hi all,
> 
> I think I see a way to eagerly do full state migration without writing your 
> own Operator, but it's kind of hacky and may have flaws I'm not aware of. 
> 
> In Flink 1.5 we now have the possibility to connect BroadcastStreams to 
> KeyedStreams and apply a KeyedBroadcastProcessFunction. This is relevant 
> because in the processBroadcastElement() method you can supply a 
> KeyedStateFunction to the Context.applyToKeyedState() method, and this 
> KeyedStateFunction will be applied every item of keyed state associated with 
> the state descriptor you specify. I've been doing some experiments with this, 
> and it's quite powerful in cases where it's useful to operate on all of your 
> application's state.
> 
> I believe this was intended for cases where an update to an item of broadcast 
> state has implications for associated keyed state, but I see nothing that 
> prevents you from essentially ignoring the broadcast stream and using this 
> mechanism to implement keyed state migration.
> 
> David
> 
> 
> 
> On Fri, Jun 8, 2018 at 9:27 AM, Piotr Nowojski  > wrote:
> Hi,
> 
> Yes it should be feasible. As I said before, with Flink 1.6 there will be 
> better way for migrating a state, but for now you either need to lazily 
> convert the state, or iterate over the keys and do the job manually.
> 
> Piotrek
> 
> 
>> On 7 Jun 2018, at 15:52, Tony Wei > > wrote:
>> 
>> Hi Piotrek,
>> 
>> So my question is: is that feasible to migrate state from `ProcessFunction` 
>> to my own operator then use `getKeyedStateBackend()` to migrate the states?
>> If yes, is there anything I need to be careful with? If no, why and can it 
>> be available in the future? Thank you.
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 2018-06-07 21:43 GMT+08:00 Piotr Nowojski > >:
>> Hi,
>> 
>> Oh, I see now. Yes indeed getKeyedStateBackened() is not exposed to the 
>> function and you can not migrate your state that way.
>> 
>> As far as I know yes, at the moment in order to convert everything at once 
>> (without getKeyes you still can implement lazy conversion) you would have to 
>> write your own operator.
>> 
>> Piotrek
>> 
>> 
>>> On 7 Jun 2018, at 15:26, Tony Wei >> > wrote:
>>> 
>>> Hi Piotrek,
>>> 
>>> I used `ProcessFunction` to implement it, but it seems that I can't call 
>>> `getKeyedStateBackend()` like `WindowOperator` did.
>>> I found that `getKeyedStateBackend()` is the method in 
>>> `AbstractStreamOperator` and `ProcessFunction` API didn't extend it.
>>> Dose that mean I can't look up all keys and migrate the entire previous 
>>> states to the new states in `ProcessFunction#open()`?
>>> As I said, do I need to port `ProcessFunction` to `KeyedProcessOperator` to 
>>> migration state like the manner showed in `WindowOperator`? 
>>> 
>>> Best Regards,
>>> Tony Wei
>>> 
>>> 2018-06-07 20:28 GMT+08:00 Piotr Nowojski >> >:
>>> What function are you implementing and how are you using it?
>>> 
>>> Usually it’s enough if your function implements RichFunction (or rather 
>>> extend from AbstractRichFunction) and then you could use RichFunction#open 
>>> in the similar manner as in the code that I posted in previous message. 
>>> Flink in many places performs instanceof chekcs like: 
>>> org.apache.flink.api.com 
>>> mon.functions.util.FunctionUtils#openFunction
>>> 
>>> public static void openFunction(Function function, Configuration 
>>> parameters) throws Exception{
>>>if (function instanceof RichFunction) {
>>>   RichFunction richFunction = (RichFunction) function;
>>>   richFunction.open(parameters);
>>>}
>>> }
>>> 
>>> Piotrek
>>> 
>>> 
 On 7 Jun 2018, at 11:07, Tony Wei >>> > wrote:
 
 Hi Piotrek,
 
 It seems that this was implemented by `Operator` API, which is a more low 
 level api compared to `Function` API.
 Since in `Function` API level we can only migrate state by event 
 triggered, it is more convenient in this way to migrate state by foreach 
 all keys in `open()` method.
 If I was implemented state operator by `ProcessFunction` API, is it 
 possible to port it to `KeyedProcessOperator` and do the state migration 
 that you mentioned?
 And are there something concerned and difficulties that will leads to 
 restored state failed or other problems? Thank you!
 
 Best Regards,
 Tony Wei
 
 2018-06-07 16:10 GMT+08:00 Piotr Nowojski >>> >:
 Hi,
 
 General solution for state/schema migration is under development and it 
 might be released with Flink 1.6.0.
 
 Before 

Re: [DISCUSS] Flink 1.6 features

2018-06-08 Thread Elias Levy
Since wishes are free:

- Standalone cluster job isolation:
https://issues.apache.org/jira/browse/FLINK-8886
- Proper sliding window joins (not overlapping hoping window joins):
https://issues.apache.org/jira/browse/FLINK-6243
- Sharing state across operators:
https://issues.apache.org/jira/browse/FLINK-6239
- Synchronizing streams: https://issues.apache.org/jira/browse/FLINK-4558

Seconded:
- Atomic cancel-with-savepoint:
https://issues.apache.org/jira/browse/FLINK-7634
- Support dynamically changing CEP patterns :
https://issues.apache.org/jira/browse/FLINK-7129


On Fri, Jun 8, 2018 at 1:31 PM, Stephan Ewen  wrote:

> Hi all!
>
> Thanks for the discussion and good input. Many suggestions fit well with
> the proposal above.
>
> Please bear in mind that with a time-based release model, we would release
> whatever is mature by end of July.
> The good thing is we could schedule the next release not too far after
> that, so that the features that did not quite make it will not be delayed
> too long.
> In some sense, you could read this as as "*what to do first*" list,
> rather than "*this goes in, other things stay out"*.
>
> Some thoughts on some of the suggestions
>
> *Kubernetes integration:* An opaque integration with Kubernetes should be
> supported through the "as a library" mode. For a deeper integration, I know
> that some committers have experimented with some PoC code. I would let Till
> add some thoughts, he has worked the most on the deployment parts recently.
>
> *Per partition watermarks with idleness:* Good point, could one implement
> that on the current interface, with a periodic watermark extractor?
>
> *Atomic cancel-with-savepoint:* Agreed, this is important. Making this
> work with all sources needs a bit more work. We should have this in the
> roadmap.
>
> *Elastic Bloomfilters:* This seems like an interesting new feature - the
> above suggested feature set was more about addressing some longer standing
> issues/requests. However, nothing should prevent contributors to work on
> that.
>
> Best,
> Stephan
>
>
> On Wed, Jun 6, 2018 at 6:23 AM, Yan Zhou [FDS Science] 
> wrote:
>
>> +1 on https://issues.apache.org/jira/browse/FLINK-5479
>> [FLINK-5479] Per-partition watermarks in ...
>> 
>> issues.apache.org
>> Reported in ML: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-
>> skewness-causes-watermark-not-being-emitted-td11008.html It's normally
>> not a common case to have Kafka partitions not producing any data, but
>> it'll probably be good to handle this as well. I ...
>>
>> --
>> *From:* Rico Bergmann 
>> *Sent:* Tuesday, June 5, 2018 9:12:00 PM
>> *To:* Hao Sun
>> *Cc:* d...@flink.apache.org; user
>> *Subject:* Re: [DISCUSS] Flink 1.6 features
>>
>> +1 on K8s integration
>>
>>
>>
>> Am 06.06.2018 um 00:01 schrieb Hao Sun :
>>
>> adding my vote to K8S Job mode, maybe it is this?
>> > Smoothen the integration in Container environment, like "Flink as a
>> Library", and easier integration with Kubernetes services and other proxies.
>>
>>
>>
>> On Mon, Jun 4, 2018 at 11:01 PM Ben Yan 
>> wrote:
>>
>> Hi Stephan,
>>
>> Will  [ https://issues.apache.org/jira/browse/FLINK-5479 ]
>> (Per-partition watermarks in FlinkKafkaConsumer should consider idle
>> partitions) be included in 1.6? As we are seeing more users with this
>> issue on the mailing lists.
>>
>> Thanks.
>> Ben
>>
>> 2018-06-05 5:29 GMT+08:00 Che Lui Shum :
>>
>> Hi Stephan,
>>
>> Will FLINK-7129 (Support dynamically changing CEP patterns) be included
>> in 1.6? There were discussions about possibly including it in 1.6:
>> http://mail-archives.apache.org/mod_mbox/flink-user/201803.m
>> box/%3cCAMq=OU7gru2O9JtoWXn1Lc1F7NKcxAyN6A3e58kxctb4b508RQ@m
>> ail.gmail.com%3e
>>
>> Thanks,
>> Shirley Shum
>>
>> [image: Inactive hide details for Stephan Ewen ---06/04/2018 02:21:47
>> AM---Hi Flink Community! The release of Apache Flink 1.5 has happ]Stephan
>> Ewen ---06/04/2018 02:21:47 AM---Hi Flink Community! The release of Apache
>> Flink 1.5 has happened (yay!) - so it is a good time
>>
>> From: Stephan Ewen 
>> To: d...@flink.apache.org, user 
>> Date: 06/04/2018 02:21 AM
>> Subject: [DISCUSS] Flink 1.6 features
>> --
>>
>>
>>
>> Hi Flink Community!
>>
>> The release of Apache Flink 1.5 has happened (yay!) - so it is a good
>> time to start talking about what to do for release 1.6.
>>
>> *== Suggested release timeline ==*
>>
>> I would propose to release around *end of July* (that is 8-9 weeks from
>> now).
>>
>> The rational behind that: There was a lot of effort in release testing
>> automation (end-to-end tests, scripted stress tests) as part of release
>> 1.5. You may have noticed the big set of new modules under
>> "flink-end-to-end-tests" in the Flink repository. It delayed the 1.5
>> release a bit, and needs to continue as part of the coming release cycle,
>> but should help make 

Re: [DISCUSS] Flink 1.6 features

2018-06-08 Thread Stephan Ewen
Hi all!

Thanks for the discussion and good input. Many suggestions fit well with
the proposal above.

Please bear in mind that with a time-based release model, we would release
whatever is mature by end of July.
The good thing is we could schedule the next release not too far after
that, so that the features that did not quite make it will not be delayed
too long.
In some sense, you could read this as as "*what to do first*" list, rather
than "*this goes in, other things stay out"*.

Some thoughts on some of the suggestions

*Kubernetes integration:* An opaque integration with Kubernetes should be
supported through the "as a library" mode. For a deeper integration, I know
that some committers have experimented with some PoC code. I would let Till
add some thoughts, he has worked the most on the deployment parts recently.

*Per partition watermarks with idleness:* Good point, could one implement
that on the current interface, with a periodic watermark extractor?

*Atomic cancel-with-savepoint:* Agreed, this is important. Making this work
with all sources needs a bit more work. We should have this in the roadmap.

*Elastic Bloomfilters:* This seems like an interesting new feature - the
above suggested feature set was more about addressing some longer standing
issues/requests. However, nothing should prevent contributors to work on
that.

Best,
Stephan


On Wed, Jun 6, 2018 at 6:23 AM, Yan Zhou [FDS Science] 
wrote:

> +1 on https://issues.apache.org/jira/browse/FLINK-5479
> [FLINK-5479] Per-partition watermarks in ...
> 
> issues.apache.org
> Reported in ML: http://apache-flink-user-mailing-list-archive.2336050.n4.
> nabble.com/Kafka-topic-partition-skewness-causes-watermark-
> not-being-emitted-td11008.html It's normally not a common case to have
> Kafka partitions not producing any data, but it'll probably be good to
> handle this as well. I ...
>
> --
> *From:* Rico Bergmann 
> *Sent:* Tuesday, June 5, 2018 9:12:00 PM
> *To:* Hao Sun
> *Cc:* d...@flink.apache.org; user
> *Subject:* Re: [DISCUSS] Flink 1.6 features
>
> +1 on K8s integration
>
>
>
> Am 06.06.2018 um 00:01 schrieb Hao Sun :
>
> adding my vote to K8S Job mode, maybe it is this?
> > Smoothen the integration in Container environment, like "Flink as a
> Library", and easier integration with Kubernetes services and other proxies.
>
>
>
> On Mon, Jun 4, 2018 at 11:01 PM Ben Yan 
> wrote:
>
> Hi Stephan,
>
> Will  [ https://issues.apache.org/jira/browse/FLINK-5479 ]
> (Per-partition watermarks in FlinkKafkaConsumer should consider idle
> partitions) be included in 1.6? As we are seeing more users with this
> issue on the mailing lists.
>
> Thanks.
> Ben
>
> 2018-06-05 5:29 GMT+08:00 Che Lui Shum :
>
> Hi Stephan,
>
> Will FLINK-7129 (Support dynamically changing CEP patterns) be included in
> 1.6? There were discussions about possibly including it in 1.6:
> http://mail-archives.apache.org/mod_mbox/flink-user/201803.
> mbox/%3cCAMq=OU7gru2O9JtoWXn1Lc1F7NKcxAyN6A3e58kxctb4b508RQ@
> mail.gmail.com%3e
>
> Thanks,
> Shirley Shum
>
> [image: Inactive hide details for Stephan Ewen ---06/04/2018 02:21:47
> AM---Hi Flink Community! The release of Apache Flink 1.5 has happ]Stephan
> Ewen ---06/04/2018 02:21:47 AM---Hi Flink Community! The release of Apache
> Flink 1.5 has happened (yay!) - so it is a good time
>
> From: Stephan Ewen 
> To: d...@flink.apache.org, user 
> Date: 06/04/2018 02:21 AM
> Subject: [DISCUSS] Flink 1.6 features
> --
>
>
>
> Hi Flink Community!
>
> The release of Apache Flink 1.5 has happened (yay!) - so it is a good time
> to start talking about what to do for release 1.6.
>
> *== Suggested release timeline ==*
>
> I would propose to release around *end of July* (that is 8-9 weeks from
> now).
>
> The rational behind that: There was a lot of effort in release testing
> automation (end-to-end tests, scripted stress tests) as part of release
> 1.5. You may have noticed the big set of new modules under
> "flink-end-to-end-tests" in the Flink repository. It delayed the 1.5
> release a bit, and needs to continue as part of the coming release cycle,
> but should help make releasing more lightweight from now on.
>
> (Side note: There are also some nightly stress tests that we created and
> run at data Artisans, and where we are looking whether and in which way it
> would make sense to contribute them to Flink.)
>
> *== Features and focus areas ==*
>
> We had a lot of big and heavy features in Flink 1.5, with FLIP-6, the new
> network stack, recovery, SQL joins and client, ... Following something like
> a "tick-tock-model", I would suggest to focus the next release more on
> integrations, tooling, and reducing user friction.
>
> Of course, this does not mean that no other pull request gets reviewed, an
> no other topic will be examined - it is simply meant as a help to
> understand where to expect more activity during the next release 

Take elements from window

2018-06-08 Thread Antonio Saldivar Lezama
Hello


I am wondering if it is possible to process the following scenario, to store 
all events by event time in a general window and process elements from a 
smaller time Frame

1.-  Store elements in a General SlidingWindow (60 mins, 10 mins)
- Rule 1 -> gets 10 mins elements from the general window and get 
aggregations
- Rule 2 -> gets 20 mins elements from the general window and get 
aggregations
- Rule 3 -> gets 30 mins elements from the general window and get 
aggregations
2.- send results 

Thank you
Regards

Re: Conceptual question

2018-06-08 Thread David Anderson
Hi all,

I think I see a way to eagerly do full state migration without writing your
own Operator, but it's kind of hacky and may have flaws I'm not aware of.

In Flink 1.5 we now have the possibility to connect BroadcastStreams to
KeyedStreams and apply a KeyedBroadcastProcessFunction. This is relevant
because in the processBroadcastElement() method you can supply a
KeyedStateFunction to the Context.applyToKeyedState() method, and this
KeyedStateFunction
will be applied every item of keyed state associated with the state
descriptor you specify. I've been doing some experiments with this, and
it's quite powerful in cases where it's useful to operate on all of your
application's state.

I believe this was intended for cases where an update to an item of
broadcast state has implications for associated keyed state, but I see
nothing that prevents you from essentially ignoring the broadcast stream
and using this mechanism to implement keyed state migration.

David



On Fri, Jun 8, 2018 at 9:27 AM, Piotr Nowojski 
wrote:

> Hi,
>
> Yes it should be feasible. As I said before, with Flink 1.6 there will be
> better way for migrating a state, but for now you either need to lazily
> convert the state, or iterate over the keys and do the job manually.
>
> Piotrek
>
>
> On 7 Jun 2018, at 15:52, Tony Wei  wrote:
>
> Hi Piotrek,
>
> So my question is: is that feasible to migrate state from
> `ProcessFunction` to my own operator then use `getKeyedStateBackend()` to
> migrate the states?
> If yes, is there anything I need to be careful with? If no, why and can it
> be available in the future? Thank you.
>
> Best Regards,
> Tony Wei
>
> 2018-06-07 21:43 GMT+08:00 Piotr Nowojski :
>
>> Hi,
>>
>> Oh, I see now. Yes indeed getKeyedStateBackened() is not exposed to the
>> function and you can not migrate your state that way.
>>
>> As far as I know yes, at the moment in order to convert everything at
>> once (without getKeyes you still can implement lazy conversion) you would
>> have to write your own operator.
>>
>> Piotrek
>>
>>
>> On 7 Jun 2018, at 15:26, Tony Wei  wrote:
>>
>> Hi Piotrek,
>>
>> I used `ProcessFunction` to implement it, but it seems that I can't call
>> `getKeyedStateBackend()` like `WindowOperator` did.
>> I found that `getKeyedStateBackend()` is the method in
>> `AbstractStreamOperator` and `ProcessFunction` API didn't extend it.
>> Dose that mean I can't look up all keys and migrate the entire previous
>> states to the new states in `ProcessFunction#open()`?
>> As I said, do I need to port `ProcessFunction` to `KeyedProcessOperator`
>> to migration state like the manner showed in `WindowOperator`?
>>
>> Best Regards,
>> Tony Wei
>>
>> 2018-06-07 20:28 GMT+08:00 Piotr Nowojski :
>>
>>> What function are you implementing and how are you using it?
>>>
>>> Usually it’s enough if your function implements RichFunction (or rather
>>> extend from AbstractRichFunction) and then you could use RichFunction#open
>>> in the similar manner as in the code that I posted in previous message.
>>> Flink in many places performs instanceof chekcs like:
>>> org.apache.flink.api.common.functions.util.FunctionUtils#openFunction
>>>
>>> public static void openFunction(Function fun
>>> ction, Configuration parameters) throws Exception{
>>>if (function instanceof RichFunction) {
>>>   RichFunction richFunction = (RichFunction) function;
>>>   richFunction.open(parameters);
>>>}
>>> }
>>>
>>> Piotrek
>>>
>>>
>>> On 7 Jun 2018, at 11:07, Tony Wei  wrote:
>>>
>>> Hi Piotrek,
>>>
>>> It seems that this was implemented by `Operator` API, which is a more
>>> low level api compared to `Function` API.
>>> Since in `Function` API level we can only migrate state by event
>>> triggered, it is more convenient in this way to migrate state by foreach
>>> all keys in `open()` method.
>>> If I was implemented state operator by `ProcessFunction` API, is it
>>> possible to port it to `KeyedProcessOperator` and do the state migration
>>> that you mentioned?
>>> And are there something concerned and difficulties that will leads to
>>> restored state failed or other problems? Thank you!
>>>
>>> Best Regards,
>>> Tony Wei
>>>
>>> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski :
>>>
 Hi,

 General solution for state/schema migration is under development and it
 might be released with Flink 1.6.0.

 Before that, you need to manually handle the state migration in your
 operator’s open method. Lets assume that your OperatorV1 has a state field
 “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible
 with previous version. What you can do, is to add a logic in open method,
 to check:
 1. If “stateV2” is non empty, do nothing
 2. If there is no “stateV2”, iterate over all of the keys and manually
 migrate “stateV1” to “stateV2”

 In your OperatorV3 you could drop the support for “stateV1”.

 I have once implemented something like that here:

 

Heap Problem with Checkpoints

2018-06-08 Thread Fabian Wollert
Hi, in this email thread

here, i tried to set up S3 as a filesystem backend for checkpoints. Now
everything is working (Flink V1.5.0), but the JobMaster is accumulating
Heap space, with eventually killing itself with HeapSpace OOM after several
hours. If I don't enable Checkpointing, then everything is fine. I'm using
the Flink S3 Shaded Libs (tried both the Hadoop and the Presto lib, no
difference in this regard) from the tutorial. my checkpoint settings are
this (job level):

env.enableCheckpointing(1000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
env.getCheckpointConfig().setCheckpointTimeout(6);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

Another clue why i suspect the S3 Checkpointing is that the heapspace dump
contains a lot of char[] objects with some logs about S3 operations.

anyone has an idea where to look further on this?

Cheers

--


*Fabian WollertZalando SE*

E-Mail: fabian.woll...@zalando.de

Tamara-Danz-Straße 1
10243 Berlin
Fax: +49 (0)30 2759 46 93
E-mail: legalnot...@zalando.co.uk
Notifications of major holdings (Sec. 33, 38, 39 WpHG):  +49 (0)30
2000889349

Management Board:
Robert Gentz, David Schneider, Rubin Ritter

Chairman of the Supervisory Board:
Lothar Lanz

Person responsible for providing the contents of Zalando SE acc. to Art. 55
RStV [Interstate Broadcasting Agreement]: Rubin Ritter
Registered at the Local Court Charlottenburg Berlin, HRB 158855 B
VAT registration number: DE 260543043


State life-cycle for different state-backend implementations

2018-06-08 Thread Rinat
Hi mates, got a question about different state backends.

As I've properly understood, on every checkpoint, Flink flushes it’s current 
state into backend. In case of FsStateBackend we’ll have a separate file for 
each checkpoint, and during the job lifecycle we got a risk of 
a huge amount of state files in hdfs, that is not very cool for a hadoop 
name-node.

Does Flink have any clean-up strategies for it’s state in different 
implementation of backends ? If you could provide any links, where I could read 
about more details of this process, it’ll be awesome ))

Thx a lot for your help.

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru 
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



[BucketingSink] notify on moving into pending/ final state

2018-06-08 Thread Rinat
Hi mates, I got a proposal about functionality of BucketingSink.

During implementation of one of our tasks we got the following need - create a 
meta-file, with the path and additional information about the file, created by 
BucketingSink, when it’s been moved into final place.
Unfortunately such behaviour is currently not available for us. 

We’ve implemented our own Sink, that provides an opportunity to register 
notifiers, that will be called, when file state is changing, but current API 
doesn’t allow us to add such behaviour using inheritance ...

It seems, that such functionality could be useful, and could be a part of 
BucketingSink API
What do you sink, should I make a PR ?

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru 
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Re: [flink-connector-filesystem] OutOfMemory in checkpointless environment

2018-06-08 Thread Rinat
Chesnay, thx for your reply, I’ve created one 
https://issues.apache.org/jira/browse/FLINK-9558 



> On 8 Jun 2018, at 12:58, Chesnay Schepler  wrote:
> 
> I agree, if the sink doesn't properly work without checkpointing we should 
> make sure that it fails early if it used that way.
> 
> It would be great if you could open a JIRA.
> 
> On 08.06.2018 10:08, Rinat wrote:
>> Piotr, thx for your reply, for now everything is pretty clear. But from my 
>> point of view, it’s better to add some information about leaks in case of 
>> disabled checkpointing into BucketingSink documentation
>> 
>>> On 8 Jun 2018, at 10:35, Piotr Nowojski >> > wrote:
>>> 
>>> Hi,
>>> 
>>> BucketingSink is designed to provide exactly-once writes to file system, 
>>> which is inherently tied to checkpointing. As you just saw, without 
>>> checkpointing, BucketingSink is never notified that it can commit pending 
>>> files. 
>>> 
>>> If you do not want to use checkpointing for some reasons, you could always 
>>> use for example 
>>> org.apache.flink.streaming.api.datastream.DataStream#writeUsingOutputFormat 
>>> and write your own simple `OutputFormat` or look if   one 
>>> of the existing ones meet your needs.
>>> 
>>> Piotrek
>>> 
 On 7 Jun 2018, at 14:23, Rinat >>> > wrote:
 
 Hi mates, we got some Flink jobs, that are writing data from kafka into 
 hdfs, using Bucketing-Sink.
 For some reasons, those jobs are running without checkpointing. For now, 
 it not a big problem for us, if some files are remained opened in case of 
 job reloading.
 
 Periodically, those jobs fail with OutOfMemory exception, and seems, that 
 I found a strange thing in the implementation of BucketingSink.
 
 During the sink lifecycle, we have a state object, implemented as a map, 
 where key is a bucket path, and value is a state, that contains 
 information about opened files and list of pending files.
 After researching of the heap dump, I found, that those state stores 
 information about ~ 1_000 buckets and their state, all this stuff weights 
 ~ 120 Mb.
 
 I’ve looked through the code, and found, that we removing the buckets from 
 the state, in notifyCheckpointComplete method. 
 
 @Override
 public void notifyCheckpointComplete(long checkpointId) throws Exception {
   Iterator>> bucketStatesIt = 
 state.bucketStates.entrySet().iterator();
   while (bucketStatesIt.hasNext()) {
if (!bucketState.isWriterOpen &&
bucketState.pendingFiles.isEmpty() &&
bucketState.pendingFilesPerCheckpoint.isEmpty()) {
 
// We've dealt with all the pending files and the writer for this 
 bucket is not currently open.
// Therefore this bucket is currently inactive and we can remove it 
 from our state.
bucketStatesIt.remove();
 }
 }
 }
 
 So, this looks like an issue, when you are using this sink in 
 checkpointless environment, because the data always added to the state, 
 but never removed.
 Of course, we could enabled checkpointing, and use one of available 
 backends, but as for me, it seems like a non expected behaviour, like I 
 have an opportunity to run the job without checkpointing, but really, if I 
 do so,
 I got an exception in sink component.
 
 What do you think about this ? Do anyone got the same problem, and how’ve 
 you solved it ?
 
 Sincerely yours,
 Rinat Sharipov
 Software Engineer at 1DMP CORE Team
 
 email: r.shari...@cleverdata.ru 
 mobile: +7 (925) 416-37-26
 
 CleverDATA
 make your data clever
 
>>> 
>> 
>> Sincerely yours,
>> Rinat Sharipov
>> Software Engineer at 1DMP CORE Team
>> 
>> email: r.shari...@cleverdata.ru 
>> mobile: +7 (925) 416-37-26
>> 
>> CleverDATA
>> make your data clever
>> 
> 

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru 
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Can not submit flink job via IP or VIP of jobmanager

2018-06-08 Thread xie wei
Hello Flink team,

We use Flink on DCOS and have problems submitting a Flink job from within a
container to the Flink cluster. Both the container and the Flink cluster
are running inside DCOS, on different nodes.



We have the following setup: Flink was installed on DCOS using the package
from the catalog. According to the Flink UI ([DCOS-URL]/service/flink/) the
Flink job manager settings are:



jobmanager.rpc.address
ip-10-0-1-95.eu-central-1.compute.internal

jobmanager.rpc.port  14503

jobmanager.web.port   14502

mesos.artifact-server.port  14505



where "ip-10-0-1-95.eu-central-1.compute.internal" is the host name of the
DCOS node with IP 10.0.1.95 on which the container with the job manager is
running.



Furthermore for both the job manager RPC port and the job manager web port
a VIP is configured:



job manager RPC port: flink.marathon.l4lb.thisdcos.directory:6123

job manager Web port: flink.marathon.l4lb.thisdcos.directory:8081





Now if we try to submit a Flink job to the job manager via the Flink cli
performing the following steps:

1) log into the DCOS master node:

dcos node ssh --leader --master-proxy

2) start an interactive session inside a Docker container using the
Mesosphere Flink image:

docker run --rm -it mesosphere/dcos-flink:1.4.2-1.0 /bin/bash

3) submit a Flink job to the Flink job manager:

cd /flink-1.4.2

./bin/flink run -m ip-10-0-1-95.eu-central-1.compute.internal:14503
examples/streaming/WordCount.jar



everything works fine. The job appears as an entry within the Flink UI and
we get the results we expect.



But if we try to submit the same job to the job manager using the VIP of
the job manager flink.marathon.l4lb.thisdcos.directory:6123 using:



./bin/flink run -m flink.marathon.l4lb.thisdcos.directory:6123
examples/streaming/WordCount.jar



or if we try to submit the job to the job manager using the IP of the DCOS
node instead of its host name:



./bin/flink run -m 10.0.1.95:14503 examples/streaming/WordCount.jar



the job can not be submitted. Apparently the connection to the job manager
can not be established and nothing appears within the Flink UI. You can
find the output in attachment.

Submitting to the jobmanager using the URL from Mesos DNS is also not
working.



Why this is not working or why we can only submit jobs using the hostname
(ip-10-0-1-95.eu-central-1.compute.internal) of the job manager and not the
IP or the VIP?


Thank you!


Best regards

Wei
Cluster configuration: Standalone cluster with JobManager at 
flink.marathon.l4lb.thisdcos.directory/***.***.***.***:6123
Using address flink.marathon.l4lb.thisdcos.directory:6123 to connect to 
JobManager.
JobManager web interface address 
http://flink.marathon.l4lb.thisdcos.directory:8081
Starting execution of program
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Submitting job with JobID: 22d5a45ee33edf71792c6bde8fc75211. Waiting for job 
completion.


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492)
at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at 
org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:89)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:396)
at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054)
at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101)
at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098)
at java.security.AccessController.doPrivileged(Native Method)
at 

Implementation of ElasticsearchSinkFunction, how to handle class level variables

2018-06-08 Thread Jayant Ameta
Hi,
I'm trying to integrate ElasticsearchSink in my pipeline. The example
shows
using Anonymous class which implements ElasticsearchSinkFunction. This is
passed as a constructor argument to another anonymous class which extends
ElasticsearchSink.

Can I create a separate class instead of using anonymous classes?
If I do so, I'll have to have index and type as class level fields. How is
state maintenance done for class level fields? Do I need to mark the fields
as transient, and implement CheckpointedFunction?

Thanks,
Jayant


Re: [flink-connector-filesystem] OutOfMemory in checkpointless environment

2018-06-08 Thread Chesnay Schepler
I agree, if the sink doesn't properly work without checkpointing we 
should make sure that it fails early if it used that way.


It would be great if you could open a JIRA.

On 08.06.2018 10:08, Rinat wrote:
Piotr, thx for your reply, for now everything is pretty clear. But 
from my point of view, it’s better to add some information about leaks 
in case of disabled checkpointing into BucketingSink documentation


On 8 Jun 2018, at 10:35, Piotr Nowojski > wrote:


Hi,

BucketingSink is designed to provide exactly-once writes to file 
system, which is inherently tied to checkpointing. As you just saw, 
without checkpointing, BucketingSink is never notified that it can 
commit pending files.


If you do not want to use checkpointing for some reasons, you could 
always use for example 
org.apache.flink.streaming.api.datastream.DataStream#writeUsingOutputFormat 
and write your own simple `OutputFormat` or look if one of the 
existing ones meet your needs.


Piotrek

On 7 Jun 2018, at 14:23, Rinat > wrote:


Hi mates, we got some Flink jobs, that are writing data from kafka 
into hdfs, using Bucketing-Sink.
For some reasons, those jobs are running without checkpointing. For 
now, it not a big problem for us, if some files are remained opened 
in case of job reloading.


Periodically, those jobs fail with *OutOfMemory *exception, and 
seems, that I found a strange thing in the implementation of 
BucketingSink.


During the sink lifecycle, we have a state object, implemented as a 
map, where key is a bucket path, and value is a state, that contains 
information about opened files and list of pending files.
After researching of the heap dump, I found, that those state stores 
information about ~ 1_000 buckets and their state, all this stuff 
weights ~ 120 Mb.


I’ve looked through the code, and found, that we removing the 
buckets from the state, in *notifyCheckpointComplete *method.


@Override public void notifyCheckpointComplete(long checkpointId)throws 
Exception {
   Iterator>> bucketStatesIt = 
state.bucketStates.entrySet().iterator();
while (bucketStatesIt.hasNext()) {
if (!bucketState.isWriterOpen &&
bucketState.pendingFiles.isEmpty() &&
bucketState.pendingFilesPerCheckpoint.isEmpty()) {

// We've dealt with all the pending files and the writer for this 
bucket is not currently open. // Therefore this bucket is currently 
inactive and we can remove it from our state. bucketStatesIt.remove();

 }
}
}

So, this looks like an issue, when you are using this sink in 
checkpointless environment, because the data always added to the 
state, but never removed.
Of course, we could enabled checkpointing, and use one of available 
backends, but as for me, it seems like a non expected behaviour, 
like I have an opportunity to run the job without checkpointing, but 
really, if I do so,

I got an exception in sink component.

What do you think about this ? Do anyone got the same problem, and 
how’ve you solved it ?


Sincerely yours,
*Rinat Sharipov*
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru 
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever





Sincerely yours,
*Rinat Sharipov*
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru 
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever





Re: Late data before window end is even close

2018-06-08 Thread Fabian Hueske
Thanks for reporting back and the debugging advice!

Best, Fabian

2018-06-08 9:00 GMT+02:00 Juho Autio :

> Flink was NOT at fault. Turns out our Kafka producer had OS level clock
> sync problems :(
>
> Because of that, our Kafka occasionally had some messages in between with
> an incorrect timestamp. In practice they were about 7 days older than they
> should.
>
> I'm really sorry for wasting your time on this. But thank you once more
> for taking the time to answer.
>
> For any similar case, I would first advise user to extra carefully compare
> the actual timestamps of their input data. For me it was helpful to make
> this change in my Flink job: for late data output, include both processing
> time (DateTime.now()) along with the event time (original timestamp).
>
> On Mon, May 14, 2018 at 12:42 PM, Fabian Hueske  wrote:
>
>> Thanks for correcting me Piotr. I didn't look close enough at the code.
>> With the presently implemented logic, a record should not be emitted to a
>> side output if its window wasn't closed yet.
>>
>> 2018-05-11 14:13 GMT+02:00 Piotr Nowojski :
>>
>>> Generally speaking best practise is always to simplify your program as
>>> much as possible to narrow down the scope of the search. Replace data
>>> source with statically generated events, remove unnecessary components Etc.
>>> Either such process help you figure out what’s wrong on your own and if
>>> not, if you share us such minimal program that reproduces the issue, it
>>> will allow  us to debug it.
>>>
>>> Piotrek
>>>
>>>
>>> On 11 May 2018, at 13:54, Juho Autio  wrote:
>>>
>>> Thanks for that code snippet, I should try it out to simulate my DAG..
>>> If any suggestions how to debug futher what's causing late data on a
>>> production stream job, please let me know.
>>>
>>> On Fri, May 11, 2018 at 2:18 PM, Piotr Nowojski >> > wrote:
>>>
 Hey,

 Actually I think Fabian initial message was incorrect. As far as I can
 see in the code of WindowOperator (last lines of 
 org.apache.flink.streaming.
 runtime.operators.windowing.WindowOperator#processElement ), the
 element is sent to late side output if it is late AND it wasn’t assigned to
 any of the existing windows (because they were late as well). In other
 words, it should work as you Juho are wishing: element should be marked as
 late once they are overdue/late for the window after one full day.

 I have tested it and it works as expected. Following program:

 https://gist.github.com/pnowojski/8cd650170925cf35be521cf236f1d97a

 Prints only ONE number to the standard err:

 > 1394

 And there is nothing on the side output.

 Piotrek

 On 11 May 2018, at 12:32, Juho Autio  wrote:

 Thanks. What I still don't get is why my message got filtered in the
 first place. Even if the allowed lateness filtering would be done "on the
 window", data should not be dropped as late if it's not in fact late by
 more than the allowedLateness setting.

 Assuming that these conditions hold:
 - messages (and thus the extracted timestamps) were not out of order by
 more than 5 secods (as far as I didn't make any mistake in my
 partition-level analysis)
 - allowedLateness=1 minute
 - watermarks are assigned on kafka consumer meaning that they are
 synchronized across all partitions

 I don't see how the watermark could have ever been more than 5 seconds
 further when the message arrives on the isElementLate filter. Do you have
 any idea on this? Is there some existing test that simulates out of order
 input to flink's kafka consumer? I could try to build a test case based on
 that to possibly reproduce my problem. I'm not sure how to gather enough
 debug information on the production stream so that it would clearly show
 the watermarks, how they progressed on each kafka partition & later in the
 chain in case isElementLate filters something.

 On Fri, May 11, 2018 at 12:12 PM, Fabian Hueske 
 wrote:

> Hi Juho,
>
> Thanks for bringing up this topic! I share your intuition.
> IMO, records should only be filtered out and send to a side output if
> any of the windows they would be assigned to is closed already.
>
> I had a look into the code and found that records are filtered out as
> late based on the following condition:
>
> protected boolean isElementLate(StreamRecord element){
>return (windowAssigner.isEventTime()) &&
>   (element.getTimestamp() + allowedLateness <=
> internalTimerService.currentWatermark());
> }
>
>
> This code shows that your analysis is correct.
> Records are filtered out based on their timestamp and the current
> watermark, even though they arrive before the window is closed.
>
> OTOH, filtering out records based on the window they would end up in
> can also be tricky if records are 

Re: [flink-connector-filesystem] OutOfMemory in checkpointless environment

2018-06-08 Thread Rinat
Piotr, thx for your reply, for now everything is pretty clear. But from my 
point of view, it’s better to add some information about leaks in case of 
disabled checkpointing into BucketingSink documentation

> On 8 Jun 2018, at 10:35, Piotr Nowojski  wrote:
> 
> Hi,
> 
> BucketingSink is designed to provide exactly-once writes to file system, 
> which is inherently tied to checkpointing. As you just saw, without 
> checkpointing, BucketingSink is never notified that it can commit pending 
> files. 
> 
> If you do not want to use checkpointing for some reasons, you could always 
> use for example 
> org.apache.flink.streaming.api.datastream.DataStream#writeUsingOutputFormat 
> and write your own simple `OutputFormat` or look if one of the existing ones 
> meet your needs.
> 
> Piotrek
> 
>> On 7 Jun 2018, at 14:23, Rinat > > wrote:
>> 
>> Hi mates, we got some Flink jobs, that are writing data from kafka into 
>> hdfs, using Bucketing-Sink.
>> For some reasons, those jobs are running without checkpointing. For now, it 
>> not a big problem for us, if some files are remained opened in case of job 
>> reloading.
>> 
>> Periodically, those jobs fail with OutOfMemory exception, and seems, that I 
>> found a strange thing in the implementation of BucketingSink.
>> 
>> During the sink lifecycle, we have a state object, implemented as a map, 
>> where key is a bucket path, and value is a state, that contains information 
>> about opened files and list of pending files.
>> After researching of the heap dump, I found, that those state stores 
>> information about ~ 1_000 buckets and their state, all this stuff weights ~ 
>> 120 Mb.
>> 
>> I’ve looked through the code, and found, that we removing the buckets from 
>> the state, in notifyCheckpointComplete method. 
>> 
>> @Override
>> public void notifyCheckpointComplete(long checkpointId) throws Exception {
>>   Iterator>> bucketStatesIt = 
>> state.bucketStates.entrySet().iterator();
>>   while (bucketStatesIt.hasNext()) {
>>if (!bucketState.isWriterOpen &&
>>bucketState.pendingFiles.isEmpty() &&
>>bucketState.pendingFilesPerCheckpoint.isEmpty()) {
>> 
>>// We've dealt with all the pending files and the writer for this 
>> bucket is not currently open.
>>// Therefore this bucket is currently inactive and we can remove it 
>> from our state.
>>bucketStatesIt.remove();
>> }
>> }
>> }
>> 
>> So, this looks like an issue, when you are using this sink in checkpointless 
>> environment, because the data always added to the state, but never removed.
>> Of course, we could enabled checkpointing, and use one of available 
>> backends, but as for me, it seems like a non expected behaviour, like I have 
>> an opportunity to run the job without checkpointing, but really, if I do so,
>> I got an exception in sink component.
>> 
>> What do you think about this ? Do anyone got the same problem, and how’ve 
>> you solved it ?
>> 
>> Sincerely yours,
>> Rinat Sharipov
>> Software Engineer at 1DMP CORE Team
>> 
>> email: r.shari...@cleverdata.ru 
>> mobile: +7 (925) 416-37-26
>> 
>> CleverDATA
>> make your data clever
>> 
> 

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru 
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Flink kafka consumer stopped committing offsets

2018-06-08 Thread Juho Autio
Hi,

We have a Flink stream job that uses Flink kafka consumer. Normally it
commits consumer offsets to Kafka.

However this stream ended up in a state where it's otherwise working just
fine, but it isn't committing offsets to Kafka any more. The job keeps
writing correct aggregation results to the sink, though. At the time of
writing this, the job has been running 14 hours without committing offsets.

Below is an extract from taskmanager.log. As you can see, it didn't log
anything until ~2018-06-07 22:08. Also that's where the log ends, these are
the last lines so far.

Could you help check if this is a know bug, possibly already fixed, or
something new?

I'm using a self-built Flink package 1.5-SNAPSHOT, flink commit
8395508b0401353ed07375e22882e7581d46ac0e which is not super old.

Cheers,
Juho

2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser
 - Kafka version : 0.10.2.1
2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser
 - Kafka commitId : e89bffd6b2eff799
2018-06-06 10:01:33,560 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  -
Discovered coordinator
my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: 2147483550
rack: null) for group
aggregate-all_server_measurements_combined-20180606-1000.
2018-06-06 10:01:33,563 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  -
Discovered coordinator
my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 (id: 2147483550
rack: null) for group
aggregate-all_server_measurements_combined-20180606-1000.
2018-06-07 22:08:28,773 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092
(id: 2147483550 rack: null) dead for group
aggregate-all_server_measurements_combined-20180606-1000
2018-06-07 22:08:28,776 WARN
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  -
Auto-commit of offsets {topic1-2=OffsetAndMetadata{offset=12300395550,
metadata=''}, topic1-18=OffsetAndMetadata{offset=12299210444, metadata=''},
topic3-0=OffsetAndMetadata{offset=5064277287, metadata=''},
topic4-6=OffsetAndMetadata{offset=5492398559, metadata=''},
topic2-1=OffsetAndMetadata{offset=89817267, metadata=''},
topic1-10=OffsetAndMetadata{offset=12299742352, metadata=''}} failed for
group aggregate-all_server_measurements_combined-20180606-1000: Offset
commit failed with a retriable exception. You should retry committing
offsets.
2018-06-07 22:08:29,840 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092
(id: 2147483550 rack: null) dead for group
aggregate-all_server_measurements_combined-20180606-1000
2018-06-07 22:08:29,841 WARN
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  -
Auto-commit of offsets {topic1-6=OffsetAndMetadata{offset=12298347875,
metadata=''}, topic4-2=OffsetAndMetadata{offset=5492779112, metadata=''},
topic1-14=OffsetAndMetadata{offset=12299972108, metadata=''}} failed for
group aggregate-all_server_measurements_combined-20180606-1000: Offset
commit failed with a retriable exception. You should retry committing
offsets.


Re: Datastream[Row] covert to table exception

2018-06-08 Thread Timo Walther
Yes, that's a workaround. I found the cause of the problem. It is a 
Scala API specific problem.


See: https://issues.apache.org/jira/browse/FLINK-9556

Thanks for reporting it!

Regards,
Timo

Am 08.06.18 um 09:43 schrieb 孙森:
Yes,I really override the method, but it did not work.  Finally ,I 
used ds.map()(Types.ROW()),then it works fine, but I did't know why.   
The code is

val inputStream: DataStream[Row] = 
env.addSource(myConsumer)(Types.ROW(fieldNameArray, flinkTypeArray))


在 2018年6月8日,下午3:15,Timo Walther > 写道:


Can you verify with a debugger if you really override the method. It 
seems to be that your created type information is either not 
called/not used.


Regards,
Timo

Am 07.06.18 um 09:03 schrieb 孙森:

Hi,Timo

       Thank you for the reply.The `inputStream.getType` is 
GenericTypeInfo.


Thanks~

sen


在 2018年6月7日,下午2:28,Timo Walther > 写道:


Sorry, I didn't see you last mail. The code looks good actually. 
What is the result of `inputStream.getType` if you print it to the 
console?


Timo

Am 07.06.18 um 08:24 schrieb Timo Walther:

Hi,

Row is a very special datatype where Flink cannot generate 
serializers based on the generics. By default 
DeserializationSchema uses reflection-based type analysis, you 
need to override the getResultType() method in 
WormholeDeserializationSchema. And specify the type information 
manually there.


Hope this helps.

Regards,
Timo

Am 06.06.18 um 13:22 schrieb 孙森:

Hi ,

I've tried to to specify such a schema, when I read from kafka, 
and covert inputstream to table . But I got the exception:


  * Exception in thread "main"
org.apache.flink.table.api.TableException: An input of
GenericTypeInfo cannot be converted to Table. Please specify
the type of the input with a RowTypeInfo

And the code here:


|private def getSchemaMap(jsonSchema: String) = { val umsSchema = 
JsonUtils.json2caseClass[UmsSchema](jsonSchema) val fields = 
umsSchema.fields_get val fieldNameList = ListBuffer.empty[String] 
val fieldTypeList = ListBuffer.empty[TypeInformation[_]] 
fields.foreach { field => fieldNameList.append(field.name) 
fieldTypeList.append(fieldTypeMatch(field.`type`)) } 
println(fieldNameList) println(fieldTypeList) 
(fieldNameList.toArray, fieldTypeList.toArray) } private def 
fieldTypeMatch(umsFieldType: UmsFieldType): TypeInformation[_] = 
{ umsFieldType match { case STRING => Types.STRING case INT => 
Types.INT case LONG => Types.LONG case FLOAT => Types.FLOAT case 
DOUBLE => Types.DOUBLE case BOOLEAN => Types.BOOLEAN case DATE => 
Types.SQL_DATE case DATETIME => Types.SQL_TIMESTAMP case DECIMAL 
=> Types.DECIMAL } } } val myConsumer: FlinkKafkaConsumer010[Row] 
= new FlinkKafkaConsumer010(topics, new 
WormholeDeserializationSchema(jsonSchema), properties) val 
inputStream: DataStream[Row] = env.addSource(myConsumer) val 
tableEnv = 
TableEnvironment.getTableEnvironment(env)<<—exception here|




Thanks !
sen















Re: [flink-connector-filesystem] OutOfMemory in checkpointless environment

2018-06-08 Thread Piotr Nowojski
Hi,

BucketingSink is designed to provide exactly-once writes to file system, which 
is inherently tied to checkpointing. As you just saw, without checkpointing, 
BucketingSink is never notified that it can commit pending files. 

If you do not want to use checkpointing for some reasons, you could always use 
for example 
org.apache.flink.streaming.api.datastream.DataStream#writeUsingOutputFormat and 
write your own simple `OutputFormat` or look if one of the existing ones meet 
your needs.

Piotrek

> On 7 Jun 2018, at 14:23, Rinat  wrote:
> 
> Hi mates, we got some Flink jobs, that are writing data from kafka into hdfs, 
> using Bucketing-Sink.
> For some reasons, those jobs are running without checkpointing. For now, it 
> not a big problem for us, if some files are remained opened in case of job 
> reloading.
> 
> Periodically, those jobs fail with OutOfMemory exception, and seems, that I 
> found a strange thing in the implementation of BucketingSink.
> 
> During the sink lifecycle, we have a state object, implemented as a map, 
> where key is a bucket path, and value is a state, that contains information 
> about opened files and list of pending files.
> After researching of the heap dump, I found, that those state stores 
> information about ~ 1_000 buckets and their state, all this stuff weights ~ 
> 120 Mb.
> 
> I’ve looked through the code, and found, that we removing the buckets from 
> the state, in notifyCheckpointComplete method. 
> 
> @Override
> public void notifyCheckpointComplete(long checkpointId) throws Exception {
>   Iterator>> bucketStatesIt = 
> state.bucketStates.entrySet().iterator();
>   while (bucketStatesIt.hasNext()) {
>if (!bucketState.isWriterOpen &&
>bucketState.pendingFiles.isEmpty() &&
>bucketState.pendingFilesPerCheckpoint.isEmpty()) {
> 
>// We've dealt with all the pending files and the writer for this 
> bucket is not currently open.
>// Therefore this bucket is currently inactive and we can remove it 
> from our state.
>bucketStatesIt.remove();
> }
> }
> }
> 
> So, this looks like an issue, when you are using this sink in checkpointless 
> environment, because the data always added to the state, but never removed.
> Of course, we could enabled checkpointing, and use one of available backends, 
> but as for me, it seems like a non expected behaviour, like I have an 
> opportunity to run the job without checkpointing, but really, if I do so,
> I got an exception in sink component.
> 
> What do you think about this ? Do anyone got the same problem, and how’ve you 
> solved it ?
> 
> Sincerely yours,
> Rinat Sharipov
> Software Engineer at 1DMP CORE Team
> 
> email: r.shari...@cleverdata.ru 
> mobile: +7 (925) 416-37-26
> 
> CleverDATA
> make your data clever
> 



Re: Conceptual question

2018-06-08 Thread Piotr Nowojski
Hi,

Yes it should be feasible. As I said before, with Flink 1.6 there will be 
better way for migrating a state, but for now you either need to lazily convert 
the state, or iterate over the keys and do the job manually.

Piotrek

> On 7 Jun 2018, at 15:52, Tony Wei  wrote:
> 
> Hi Piotrek,
> 
> So my question is: is that feasible to migrate state from `ProcessFunction` 
> to my own operator then use `getKeyedStateBackend()` to migrate the states?
> If yes, is there anything I need to be careful with? If no, why and can it be 
> available in the future? Thank you.
> 
> Best Regards,
> Tony Wei
> 
> 2018-06-07 21:43 GMT+08:00 Piotr Nowojski  >:
> Hi,
> 
> Oh, I see now. Yes indeed getKeyedStateBackened() is not exposed to the 
> function and you can not migrate your state that way.
> 
> As far as I know yes, at the moment in order to convert everything at once 
> (without getKeyes you still can implement lazy conversion) you would have to 
> write your own operator.
> 
> Piotrek
> 
> 
>> On 7 Jun 2018, at 15:26, Tony Wei > > wrote:
>> 
>> Hi Piotrek,
>> 
>> I used `ProcessFunction` to implement it, but it seems that I can't call 
>> `getKeyedStateBackend()` like `WindowOperator` did.
>> I found that `getKeyedStateBackend()` is the method in 
>> `AbstractStreamOperator` and `ProcessFunction` API didn't extend it.
>> Dose that mean I can't look up all keys and migrate the entire previous 
>> states to the new states in `ProcessFunction#open()`?
>> As I said, do I need to port `ProcessFunction` to `KeyedProcessOperator` to 
>> migration state like the manner showed in `WindowOperator`? 
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 2018-06-07 20:28 GMT+08:00 Piotr Nowojski > >:
>> What function are you implementing and how are you using it?
>> 
>> Usually it’s enough if your function implements RichFunction (or rather 
>> extend from AbstractRichFunction) and then you could use RichFunction#open 
>> in the similar manner as in the code that I posted in previous message. 
>> Flink in many places performs instanceof chekcs like: 
>> org.apache.flink.api.com 
>> mon.functions.util.FunctionUtils#openFunction
>> 
>> public static void openFunction(Function function, Configuration parameters) 
>> throws Exception{
>>if (function instanceof RichFunction) {
>>   RichFunction richFunction = (RichFunction) function;
>>   richFunction.open(parameters);
>>}
>> }
>> 
>> Piotrek
>> 
>> 
>>> On 7 Jun 2018, at 11:07, Tony Wei >> > wrote:
>>> 
>>> Hi Piotrek,
>>> 
>>> It seems that this was implemented by `Operator` API, which is a more low 
>>> level api compared to `Function` API.
>>> Since in `Function` API level we can only migrate state by event triggered, 
>>> it is more convenient in this way to migrate state by foreach all keys in 
>>> `open()` method.
>>> If I was implemented state operator by `ProcessFunction` API, is it 
>>> possible to port it to `KeyedProcessOperator` and do the state migration 
>>> that you mentioned?
>>> And are there something concerned and difficulties that will leads to 
>>> restored state failed or other problems? Thank you!
>>> 
>>> Best Regards,
>>> Tony Wei
>>> 
>>> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski >> >:
>>> Hi,
>>> 
>>> General solution for state/schema migration is under development and it 
>>> might be released with Flink 1.6.0.
>>> 
>>> Before that, you need to manually handle the state migration in your 
>>> operator’s open method. Lets assume that your OperatorV1 has a state field 
>>> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible 
>>> with previous version. What you can do, is to add a logic in open method, 
>>> to check:
>>> 1. If “stateV2” is non empty, do nothing
>>> 2. If there is no “stateV2”, iterate over all of the keys and manually 
>>> migrate “stateV1” to “stateV2”
>>> 
>>> In your OperatorV3 you could drop the support for “stateV1”.
>>> 
>>> I have once implemented something like that here:
>>> 
>>> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L258
>>>  
>>> 
>>> 
>>> Hope that helps!
>>> 
>>> Piotrek
>>> 
>>> 
 On 6 Jun 2018, at 17:04, TechnoMage >>> > wrote:
 
 We are still pretty new to Flink and I have a conceptual / DevOps question.
 
 When a job is modified and we want to deploy the new version, what is the 
 preferred method?  Our jobs have a lot of keyed state.
 
 If we use snapshots we have old state that may no longer apply to 

Re: Late data before window end is even close

2018-06-08 Thread Juho Autio
Flink was NOT at fault. Turns out our Kafka producer had OS level clock
sync problems :(

Because of that, our Kafka occasionally had some messages in between with
an incorrect timestamp. In practice they were about 7 days older than they
should.

I'm really sorry for wasting your time on this. But thank you once more for
taking the time to answer.

For any similar case, I would first advise user to extra carefully compare
the actual timestamps of their input data. For me it was helpful to make
this change in my Flink job: for late data output, include both processing
time (DateTime.now()) along with the event time (original timestamp).

On Mon, May 14, 2018 at 12:42 PM, Fabian Hueske  wrote:

> Thanks for correcting me Piotr. I didn't look close enough at the code.
> With the presently implemented logic, a record should not be emitted to a
> side output if its window wasn't closed yet.
>
> 2018-05-11 14:13 GMT+02:00 Piotr Nowojski :
>
>> Generally speaking best practise is always to simplify your program as
>> much as possible to narrow down the scope of the search. Replace data
>> source with statically generated events, remove unnecessary components Etc.
>> Either such process help you figure out what’s wrong on your own and if
>> not, if you share us such minimal program that reproduces the issue, it
>> will allow  us to debug it.
>>
>> Piotrek
>>
>>
>> On 11 May 2018, at 13:54, Juho Autio  wrote:
>>
>> Thanks for that code snippet, I should try it out to simulate my DAG.. If
>> any suggestions how to debug futher what's causing late data on a
>> production stream job, please let me know.
>>
>> On Fri, May 11, 2018 at 2:18 PM, Piotr Nowojski 
>> wrote:
>>
>>> Hey,
>>>
>>> Actually I think Fabian initial message was incorrect. As far as I can
>>> see in the code of WindowOperator (last lines of org.apache.flink.streaming.
>>> runtime.operators.windowing.WindowOperator#processElement ), the
>>> element is sent to late side output if it is late AND it wasn’t assigned to
>>> any of the existing windows (because they were late as well). In other
>>> words, it should work as you Juho are wishing: element should be marked as
>>> late once they are overdue/late for the window after one full day.
>>>
>>> I have tested it and it works as expected. Following program:
>>>
>>> https://gist.github.com/pnowojski/8cd650170925cf35be521cf236f1d97a
>>>
>>> Prints only ONE number to the standard err:
>>>
>>> > 1394
>>>
>>> And there is nothing on the side output.
>>>
>>> Piotrek
>>>
>>> On 11 May 2018, at 12:32, Juho Autio  wrote:
>>>
>>> Thanks. What I still don't get is why my message got filtered in the
>>> first place. Even if the allowed lateness filtering would be done "on the
>>> window", data should not be dropped as late if it's not in fact late by
>>> more than the allowedLateness setting.
>>>
>>> Assuming that these conditions hold:
>>> - messages (and thus the extracted timestamps) were not out of order by
>>> more than 5 secods (as far as I didn't make any mistake in my
>>> partition-level analysis)
>>> - allowedLateness=1 minute
>>> - watermarks are assigned on kafka consumer meaning that they are
>>> synchronized across all partitions
>>>
>>> I don't see how the watermark could have ever been more than 5 seconds
>>> further when the message arrives on the isElementLate filter. Do you have
>>> any idea on this? Is there some existing test that simulates out of order
>>> input to flink's kafka consumer? I could try to build a test case based on
>>> that to possibly reproduce my problem. I'm not sure how to gather enough
>>> debug information on the production stream so that it would clearly show
>>> the watermarks, how they progressed on each kafka partition & later in the
>>> chain in case isElementLate filters something.
>>>
>>> On Fri, May 11, 2018 at 12:12 PM, Fabian Hueske 
>>> wrote:
>>>
 Hi Juho,

 Thanks for bringing up this topic! I share your intuition.
 IMO, records should only be filtered out and send to a side output if
 any of the windows they would be assigned to is closed already.

 I had a look into the code and found that records are filtered out as
 late based on the following condition:

 protected boolean isElementLate(StreamRecord element){
return (windowAssigner.isEventTime()) &&
   (element.getTimestamp() + allowedLateness <=
 internalTimerService.currentWatermark());
 }


 This code shows that your analysis is correct.
 Records are filtered out based on their timestamp and the current
 watermark, even though they arrive before the window is closed.

 OTOH, filtering out records based on the window they would end up in
 can also be tricky if records are assigned to multiple windows (e.g.,
 sliding windows).
 In this case, a side-outputted records could still be in some windows
 and not in others.

 @Aljoscha (CC) Might have an explanation for the current behavior.