Re: Reduce parallelism without network transfer.

2018-02-05 Thread Piotr Nowojski
Hi,

It should work like this out of the box if you use rescale method:

https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html#physical-partitioning
 


If it will not work, please let us know.

Piotrek

> On 3 Feb 2018, at 04:39, Kien Truong  wrote:
> 
> Hi,
> 
> Assuming that I have a streaming job, using 30 task managers with 4 slot 
> each. I want to change the parallelism of 1 operator from 120 to 30. Are 
> there anyway so that each subtask of this operator get data from 4 upstream 
> subtasks running in the same task manager, thus avoiding network completely ?
> 
> Best regards, 
> Kien
> 
> Sent from TypeApp 


Re: Flink not writing last few elements to disk

2018-02-05 Thread Piotr Nowojski
Hi,

FileProcessMode.PROCESS_CONTINUOUSLY processes the file continuously - the 
stream will not end. 

Simple `writeAsCsv(…)` on the other hand only flushes the output file on a 
stream end (see `OutputFormatSinkFunction`).

You can either use `PROCESS_ONCE` mode or use more advanced data sink:
- BucketingSink
- re-use `writeAsCsv(…)` code by extending OutputFormatSinkFunction and 
implementing `CheckpointedFunction` to flush on snapshots (for at-least-once)
- write your own sink by extending `TwoPhaseCommitSinkFunction` (to support 
`exactly-once`)

Piotrek

> On 2 Feb 2018, at 18:32, geoff halmo  wrote:
> 
> Hi Flink community:
> 
> I am testing Flink but can't write the final(18 or so elements out to disk)
> 
> Setup:
> Using NYC yellow taxi from data 2017-09.csv, I sorted the data on
> pickup_datetime in bash. I am working in event time.
> 
> Skeleton program:
> val ds = senv.readFile(input_format, input_path,
> FileProcessMode.PROCESS_CONTINUOUSLY, 1000)
> 
> ds.flatMap(row => parse(row)
> .assignAscendingTimestamps( _.datetime)
> .timeWindowAll(Time.hours(1))
> .process( new MyProcessAllWIndowFunction() )
> .writeCsv
> 
> Issue:
> The last line is a half line:
> tail -n1 output.csv
> 150655320,2017-09-27T:19:00-4:00[user@computer]
> 
> When I use .print instead of .writeCsv, the last line on console is
> 150682680,2017-09-30T23:00-400[America/New_York],21353



Re: Latest version of Kafka

2018-02-02 Thread Piotr Nowojski
Hi,

Flink as for now provides only a connector for Kafka 0.11, which is using 
KafkaClient in 0.11.x version. However you should be able to use it for reading 
to/writing from Kafka 1.0 - Kafka claims (and as far as I know it’s true) that 
Kafka 1.0 is backward compatible with 0.11. 

Piotrek

> On 1 Feb 2018, at 14:46, Marchant, Hayden  wrote:
> 
> What is the newest version of Kafka that is compatible with Flink 1.4.0? I 
> see the last version of Kafka supported is 0.11 , from documentation, but has 
> any testing been done with Kafka 1.0?
> 
> 
> Hayden Marchant
> 



Re: Send ACK when all records of file are processed

2018-01-30 Thread Piotr Nowojski
In case of reading from input files, at the EOF event, readers will send 
Watermark(Long.MAX_VALUE) on all of the output edges and those watermarks will 
be propagated accordingly. So your ACK operator will get 
Watermark(Long.MAX_VALUE) only when it gets it from ALL of it’s input edges.

When reading from Kafka, you do not have an EOF event, so you it would not be 
possible to use this Watermark(Long.MAX_VALUE). In that case you would need to 
emit some dummy EOF record, containing some meta information like filename 
alongside with correctly set event time to a value greater then original even 
read from Kafka which contained the filename to process. You would have to pass 
this EOF dummy record to your EOF operator. There you you would need to create 
some kind of mapping 

fileName -> event time marking EOF

And each time you process EOF record, you add new entry to this mapping. Now 
whenever you process watermarks, you can check for which fileNames does this 
watermark guarantees that file has been processed completely.

However this is more complicated and you would have to handle thins like:
- cleaning up the mapping (avoiding OutOfMemory)
- making sure that watermarks are generated without unnecessary latencies (when 
reading from file, EOF immediately emits Watermark(Long.MAX_VALUE), which might 
not always be the case for Kafka: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission>)

Piotrek

> On 30 Jan 2018, at 15:17, Vinay Patil <vinay18.pa...@gmail.com> wrote:
> 
> Yeh, so this is the current implementation.
> 
> One question regarding the Watermark, since watermark is chosen as minimum 
> value of all of input streams, only one input  stream will have watermark 
> value to LONG.MAX_VALUE which denotes the EOF processing whereas the other 
> streams will not have this value , is my understanding right ? So in this 
> case LONG.MAX_VALUE will always be a greater value than it's input streams. 
> Or the LONG.MAX_VALUE watermark will flow from each input stream ?
> 
> 
> I was thinking of directly reading from Kafka as source in Flink in order to 
> remove the middle layer of independent Kafka Consumer which is triggering 
> Flink job.
> 
> So, the pipeline will be 1. readFrom Kafka -> take the File location -> read 
> using FileReaderOperator
> 
> But in this case how do I determine for which File I have received the 
> LONG.MAX_VALUE, it will get complicated.
> 
> 
> 
> Regards,
> Vinay Patil
> 
> On Tue, Jan 30, 2018 at 1:57 AM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Thanks for the clarification :)
> 
> Since you have one Job per an ACK, you can just relay on 
> Watermark(Long.MAX_VALUE) to mark the end of the processing.
> 
> More complicated solution (compared to what I proposed before) would be 
> needed if you had one long living job (for example multiple weeks) and it 
> would need to produce multiple ACKs in different point of time.
> 
> Piotrek
> 
> 
>> On 29 Jan 2018, at 15:43, Vinay Patil <vinay18.pa...@gmail.com 
>> <mailto:vinay18.pa...@gmail.com>> wrote:
>> 
>> Sure, here is the complete design that we have :
>> 
>> File metadata (NFS location of file) is stored in kafka , we are having a 
>> Kafka Consumer (not flink one) which will read from each partition and 
>> trigger a Flink job on cluster. 
>> 
>> The Flink job will then read from a file and do the processing as I 
>> mentioned earlier.
>> 
>> The requirement here is we need to trigger a ACK if the validations for all 
>> the records in a file are successful.
>> 
>> P.S I know we are not using Kafka to its full potential and are just using 
>> it for storing metadata :) 
>> 
>> Regards,
>> Vinay Patil
>> 
>> On Thu, Jan 25, 2018 at 11:57 AM, Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>> wrote:
>> Could you rephrase what is your concern? 
>> 
>> Thanks, Piotrek
>> 
>> 
>>> On 25 Jan 2018, at 18:54, Vinay Patil <vinay18.pa...@gmail.com 
>>> <mailto:vinay18.pa...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> No, to clarify I need to send the ack for each file when it gets processed 
>>> completely and there are multiple files that I am going to read from the 
>>> shared location.
>>> 
>>> Regards,
>>> Vinay Patil
>>> 
>>> On Thu, Ja

Re: Send ACK when all records of file are processed

2018-01-25 Thread Piotr Nowojski
Hi,

If an operator has multiple inputs, it’s watermark will be the minimum of all 
of the inputs. Thus your hypothetical “ACK Operator” will get 
Watermark(Long.MAX_VALUE) only when of the preceding operators report 
Watermark(Long.MAX_VALUE). 

Yes, instead of simply adding sink, you would have to use something like 
`flatMap`, that doesn’t emit anything, only passes the watermark (default 
implementation are doing exactly that).

To access watermark, you can use DataStream.transform function and pass your 
own implementation of an operator extending from AbstractStreamOperator. 
Probably you would only need to override processWatermark() method and there 
you could do the ACK operation once you get 
org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK.

Piotrek

> On 25 Jan 2018, at 17:56, Vinay Patil <vinay18.pa...@gmail.com> wrote:
> 
> Hi Piotrek,
> 
> Thank you for your detailed answer.
> 
> Yes, I want to generate the ack when all the records of the file are written 
> to DB.
> 
> So to understand what you are saying , we will receive a single EOF watermark 
> value at the ack operator when all the downstream operator process all the 
> records of the file. But what I understand regarding the watermark is each 
> parallel instance of the operator will emit the watermark, so how do I ensure 
> that the EOF is reached  or will I receive only one watermark at the ack 
> operator ?
> 
> 
> So the pipeline topology will look like 
> 
> DataStream  readFileStream = env.readFile()
> 
> readFileStream
>  .transform(// ContrinousFileReaderOperator)
>  .key(0)
>  .map(// encrichment)
>   .addSink(// DB)
> 
>  instead of add sink, should it be a  simple map operator which writes to DB 
> so that we can have a next ack operator which will generate the response.
> 
> Also, how do I get/access the Watermark value in the ack operator ? It will 
> be a simple  map operator, right ?
> 
> 
> 
> 
> 
> Regards,
> Vinay Patil
> 
> On Thu, Jan 25, 2018 at 4:50 AM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> As you figured out, some dummy EOF record is one solution, however you might 
> try to achieve it also by wrapping an existing CSV function. Your wrapper 
> could emit this dummy EOF record. Another (probably better) idea is to use 
> Watermark(Long.MAX_VALUE) for the EOF marker. Stream source and/or 
> ContrinousFileReaderOperator will do that for you, so you would just need to 
> handle the Watermark.
> 
> The question is, do you need to perform the ACK operation AFTER all of the DB 
> writes, or just after reading the CSV file? If the latter one, you could add 
> some custom ACK operator with parallelism one just after the CSV source that 
> waits for the EOF Watermark. 
> 
> If it is the first one (some kind of committing the DB writes), you would 
> need to to wait until the EOF passes through all of your operators. You would 
> need something like that:
> 
> parallelism 1 for source -> default parallelism for keyBy/enrichment/db 
> writes -> parallelism 1 for ACK operator on Watermark(Long.MAX_VALUE)
> 
> I hope this helps,
> Piotrek
> 
>> On 24 Jan 2018, at 23:19, Vinay Patil <vinay18.pa...@gmail.com 
>> <mailto:vinay18.pa...@gmail.com>> wrote:
>> 
>> Hi Guys,
>> 
>> Following is how my pipeline looks (DataStream API) :
>> 
>> [1] Read the data from the csv file
>> [2] KeyBy it by some id
>> [3] Do the enrichment and write it to DB
>> 
>> [1] reads the data in sequence as it has single parallelism and then I have 
>> default parallelism for the other operators.
>> 
>> I want to generate a response (ack) when all the data of the file is 
>> processed. How can I achieve this ?
>> 
>> One solution I can think of is to have EOF dummy record in a file and a 
>> unique field for all the records in that file. Doing a keyBy on this field 
>> will make sure that all records are sent to a single slot. So, when EOF  
>> dummy records is read I can generate a response/ack.
>> 
>> Is there a better way I can deal with this ?
>> 
>> 
>> Regards,
>> Vinay Patil
> 
> 



Re: Data exchange between tasks (operators/sources) at streaming api runtime

2018-01-25 Thread Piotr Nowojski
If you want to go this way, you could:
- as you proposed use some busy waiting with reading some file from a 
distributed file system
- wait for some network message (opening your own socket)
- use some other external system for this purpose: Kafka? Zookeeper?  

Although all of them seems hacky and I would prefer (as I proposed before) to 
pre compute those ids before running/starting the main Flink application. 
Probably would be simpler and easier to maintain.

Piotrek

> On 25 Jan 2018, at 13:47, Ishwara Varnasi <ivarn...@gmail.com> wrote:
> 
> The FLIP-17 is promising. Until it’s available I’m planning to do this: 
> extend Kafka consumer and add logic to hold consuming until other source 
> (fixed set) completes sending and those messages are processed by the 
> application. However the question is to how to let the Kafka consumer know 
> that it should now start consuming messages. What is the correct way to 
> broadcast messages to other tasks at runtime? I’d success with the 
> distributed cache (ie write status to a file in one task and other looks for 
> status in this file), but doesn’t look like good solution although works. 
> Thanks for the pointers.
> Ishwara Varnasi 
> 
> Sent from my iPhone
> 
> On Jan 25, 2018, at 4:03 AM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> 
>> Hi,
>> 
>> As far as I know there is currently no simple way to do this:
>> Join stream with static data in 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
>>  
>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API>
>> and
>> https://issues.apache.org/jira/browse/FLINK-6131 
>> <https://issues.apache.org/jira/browse/FLINK-6131>
>> 
>> One walk around might be to buffer on the state the Kafka input in your 
>> TwoInput operator until all of the broadcasted messages have arrived.
>> Another option might be to dynamically start your application. First run 
>> some computation to determine the fixed lists of ids and start the flink 
>> application with those values hardcoded in/passed via command line arguments.
>> 
>> Piotrek 
>> 
>>> On 25 Jan 2018, at 04:10, Ishwara Varnasi <ivarn...@gmail.com 
>>> <mailto:ivarn...@gmail.com>> wrote:
>>> 
>>> Hello,
>>> I have a scenario where I've two sources, one of them is source of fixed 
>>> list of ids for preloading (caching certain info which is slow) and second 
>>> one is the kafka consumer. I need to run Kafka after first one completes. I 
>>> need a mechanism to let the Kafka consumer know that it can start consuming 
>>> messages. How can I achieve this?
>>> thanks
>>> Ishwara Varnasi
>> 



Re: Send ACK when all records of file are processed

2018-01-25 Thread Piotr Nowojski
Hi,

As you figured out, some dummy EOF record is one solution, however you might 
try to achieve it also by wrapping an existing CSV function. Your wrapper could 
emit this dummy EOF record. Another (probably better) idea is to use 
Watermark(Long.MAX_VALUE) for the EOF marker. Stream source and/or 
ContrinousFileReaderOperator will do that for you, so you would just need to 
handle the Watermark.

The question is, do you need to perform the ACK operation AFTER all of the DB 
writes, or just after reading the CSV file? If the latter one, you could add 
some custom ACK operator with parallelism one just after the CSV source that 
waits for the EOF Watermark. 

If it is the first one (some kind of committing the DB writes), you would need 
to to wait until the EOF passes through all of your operators. You would need 
something like that:

parallelism 1 for source -> default parallelism for keyBy/enrichment/db writes 
-> parallelism 1 for ACK operator on Watermark(Long.MAX_VALUE)

I hope this helps,
Piotrek

> On 24 Jan 2018, at 23:19, Vinay Patil  wrote:
> 
> Hi Guys,
> 
> Following is how my pipeline looks (DataStream API) :
> 
> [1] Read the data from the csv file
> [2] KeyBy it by some id
> [3] Do the enrichment and write it to DB
> 
> [1] reads the data in sequence as it has single parallelism and then I have 
> default parallelism for the other operators.
> 
> I want to generate a response (ack) when all the data of the file is 
> processed. How can I achieve this ?
> 
> One solution I can think of is to have EOF dummy record in a file and a 
> unique field for all the records in that file. Doing a keyBy on this field 
> will make sure that all records are sent to a single slot. So, when EOF  
> dummy records is read I can generate a response/ack.
> 
> Is there a better way I can deal with this ?
> 
> 
> Regards,
> Vinay Patil



Re: Avoiding deadlock with iterations

2018-01-25 Thread Piotr Nowojski
Hi,

This is a known problem and I don’t think there is an easy solution to this. 
Please refer to the:
http://mail-archives.apache.org/mod_mbox/flink-user/201704.mbox/%3c5486a7fd-41c3-4131-5100-272825088...@gaborhermann.com%3E
 

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66853132 


Thanks,
Piotrek

> On 25 Jan 2018, at 05:36, Ken Krugler  wrote:
> 
> Hi all,
> 
> We’ve run into deadlocks with two different streaming workflows that have 
> iterations.
> 
> In both cases, the issue is with fan-out; if any operation in the loop can 
> emit more records than consumed, eventually a network buffer fills up, and 
> then everyone in the iteration loop is blocked.
> 
> One pattern we can use, when the operator that’s causing the fan-out has the 
> ability to decide how much to emit, is to have it behave as an async 
> function, emitting from a queue with multiple threads. If threads start 
> blocking because of back pressure, then the queue begins to fill up, and the 
> function can throttle back how much data it queues up. So this gives us a 
> small (carefully managed) data reservoir we can use to avoid the deadlock.
> 
> Is there a better approach? I didn’t see any way to determine how “full” the 
> various network buffers are, and use that for throttling. Plus there’s the 
> issue of partitioning, where it would be impossible in many cases to know the 
> impact of a record being emitted. So even if we could monitor buffers, I 
> don’t think it’s a viable solution.
> 
> Thanks,
> 
> — Ken
> 
> 
> http://about.me/kkrugler 
> +1 530-210-6378
> 



Re: Data exchange between tasks (operators/sources) at streaming api runtime

2018-01-25 Thread Piotr Nowojski
Hi,

As far as I know there is currently no simple way to do this:
Join stream with static data in 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
 

and
https://issues.apache.org/jira/browse/FLINK-6131 


One walk around might be to buffer on the state the Kafka input in your 
TwoInput operator until all of the broadcasted messages have arrived.
Another option might be to dynamically start your application. First run some 
computation to determine the fixed lists of ids and start the flink application 
with those values hardcoded in/passed via command line arguments.

Piotrek 

> On 25 Jan 2018, at 04:10, Ishwara Varnasi  wrote:
> 
> Hello,
> I have a scenario where I've two sources, one of them is source of fixed list 
> of ids for preloading (caching certain info which is slow) and second one is 
> the kafka consumer. I need to run Kafka after first one completes. I need a 
> mechanism to let the Kafka consumer know that it can start consuming 
> messages. How can I achieve this?
> thanks
> Ishwara Varnasi



Re: What's the meaning of "Registered `TaskManager` at akka://flink/deadLetters " ?

2018-01-19 Thread Piotr Nowojski
Hi,

It seems like you have not opened some of the ports. As I pointed out in the 
first mail, please go through all of the config options regarding 
hostnames/ports (not only those that appear in the log files, maybe something 
is not being logged) 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#jobmanager-amp-taskmanager
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#jobmanager-amp-taskmanager>

jobmanager.rpc.port
taskmanager.rpc.port
taskmanager.data.port
blob.server.port 

And double check that they are accessible from appropriate machines, best by 
using some external tool like telnet and ncat. You network can be configured to 
accept some connections only from specific hosts (like localhost). For example 
in the case for which you attached the log files, did you check that the job 
manager host, can open a connection to the `stage_dbq_1:33633` (task manager 
host and it’s rpc port - rpc port by default is random).

Also make sure that the configurations on the task manager and job manager are 
consistent.

Piotrek

> On 18 Jan 2018, at 08:41, Reza Samee <reza.sa...@gmail.com> wrote:
> 
> Hi, 
> 
> I attached log file,
> 
> Thanks
> 
> On Mon, Jan 15, 2018 at 3:36 PM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> Could you post full job manager and task manager logs from startup until the 
> first signs of the problem?
> 
> Thanks, Piotrek
> 
> 
>> On 15 Jan 2018, at 11:21, Reza Samee <reza.sa...@gmail.com 
>> <mailto:reza.sa...@gmail.com>> wrote:
>> 
>> Thanks for response; 
>> And sorry the passed time.
>> 
>> The JobManager & TaskManager logged ports are open!
>> 
>> 
>> Is this log OK?
>> 2018-01-15 13:40:03,455 INFO  
>> org.apache.flink.runtime.webmonitor.JobManagerRetriever   - New leader 
>> reachable under akka.tcp://flink@172.16.20.18:6123/user/jobmanager:null 
>> <http://flink@172.16.20.18:6123/user/jobmanager:null>.
>> 
>> When I kill task-manger, the jobmanager logs:
>> 2018-01-15 13:32:41,419 WARN  akka.remote.ReliableDeliverySupervisor 
>>- Association with remote system 
>> [akka.tcp://flink@stage_dbq_1:45532 <>] has failed, address is now gated for 
>> [5000] ms. Reason: [Disassociated] 
>> 
>> But it will not decrement the number of available task-managers!
>> and when I start my signle task-manager again, it logs:
>> 
>> 2018-01-15 13:32:52,753 INFO  
>> org.apache.flink.runtime.instance.InstanceManager - Registered 
>> TaskManager at ??? (akka://flink/deadLetters <>) as 
>> 626846ae27a833cb094eeeb047a6a72c. Current number of registered hosts is 2. 
>> Current number of alive task slots is 40.
>> 
>> 
>> On Wed, Jan 10, 2018 at 11:36 AM, Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>> wrote:
>> Hi,
>> 
>> Search both job manager and task manager logs for ip address(es) and port(s) 
>> that have timeouted. First of all make sure that nodes are visible to each 
>> other using some simple ping. Afterwards please check that those timeouted 
>> ports are opened and not blocked by some firewall (telnet).
>> 
>> You can search the documentation for the configuration parameters with 
>> “port” in name:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html>
>> But note that many of them are random by default.
>> 
>> Piotrek
>> 
>>> On 9 Jan 2018, at 17:56, Reza Samee <reza.sa...@gmail.com 
>>> <mailto:reza.sa...@gmail.com>> wrote:
>>> 
>>> 
>>> I'm running a flink-cluster (a mini one with just one node); but the 
>>> problem is that my TaskManager can't reach to my JobManager!
>>> 
>>> Here are logs from TaskManager
>>> ...
>>> Trying to register at JobManager 
>>> akka.tcp://flink@MY_PRIV_IP/user/jobmanager <> (attempt 20, timeout: 30 
>>> seconds)
>>> Trying to register at JobManager 
>>> akka.tcp://flink@MY_PRIV_IP/user/jobmanager <> (attempt 21, timeout: 30 
>>> seconds)
>>> Trying to register at JobManager 
>>> akka.tcp://flink@MY_PRIV_IP/user/jobmanager <> (attempt 22, timeout: 30 
>>> seconds)
>>> Trying to register at JobManager 
>>> akka.tcp://flink@MY_PRIV_IP/user/jobmanager <> (attempt 23, timeout: 30 
>>> seconds)
>>> Trying to re

Re: What's the meaning of "Registered `TaskManager` at akka://flink/deadLetters " ?

2018-01-15 Thread Piotr Nowojski
Hi,

Could you post full job manager and task manager logs from startup until the 
first signs of the problem?

Thanks, Piotrek

> On 15 Jan 2018, at 11:21, Reza Samee <reza.sa...@gmail.com> wrote:
> 
> Thanks for response; 
> And sorry the passed time.
> 
> The JobManager & TaskManager logged ports are open!
> 
> 
> Is this log OK?
> 2018-01-15 13:40:03,455 INFO  
> org.apache.flink.runtime.webmonitor.JobManagerRetriever   - New leader 
> reachable under akka.tcp://flink@172.16.20.18:6123/user/jobmanager:null 
> <http://flink@172.16.20.18:6123/user/jobmanager:null>.
> 
> When I kill task-manger, the jobmanager logs:
> 2018-01-15 13:32:41,419 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@stage_dbq_1:45532] has failed, address is now gated for 
> [5000] ms. Reason: [Disassociated] 
> 
> But it will not decrement the number of available task-managers!
> and when I start my signle task-manager again, it logs:
> 
> 2018-01-15 13:32:52,753 INFO  
> org.apache.flink.runtime.instance.InstanceManager - Registered 
> TaskManager at ??? (akka://flink/deadLetters) as 
> 626846ae27a833cb094eeeb047a6a72c. Current number of registered hosts is 2. 
> Current number of alive task slots is 40.
> 
> 
> On Wed, Jan 10, 2018 at 11:36 AM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> Search both job manager and task manager logs for ip address(es) and port(s) 
> that have timeouted. First of all make sure that nodes are visible to each 
> other using some simple ping. Afterwards please check that those timeouted 
> ports are opened and not blocked by some firewall (telnet).
> 
> You can search the documentation for the configuration parameters with “port” 
> in name:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html 
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html>
> But note that many of them are random by default.
> 
> Piotrek
> 
>> On 9 Jan 2018, at 17:56, Reza Samee <reza.sa...@gmail.com 
>> <mailto:reza.sa...@gmail.com>> wrote:
>> 
>> 
>> I'm running a flink-cluster (a mini one with just one node); but the problem 
>> is that my TaskManager can't reach to my JobManager!
>> 
>> Here are logs from TaskManager
>> ...
>> Trying to register at JobManager akka.tcp://flink@MY_PRIV_IP/user/jobmanager 
>> <> (attempt 20, timeout: 30 seconds)
>> Trying to register at JobManager akka.tcp://flink@MY_PRIV_IP/user/jobmanager 
>> <> (attempt 21, timeout: 30 seconds)
>> Trying to register at JobManager akka.tcp://flink@MY_PRIV_IP/user/jobmanager 
>> <> (attempt 22, timeout: 30 seconds)
>> Trying to register at JobManager akka.tcp://flink@MY_PRIV_IP/user/jobmanager 
>> <> (attempt 23, timeout: 30 seconds)
>> Trying to register at JobManager akka.tcp://flink@MY_PRIV_IP/user/jobmanager 
>> <> (attempt 24, timeout: 30 seconds)
>> ...
>> 
>> My "JobManager UI" shows my TaskManager with this Path & ID: 
>> "akka://flink/deadLetters <>" ( in TaskManagers tab)
>> And I found these lines in my JobManger stdout:
>> 
>> Resource Manager associating with leading JobManager 
>> Actor[akka://flink/user/jobmanager#-275619168 <>] - leader session null
>> TaskManager ResourceID{resourceId='1132cbdaf2d8204e5e42e321e8592754'} has 
>> started.
>> Registered TaskManager at MY_PRIV_IP (akka://flink/deadLetters <>) as 
>> 7d9568445b4557a74d05a0771a08ad9c. Current number of registered hosts is 1. 
>> Current number of alive task slots is 20.
>> 
>> 
>> What's the meaning of these lines? Where should I look for the solution?
>> 
>> 
>> 
>> 
>> -- 
>> رضا سامعی / http://samee.blog.ir <http://samee.blog.ir/>
> 
> 
> 
> -- 
> رضا سامعی / http://samee.blog.ir <http://samee.blog.ir/>


Re: Stream job failed after increasing number retained checkpoints

2018-01-10 Thread Piotr Nowojski
/4) (50b6fc8908a4b13dbbe73f4686beda7d).
> 2018-01-08 22:26:43,037 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Sink: Discarded events (4/4) 
> (50b6fc8908a4b13dbbe73f4686beda7d) switched from RUNNING to FAILED.
> java.lang.Exception: TaskManager akka://flink/user/taskmanager disconnects 
> from JobManager 
> akka.tcp://fl...@ip-10-1-51-209.cloud-internal.rovio.com:35341/user/jobmanager
>  
> <http://fl...@ip-10-1-51-209.cloud-internal.rovio.com:35341/user/jobmanager>: 
> JobManager is no longer reachable
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1095)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:311)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:120)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at 
> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:44)
>   at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
>   at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:486)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2018-01-08 22:26:43,069 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Triggering cancellation of task code Sink: Discarded events 
> (4/4) (50b6fc8908a4b13dbbe73f4686beda7d).
> 2018-01-08 22:26:43,087 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Sink: CounterSink (async 
> call completed) (3/4) (b9f2b35e1f9822320cded759c2daea1e).
> 
> 
> José Miguel Tejedor Fernández
> Server developer
> jose.fernan...@rovio.com <mailto:jose.fernan...@rovio.com>
> Rovio Entertainment Ltd.
> Keilaranta 7, FIN - 02150 Espoo, Finland
> www.rovio.com <http://www.rovio.com/>
> 
> 
> On Wed, Jan 10, 2018 at 10:50 AM, Stefan Richter <s.rich...@data-artisans.com 
> <mailto:s.rich...@data-artisans.com>> wrote:
> Hi,
> 
> there is no known limitation in the strict sense, but you might run out of 
> dfs space or job manager memory if you keep around a huge number checkpoints. 
> I wonder what reason you might have that you ever want such a huge number of 
> retained checkpoints? Usually keeping one checkpoint should do the job, maybe 
> a couple more if you are very afraid about corruption that goes beyond your 
> DFSs capabilities to handle it. Is there any reason for that or maybe a 
> misconception about increasing the number of retained checkpoints is good for?
> 
> Best,
> Stefan 
> 
> 
>> Am 10.01.2018 um 08:54 schrieb Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>>:
>> 
>> Hi,
>> 
>> Increasing akka’s timeouts is rarely a solution for any problems - it either 
>> do not help, or just mask the issue making it less visible. But yes, it is 
>&g

Re: Datastream broadcast with KeyBy

2018-01-10 Thread Piotr Nowojski
Hi,

Could you elaborate what is the problem that you are having? What is the 
exception(s) that you are getting? I have tested such simple example and it’s 
seems to be working as expected:

DataStreamSource input = env.fromElements(1, 2, 3, 4, 5, 1, 2, 3);

DataStreamSource confStream = env.fromElements(42);

input.keyBy(new MyKeySelector()).connect(confStream.broadcast()).process(new 
MyCoProcessFunction()).print();

Thanks, Piotrek

> On 10 Jan 2018, at 10:01, anujk  wrote:
> 
> Currently we have an Flink pipeline running with Data-Src —> KeyBy —>
> ProcessFunction.  State Management (with RocksDB) and Timers are working
> well.
> Now we have to extend this by having another Config Stream which we want to
> broadcast to all process operators. So wanted to connect the Data Stream
> with Config Stream (with Config Stream being broadcast) and use
> CoProcessFunction to handle both streams.
> 
> KeyBy uses Hash based partitioning and also if we write CustomPartitioner it
> can return only one partition (Array of SelectedChannel option as in
> BroadcastPartitioner is not allowed).
> Would have liked this to work —
> dataStream.keyBy().connect(confStream.broadcast()).process(…RichCoProcessFunction()…)
> but it says both stream must be keyed.
> 
> Is there any way to make this work?
> 
> dataStream.connect(confStream.broadcast()).flatMap(...
> RichCoFlatMapFunction() …) ==> broadcast works. But we really want KeyBy and
> processFunction functionality.
> 
> Thanks,
> Anuj
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Custom Partitioning for Keyed Streams

2018-01-10 Thread Piotr Nowojski
Hi,

I don’t think it is possible to enforce scheduling of two keys to different 
nodes, since all of that is based on hashes.

For some cases, doing the pre-aggregation step (initial aggregation done before 
keyBy, which is followed by final aggregation after the keyBy) can be the 
solution for handling a data skew. With pre aggregation, some (most?) of the 
work can be distributed and be done on the source node instead of doing all of 
the heavy lifting on the destination node. It has not been yet merged to the 
Flink code, but it’s entirely a user space code, which you could copy paste 
(and adjust) into your project. Pull request containing pre aggregation is here:
https://github.com/apache/flink/pull/4626 

Please pay attention at the limitations of this code (documented in the java 
doc).

If above code doesn’t work for you for whatever reason, you can also try to 
implement some custom tailored pre aggregation. Like having two keyBy steps, 
where in first you can artificially split A and B keys into couple of smaller 
ones and the second keyBy could merge/squash the results.

Piotrek

> On 9 Jan 2018, at 21:55, Martin, Nick  wrote:
> 
> Have a set of stateful operators that rely on keyed state. There is 
> substantial skew between keys (i.e. there will be 100 messages on keys A and 
> B, and 10 messages each on keys C-J), and key selection assignment is 
> dictated by the needs of my application such that I can’t choose keys in a 
> way that will eliminate the skew. The skew is somewhat predictable (i.e. I 
> know keys A and B will usually get roughly 10x as many messages as the rest) 
> and fairly consistent on different timescales (i.e. counting the messages on 
> each key for 30 seconds would provide a reasonably good guess as to the 
> distribution of messages that will be received over the next 10-20 minutes).
>  
> The problem I’m having is that often the high volume keys (A and B in the 
> example) end up on the same task slot and slow it down, while the low volume 
> ones are distributed across the other operators, leaving them underloaded. I 
> looked into the available physical partitioning functions, but it looks like 
> that functionality is generally incompatible with keyed streams, and I need 
> access to keyed state to do my actual processing. Is there any way I can get 
> better load balancing while using keyed state?
> 
> Notice: This e-mail is intended solely for use of the individual or entity to 
> which it is addressed and may contain information that is proprietary, 
> privileged and/or exempt from disclosure under applicable law. If the reader 
> is not the intended recipient or agent responsible for delivering the message 
> to the intended recipient, you are hereby notified that any dissemination, 
> distribution or copying of this communication is strictly prohibited. This 
> communication may also contain data subject to U.S. export laws. If so, data 
> subject to the International Traffic in Arms Regulation cannot be 
> disseminated, distributed, transferred, or copied, whether incorporated or in 
> its original form, to foreign nationals residing in the U.S. or abroad, 
> absent the express prior approval of the U.S. Department of State. Data 
> subject to the Export Administration Act may not be disseminated, 
> distributed, transferred or copied contrary to U. S. Department of Commerce 
> regulations. If you have received this communication in error, please notify 
> the sender by reply e-mail and destroy the e-mail message and any physical 
> copies made of the communication.
>  Thank you. 
> *



Re: Stream job failed after increasing number retained checkpoints

2018-01-09 Thread Piotr Nowojski
Hi,

Increasing akka’s timeouts is rarely a solution for any problems - it either do 
not help, or just mask the issue making it less visible. But yes, it is 
possible to bump the limits: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#distributed-coordination-via-akka
 


I don’t think that state.checkpoints.num-retained was thought to handle such 
large numbers of retained checkpoint so maybe there are some known/unknown 
limitations. Stefan, do you know something in this regard?

Parallel thing to do is that like for any other akka timeout, you should track 
down the root cause of it. This one warning line doesn’t tell much. From where 
does it come from? Client log? Job manager log? Task manager log? Please search 
on the opposite side of the time outing connection for possible root cause of 
the timeout including:
- possible error/exceptions/warnings
- long GC pauses or other blocking operations (possibly long unnatural gaps in 
the logs)
- machine health (CPU usage, disks usage, network connections)

Piotrek

> On 9 Jan 2018, at 16:38, Jose Miguel Tejedor Fernandez 
>  wrote:
> 
> Hello,
> 
> I have several stream jobs running (v. 1.3.1 ) in production which always 
> fails after a fixed period of around 30h after being executing. That's the 
> WARN trace before failing:
> 
> Association with remote system 
> [akka.tcp://fl...@ip-10-1-51-134.cloud-internal.acme.com:39876 
> ] has failed, 
> address is now gated for [5000] ms. Reason: [Association failed with 
> [akka.tcp://fl...@ip-10-1-51-134.cloud-internal.acme.com:39876 
> ]] Caused by: [No 
> response from remote for outbound association. Handshake timed out after 
> [2 ms].
> 
> The main change done in the job configuration was to increase the 
> state.checkpoints.num-retained from 1 to 2880. I am using asynchronous 
> RocksDB to persists to snapshot the state. (I attach some screenshots with 
> the  checkpoint conf from webUI)
> 
> May my assumption be correct that the increase of checkpoints.num-retained is 
> causing the problem? Any known issue regarding this?
> Besides, Is there any way to increase the Akka handshake timeout from the 
> current 2 ms to a higher value? I considered that it may be convenient to 
> increase the timeout to 1 minute instead.
> 
> BR
> 
> 
>  17.35.18.png>



Re: periodic trigger

2018-01-03 Thread Piotr Nowojski
Hi,

Sorry for late response (because of the holiday period).

You didn’t mention lateness previously, that’s why I proposed such solution. 

Another approach would be to calculate max session length per user on the first 
aggregation level and at the same time remember what was the previously 
emitted/triggered value. Now, whenever you recalculate your window because of 
firing a trigger, you could check what is the new value and what was the 
previously emitted value. If they are the same, you do not have to emit 
anything. If they are different, you would have to issue an “update”/“diff” to 
the global aggregation on the second level. In case of simple average of 
session length, first level on update could emit “-old_value” and “+new_value” 
(negative old_value and positive new_value). 

To do this in Flink you could use 

org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction

and store the previously emitted values in

org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction.Context#windowState()

For efficiency reasons best to combine it with a reduce function using this call

org.apache.flink.streaming.api.datastream.WindowedStream#reduce(org.apache.flink.api.common.functions.ReduceFunction,
 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction<T,R,K,W>)

Reduce will ensure that your ProcessWindowFunction will not have to process all 
events belonging to the window each time it is triggered, but only it will have 
to process the single reduced element.

In your case:

.keyBy("userId", "sessionId")
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
.reduce(MAX_BY_LENGTH_REDUCE_FUNCTION, 
MY_FANCY_PROCESS_WINDOW_FUNCTION_WITH_UPDATES).

Of course second level global aggregation would have to understand and take 
those “update”/“diffs” into account, but that would be simple to implement by 
some custom reduce function/aggregate. You would like the sequence of values 1, 
1, 10, -10, 1 produce average of 1 and not 0.6 - negative values have to 
decrease counters in calculating the average value (to aggregate average value 
you always need to keep sum of values and a counter).

.timeWindowAll(Time.seconds(60))
.aggregate(FANCY_AGGREGATE_THAT_HANDLES_UPDATES)
.print();

Hope that helps.

Piotrek

> On 22 Dec 2017, at 14:10, Plamen Paskov <plamen.pas...@next-stream.com> wrote:
> 
> I think it will not solve the problem as if i set ContinuousEventTimeTrigger 
> to 10 seconds and allowedLateness(Time.seconds(60)) as i don't want to 
> discard events from different users received later then i might receive more 
> than one row for a single user based on the number of windows created by the 
> events of this user. That will make the the average computations wrong.
> 
> On 22.12.2017 12:10, Piotr Nowojski wrote:
>> Ok, I think now I understand your problem. 
>> 
>> Wouldn’t it be enough, if you change last global window to something like 
>> this:
>> 
>> lastUserSession
>> .timeWindowAll(Time.seconds(10))
>> .aggregate(new AverageSessionLengthAcrossAllUsers())
>> .print();
>> 
>> (As a side note, maybe you should use ContinousEventTimeTrigger in the first 
>> window). This way it will aggregate and calculate average session length of 
>> only last “preview results” of the 60 seconds user windows (emitted every 10 
>> seconds from the first aggregation).
>> 
>> Piotrek
>> 
>>> On 21 Dec 2017, at 15:18, Plamen Paskov <plamen.pas...@next-stream.com 
>>> <mailto:plamen.pas...@next-stream.com>> wrote:
>>> 
>>> Imagine a case where i want to run a computation every X seconds for 1 day 
>>> window. I want the calculate average session length for current day every X 
>>> seconds. Is there an easy way to achieve that?
>>> 
>>> On 21.12.2017 16:06, Piotr Nowojski wrote:
>>>> Hi,
>>>> 
>>>> You defined a tumbling window 
>>>> (https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#tumbling-windows
>>>>  
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#tumbling-windows>)
>>>>  of 60 seconds, triggered every 10 seconds. This means that each input 
>>>> element can be processed/averaged up to 6 times (there is no other way if 
>>>> you trigger each window multiple times).
>>>> 
>>>> I am not sure what are you trying to achieve, but please refer to the 
>>>> documentation about different window types (tumbling, sliding, session) 
>>>> maybe it will clarify things for y

Re: entrypoint for executing job in task manager

2017-12-22 Thread Piotr Nowojski
I don’t think there is such hook in the Flink code now. You will have to walk 
around this issue somehow in user space. 

Maybe you could make a contract that every operator before touching Guice, 
should call static synchronized method `initializeGuiceContext`. This method 
could search the classpath for classes with some specific annotations, for 
example `@MyInitializationHook` and install/add all of such hooks before 
actually using Guice?

Piotrek

> On 21 Dec 2017, at 17:49, Steven Wu <stevenz...@gmail.com> wrote:
> 
> We use Guice for dependency injection. We need to install additional Guice 
> modules (for bindings) when setting up this static context of Guice injector.
> 
> Calling the static initializer from operator open method won't really help. 
> Not all operators are implemented by app developer who want to install 
> additional Guice modules. E.g. kafka source operator is implemented/provided 
> by our platform. I think the source operator will open first, which means app 
> operator won't get a chance to initialize the static context. What would 
> really help if there is a entry hook (at task manager) that is executed 
> before any operator opening.
> 
> On Thu, Dec 21, 2017 at 12:27 AM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Open method is called just before any elements are processed. You can hook in 
> any initialisation logic there, including initialisation of a static context. 
> However keep in mind, that since this context is static, it will be shared 
> between multiple operators (if you are running parallelism > number of task 
> managers), so accesses to it must be synchronized (including initialisation). 
> Another thing to consider is that managing the life cycle of static context 
> can be tricky (when to close it and release it’s resources).
> 
> The questions is, whether you really need a static context?
> 
> Thanks,
> Piotrek
> 
> 
> > On 21 Dec 2017, at 07:53, Steven Wu <stevenz...@gmail.com 
> > <mailto:stevenz...@gmail.com>> wrote:
> >
> > Here is my understanding of how job submission works in Flink. When 
> > submitting a job to job manager via REST API, we provide a entry class. Job 
> > manager then evaluate job graph and ship serialized operators to task 
> > manager. Task manager then open operators and run tasks.
> >
> > My app would typically requires some initialization phase to setup my own 
> > running context in task manager (e.g. calling a static method of some 
> > class). Does Flink provide any entry hook in task manager when executing a 
> > job (and tasks)? As for job manager, the entry class provides such hook 
> > where I can initialize my static context.
> >
> > Thanks,
> > Steven
> 
> 



Re: periodic trigger

2017-12-22 Thread Piotr Nowojski
Ok, I think now I understand your problem. 

Wouldn’t it be enough, if you change last global window to something like this:

lastUserSession
.timeWindowAll(Time.seconds(10))
.aggregate(new AverageSessionLengthAcrossAllUsers())
.print();

(As a side note, maybe you should use ContinousEventTimeTrigger in the first 
window). This way it will aggregate and calculate average session length of 
only last “preview results” of the 60 seconds user windows (emitted every 10 
seconds from the first aggregation).

Piotrek

> On 21 Dec 2017, at 15:18, Plamen Paskov <plamen.pas...@next-stream.com> wrote:
> 
> Imagine a case where i want to run a computation every X seconds for 1 day 
> window. I want the calculate average session length for current day every X 
> seconds. Is there an easy way to achieve that?
> 
> On 21.12.2017 16:06, Piotr Nowojski wrote:
>> Hi,
>> 
>> You defined a tumbling window 
>> (https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#tumbling-windows
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#tumbling-windows>)
>>  of 60 seconds, triggered every 10 seconds. This means that each input 
>> element can be processed/averaged up to 6 times (there is no other way if 
>> you trigger each window multiple times).
>> 
>> I am not sure what are you trying to achieve, but please refer to the 
>> documentation about different window types (tumbling, sliding, session) 
>> maybe it will clarify things for you:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners>
>> 
>> If you want to avoid duplicated processing, use either tumbling window with 
>> default trigger (triggering at the end of the window), or use session 
>> windows.
>> 
>> Piotrek
>> 
>> 
>>> On 21 Dec 2017, at 13:29, Plamen Paskov <plamen.pas...@next-stream.com 
>>> <mailto:plamen.pas...@next-stream.com>> wrote:
>>> 
>>> Hi guys,
>>> I have the following code:
>>> 
>>> SingleOutputStreamOperator lastUserSession = env
>>> .socketTextStream("localhost", 9000, "\n")
>>> .map(new MapFunction<String, Event>() {
>>> @Override
>>> public Event map(String value) throws Exception {
>>> String[] row = value.split(",");
>>> return new Event(Long.valueOf(row[0]), row[1], 
>>> Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
>>> }
>>> })
>>> .assignTimestampsAndWatermarks(new 
>>> BoundedOutOfOrdernessTimestampExtractor(Time.seconds(10)) {
>>> @Override
>>> public long extractTimestamp(Event element) {
>>> return element.timestamp;
>>> }
>>> })
>>> .keyBy("userId", "sessionId")
>>> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
>>> .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
>>> .maxBy("length", false);
>>> 
>>> lastUserSession
>>> .timeWindowAll(Time.seconds(60))
>>> .aggregate(new AverageSessionLengthAcrossAllUsers())
>>> .print();
>>> 
>>> What i'm trying to achieve is to calculate the average session length every 
>>> 10 seconds. The problem is that once the window length is 60 seconds and a 
>>> computation is triggered
>>> every 10 seconds i will receive duplicate events in my average calculation 
>>> method so the average will not be correct. If i move 
>>> ContinuousProcessingTimeTrigger down before 
>>> AverageSessionLengthAcrossAllUsers() then it's not triggering every 10 
>>> seconds.
>>> Any other suggestions how to workaround this?
>>> 
>>> Thanks
>> 
> 



Re: periodic trigger

2017-12-21 Thread Piotr Nowojski
Hi,

You defined a tumbling window 
(https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#tumbling-windows
 
)
 of 60 seconds, triggered every 10 seconds. This means that each input element 
can be processed/averaged up to 6 times (there is no other way if you trigger 
each window multiple times).

I am not sure what are you trying to achieve, but please refer to the 
documentation about different window types (tumbling, sliding, session) maybe 
it will clarify things for you:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners
 


If you want to avoid duplicated processing, use either tumbling window with 
default trigger (triggering at the end of the window), or use session windows.

Piotrek


> On 21 Dec 2017, at 13:29, Plamen Paskov  wrote:
> 
> Hi guys,
> I have the following code:
> 
> SingleOutputStreamOperator lastUserSession = env
> .socketTextStream("localhost", 9000, "\n")
> .map(new MapFunction() {
> @Override
> public Event map(String value) throws Exception {
> String[] row = value.split(",");
> return new Event(Long.valueOf(row[0]), row[1], 
> Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
> }
> })
> .assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor(Time.seconds(10)) {
> @Override
> public long extractTimestamp(Event element) {
> return element.timestamp;
> }
> })
> .keyBy("userId", "sessionId")
> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
> .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
> .maxBy("length", false);
> 
> lastUserSession
> .timeWindowAll(Time.seconds(60))
> .aggregate(new AverageSessionLengthAcrossAllUsers())
> .print();
> 
> What i'm trying to achieve is to calculate the average session length every 
> 10 seconds. The problem is that once the window length is 60 seconds and a 
> computation is triggered
> every 10 seconds i will receive duplicate events in my average calculation 
> method so the average will not be correct. If i move 
> ContinuousProcessingTimeTrigger down before 
> AverageSessionLengthAcrossAllUsers() then it's not triggering every 10 
> seconds.
> Any other suggestions how to workaround this?
> 
> Thanks



Re: Metric reporters with non-static ports

2017-12-21 Thread Piotr Nowojski
I am not sure (and because of holiday season you might not get an answer 
quickly), however I do not see a way to obtain this port other then by looking 
into the log files. On the other hand, I have an impression, that intention of 
this feature was that if you must execute N reporters on one single machine, 
you configure port range to the size of N. That way you can just assume that 
each port was taken and used.

Maybe Chesnay will be able to answer this question better once he is back from 
the holidays.

Piotrek  

> On 20 Dec 2017, at 17:57, Jared Stehler  
> wrote:
> 
> The prometheus metric reporter allows for a specification of a port range; is 
> there a way I can find out which actual port it found to bind to?
> 
> Also, there doesn’t seem to be a way to reserve an extra port for task 
> managers in mesos to assign to a metric reporter, is that a roadmap item? I’m 
> able to override the port for the app master 
> (-Dmetrics.reporter.prom_reporter.port=$PORT1) but this carries over to the 
> task managers and can collide with the assigned data port, etc.
> 
> 
> 
> --
> Jared Stehler
> Chief Architect - Intellify Learning
> o: 617.701.6330 x703
> 
> 
> 



Re: entrypoint for executing job in task manager

2017-12-21 Thread Piotr Nowojski
Open method is called just before any elements are processed. You can hook in 
any initialisation logic there, including initialisation of a static context. 
However keep in mind, that since this context is static, it will be shared 
between multiple operators (if you are running parallelism > number of task 
managers), so accesses to it must be synchronized (including initialisation). 
Another thing to consider is that managing the life cycle of static context can 
be tricky (when to close it and release it’s resources).

The questions is, whether you really need a static context?

Thanks,
Piotrek


> On 21 Dec 2017, at 07:53, Steven Wu  wrote:
> 
> Here is my understanding of how job submission works in Flink. When 
> submitting a job to job manager via REST API, we provide a entry class. Job 
> manager then evaluate job graph and ship serialized operators to task 
> manager. Task manager then open operators and run tasks.
> 
> My app would typically requires some initialization phase to setup my own 
> running context in task manager (e.g. calling a static method of some class). 
> Does Flink provide any entry hook in task manager when executing a job (and 
> tasks)? As for job manager, the entry class provides such hook where I can 
> initialize my static context.
> 
> Thanks,
> Steven



Re: Custom Metrics

2017-12-14 Thread Piotr Nowojski
Hi,

> I have couple more questions related to metrics. I use Influx db reporter to 
> report flink metrics and I see a lot of metrics are bring reported. Is there 
> a way to select only a subset of metrics that we need to monitor the 
> application?

At this point is up to either reporter, or up to the system that metrics are 
reported. You would need to extend an Influx db reporter to add some 
configuration options to ignore some metrics.

> Also, Is there a way to specify custom metics scope? Basically I register 
> metrics like below, add a custom metric group and then add a meter per user. 
> I would like this to be reported as measurement "Users" and tags with user 
> id. This way I can easily visualize the data in grafana or any other tool by 
> selecting the measurement and group by tag. Is there a way to report like 
> that instead of host, process_type, tm_id, job_name, task_name & 
> subtask_index?


Can not you ignore first couple of groups/scopes in the Grafana? I think you 
can also add more groups in the user scope.

metricGroup.addGroup("Users”).addGroup(“Foo”).addGroup(“Bar”).

Piotrek

> On 13 Dec 2017, at 22:34, Navneeth Krishnan <reachnavnee...@gmail.com> wrote:
> 
> Thanks Pitor.
> 
> I have couple more questions related to metrics. I use Influx db reporter to 
> report flink metrics and I see a lot of metrics are bring reported. Is there 
> a way to select only a subset of metrics that we need to monitor the 
> application?
> 
> Also, Is there a way to specify custom metics scope? Basically I register 
> metrics like below, add a custom metric group and then add a meter per user. 
> I would like this to be reported as measurement "Users" and tags with user 
> id. This way I can easily visualize the data in grafana or any other tool by 
> selecting the measurement and group by tag. Is there a way to report like 
> that instead of host, process_type, tm_id, job_name, task_name & 
> subtask_index?
> 
> metricGroup.addGroup("Users")
> .meter(userId, new DropwizardMeterWrapper(new 
> com.codahale.metrics.Meter()));
> Thanks a bunch.
> 
> On Mon, Dec 11, 2017 at 11:12 PM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> Reporting once per 10 seconds shouldn’t create problems. Best to try it out. 
> Let us know if you get into some troubles :)
> 
> Piotrek
> 
>> On 11 Dec 2017, at 18:23, Navneeth Krishnan <reachnavnee...@gmail.com 
>> <mailto:reachnavnee...@gmail.com>> wrote:
>> 
>> Thanks Piotr. 
>> 
>> Yes, passing the metric group should be sufficient. The subcomponents will 
>> not be able to provide the list of metrics to register since the metrics are 
>> created based on incoming data by tenant. Also I am planning to have the 
>> metrics reported every 10 seconds and hope it shouldn't be a problem. We use 
>> influx and grafana to plot the metrics.
>> 
>> The option 2 that I had in mind was to collect all metrics and use influx db 
>> sink to report it directly inside the pipeline. But it seems reporting per 
>> node might not be possible.
>> 
>> 
>> On Mon, Dec 11, 2017 at 3:14 AM, Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>> wrote:
>> Hi,
>> 
>> I’m not sure if I completely understand your issue.
>> 
>> 1.
>> - You don’t have to pass RuntimeContext, you can always pass just the 
>> MetricGroup or ask your components/subclasses “what metrics do you want to 
>> register” and register them at the top level.
>> - Reporting tens/hundreds/thousands of metrics shouldn’t be an issue for 
>> Flink, as long as you have a reasonable reporting interval. However keep in 
>> mind that Flink only reports your metrics and you still need something to 
>> read/handle/process/aggregate your metrics
>> 2.
>> I don’t think that reporting per node/jvm is possible with Flink’s metric 
>> system. For that you would need some other solution, like report your 
>> metrics using JMX (directly register MBeans from your code)
>> 
>> Piotrek
>> 
>> > On 10 Dec 2017, at 18:51, Navneeth Krishnan <reachnavnee...@gmail.com 
>> > <mailto:reachnavnee...@gmail.com>> wrote:
>> >
>> > Hi,
>> >
>> > I have a streaming pipeline running on flink and I need to collect metrics 
>> > to identify how my algorithm is performing. The entire pipeline is 
>> > multi-tenanted and I also need metrics per tenant. Lets say there would be 
>> > around 20 metrics to be captured per tenant. I have the following ideas 
>> >

Re: Flink flick cancel vs stop

2017-12-14 Thread Piotr Nowojski
Hi,

Yes we are aware of this issue and we would like to have it soon, but at the 
moment it does not look like clean shutdown will be ready for Flink 1.5.

Another solution is Kafka exactly-once producer implemented on top of the 
GenericWriteAheadSink. It could avoid this issue (at a cost of significantly 
higher overhead). There are plans to implement such producer as an alternative 
to the current one, but I do not know the timeline for that. It should be 
relatively easy task and we would welcome such contribution. 

Piotrek

> On 14 Dec 2017, at 01:43, Elias Levy <fearsome.lucid...@gmail.com> wrote:
> 
> I am re-upping this thread now that FlinkKafkaProducer011 is out.  The new 
> producer, when used with the exactly once semantics, has the rather 
> troublesome behavior that it will fallback to at-most-once, rather than 
> at-least-once, if the job is down for longer than the Kafka broker's 
> transaction.max.timeout.ms <http://transaction.max.timeout.ms/> setting.
> 
> In situations that require extended maintenance downtime, this behavior is 
> nearly certain to lead to message loss, as a canceling a job while taking a 
> savepoint will not wait for the Kafka transactions to bet committed and is 
> not atomic.
> 
> So it seems like there is a need for an atomic stop or cancel with savepoint 
> that waits for transactional sinks to commit and then immediately stop any 
> further message processing.
>  
> 
> On Tue, Oct 24, 2017 at 4:46 AM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> I would propose implementations of NewSource to be not blocking/asynchronous. 
> For example something like
> 
> public abstract Future getCurrent();
> 
> Which would allow us to perform some certain actions while there are no data 
> available to process (for example flush output buffers). Something like this 
> came up recently when we were discussing possible future changes in the 
> network stack. It wouldn’t complicate API by a lot, since default 
> implementation could just:
> 
> public Future getCurrent() {
>   return completedFuture(getCurrentBlocking());
> }
> 
> Another thing to consider is maybe we would like to leave the door open for 
> fetching records in some batches from the source’s input buffers? Source 
> function (like Kafka) have some internal buffers and it would be more 
> efficient to read all/deserialise all data present in the input buffer at 
> once, instead of paying synchronisation/calling virtual method/etc costs once 
> per each record.
> 
> Piotrek
> 
>> On 22 Sep 2017, at 11:13, Aljoscha Krettek <aljos...@apache.org 
>> <mailto:aljos...@apache.org>> wrote:
>> 
>> @Eron Yes, that would be the difference in characterisation. I think 
>> technically all sources could be transformed by that by pushing data into a 
>> (blocking) queue and having the "getElement()" method pull from that.
>> 
>>> On 15. Sep 2017, at 20:17, Elias Levy <fearsome.lucid...@gmail.com 
>>> <mailto:fearsome.lucid...@gmail.com>> wrote:
>>> 
>>> On Fri, Sep 15, 2017 at 10:02 AM, Eron Wright <eronwri...@gmail.com 
>>> <mailto:eronwri...@gmail.com>> wrote:
>>> Aljoscha, would it be correct to characterize your idea as a 'pull' source 
>>> rather than the current 'push'?  It would be interesting to look at the 
>>> existing connectors to see how hard it would be to reverse their 
>>> orientation.   e.g. the source might require a buffer pool.
>>> 
>>> The Kafka client works that way.  As does the QueueingConsumer used by the 
>>> RabbitMQ source.  The Kinesis and NiFi sources also seems to poll. Those 
>>> are all the bundled sources.
>> 
> 
> 



Re: Off heap memory issue

2017-12-13 Thread Piotr Nowojski
Hi,

OOMs from metaspace probably mean that your jars are not releasing some 
resources:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/debugging_classloading.html#unloading-of-dynamically-loaded-classes
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/debugging_classloading.html#unloading-of-dynamically-loaded-classes>

Regarding second issue (I guess it is probably somehow related to the first 
one). If it’s indeed a heap space OOM, it should be fairly easy to 
analyse/debug. This article describes how to track such issues, Especially 
chapter titled "Using Java VisualVM”:
https://www.toptal.com/java/hunting-memory-leaks-in-java 
<https://www.toptal.com/java/hunting-memory-leaks-in-java>
It should allow you to pinpoint the owner and the source of the leak.

Piotrek

> On 12 Dec 2017, at 14:47, Javier Lopez <javier.lo...@zalando.de> wrote:
> 
> Hi Piotr,
> 
> We found out which one was the problem in the workers. After setting a value 
> for XX:MaxMetaspaceSize we started to get OOM exceptions from the metaspace. 
> We found out how Flink manages the User classes here 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/debugging_classloading.htm
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/debugging_classloading.htm>l
>  and solved the problem by adding the job's jar file in the /lib of the nodes 
> (master and workers). Now we have a constant memory usage in the workers. 
> 
> Unfortunately, we still have an OOM problem in the master node. We are using 
> the same configuration as in the workers (200MB for MaxMetaspace and 13000MB 
> for Heap) and after ~6000 jobs, the master runs out of memory. The metaspace 
> usage is almost constant, around 50MB and the heap usage grows up to 1MB, 
> then GC does its work and reduces this usage. But we still have the OOM 
> problems. Do you have any other idea of what could cause this problem? Our 
> workaround is to restart the master, but we cannot keep doing this in the 
> long term.
> 
> Thanks for all your support, it has been helpful.
> 
> On 16 November 2017 at 15:27, Javier Lopez <javier.lo...@zalando.de 
> <mailto:javier.lo...@zalando.de>> wrote:
> Hi Piotr,
> 
> Sorry for the late response, I'm out of the office and with limited access to 
> the Internet. I think we are on the right path to solve this problem. Some 
> time ago we did a memory analysis over 3 different cluster we are using, two 
> of them are running jobs 24/7 and the other is the one deploying thousands of 
> jobs. All of those clusters have the same behavior for arrays of Chars and 
> Bytes (as expected), but for this particular Class "java.lang.Class" the 
> clusters that have 24/7 jobs have less than 20K instances of that class, 
> whereas the other cluster has  383,120 
> instances. I don't know if this could be related.
> 
> I hope that we can test this soon, and will let you know if this fixed the 
> problem.
> 
> Thanks.
> 
> 
> On 15 November 2017 at 13:18, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> I have been able to observe some off heap memory “issues” by submitting Kafka 
> job provided by Javier Lopez (in different mailing thread). 
> 
> TL;DR;
> 
> There was no memory leak, just memory pool “Metaspace” and “Compressed Class 
> Space” are growing in size over time and are only rarely garbage collected. 
> In my test case they together were wasting up to ~7GB of memory, while my 
> test case could use as little as ~100MB. Connect with for example jconsole to 
> your JVM, check their size and cut their size by half by setting:
> 
> env.java.opts: -XX:CompressedClassSpaceSize=***M -XX:MaxMetaspaceSize=***M
> 
> In flink-conf.yaml. Everything works fine and memory consumption still too 
> high? Rinse and repeat.
> 
> 
> Long story:
> 
> In default settings, with max heap size of 1GB, off heap memory consumption, 
> memory consumption off non-heap memory pools of “Metaspace” and “Compressed 
> Class Space” was growing in time which seemed like indefinitely, and 
> Metaspace was always around ~6 times larger compared to compressed class 
> space. Default max meatspace size is unlimited, while “Compressed class 
> space” has a default max size of 1GB. 
> 
> When I decreased the CompressedClassSpaceSize down to 100MB, memory 
> consumption grew up to 90MB and then it started bouncing up and down by 
> couple of MB. “Metaspace” was following the same pattern, but using ~600MB. 
> When I decreased down MaxMetaspaceSize to 200MB, memory consumption of both 
> pools was bouncing around ~220MB.
> 
> It seems like there are no gen

Re: How to deal with dynamic types

2017-12-11 Thread Piotr Nowojski
Hi,

For truly dynamic class you would need a custom TypeInformation or 
TypeDeserializationSchema and store the fields on some kind of Map. Maybe something could be done with inheritance if records that always 
share the same fields could be deserialized to some specific class with 
fixed/predefinied fields.

However in your case it seems like you can ignore all of the dynamic fields, 
and just implement a deserializer that skips/ignores all of the field except of 
Dept and Salary. It could produce simple POJO with those two fields or a even 
Touple2. If those fields are missing, set them to null and 
discard/filter out the record, since you will not be able to use it for 
calculating your average anyway.

Piotrek

> On 11 Dec 2017, at 16:13, madan  wrote:
> 
> Hi,
> 
> I am trying some initial samples with flink. I have one doubt regarding data 
> types. Flink support data types Tuple(max 25 fields), Java POJOs, Primitive 
> types, Regular classes etc., 
> In my case I do not have fixed type. I have meta data with filed names & its 
> types. For ex., (Id:int, Name:String, Salary:Double, Dept:String,... etc). I 
> do not know the number of fields, its names or types till I receive metadata. 
> In these what should be the source type I should go with? Please suggest. 
> Small example would be of great help.
> 
> 
> Scenario trying to solve :
> 
> Input :
> Metadata : {"id":"int", 
> "Name":"String","Salary":"Double","Dept":"String"}
> Data file :   csv data file with above fields data 
> 
> Output required is : Calculate average of salary by department wise.
> 
>
> -- 
> Thank you,
> Madan.



Re: Custom Metrics

2017-12-11 Thread Piotr Nowojski
Hi,

Reporting once per 10 seconds shouldn’t create problems. Best to try it out. 
Let us know if you get into some troubles :)

Piotrek

> On 11 Dec 2017, at 18:23, Navneeth Krishnan <reachnavnee...@gmail.com> wrote:
> 
> Thanks Piotr. 
> 
> Yes, passing the metric group should be sufficient. The subcomponents will 
> not be able to provide the list of metrics to register since the metrics are 
> created based on incoming data by tenant. Also I am planning to have the 
> metrics reported every 10 seconds and hope it shouldn't be a problem. We use 
> influx and grafana to plot the metrics.
> 
> The option 2 that I had in mind was to collect all metrics and use influx db 
> sink to report it directly inside the pipeline. But it seems reporting per 
> node might not be possible.
> 
> 
> On Mon, Dec 11, 2017 at 3:14 AM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> I’m not sure if I completely understand your issue.
> 
> 1.
> - You don’t have to pass RuntimeContext, you can always pass just the 
> MetricGroup or ask your components/subclasses “what metrics do you want to 
> register” and register them at the top level.
> - Reporting tens/hundreds/thousands of metrics shouldn’t be an issue for 
> Flink, as long as you have a reasonable reporting interval. However keep in 
> mind that Flink only reports your metrics and you still need something to 
> read/handle/process/aggregate your metrics
> 2.
> I don’t think that reporting per node/jvm is possible with Flink’s metric 
> system. For that you would need some other solution, like report your metrics 
> using JMX (directly register MBeans from your code)
> 
> Piotrek
> 
> > On 10 Dec 2017, at 18:51, Navneeth Krishnan <reachnavnee...@gmail.com 
> > <mailto:reachnavnee...@gmail.com>> wrote:
> >
> > Hi,
> >
> > I have a streaming pipeline running on flink and I need to collect metrics 
> > to identify how my algorithm is performing. The entire pipeline is 
> > multi-tenanted and I also need metrics per tenant. Lets say there would be 
> > around 20 metrics to be captured per tenant. I have the following ideas for 
> > implemention but any suggestions on which one might be better will help.
> >
> > 1. Use flink metric group and register a group per tenant at the operator 
> > level. The disadvantage of this approach for me is I need the 
> > runtimecontext parameter to register a metric and I have various subclasses 
> > to which I need to pass this object to limit the metric scope within the 
> > operator. Also there will be too many metrics reported if there are higher 
> > number of subtasks.
> > How is everyone accessing flink state/ metrics from other classes where you 
> > don't have access to runtimecontext?
> >
> > 2. Use a custom singleton metric registry to add and send these metrics 
> > using custom sink. Instead of using flink metric group to collect metrics 
> > per operatior - subtask, collect per jvm and use influx sink to send the 
> > metric data. What i'm not sure in this case is how to collect only once per 
> > node/jvm.
> >
> > Thanks a bunch in advance.
> 
> 



Re: REST api: how to upload jar?

2017-12-11 Thread Piotr Nowojski
Hi,

Have you tried this

https://stackoverflow.com/questions/41724269/apache-flink-rest-client-jar-upload-not-working
 


?

Piotrek

> On 11 Dec 2017, at 14:22, Edward  wrote:
> 
> Let me try that again -- it didn't seem to render my commands correctly:
> 
> Thanks for the response, Shailesh. However, when I try with python, I get
> the 
> same error as when I attempted this with cURL: 
> 
> $ python uploadJar.py
> java.io.FileNotFoundException:
> /tmp/flink-web-4bed7801-fa5e-4e5e-abf1-3fa13ba1f528/438eaac1-7647-4716-8d8d-f95acd8129b2_/path/to/jar/file.jar
> (No such file or directory)
> 
> That is, if I tell python (or cURL) that my jar file is at 
> /path/to/jar/file.jar, the file path it uses on the server side includes 
> that entire path in the target file name. And if I try the script with no
> path (i.e. run the script 
> in the folder where file.jar exists), it uploads an empty file named 
> file.jar.  The endpoint at file/upload seems to be take the form-data 
> element "jarfile" and use the fully qualified path when trying to save the 
> jar file on the server side. 
> 
> Here is my equivalent attempt using cURL, which gives the same 
> FileNoFoundException as above: 
> 
> curl 'http://localhost:8081/jars/upload' -H 'Content-Type:
> multipart/form-data; boundary=Boundary' --data-binary
> $'--Boundary\r\nContent-Disposition: form-data; name="jarfile";
> filename="/path/to/jar/file.jar"\r\nContent-Type:
> application/java-archive\r\n\r\n\r\n--Boundary--\r\n' 
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Exception when using the time attribute in table API

2017-12-11 Thread Piotr Nowojski
Hi,

NoSuchMethodError probably comes from some mismatched compile/runtime versions 
of the Flink. Do you have to use 1.4-SNAPSHOT version? It can change on daily 
basis, so you have to be more careful about what Flink jar's you are using at 
runtime and what on compile time. If you really need some 1.4 features, it 
would be better to relay on the latest RC version (currently that would be 
RC3). 

Regarding 1.3.2 sorry, could you be more specific what problem are you 
observing and provide more details (stack trace/log)? Is it compiler error? 
Runtime error? 

Piotrek

> On 8 Dec 2017, at 17:36, Sendoh  wrote:
> 
> Hi Flink users,
> 
> I saw this error 
> 
> 12/08/2017 17:31:27   groupBy: (shipmentNumber), window:
> (TumblingGroupWindow('w$, 'rowtime, 360.millis)), select:
> (shipmentNumber, SUM(grandTotal) AS EXPR$1) -> to: Row(3/4) switched to
> FAILED 
> java.lang.NoSuchMethodError:
> org.apache.flink.api.common.functions.AggregateFunction.add(Ljava/lang/Object;Ljava/lang/Object;)V
> 
> The flink version is 1.4-SNAPSHOT. I already implemetend
> DefinedRowtimeAttribute in my table source and return the 
> event time column as row time
> 
> I thought of mvn issue and, and also tried 1.3.2 and it shows the data type
> is not supporting tumble window no matter using Types.LONG() or
> Types.SQL_TIMESTAMP().
> 
> Is there anything I should also notice?
> 
> Best,
> 
> Sendoh
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: ayncIO & TM akka response

2017-12-11 Thread Piotr Nowojski
Hi,

Please search the task manager logs for the potential reason of 
failure/disconnecting around the time when you got this error on the job 
manager. There should be some clearly visible exception. 

Thanks, Piotrek

> On 9 Dec 2017, at 20:35, Chen Qin  wrote:
> 
> Hi there,
> 
> In recent, our production fink jobs observed some weird performance issue. 
> When job tailing kafka source failed and try to catch up, asyncIO after event 
> trigger get much higher load on task thread. Since each TM allocated two 
> virtual CPU in docker, my assumption was akka message between JM and TM 
> shouldn't be impacted.
> 
> What I observed was TM get closed and keep restart with same error message 
> below. Any suggestion is appreciated!
> 
> 
> org.apache.flink.runtime.io 
> .network.netty.exception.RemoteTransportException:
>  Connection unexpectedly closed by remote task manager 
> '​xxx/​xxx:5841'. This might indicate that the remote task manager 
> was lost.
> at org.apache.flink.runtime.io 
> .network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
> at 
> io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
> at 
> io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:294)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
> at 
> io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:829)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:610)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> at java.lang.Thread.run(Thread.java:748)
> 
> ​Chen​ 



Re: Custom Metrics

2017-12-11 Thread Piotr Nowojski
Hi,

I’m not sure if I completely understand your issue.

1.
- You don’t have to pass RuntimeContext, you can always pass just the 
MetricGroup or ask your components/subclasses “what metrics do you want to 
register” and register them at the top level.
- Reporting tens/hundreds/thousands of metrics shouldn’t be an issue for Flink, 
as long as you have a reasonable reporting interval. However keep in mind that 
Flink only reports your metrics and you still need something to 
read/handle/process/aggregate your metrics
2.
I don’t think that reporting per node/jvm is possible with Flink’s metric 
system. For that you would need some other solution, like report your metrics 
using JMX (directly register MBeans from your code)

Piotrek

> On 10 Dec 2017, at 18:51, Navneeth Krishnan  wrote:
> 
> Hi,
> 
> I have a streaming pipeline running on flink and I need to collect metrics to 
> identify how my algorithm is performing. The entire pipeline is 
> multi-tenanted and I also need metrics per tenant. Lets say there would be 
> around 20 metrics to be captured per tenant. I have the following ideas for 
> implemention but any suggestions on which one might be better will help.
> 
> 1. Use flink metric group and register a group per tenant at the operator 
> level. The disadvantage of this approach for me is I need the runtimecontext 
> parameter to register a metric and I have various subclasses to which I need 
> to pass this object to limit the metric scope within the operator. Also there 
> will be too many metrics reported if there are higher number of subtasks. 
> How is everyone accessing flink state/ metrics from other classes where you 
> don't have access to runtimecontext?
> 
> 2. Use a custom singleton metric registry to add and send these metrics using 
> custom sink. Instead of using flink metric group to collect metrics per 
> operatior - subtask, collect per jvm and use influx sink to send the metric 
> data. What i'm not sure in this case is how to collect only once per node/jvm.
> 
> Thanks a bunch in advance.



Re: How & Where does flink stores data for aggregations.

2017-11-24 Thread Piotr Nowojski
Hi,

Flink will have to maintain state of the defined aggregations per each window 
and key (the more names you have, the bigger the state). Flink’s state backend 
will be used for that (for example memory or rocksdb).

However in most cases state will be small and not dependent on the length of 
the window, but only on number of keys. In your case per each key (name) only 
one counter will be maintained. Same applies to sums and averages (averages 
will use counter and sum).

There is no magic way to deal with too large state. Either add more RAM to the 
cluster, fallback to using disks or rewrite your query/application so it will 
not need that large state.

Piotrek

> On 23 Nov 2017, at 20:23, Shivam Sharma <28shivamsha...@gmail.com> wrote:
> 
> Hi All,
> 
> I have a small question regarding where does Flink stores data for doing 
> window aggregations. Lets say I am running following query on Flink table:
> 
> SELECT name, count(*)
> FROM testTable
> GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE), name
> 
> So, If I understand above query properly so it must be saving data for 1 
> minute somewhere to find aggregations. If Flink is persisting this in memory 
> then my concern is if I increase interval to a DAY or more then it will store 
> the complete data for interval which can cross memory. If persistence is disk 
> then latency will be there.
> 
> Basically how do we solve such kind of use-cases using FLINK where 
> aggregation interval are quite high.
> 
> Thanks in advance
> 
> -- 
> Shivam Sharma
> 



Re: Correlation between data streams/operators and threads

2017-11-21 Thread Piotr Nowojski
> So as long as the parallelism of my kafka source and sink operators is 1, all 
> the subsequent operators (multiple filters to create multiple streams, and 
> then individual CEP and Process operators per stream) will be executed in the 
> same task slot? 

Yes, unless you specify different resource sharing group for subsequent 
operators. 

> Regarding approach D, I'm not sure how this is different from the current 
> approach I had provided the code for above, and will it solve this problem of 
> different data streams not getting distributed across slots?

Difference is huge. Without keyBy you can not have multiple instances 
(parallelism > 1) of source and filtering operators (unless you create 
different Kafka partitions per each device, which in your case would solve a 
lot of problems btw). Your solution that you shown earlier, will simply not 
scale beyond one machine. You could distribute your business logic among as 
many machines as you want, but there always would be a potential bottleneck of 
single source/filtering operations. With keyBy you could have multiple source 
operators and keyBy would ensure that events from the same device are processed 
always by one task/machine.

Piotrek

> On 21 Nov 2017, at 07:39, Shailesh Jain <shailesh.j...@stellapps.com> wrote:
> 
> Thanks for your time in helping me here.
> 
> So as long as the parallelism of my kafka source and sink operators is 1, all 
> the subsequent operators (multiple filters to create multiple streams, and 
> then individual CEP and Process operators per stream) will be executed in the 
> same task slot? 
> 
> I cannot take approach F as the entire business logic revolves around event 
> timing.
> 
> Regarding approach D, I'm not sure how this is different from the current 
> approach I had provided the code for above, and will it solve this problem of 
> different data streams not getting distributed across slots?
> 
> Thanks again,
> Shailesh
> 
> On Fri, Nov 17, 2017 at 3:01 PM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Sorry for not responding but I was away.
> 
> Regarding 1.
> 
> One source operator, followed by multiple tasks with parallelism 1 (as 
> visible on your screen shot) that share resource group will collapse to one 
> task slot - only one TaskManager will execute all of your job.
> 
> 
> Because all of your events are written into one Kafka topic, previously 
> proposed solutions A) (multiple jobs), and B) (one job with multiple sources) 
> can not work. In that case what you have to do is either:
> 
> D) set parallelism as you wish in the environment, read from Kafka, keyBy 
> device type, split the stream by filtering by device type (or using side 
> outputs), perform your logic
> 
> This will create TOTAL_DEVICES number of data streams after keyBy on each 
> machine, and filtering will cost you (it will be linear according 
> TOTAL_DEVICES), but should be the easiest solution.
> 
> E) set parallelism as you wish, read from Kafka, keyBy device type, write 
> custom operators with custom logic handling watermarks using KeyedState
> 
> However I would strongly suggest to re-consider
> 
> F) ignore all the issue of assigning different watermarks per device stream, 
> just assign minimal from all of the devices. It would be the easiest to 
> implement.
> 
> Piotrek
> 
> > On 17 Nov 2017, at 09:22, Nico Kruber <n...@data-artisans.com 
> > <mailto:n...@data-artisans.com>> wrote:
> >
> > regarding 3.
> > a) The taskmanager logs are missing, are there any?
> > b) Also, the JobManager logs say you have 4 slots available in total - is 
> > this
> > enough for your 5 devices scenario?
> > c) The JobManager log, however, does not really reveal what it is currently
> > doing, can you set the log level to DEBUG to see more?
> > d) Also, do you still observe CPU load during the 15min as an indication 
> > that
> > it is actually doing something?
> > e) During this 15min period where apparently nothing happens, can you 
> > provide
> > the output of "jstack " (with the PID of your JobManager)?
> > f) You may further be able to debug into what is happening by running this 
> > in
> > your IDE in debug mode and pause the execution when you suspect it to hang.
> >
> >
> > Nico
> >
> > On Tuesday, 14 November 2017 14:27:36 CET Piotr Nowojski wrote:
> >> 3. Nico, can you take a look at this one? Isn’t this a blob server issue?
> >>
> >> Piotrek
> >>
> >>> On 14 Nov 2017, at 11:35, Shailesh Jain <shailesh.j...@stellapps.com 
> >>> <mailto:shailesh.j...@stell

Re: org.apache.flink.runtime.io.network.NetworkEnvironment causing memory leak?

2017-11-17 Thread Piotr Nowojski
Hi,

If the TM is not responding check the TM logs if there is some long gap in 
logs. There might be three main reasons for such gaps:

1. Machine is swapping - setup/configure your machine/processes that machine 
never swap (best to disable swap altogether)
2. Long GC full stops - look how to analyse those either by printing GC logs or 
attaching to the JVM with some profiler.
3. Network issues - but this usually shouldn’t cause gaps in the logs.

Piotrek

> On 16 Nov 2017, at 17:48, Hao Sun  wrote:
> 
> Sorry, the "killed" I mean here is JM lost the TM. The TM instance is still 
> running inside kubernetes, but it is not responding to any requests, probably 
> due to high load. And from JM side, JM lost heartbeat tracking of the TM, so 
> it marked the TM as died.
> 
> The „volume“ of Kafka topics, I mean, the volume of messages for a topic. 
> e.g. 1 msg/sec, I have not check the size of the message yet.
> But overall, as you suggested, I think I need more tuning for my TM params, 
> so it can maintain a reasonable load. I am not sure what params to look for, 
> but I will do my research first.
> 
> Always thanks for your help Stefan.
> 
> On Thu, Nov 16, 2017 at 8:27 AM Stefan Richter  > wrote:
> Hi,
>> In addition to your comments, what are the items retained by 
>> NetworkEnvironment? They grew seems like indefinitely, do they ever reduce?
>> 
> 
> Mostly the network buffers, which should be ok. They are always recycled and 
> should not be released until the network environment is GCed.
> 
>> I think there is a GC issue because my task manager is killed somehow after 
>> a job run. The duration correlates to the volume of Kafka topics. More 
>> volume TM dies quickly. Do you have any tips to debug it?
>> 
> 
> What killed your task manager? For example do you see a see an 
> java.lang.OutOfMemoryError or is the process killed by the OS’s OOM killer? 
> In case of an OOM killer, you might need to grant more process memory or 
> reduce the memory that you have configured for Flink to stay below the 
> configured threshold that would kill the process. What exactly do you mean by 
> „volume“ of Kafka topics? 
> 
> To debug, I suggest that you first figure out why the process is killed, 
> maybe your thresholds are simply to low and the consumption can go beyond 
> with your configuration of Flink. Then you should figure out what is actually 
> growing more than you expect, e.g. is the problem triggered by heap space or 
> native memory? Depending on the answer, e.g. heap dumps could help to spot 
> the problematic objects.
> 
> Best,
> Stefan



Re: Flink memory leak

2017-11-17 Thread Piotr Nowojski
Thank you for those screenshots, they help a lot.

However I do not see any obvious candidate for a memory leak. There is a slight 
upward trend in "G1 Old Gen”, but this can be misleading. To further analyse 
what’s going you need to run your test case for a longer time. Also you will 
need to take two heap dumps to compare histograms from different time points to 
calculate “delta changes” between them.

Please keep in mind that to avoid random noise it would be best to stop 
submitting jobs and manually perform GC (from jconsole) before collecting heap 
dump. Otherwise it will be “polluted” by non-leaked objects and you might need 
to collect more heap dumps to correctly spot where is the upward trend among 
all of the noise. 

Piotrek

> On 16 Nov 2017, at 14:26, ebru <b20926...@cs.hacettepe.edu.tr> wrote:
> 
> Hi Piotrek,
> 
> We’ve analysed our task managers memory pool with VisualVm and Console, we 
> attached the screenshot of results.
> Could you help us about evaluating the results?
> 
> -Ebru
> 
> 
>> On 14 Nov 2017, at 19:29, Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>> wrote:
>> 
>> Best would be to analyse memory usage via some profiler. What I have done 
>> was:
>> 
>> 1. Run your scenario on the test cluster until memory consumption goes up
>> 2. Stop submitting new jobs, cancel or running jobs
>> 3. Manually triggered GC couple of times via jconsole (other tools can do 
>> that as well)
>> 4. Analyse memory consumption via:
>> A) Oracle’s Mission Control (Java Mission Control, jmc)
>>   - analyse memory consumption and check which memory pool is growing 
>> (OldGen heap? Metaspace? Code Cache? Non heap?)
>>   - run flight record with checked all memory options
>>   - check which objects were using a lot of memory 
>> B) VisualVM
>>   - take heap dump and analyse what is using up all of this memory
>> C) jconsole
>>   - this can tell you memory pool status of you JVMs, but will not tell you 
>> what objects are actually exhausting the pools
>> 
>> Couple of remarks:
>> - because of GC memory usage can goes up and down. Important is the trend of 
>> local minimums measured just after manually triggered GC
>> - you might have to repeat steps 2, 3, 4 to actually see what has increased 
>> between submitting the jobs
>> - by default network buffers are using 10% of heap space in byte[], so you 
>> can ignore those
>> - this JDK bug that I have reproduced was visible by huge memory consumption 
>> of multiple char[] and ConcurrentHashMap$Node instances   
>> 
>> Piotrek
>> 
>>> On 14 Nov 2017, at 16:08, Flavio Pompermaier <pomperma...@okkam.it 
>>> <mailto:pomperma...@okkam.it>> wrote:
>>> 
>>> What should we do to confirm it? Do you have any github repo start from?
>>> 
>>> On Tue, Nov 14, 2017 at 4:02 PM, Piotr Nowojski <pi...@data-artisans.com 
>>> <mailto:pi...@data-artisans.com>> wrote:
>>> Ebru, Javier, Flavio:
>>> 
>>> I tried to reproduce memory leak by submitting a job, that was generating 
>>> classes with random names. And indeed I have found one. Memory was 
>>> accumulating in `char[]` instances that belonged to 
>>> `java.lang.ClassLoader#parallelLockMap`. OldGen memory pool was growing in 
>>> size up to the point I got:
>>> 
>>> java.lang.OutOfMemoryError: Java heap space
>>> 
>>> This seems like an old known “feature” of JDK:
>>> https://bugs.openjdk.java.net/browse/JDK-8037342 
>>> <https://bugs.openjdk.java.net/browse/JDK-8037342>
>>> 
>>> Can any of you confirm that this is the issue that you are experiencing? If 
>>> not, I would really need more help/information from you to track this down.
>>> 
>>> Piotrek
>>> 
>>>> On 10 Nov 2017, at 15:12, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>>>> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> 
>>>> wrote:
>>>> 
>>>> On 2017-11-10 13:14, Piotr Nowojski wrote:
>>>>> jobmanager1.log and taskmanager2.log are the same. Can you also submit
>>>>> files containing std output?
>>>>> Piotrek
>>>>>> On 10 Nov 2017, at 09:35, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>>>>>> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> 
>>>>>> wrote:
>>>>>> On 2017-11-10 11:04, Piotr Nowojski wrote:
>>>>>>> Hi,
>>>>>>> Thanks for the logs, h

Re: Off heap memory issue

2017-11-15 Thread Piotr Nowojski
Hi,

I have been able to observe some off heap memory “issues” by submitting Kafka 
job provided by Javier Lopez (in different mailing thread). 

TL;DR;

There was no memory leak, just memory pool “Metaspace” and “Compressed Class 
Space” are growing in size over time and are only rarely garbage collected. In 
my test case they together were wasting up to ~7GB of memory, while my test 
case could use as little as ~100MB. Connect with for example jconsole to your 
JVM, check their size and cut their size by half by setting:

env.java.opts: -XX:CompressedClassSpaceSize=***M -XX:MaxMetaspaceSize=***M

In flink-conf.yaml. Everything works fine and memory consumption still too 
high? Rinse and repeat.


Long story:

In default settings, with max heap size of 1GB, off heap memory consumption, 
memory consumption off non-heap memory pools of “Metaspace” and “Compressed 
Class Space” was growing in time which seemed like indefinitely, and Metaspace 
was always around ~6 times larger compared to compressed class space. Default 
max meatspace size is unlimited, while “Compressed class space” has a default 
max size of 1GB. 

When I decreased the CompressedClassSpaceSize down to 100MB, memory consumption 
grew up to 90MB and then it started bouncing up and down by couple of MB. 
“Metaspace” was following the same pattern, but using ~600MB. When I decreased 
down MaxMetaspaceSize to 200MB, memory consumption of both pools was bouncing 
around ~220MB.

It seems like there are no general guide lines how to configure those values, 
since it’s heavily application dependent. However this seems like the most 
likely suspect of the apparent OFF HEAP “memory leak” that was reported couple 
of times in use cases where users are submitting hundreds/thousands of jobs to 
Flink cluster. For more information please check here:

https://docs.oracle.com/javase/8/docs/technotes/guides/vm/gctuning/considerations.html
 


Please let us know if this solves your issues.

Thanks, Piotrek

> On 13 Nov 2017, at 16:06, Flavio Pompermaier  wrote:
> 
> Unfortunately the issue I've opened [1] was not a problem of Flink but was 
> just caused by an ever increasing job plan.
> So no help from that..Let's hope to find out the real source of the problem.
> Maybe using  -Djdk.nio.maxCachedBufferSize could help (but I didn't try it 
> yet)
> 
> Best,
> Flavio
> 
> [1] https://issues.apache.org/jira/browse/FLINK-7845 
> 
> 
> On Wed, Oct 18, 2017 at 2:07 PM, Kien Truong  > wrote:
> Hi,
> 
> We saw a similar issue in one of our job due to ByteBuffer memory leak[1]. 
> We fixed it using the solution in the article, setting 
> -Djdk.nio.maxCachedBufferSize
> 
> This variable is available for Java > 8u102
> 
> Best regards,
> 
> Kien
> [1]http://www.evanjones.ca/java-bytebuffer-leak.html 
> 
> 
> On 10/18/2017 4:06 PM, Flavio Pompermaier wrote:
>> We also faced the same problem, but the number of jobs we can run before 
>> restarting the cluster depends on the volume of the data to shuffle around 
>> the network. We even had problems with a single job and in order to avoid 
>> OOM issues we had to put some configuration to limit Netty memory usage, 
>> i.e.:
>>  - Add to flink.yaml -> env.java.opts: 
>> -Dio.netty.recycler.maxCapacity.default=1
>>  - Edit taskmanager.sh and change TM_MAX_OFFHEAP_SIZE from 8388607T to 5g
>> 
>> At this purpose we wrote a small test to reproduce the problem and we opened 
>> an issue for that [1].
>> We still don't know if the problems are related however..
>> 
>> I hope that could be helpful,
>> Flavio
>> 
>> [1] https://issues.apache.org/jira/browse/FLINK-7845 
>> 
>> 
>> On Wed, Oct 18, 2017 at 10:48 AM, Javier Lopez > > wrote:
>> Hi Robert,
>> 
>> Sorry to reply this late. We did a lot of tests, trying to identify if the 
>> problem was in our custom sources/sinks. We figured out that none of our 
>> custom components is causing this problem. We came up with a small test, and 
>> realized that the Flink nodes run out of non-heap JVM memory and crash after 
>> deployment of thousands of jobs. 
>> 
>> When rapidly deploying thousands or hundreds of thousands of Flink jobs - 
>> depending on job complexity in terms of resource consumption - Flink nodes 
>> non-heap JVM memory consumption grows until there is no more memory left on 
>> the machine and the Flink process crashes. Both TaskManagers and JobManager 
>> exhibit the same behavior. The TaskManagers die faster though. The memory 
>> consumption doesn't decrease after stopping the deployment of new jobs, with 
>> the cluster being idle (no running jobs). 
>> 
>> We could replicate the behavior 

Re: Flink memory leak

2017-11-14 Thread Piotr Nowojski
Best would be to analyse memory usage via some profiler. What I have done was:

1. Run your scenario on the test cluster until memory consumption goes up
2. Stop submitting new jobs, cancel or running jobs
3. Manually triggered GC couple of times via jconsole (other tools can do that 
as well)
4. Analyse memory consumption via:
A) Oracle’s Mission Control (Java Mission Control, jmc)
  - analyse memory consumption and check which memory pool is growing (OldGen 
heap? Metaspace? Code Cache? Non heap?)
  - run flight record with checked all memory options
  - check which objects were using a lot of memory 
B) VisualVM
  - take heap dump and analyse what is using up all of this memory
C) jconsole
  - this can tell you memory pool status of you JVMs, but will not tell you 
what objects are actually exhausting the pools

Couple of remarks:
- because of GC memory usage can goes up and down. Important is the trend of 
local minimums measured just after manually triggered GC
- you might have to repeat steps 2, 3, 4 to actually see what has increased 
between submitting the jobs
- by default network buffers are using 10% of heap space in byte[], so you can 
ignore those
- this JDK bug that I have reproduced was visible by huge memory consumption of 
multiple char[] and ConcurrentHashMap$Node instances   

Piotrek

> On 14 Nov 2017, at 16:08, Flavio Pompermaier <pomperma...@okkam.it> wrote:
> 
> What should we do to confirm it? Do you have any github repo start from?
> 
> On Tue, Nov 14, 2017 at 4:02 PM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Ebru, Javier, Flavio:
> 
> I tried to reproduce memory leak by submitting a job, that was generating 
> classes with random names. And indeed I have found one. Memory was 
> accumulating in `char[]` instances that belonged to 
> `java.lang.ClassLoader#parallelLockMap`. OldGen memory pool was growing in 
> size up to the point I got:
> 
> java.lang.OutOfMemoryError: Java heap space
> 
> This seems like an old known “feature” of JDK:
> https://bugs.openjdk.java.net/browse/JDK-8037342 
> <https://bugs.openjdk.java.net/browse/JDK-8037342>
> 
> Can any of you confirm that this is the issue that you are experiencing? If 
> not, I would really need more help/information from you to track this down.
> 
> Piotrek
> 
>> On 10 Nov 2017, at 15:12, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> wrote:
>> 
>> On 2017-11-10 13:14, Piotr Nowojski wrote:
>>> jobmanager1.log and taskmanager2.log are the same. Can you also submit
>>> files containing std output?
>>> Piotrek
>>>> On 10 Nov 2017, at 09:35, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>>>> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> 
>>>> wrote:
>>>> On 2017-11-10 11:04, Piotr Nowojski wrote:
>>>>> Hi,
>>>>> Thanks for the logs, however I do not see before mentioned exceptions
>>>>> in it. It ends with java.lang.InterruptedException
>>>>> Is it the correct log file? Also, could you attach the std output file
>>>>> of the failing TaskManager?
>>>>> Piotrek
>>>>>> On 10 Nov 2017, at 08:42, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>>>>>> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> 
>>>>>> wrote:
>>>>>> On 2017-11-09 20:08, Piotr Nowojski wrote:
>>>>>>> Hi,
>>>>>>> Could you attach full logs from those task managers? At first glance I
>>>>>>> don’t see a connection between those exceptions and any memory issue
>>>>>>> that you might had. It looks like a dependency issue in one (some?
>>>>>>> All?) of your jobs.
>>>>>>> Did you build your jars with -Pbuild-jar profile as described here:
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project
>>>>>>>  
>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project>
>>>>>>> ?
>>>>>>> If that doesn’t help. Can you binary search which job is causing the
>>>>>>> problem? There might be some Flink incompatibility between different
>>>>>>> versions and rebuilding a job’s jar with a version matching to the
>>>>>>> cluster version might help.
>>>>>>> Piotrek
>>>>>>>> On 9 Nov 2017, at 17:36, ÇETİNKAYA EBRU ÇETİNKAYA 

Re: Flink memory leak

2017-11-14 Thread Piotr Nowojski
Ebru, Javier, Flavio:

I tried to reproduce memory leak by submitting a job, that was generating 
classes with random names. And indeed I have found one. Memory was accumulating 
in `char[]` instances that belonged to `java.lang.ClassLoader#parallelLockMap`. 
OldGen memory pool was growing in size up to the point I got:

java.lang.OutOfMemoryError: Java heap space

This seems like an old known “feature” of JDK:
https://bugs.openjdk.java.net/browse/JDK-8037342 
<https://bugs.openjdk.java.net/browse/JDK-8037342>

Can any of you confirm that this is the issue that you are experiencing? If 
not, I would really need more help/information from you to track this down.

Piotrek

> On 10 Nov 2017, at 15:12, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
> <b20926...@cs.hacettepe.edu.tr> wrote:
> 
> On 2017-11-10 13:14, Piotr Nowojski wrote:
>> jobmanager1.log and taskmanager2.log are the same. Can you also submit
>> files containing std output?
>> Piotrek
>>> On 10 Nov 2017, at 09:35, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>> On 2017-11-10 11:04, Piotr Nowojski wrote:
>>>> Hi,
>>>> Thanks for the logs, however I do not see before mentioned exceptions
>>>> in it. It ends with java.lang.InterruptedException
>>>> Is it the correct log file? Also, could you attach the std output file
>>>> of the failing TaskManager?
>>>> Piotrek
>>>>> On 10 Nov 2017, at 08:42, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>> On 2017-11-09 20:08, Piotr Nowojski wrote:
>>>>>> Hi,
>>>>>> Could you attach full logs from those task managers? At first glance I
>>>>>> don’t see a connection between those exceptions and any memory issue
>>>>>> that you might had. It looks like a dependency issue in one (some?
>>>>>> All?) of your jobs.
>>>>>> Did you build your jars with -Pbuild-jar profile as described here:
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project
>>>>>> ?
>>>>>> If that doesn’t help. Can you binary search which job is causing the
>>>>>> problem? There might be some Flink incompatibility between different
>>>>>> versions and rebuilding a job’s jar with a version matching to the
>>>>>> cluster version might help.
>>>>>> Piotrek
>>>>>>> On 9 Nov 2017, at 17:36, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>>>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>>>> On 2017-11-08 18:30, Piotr Nowojski wrote:
>>>>>>> Btw, Ebru:
>>>>>>> I don’t agree that the main suspect is NetworkBufferPool. On your
>>>>>>> screenshots it’s memory consumption was reasonable and stable:
>>>>>>> 596MB
>>>>>>> -> 602MB -> 597MB.
>>>>>>> PoolThreadCache memory usage ~120MB is also reasonable.
>>>>>>> Do you experience any problems, like Out Of Memory
>>>>>>> errors/crashes/long
>>>>>>> GC pauses? Or just JVM process is using more memory over time? You
>>>>>>> are
>>>>>>> aware that JVM doesn’t like to release memory back to OS once it
>>>>>>> was
>>>>>>> used? So increasing memory usage until hitting some limit (for
>>>>>>> example
>>>>>>> JVM max heap size) is expected behaviour.
>>>>>>> Piotrek
>>>>>>> On 8 Nov 2017, at 15:48, Piotr Nowojski <pi...@data-artisans.com>
>>>>>>> wrote:
>>>>>>> I don’t know if this is relevant to this issue, but I was
>>>>>>> constantly getting failures trying to reproduce this leak using your
>>>>>>> Job, because you were using non deterministic getKey function:
>>>>>>> @Override
>>>>>>> public Integer getKey(Integer event) {
>>>>>>> Random randomGen = new Random((new Date()).getTime());
>>>>>>> return randomGen.nextInt() % 8;
>>>>>>> }
>>>>>>> And quoting Java doc of KeySelector:
>>>>>>> "If invoked multiple times on the same object, the returned key must
>>>>>>> be the same.”
>>>>>>> I’m trying to reproduce this issue with following job:
>>>>>>> https://gist.github.com/pno

Re: Correlation between data streams/operators and threads

2017-11-14 Thread Piotr Nowojski
1. It seems like you have one single data source, not one per device. That 
might make a difference. Single data source followed by comap might create one 
single operator chain. If you want to go this way, please use my suggested 
solution c), since you will have troubles with handling watermarks anyway with 
single data source.

3. Nico, can you take a look at this one? Isn’t this a blob server issue?

Piotrek

> On 14 Nov 2017, at 11:35, Shailesh Jain <shailesh.j...@stellapps.com> wrote:
> 
> 1. Okay, I understand. My code is similar to what you demonstrated. I have 
> attached a snap of my job plan visualization.
> 
> 3. Have attached the logs and exception raised (15min - configured akka 
> timeout) after submitting the job.
> 
> Thanks,
> Shailesh
> 
> 
> On Tue, Nov 14, 2017 at 2:46 PM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> 1. 
> I’m not sure what is your code. However I have tested it and here is the 
> example with multiple streams in one job:
> https://gist.github.com/pnowojski/63fb1c56f2938091769d8de6f513567f 
> <https://gist.github.com/pnowojski/63fb1c56f2938091769d8de6f513567f>
> As expected it created 5 source threads (checked in the debugger) and is 
> printing 5 values to the output every seconds, so clearly those 5 sources are 
> executed simultaneously.
> 
> Number of operators is not related to the number of threads. Number of 
> operator chains is. Simple pipelines like source -> map -> filter -> sink 
> will be chained and executed in one threads, please refer to the 
> documentation link in one of my earlier response.
> 
> Can you share your job code?
> 
> 2. Good point, I forgot to mention that. The job in my example will have 5 
> operator chains, but because of task slot sharing, they will share one single 
> task slot. In order to distribute such job with parallelism 1 across the 
> cluster you have to define different slot sharing groups per each chain:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#task-chaining-and-resource-groups
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#task-chaining-and-resource-groups>
> Just set it on the sources.
> 
> 3. Can you show the logs from job manager and task manager?
> 
> 4. As long as you have enough heap memory to run your application/tasks there 
> is no upper limit for number of task slots.
> 
> Piotrek 
> 
>> On 14 Nov 2017, at 07:26, Shailesh Jain <shailesh.j...@stellapps.com 
>> <mailto:shailesh.j...@stellapps.com>> wrote:
>> 
>> Hi Piotrek,
>> 
>> I tried out option 'a' mentioned above, but instead of separate jobs, I'm 
>> creating separate streams per device. Following is the test deployment 
>> configuration as a local cluster (8GB ram, 2.5 GHz i5, ubuntu machine):
>> 
>> akka.client.timeout 15 min
>> jobmanager.heap.mb 1024
>> jobmanager.rpc.address localhost
>> jobmanager.rpc.port 6123
>> jobmanager.web.port 8081
>> metrics.reporter.jmx.class org.apache.flink.metrics.jmx.JMXReporter
>> metrics.reporter.jmx.port 8789
>> metrics.reporters jmx
>> parallelism.default 1
>> taskmanager.heap.mb 1024
>> taskmanager.memory.preallocate false
>> taskmanager.numberOfTaskSlots 4
>> 
>> The number of Operators per device stream is 4 (one sink function, 3 CEP 
>> operators).
>> 
>> Observations (and questions):
>> 
>> 1. No. of threads (captured through JMX) is almost the same as the total 
>> number of operators being created. This clears my original question in this 
>> thread.
>> 
>> 2. Even when the number of task slots is 4, on web ui, it shows 3 slots as 
>> free. Is this expected? Why are the subtasks not being distributed across 
>> slots?
>> 
>> 3. Job deployment hangs (never switches to RUNNING) when the number of 
>> devices is greater than 5. Even on increasing the akka client timeout, it 
>> does not help. Will separate jobs being deployed per device instead of 
>> separate streams help here?
>> 
>> 4. Is there an upper limit on number task slots which can be configured? I 
>> know that my operator state size at any given point in time would not be 
>> very high, so it looks OK to deploy independent jobs which can be deployed 
>> on the same task manager across slots.
>> 
>> Thanks,
>> Shailesh
>> 
>> 
>> On Mon, Nov 13, 2017 at 7:21 PM, Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>> wrote:
>> Sure, let us know if you have other questions or e

Re: Correlation between data streams/operators and threads

2017-11-14 Thread Piotr Nowojski
Hi,

1. 
I’m not sure what is your code. However I have tested it and here is the 
example with multiple streams in one job:
https://gist.github.com/pnowojski/63fb1c56f2938091769d8de6f513567f 
<https://gist.github.com/pnowojski/63fb1c56f2938091769d8de6f513567f>
As expected it created 5 source threads (checked in the debugger) and is 
printing 5 values to the output every seconds, so clearly those 5 sources are 
executed simultaneously.

Number of operators is not related to the number of threads. Number of operator 
chains is. Simple pipelines like source -> map -> filter -> sink will be 
chained and executed in one threads, please refer to the documentation link in 
one of my earlier response.

Can you share your job code?

2. Good point, I forgot to mention that. The job in my example will have 5 
operator chains, but because of task slot sharing, they will share one single 
task slot. In order to distribute such job with parallelism 1 across the 
cluster you have to define different slot sharing groups per each chain:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#task-chaining-and-resource-groups
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#task-chaining-and-resource-groups>
Just set it on the sources.

3. Can you show the logs from job manager and task manager?

4. As long as you have enough heap memory to run your application/tasks there 
is no upper limit for number of task slots.

Piotrek 

> On 14 Nov 2017, at 07:26, Shailesh Jain <shailesh.j...@stellapps.com> wrote:
> 
> Hi Piotrek,
> 
> I tried out option 'a' mentioned above, but instead of separate jobs, I'm 
> creating separate streams per device. Following is the test deployment 
> configuration as a local cluster (8GB ram, 2.5 GHz i5, ubuntu machine):
> 
> akka.client.timeout 15 min
> jobmanager.heap.mb 1024
> jobmanager.rpc.address localhost
> jobmanager.rpc.port 6123
> jobmanager.web.port 8081
> metrics.reporter.jmx.class org.apache.flink.metrics.jmx.JMXReporter
> metrics.reporter.jmx.port 8789
> metrics.reporters jmx
> parallelism.default 1
> taskmanager.heap.mb 1024
> taskmanager.memory.preallocate false
> taskmanager.numberOfTaskSlots 4
> 
> The number of Operators per device stream is 4 (one sink function, 3 CEP 
> operators).
> 
> Observations (and questions):
> 
> 1. No. of threads (captured through JMX) is almost the same as the total 
> number of operators being created. This clears my original question in this 
> thread.
> 
> 2. Even when the number of task slots is 4, on web ui, it shows 3 slots as 
> free. Is this expected? Why are the subtasks not being distributed across 
> slots?
> 
> 3. Job deployment hangs (never switches to RUNNING) when the number of 
> devices is greater than 5. Even on increasing the akka client timeout, it 
> does not help. Will separate jobs being deployed per device instead of 
> separate streams help here?
> 
> 4. Is there an upper limit on number task slots which can be configured? I 
> know that my operator state size at any given point in time would not be very 
> high, so it looks OK to deploy independent jobs which can be deployed on the 
> same task manager across slots.
> 
> Thanks,
> Shailesh
> 
> 
> On Mon, Nov 13, 2017 at 7:21 PM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Sure, let us know if you have other questions or encounter some issues.
> 
> Thanks, Piotrek
> 
> 
>> On 13 Nov 2017, at 14:49, Shailesh Jain <shailesh.j...@stellapps.com 
>> <mailto:shailesh.j...@stellapps.com>> wrote:
>> 
>> Thanks, Piotr. I'll try it out and will get back in case of any further 
>> questions.
>> 
>> Shailesh
>> 
>> On Fri, Nov 10, 2017 at 5:52 PM, Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>> wrote:
>> 1.  It’s a little bit more complicated then that. Each operator chain/task 
>> will be executed in separate thread (parallelism
>>  Multiplies that). You can check in web ui how was your job split into tasks.
>> 
>> 3. Yes that’s true, this is an issue. To preserve the individual 
>> watermarks/latencies (assuming that you have some way to calculate them 
>> individually per each device), you could either:
>> 
>> a) have separate jobs per each device with parallelism 1. Pros: independent 
>> failures/checkpoints, Cons: resource usage (number of threads increases with 
>> number of devices, there are also other resources consumed by each job), 
>> efficiency, 
>> b) have one job with multiple data streams. Cons: resource usage (threads)
>> c) ignore Flink’s watermarks, and implement yo

Re: Correlation between data streams/operators and threads

2017-11-13 Thread Piotr Nowojski
Sure, let us know if you have other questions or encounter some issues.

Thanks, Piotrek

> On 13 Nov 2017, at 14:49, Shailesh Jain <shailesh.j...@stellapps.com> wrote:
> 
> Thanks, Piotr. I'll try it out and will get back in case of any further 
> questions.
> 
> Shailesh
> 
> On Fri, Nov 10, 2017 at 5:52 PM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> 1.  It’s a little bit more complicated then that. Each operator chain/task 
> will be executed in separate thread (parallelism
>  Multiplies that). You can check in web ui how was your job split into tasks.
> 
> 3. Yes that’s true, this is an issue. To preserve the individual 
> watermarks/latencies (assuming that you have some way to calculate them 
> individually per each device), you could either:
> 
> a) have separate jobs per each device with parallelism 1. Pros: independent 
> failures/checkpoints, Cons: resource usage (number of threads increases with 
> number of devices, there are also other resources consumed by each job), 
> efficiency, 
> b) have one job with multiple data streams. Cons: resource usage (threads)
> c) ignore Flink’s watermarks, and implement your own code in place of it. You 
> could read all of your data in single data stream, keyBy partition/device and 
> manually handle watermarks logic. You could either try to wrap CEP/Window 
> operators or copy/paste and modify them to suite your needs. 
> 
> I would start and try out from a). If it work for your cluster/scale then 
> that’s fine. If not try b) (would share most of the code with a), and as a 
> last resort try c).
> 
> Kostas, would you like to add something?
> 
> Piotrek
> 
>> On 9 Nov 2017, at 19:16, Shailesh Jain <shailesh.j...@stellapps.com 
>> <mailto:shailesh.j...@stellapps.com>> wrote:
>> 
>> On 1. - is it tied specifically to the number of source operators or to the 
>> number of Datastream objects created. I mean does the answer change if I 
>> read all the data from a single Kafka topic, get a Datastream of all events, 
>> and the apply N filters to create N individual streams?
>> 
>> On 3. - the problem with partitions is that watermarks cannot be different 
>> per partition, and since in this use case, each stream is from a device, the 
>> latency could be different (but order will be correct almost always) and 
>> there are high chances of loosing out on events on operators like Patterns 
>> which work with windows. Any ideas for workarounds here?
>> 
>> 
>> Thanks,
>> Shailesh
>> 
>> On 09-Nov-2017 8:48 PM, "Piotr Nowojski" <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>> wrote:
>> Hi,
>> 
>> 1. 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/parallel.html
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/parallel.html>
>> 
>> Number of threads executing would be roughly speaking equal to of the number 
>> of input data streams multiplied by the parallelism.
>> 
>> 2. 
>> Yes, you could dynamically create more data streams at the job startup.
>> 
>> 3.
>> Running 1 independent data streams on a small cluster (couple of nodes) 
>> will definitely be an issue, since even with parallelism set to 1, there 
>> would be quite a lot of unnecessary threads. 
>> 
>> It would be much better to treat your data as a single data input stream 
>> with multiple partitions. You could assign partitions between source 
>> instances based on parallelism. For example with parallelism 6:
>> - source 0 could get partitions 0, 6, 12, 18
>> - source 1, could get partitions 1, 7, …
>> …
>> - source 5, could get partitions 5, 11, ...
>> 
>> Piotrek
>> 
>>> On 9 Nov 2017, at 10:18, Shailesh Jain <shailesh.j...@stellapps.com 
>>> <mailto:shailesh.j...@stellapps.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> I'm trying to understand the runtime aspect of Flink when dealing with 
>>> multiple data streams and multiple operators per data stream.
>>> 
>>> Use case: N data streams in a single flink job (each data stream 
>>> representing 1 device - with different time latencies), and each of these 
>>> data streams gets split into two streams, of which one goes into a bunch of 
>>> CEP operators, and one into a process function.
>>> 
>>> Questions:
>>> 1. At runtime, will the engine create one thread per data stream? Or one 
>>> thread per operator?
>>> 2. Is it possible to dynamically create a data stream at runtime when the 
>>> job starts? (i.e. if N is read from a file when the job starts and 
>>> corresponding N streams need to be created)
>>> 3. Are there any specific performance impacts when a large number of 
>>> streams (N ~ 1) are created, as opposed to N partitions within a single 
>>> stream?
>>> 
>>> Are there any internal (design) documents which can help understanding the 
>>> implementation details? Any references to the source will also be really 
>>> helpful.
>>> 
>>> Thanks in advance.
>>> 
>>> Shailesh
>>> 
>>> 
>> 
>> 
> 
> 



Re: Flink memory leak

2017-11-10 Thread Piotr Nowojski
I have a couple of concerns.

1. Your logs seems to be incomplete. There are for example missing at the 
beginning configuration output (see attached example log). Also output file 
seems strange to me (like duplicated log file). Please submit full logs.

2. If your heap size is 1.5GB, how is it possible that on your screenshot you 
are showing memory usage ~40GBs with some process using 10GB?

Please analyse what is actually consuming all of that memory and ensure that 
your machine does not use swap. If the memory consumption comes from Flink, 
please check jvm's memory pools using jconsole, maybe something off heap is 
using the memory. Especially pay attention at your PermGen/Metaspace/Code 
pools, since they can cause class loading issues.

3. 1.5GB for heap is very low value. From your screenshots I assumed that you 
have set heap to some enormous value (htop screenshot showing ~40GB memory 
usage on the machine). It might be just too small value and you should increase 
it. Especially that you are trying to run multiple jobs at the same time with 
300 task slots. But increase it only after you solve your issue of  other 
things eating up your memory.

Piotrek


flink-pnowojski-taskmanager-9-piotr-mbp.log
Description: Binary data



> On 10 Nov 2017, at 16:05, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
> <b20926...@cs.hacettepe.edu.tr> wrote:
> 
> On 2017-11-10 18:01, ÇETİNKAYA EBRU ÇETİNKAYA EBRU wrote:
>> On 2017-11-10 17:50, Piotr Nowojski wrote:
>>> I do not see anything abnormal in the logs before this error :(
>>> What are your JVM settings and which java version are you running?
>>> What happens if you limit the heap size so that the swap is never
>>> used?
>>> Piotrek
>>>> On 10 Nov 2017, at 14:57, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>> On 2017-11-10 13:14, Piotr Nowojski wrote:
>>>>> jobmanager1.log and taskmanager2.log are the same. Can you also submit
>>>>> files containing std output?
>>>>> Piotrek
>>>>>> On 10 Nov 2017, at 09:35, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>>>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>>> On 2017-11-10 11:04, Piotr Nowojski wrote:
>>>>>>> Hi,
>>>>>>> Thanks for the logs, however I do not see before mentioned exceptions
>>>>>>> in it. It ends with java.lang.InterruptedException
>>>>>>> Is it the correct log file? Also, could you attach the std output file
>>>>>>> of the failing TaskManager?
>>>>>>> Piotrek
>>>>>>>> On 10 Nov 2017, at 08:42, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>>>>>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>>>>> On 2017-11-09 20:08, Piotr Nowojski wrote:
>>>>>>>>> Hi,
>>>>>>>>> Could you attach full logs from those task managers? At first glance I
>>>>>>>>> don’t see a connection between those exceptions and any memory issue
>>>>>>>>> that you might had. It looks like a dependency issue in one (some?
>>>>>>>>> All?) of your jobs.
>>>>>>>>> Did you build your jars with -Pbuild-jar profile as described here:
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project
>>>>>>>>> ?
>>>>>>>>> If that doesn’t help. Can you binary search which job is causing the
>>>>>>>>> problem? There might be some Flink incompatibility between different
>>>>>>>>> versions and rebuilding a job’s jar with a version matching to the
>>>>>>>>> cluster version might help.
>>>>>>>>> Piotrek
>>>>>>>>>> On 9 Nov 2017, at 17:36, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>>>>>>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>>>>>>> On 2017-11-08 18:30, Piotr Nowojski wrote:
>>>>>>>>>> Btw, Ebru:
>>>>>>>>>> I don’t agree that the main suspect is NetworkBufferPool. On your
>>>>>>>>>> screenshots it’s memory consumption was reasonable and stable:
>>>>>>>>>> 596MB
>>>>>>>>>> -> 602MB -> 597MB.
>>>>>>>>>> PoolThreadCache memory usage ~120MB is also reasonable.
>>>>>>>>>> Do you experience any problems, like Out Of Memory
>>>>>>&

Re: Flink memory leak

2017-11-10 Thread Piotr Nowojski
I do not see anything abnormal in the logs before this error :(

What are your JVM settings and which java version are you running? What happens 
if you limit the heap size so that the swap is never used? 

Piotrek

> On 10 Nov 2017, at 14:57, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
> <b20926...@cs.hacettepe.edu.tr> wrote:
> 
> On 2017-11-10 13:14, Piotr Nowojski wrote:
>> jobmanager1.log and taskmanager2.log are the same. Can you also submit
>> files containing std output?
>> Piotrek
>>> On 10 Nov 2017, at 09:35, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>> On 2017-11-10 11:04, Piotr Nowojski wrote:
>>>> Hi,
>>>> Thanks for the logs, however I do not see before mentioned exceptions
>>>> in it. It ends with java.lang.InterruptedException
>>>> Is it the correct log file? Also, could you attach the std output file
>>>> of the failing TaskManager?
>>>> Piotrek
>>>>> On 10 Nov 2017, at 08:42, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>> On 2017-11-09 20:08, Piotr Nowojski wrote:
>>>>>> Hi,
>>>>>> Could you attach full logs from those task managers? At first glance I
>>>>>> don’t see a connection between those exceptions and any memory issue
>>>>>> that you might had. It looks like a dependency issue in one (some?
>>>>>> All?) of your jobs.
>>>>>> Did you build your jars with -Pbuild-jar profile as described here:
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project
>>>>>> ?
>>>>>> If that doesn’t help. Can you binary search which job is causing the
>>>>>> problem? There might be some Flink incompatibility between different
>>>>>> versions and rebuilding a job’s jar with a version matching to the
>>>>>> cluster version might help.
>>>>>> Piotrek
>>>>>>> On 9 Nov 2017, at 17:36, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>>>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>>>> On 2017-11-08 18:30, Piotr Nowojski wrote:
>>>>>>> Btw, Ebru:
>>>>>>> I don’t agree that the main suspect is NetworkBufferPool. On your
>>>>>>> screenshots it’s memory consumption was reasonable and stable:
>>>>>>> 596MB
>>>>>>> -> 602MB -> 597MB.
>>>>>>> PoolThreadCache memory usage ~120MB is also reasonable.
>>>>>>> Do you experience any problems, like Out Of Memory
>>>>>>> errors/crashes/long
>>>>>>> GC pauses? Or just JVM process is using more memory over time? You
>>>>>>> are
>>>>>>> aware that JVM doesn’t like to release memory back to OS once it
>>>>>>> was
>>>>>>> used? So increasing memory usage until hitting some limit (for
>>>>>>> example
>>>>>>> JVM max heap size) is expected behaviour.
>>>>>>> Piotrek
>>>>>>> On 8 Nov 2017, at 15:48, Piotr Nowojski <pi...@data-artisans.com>
>>>>>>> wrote:
>>>>>>> I don’t know if this is relevant to this issue, but I was
>>>>>>> constantly getting failures trying to reproduce this leak using your
>>>>>>> Job, because you were using non deterministic getKey function:
>>>>>>> @Override
>>>>>>> public Integer getKey(Integer event) {
>>>>>>> Random randomGen = new Random((new Date()).getTime());
>>>>>>> return randomGen.nextInt() % 8;
>>>>>>> }
>>>>>>> And quoting Java doc of KeySelector:
>>>>>>> "If invoked multiple times on the same object, the returned key must
>>>>>>> be the same.”
>>>>>>> I’m trying to reproduce this issue with following job:
>>>>>>> https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3
>>>>>>> Where IntegerSource is just an infinite source, DisardingSink is
>>>>>>> well just discarding incoming data. I’m cancelling the job every 5
>>>>>>> seconds and so far (after ~15 minutes) my memory consumption is
>>>>>>> stable, well below maximum java heap size.
>>>>>>> Piotrek
>>>>>>> On 8 Nov 20

Re: Flink memory leak

2017-11-10 Thread Piotr Nowojski
jobmanager1.log and taskmanager2.log are the same. Can you also submit files 
containing std output?

Piotrek

> On 10 Nov 2017, at 09:35, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
> <b20926...@cs.hacettepe.edu.tr> wrote:
> 
> On 2017-11-10 11:04, Piotr Nowojski wrote:
>> Hi,
>> Thanks for the logs, however I do not see before mentioned exceptions
>> in it. It ends with java.lang.InterruptedException
>> Is it the correct log file? Also, could you attach the std output file
>> of the failing TaskManager?
>> Piotrek
>>> On 10 Nov 2017, at 08:42, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>> On 2017-11-09 20:08, Piotr Nowojski wrote:
>>>> Hi,
>>>> Could you attach full logs from those task managers? At first glance I
>>>> don’t see a connection between those exceptions and any memory issue
>>>> that you might had. It looks like a dependency issue in one (some?
>>>> All?) of your jobs.
>>>> Did you build your jars with -Pbuild-jar profile as described here:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project
>>>> ?
>>>> If that doesn’t help. Can you binary search which job is causing the
>>>> problem? There might be some Flink incompatibility between different
>>>> versions and rebuilding a job’s jar with a version matching to the
>>>> cluster version might help.
>>>> Piotrek
>>>>> On 9 Nov 2017, at 17:36, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>> On 2017-11-08 18:30, Piotr Nowojski wrote:
>>>>> Btw, Ebru:
>>>>> I don’t agree that the main suspect is NetworkBufferPool. On your
>>>>> screenshots it’s memory consumption was reasonable and stable:
>>>>> 596MB
>>>>> -> 602MB -> 597MB.
>>>>> PoolThreadCache memory usage ~120MB is also reasonable.
>>>>> Do you experience any problems, like Out Of Memory
>>>>> errors/crashes/long
>>>>> GC pauses? Or just JVM process is using more memory over time? You
>>>>> are
>>>>> aware that JVM doesn’t like to release memory back to OS once it
>>>>> was
>>>>> used? So increasing memory usage until hitting some limit (for
>>>>> example
>>>>> JVM max heap size) is expected behaviour.
>>>>> Piotrek
>>>>> On 8 Nov 2017, at 15:48, Piotr Nowojski <pi...@data-artisans.com>
>>>>> wrote:
>>>>> I don’t know if this is relevant to this issue, but I was
>>>>> constantly getting failures trying to reproduce this leak using your
>>>>> Job, because you were using non deterministic getKey function:
>>>>> @Override
>>>>> public Integer getKey(Integer event) {
>>>>> Random randomGen = new Random((new Date()).getTime());
>>>>> return randomGen.nextInt() % 8;
>>>>> }
>>>>> And quoting Java doc of KeySelector:
>>>>> "If invoked multiple times on the same object, the returned key must
>>>>> be the same.”
>>>>> I’m trying to reproduce this issue with following job:
>>>>> https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3
>>>>> Where IntegerSource is just an infinite source, DisardingSink is
>>>>> well just discarding incoming data. I’m cancelling the job every 5
>>>>> seconds and so far (after ~15 minutes) my memory consumption is
>>>>> stable, well below maximum java heap size.
>>>>> Piotrek
>>>>> On 8 Nov 2017, at 15:28, Javier Lopez <javier.lo...@zalando.de>
>>>>> wrote:
>>>>> Yes, I tested with just printing the stream. But it could take a
>>>>> lot of time to fail.
>>>>> On Wednesday, 8 November 2017, Piotr Nowojski
>>>>> <pi...@data-artisans.com> wrote:
>>>>> Thanks for quick answer.
>>>>> So it will also fail after some time with `fromElements` source
>>>>> instead of Kafka, right?
>>>>> Did you try it also without a Kafka producer?
>>>>> Piotrek
>>>>> On 8 Nov 2017, at 14:57, Javier Lopez <javier.lo...@zalando.de>
>>>>> wrote:
>>>>> Hi,
>>>>> You don't need data. With data it will die faster. I tested as
>>>>> well with a small data set, using the fromElements source,

Re: Flink memory leak

2017-11-10 Thread Piotr Nowojski
Hi,

Thanks for the logs, however I do not see before mentioned exceptions in it. It 
ends with java.lang.InterruptedException

Is it the correct log file? Also, could you attach the std output file of the 
failing TaskManager?

Piotrek

> On 10 Nov 2017, at 08:42, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
> <b20926...@cs.hacettepe.edu.tr> wrote:
> 
> On 2017-11-09 20:08, Piotr Nowojski wrote:
>> Hi,
>> Could you attach full logs from those task managers? At first glance I
>> don’t see a connection between those exceptions and any memory issue
>> that you might had. It looks like a dependency issue in one (some?
>> All?) of your jobs.
>> Did you build your jars with -Pbuild-jar profile as described here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project
>> ?
>> If that doesn’t help. Can you binary search which job is causing the
>> problem? There might be some Flink incompatibility between different
>> versions and rebuilding a job’s jar with a version matching to the
>> cluster version might help.
>> Piotrek
>>> On 9 Nov 2017, at 17:36, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>> On 2017-11-08 18:30, Piotr Nowojski wrote:
>>> Btw, Ebru:
>>> I don’t agree that the main suspect is NetworkBufferPool. On your
>>> screenshots it’s memory consumption was reasonable and stable:
>>> 596MB
>>> -> 602MB -> 597MB.
>>> PoolThreadCache memory usage ~120MB is also reasonable.
>>> Do you experience any problems, like Out Of Memory
>>> errors/crashes/long
>>> GC pauses? Or just JVM process is using more memory over time? You
>>> are
>>> aware that JVM doesn’t like to release memory back to OS once it
>>> was
>>> used? So increasing memory usage until hitting some limit (for
>>> example
>>> JVM max heap size) is expected behaviour.
>>> Piotrek
>>> On 8 Nov 2017, at 15:48, Piotr Nowojski <pi...@data-artisans.com>
>>> wrote:
>>> I don’t know if this is relevant to this issue, but I was
>>> constantly getting failures trying to reproduce this leak using your
>>> Job, because you were using non deterministic getKey function:
>>> @Override
>>> public Integer getKey(Integer event) {
>>> Random randomGen = new Random((new Date()).getTime());
>>> return randomGen.nextInt() % 8;
>>> }
>>> And quoting Java doc of KeySelector:
>>> "If invoked multiple times on the same object, the returned key must
>>> be the same.”
>>> I’m trying to reproduce this issue with following job:
>>> https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3
>>> Where IntegerSource is just an infinite source, DisardingSink is
>>> well just discarding incoming data. I’m cancelling the job every 5
>>> seconds and so far (after ~15 minutes) my memory consumption is
>>> stable, well below maximum java heap size.
>>> Piotrek
>>> On 8 Nov 2017, at 15:28, Javier Lopez <javier.lo...@zalando.de>
>>> wrote:
>>> Yes, I tested with just printing the stream. But it could take a
>>> lot of time to fail.
>>> On Wednesday, 8 November 2017, Piotr Nowojski
>>> <pi...@data-artisans.com> wrote:
>>> Thanks for quick answer.
>>> So it will also fail after some time with `fromElements` source
>>> instead of Kafka, right?
>>> Did you try it also without a Kafka producer?
>>> Piotrek
>>> On 8 Nov 2017, at 14:57, Javier Lopez <javier.lo...@zalando.de>
>>> wrote:
>>> Hi,
>>> You don't need data. With data it will die faster. I tested as
>>> well with a small data set, using the fromElements source, but it
>>> will take some time to die. It's better with some data.
>>> On 8 November 2017 at 14:54, Piotr Nowojski
>>> <pi...@data-artisans.com> wrote:
>>> Hi,
>>> Thanks for sharing this job.
>>> Do I need to feed some data to the Kafka to reproduce this
>> issue with your script?
>>>> Does this OOM issue also happen when you are not using the
>> Kafka source/sink?
>>>> Piotrek
>>>> On 8 Nov 2017, at 14:08, Javier Lopez <javier.lo...@zalando.de>
>> wrote:
>>>> Hi,
>>>> This is the test flink job we created to trigger this leak
>> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6
>>>> And this is the python script we are using to execute the job
>> thousands of times to get the OOM probl

Re: Flink memory leak

2017-11-09 Thread Piotr Nowojski
Hi,

Could you attach full logs from those task managers? At first glance I don’t 
see a connection between those exceptions and any memory issue that you might 
had. It looks like a dependency issue in one (some? All?) of your jobs.

Did you build your jars with -Pbuild-jar profile as described here:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project>
? 

If that doesn’t help. Can you binary search which job is causing the problem? 
There might be some Flink incompatibility between different versions and 
rebuilding a job’s jar with a version matching to the cluster version might 
help.

Piotrek


> On 9 Nov 2017, at 17:36, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
> <b20926...@cs.hacettepe.edu.tr> wrote:
> 
> On 2017-11-08 18:30, Piotr Nowojski wrote:
>> Btw, Ebru:
>> I don’t agree that the main suspect is NetworkBufferPool. On your
>> screenshots it’s memory consumption was reasonable and stable: 596MB
>> -> 602MB -> 597MB.
>> PoolThreadCache memory usage ~120MB is also reasonable.
>> Do you experience any problems, like Out Of Memory errors/crashes/long
>> GC pauses? Or just JVM process is using more memory over time? You are
>> aware that JVM doesn’t like to release memory back to OS once it was
>> used? So increasing memory usage until hitting some limit (for example
>> JVM max heap size) is expected behaviour.
>> Piotrek
>>> On 8 Nov 2017, at 15:48, Piotr Nowojski <pi...@data-artisans.com>
>>> wrote:
>>> I don’t know if this is relevant to this issue, but I was
>>> constantly getting failures trying to reproduce this leak using your
>>> Job, because you were using non deterministic getKey function:
>>> @Override
>>> public Integer getKey(Integer event) {
>>> Random randomGen = new Random((new Date()).getTime());
>>> return randomGen.nextInt() % 8;
>>> }
>>> And quoting Java doc of KeySelector:
>>> "If invoked multiple times on the same object, the returned key must
>>> be the same.”
>>> I’m trying to reproduce this issue with following job:
>>> https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3
>>> Where IntegerSource is just an infinite source, DisardingSink is
>>> well just discarding incoming data. I’m cancelling the job every 5
>>> seconds and so far (after ~15 minutes) my memory consumption is
>>> stable, well below maximum java heap size.
>>> Piotrek
>>>> On 8 Nov 2017, at 15:28, Javier Lopez <javier.lo...@zalando.de>
>>>> wrote:
>>>> Yes, I tested with just printing the stream. But it could take a
>>>> lot of time to fail.
>>>> On Wednesday, 8 November 2017, Piotr Nowojski
>>>> <pi...@data-artisans.com> wrote:
>>>>> Thanks for quick answer.
>>>>> So it will also fail after some time with `fromElements` source
>>>> instead of Kafka, right?
>>>>> Did you try it also without a Kafka producer?
>>>>> Piotrek
>>>>> On 8 Nov 2017, at 14:57, Javier Lopez <javier.lo...@zalando.de>
>>>> wrote:
>>>>> Hi,
>>>>> You don't need data. With data it will die faster. I tested as
>>>> well with a small data set, using the fromElements source, but it
>>>> will take some time to die. It's better with some data.
>>>>> On 8 November 2017 at 14:54, Piotr Nowojski
>>>> <pi...@data-artisans.com> wrote:
>>>>>> Hi,
>>>>>> Thanks for sharing this job.
>>>>>> Do I need to feed some data to the Kafka to reproduce this
>>>> issue with your script?
>>>>>> Does this OOM issue also happen when you are not using the
>>>> Kafka source/sink?
>>>>>> Piotrek
>>>>>> On 8 Nov 2017, at 14:08, Javier Lopez <javier.lo...@zalando.de>
>>>> wrote:
>>>>>> Hi,
>>>>>> This is the test flink job we created to trigger this leak
>>>> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6
>>>>>> And this is the python script we are using to execute the job
>>>> thousands of times to get the OOM problem
>>>> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107
>>>>>> The cluster we used for this has this configuration:
>>>>>> Instance type: t2.large
>>>>>> Number of workers: 2
>>>>&g

Re: How to best create a bounded session window ?

2017-11-09 Thread Piotr Nowojski
Indeed you are unfortunately right. Triggers do not define/control lifecycle of 
the window, so it could happen that each new event is constantly pushing the 
leading boundary of the window, while your custom trigger is constantly 
triggering and purging this single EVENT (because exceeded max window length). 
So probably my example of events:
1 2 3 4 6 7 8 5
Would result in following fired windows:
[1 2 3 4 6] (window time from 1 to 6) +
[7] - (window time from 1 to 7) +
[8] - (window time from 1 to 8) +
[5] - (window time from 1 to 8)

I’m not sure if you can walk around this issue. You would have to either 
implement your custom WindowOperator that behaves differently or you could copy 
the code and add new TriggerResult - FIRE_PURGE_AND_DROP_WINDOW. The later one 
maybe could be contributed back into Flink (should be discussed in some ticket 
before).

Piotrek

> On 9 Nov 2017, at 15:27, Vishal Santoshi <vishal.santo...@gmail.com> wrote:
> 
> Thanks you for the response. 
> 
> I would not mind the second scenario as in a second window, which 
> your illustration suggests with a custom trigger approach, I am not certain  
> though that triggers  define the lifecycle of a window, as in a trigger 
> firing does not necessarily imply a Garbage Collectable Window.  It should be 
> GCed only after the watermark exceeds a hypothetically ever increasing window 
> leading boundary by a lag. In a some case that might never happen as in the 
> leading boundary is forever increasing. We may decide to fire_and_purge. fire 
> etc but the window remains live.  Or did I get that part wrong ? 
> 
> 
> Vishal.
> 
> 
> 
> 
> On Thu, Nov 9, 2017 at 8:24 AM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> It might be more complicated if you want to take into account events coming 
> in out of order. For example you limit length of window to 5 and you get the 
> following events:
> 
> 1 2 3 4 6 7 8 5
> 
> Do you want to emit windows:
> 
> [1 2 3 4 5] (length limit exceeded) + [6 7 8] ?
> 
> Or are you fine with interleaving windows in case of out of order:
> 
> [1 2 3 4 6] + [5 7 8] 
> 
> If the latter one, some custom Trigger should be enough for you. If not, you 
> would need to implement hypothetical MergingAndSplitableWindowAssigner, that 
> after encountering late event “5” could split previously created windows. 
> Unfortunately such feature is not supported by a WindowOperator, so you would 
> have to implement your own operator for this.
> 
> Regardless of your option remember to write some integration tests:
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html#integration-testing
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html#integration-testing>
> 
> Piotrek
> 
>> On 8 Nov 2017, at 21:43, Vishal Santoshi <vishal.santo...@gmail.com 
>> <mailto:vishal.santo...@gmail.com>> wrote:
>> 
>> I am implementing a bounded session window but I require to short circuit 
>> the session if the session length ( in count of events or time ) go beyond a 
>> configured limit , a very reasonable scenario ( bot etc ) . I am using the 
>> approach as listed. I am not sure though if the Window itself is being 
>> terminated and if that is even feasible. Any other approach or advise ?  
>> 
>> public class BoundedEventTimeTrigger extends Trigger<Object, TimeWindow> {
>> private static final long serialVersionUID = 1L;
>> long maxSessionTime;
>> 
>> ValueState doneState;
>> private final ValueStateDescriptor cleanupStateDescriptor =
>> new ValueStateDescriptor<>("done", Boolean.class );
>> 
>> private BoundedEventTimeTrigger(long maxSessionTime) {
>> this.maxSessionTime = maxSessionTime;
>> }
>> 
>> /**
>>  * Creates an event-time trigger that fires once the watermark passes 
>> the end of the window.
>>  * 
>>  * Once the trigger fires all elements are discarded. Elements that 
>> arrive late immediately
>>  * trigger window evaluation with just this one element.
>>  */
>> public static BoundedEventTimeTrigger create(long maxSessionLengh) {
>> return new BoundedEventTimeTrigger(maxSessionLengh);
>> }
>> 
>> @Override
>> public TriggerResult onElement(Object element, long timestamp, 
>> TimeWindow window, TriggerContext ctx) throws Exception {
>> if(cleanupState!=null && cleanupState.value()!=null && 
>> cleanupState.value()) {
>> return TriggerResu

Re: Correlation between data streams/operators and threads

2017-11-09 Thread Piotr Nowojski
Hi,

1. 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/parallel.html 


Number of threads executing would be roughly speaking equal to of the number of 
input data streams multiplied by the parallelism.

2. 
Yes, you could dynamically create more data streams at the job startup.

3.
Running 1 independent data streams on a small cluster (couple of nodes) 
will definitely be an issue, since even with parallelism set to 1, there would 
be quite a lot of unnecessary threads. 

It would be much better to treat your data as a single data input stream with 
multiple partitions. You could assign partitions between source instances based 
on parallelism. For example with parallelism 6:
- source 0 could get partitions 0, 6, 12, 18
- source 1, could get partitions 1, 7, …
…
- source 5, could get partitions 5, 11, ...

Piotrek

> On 9 Nov 2017, at 10:18, Shailesh Jain  wrote:
> 
> Hi,
> 
> I'm trying to understand the runtime aspect of Flink when dealing with 
> multiple data streams and multiple operators per data stream.
> 
> Use case: N data streams in a single flink job (each data stream representing 
> 1 device - with different time latencies), and each of these data streams 
> gets split into two streams, of which one goes into a bunch of CEP operators, 
> and one into a process function.
> 
> Questions:
> 1. At runtime, will the engine create one thread per data stream? Or one 
> thread per operator?
> 2. Is it possible to dynamically create a data stream at runtime when the job 
> starts? (i.e. if N is read from a file when the job starts and corresponding 
> N streams need to be created)
> 3. Are there any specific performance impacts when a large number of streams 
> (N ~ 1) are created, as opposed to N partitions within a single stream?
> 
> Are there any internal (design) documents which can help understanding the 
> implementation details? Any references to the source will also be really 
> helpful.
> 
> Thanks in advance.
> 
> Shailesh
> 
> 



Re: Weird performance on custom Hashjoin w.r.t. parallelism

2017-11-09 Thread Piotr Nowojski
Hi,

Yes as you correctly analysed parallelism 1 was causing problems, because it 
meant that all of the records must been gathered over the network from all of 
the task managers. Keep in mind that even if you increase parallelism to “p”, 
every change in parallelism can slow down your application, because events will 
have to be redistributed, which in most cases means network transfers. 

For measuring throughput you could use already defined metrics in Flink:
https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html
 


You can get list of vertices of your job:
http://:8081/jobs//vertices 
:8081/jobs/%3Cjob-id%3E/vertices>
Then statistics:
http://:8081/jobs//vertices//metrics 
:8081/jobs/%3Cjob-id%3E/vertices/:vertex-id:/metrics>

For example
http://localhost:8081/jobs/34c6f7d00cf9b3ebfff4d94ad465eb23/vertices 

http://localhost:8081/jobs/34c6f7d00cf9b3ebfff4d94ad465eb23/vertices/3d144c2a0fc19115f5f075ba85deac26/metrics
 


You can also try to aggregate them:
https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html#rest-api-integration
 


Piotrek

> On 9 Nov 2017, at 07:53, m@xi  wrote:
> 
> Hello!
> 
> I found out that the cause of the problem was the map that I have after the
> parallel join with parallelism 1.
> When I changed it to .map(new MyMapMeter).setParallelism(p) then when I
> increase the number of parallelism p the completion time decreases, which is
> reasonable. Somehow it was a bottleneck of my parallel execution plan, but I
> had it this way in order to measure a valid average throughput.
> 
> So, my question is the following: 
> 
> How can I measure the average throughput of my parallel join operation
> properly?
> 
> Best,
> Max
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: How to best create a bounded session window ?

2017-11-09 Thread Piotr Nowojski
It might be more complicated if you want to take into account events coming in 
out of order. For example you limit length of window to 5 and you get the 
following events:

1 2 3 4 6 7 8 5

Do you want to emit windows:

[1 2 3 4 5] (length limit exceeded) + [6 7 8] ?

Or are you fine with interleaving windows in case of out of order:

[1 2 3 4 6] + [5 7 8] 

If the latter one, some custom Trigger should be enough for you. If not, you 
would need to implement hypothetical MergingAndSplitableWindowAssigner, that 
after encountering late event “5” could split previously created windows. 
Unfortunately such feature is not supported by a WindowOperator, so you would 
have to implement your own operator for this.

Regardless of your option remember to write some integration tests:

https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html#integration-testing
 


Piotrek

> On 8 Nov 2017, at 21:43, Vishal Santoshi  wrote:
> 
> I am implementing a bounded session window but I require to short circuit the 
> session if the session length ( in count of events or time ) go beyond a 
> configured limit , a very reasonable scenario ( bot etc ) . I am using the 
> approach as listed. I am not sure though if the Window itself is being 
> terminated and if that is even feasible. Any other approach or advise ?  
> 
> public class BoundedEventTimeTrigger extends Trigger {
> private static final long serialVersionUID = 1L;
> long maxSessionTime;
> 
> ValueState doneState;
> private final ValueStateDescriptor cleanupStateDescriptor =
> new ValueStateDescriptor<>("done", Boolean.class );
> 
> private BoundedEventTimeTrigger(long maxSessionTime) {
> this.maxSessionTime = maxSessionTime;
> }
> 
> /**
>  * Creates an event-time trigger that fires once the watermark passes the 
> end of the window.
>  * 
>  * Once the trigger fires all elements are discarded. Elements that 
> arrive late immediately
>  * trigger window evaluation with just this one element.
>  */
> public static BoundedEventTimeTrigger create(long maxSessionLengh) {
> return new BoundedEventTimeTrigger(maxSessionLengh);
> }
> 
> @Override
> public TriggerResult onElement(Object element, long timestamp, TimeWindow 
> window, TriggerContext ctx) throws Exception {
> if(cleanupState!=null && cleanupState.value()!=null && 
> cleanupState.value()) {
> return TriggerResult.CONTINUE;
> }
> if(timestamp - window.getStart() > maxSessionTime){
> System.out.println(new Date(timestamp) + "\t" + new 
> Date(window.getStart()));
> try {
> doneState = ctx.getPartitionedState(cleanupStateDescriptor);
> doneState.update(true);
> return TriggerResult.FIRE_AND_PURGE;
> } catch (IOException e) {
> throw new RuntimeException("Failed to update state", e);
> }
> }
> 
> if (window.maxTimestamp() <= ctx.getCurrentWatermark() ) {
> // if the watermark is already past the window fire immediately
> return TriggerResult.FIRE;
> } else {
> ctx.registerEventTimeTimer(window.maxTimestamp());
> return TriggerResult.CONTINUE;
> }
> }
> 
> @Override
> public TriggerResult onEventTime(long time, TimeWindow window, 
> TriggerContext ctx) {
> return time == window.maxTimestamp() ?
> TriggerResult.FIRE :
> TriggerResult.CONTINUE;
> }
> 
> @Override
> public TriggerResult onProcessingTime(long time, TimeWindow window, 
> TriggerContext ctx) throws Exception {
> return TriggerResult.CONTINUE;
> }
> 
> @Override
> public void clear(TimeWindow window, TriggerContext ctx) throws Exception 
> {
> ctx.deleteEventTimeTimer(window.maxTimestamp());
> }
> 
> @Override
> public boolean canMerge() {
> return true;
> }
> 
> @Override
> public void onMerge(TimeWindow window,
> OnMergeContext ctx) {
> ctx.registerEventTimeTimer(window.maxTimestamp());
> }
> 
> @Override
> public String toString() {
> return "EventTimeTrigger()";
> }
> }
> 



Re: Flink memory leak

2017-11-08 Thread Piotr Nowojski
Btw, Ebru:

I don’t agree that the main suspect is NetworkBufferPool. On your screenshots 
it’s memory consumption was reasonable and stable: 596MB -> 602MB -> 597MB. 

PoolThreadCache memory usage ~120MB is also reasonable.

Do you experience any problems, like Out Of Memory errors/crashes/long GC 
pauses? Or just JVM process is using more memory over time? You are aware that 
JVM doesn’t like to release memory back to OS once it was used? So increasing 
memory usage until hitting some limit (for example JVM max heap size) is 
expected behaviour.

Piotrek

> On 8 Nov 2017, at 15:48, Piotr Nowojski <pi...@data-artisans.com> wrote:
> 
> I don’t know if this is relevant to this issue, but I was constantly getting 
> failures trying to reproduce this leak using your Job, because you were using 
> non deterministic getKey function:
> @Override
> public Integer getKey(Integer event) {
>Random randomGen = new Random((new Date()).getTime());
>return randomGen.nextInt() % 8;
> }
> And quoting Java doc of KeySelector:
> 
> "If invoked multiple times on the same object, the returned key must be the 
> same.”
> 
> I’m trying to reproduce this issue with following job:
> 
> https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3 
> <https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3>
> 
> Where IntegerSource is just an infinite source, DisardingSink is well just 
> discarding incoming data. I’m cancelling the job every 5 seconds and so far 
> (after ~15 minutes) my memory consumption is stable, well below maximum java 
> heap size.
> 
> Piotrek
> 
>> On 8 Nov 2017, at 15:28, Javier Lopez <javier.lo...@zalando.de 
>> <mailto:javier.lo...@zalando.de>> wrote:
>> 
>> Yes, I tested with just printing the stream. But it could take a lot of time 
>> to fail. 
>> 
>> On Wednesday, 8 November 2017, Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>> wrote:
>> > Thanks for quick answer. 
>> > So it will also fail after some time with `fromElements` source instead of 
>> > Kafka, right? 
>> > Did you try it also without a Kafka producer?
>> > Piotrek
>> >
>> > On 8 Nov 2017, at 14:57, Javier Lopez <javier.lo...@zalando.de 
>> > <mailto:javier.lo...@zalando.de>> wrote:
>> > Hi,
>> > You don't need data. With data it will die faster. I tested as well with a 
>> > small data set, using the fromElements source, but it will take some time 
>> > to die. It's better with some data.
>> > On 8 November 2017 at 14:54, Piotr Nowojski <pi...@data-artisans.com 
>> > <mailto:pi...@data-artisans.com>> wrote:
>> >>
>> >> Hi,
>> >> Thanks for sharing this job. 
>> >> Do I need to feed some data to the Kafka to reproduce this issue with 
>> >> your script?
>> >> Does this OOM issue also happen when you are not using the Kafka 
>> >> source/sink? 
>> >> Piotrek
>> >>
>> >> On 8 Nov 2017, at 14:08, Javier Lopez <javier.lo...@zalando.de 
>> >> <mailto:javier.lo...@zalando.de>> wrote:
>> >> Hi,
>> >> This is the test flink job we created to trigger this leak 
>> >> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 
>> >> <https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6>
>> >> And this is the python script we are using to execute the job thousands 
>> >> of times to get the OOM problem 
>> >> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107 
>> >> <https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107>
>> >> The cluster we used for this has this configuration:
>> >>
>> >> Instance type: t2.large
>> >> Number of workers: 2
>> >> HeapMemory: 5500
>> >> Number of task slots per node: 4
>> >> TaskMangMemFraction: 0.5
>> >> NumberOfNetworkBuffers: 2000
>> >>
>> >> We have tried several things, increasing the heap, reducing the heap, 
>> >> more memory fraction, changes this value in the taskmanager.sh 
>> >> "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to work.
>> >> Thanks for your help.
>> >> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>> >> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> 
>> >> wrote:
>> >>>
>> >>> On 2017-11-08 15:20, Piotr Nowojski wrote:
>> >

Re: Flink memory leak

2017-11-08 Thread Piotr Nowojski
I don’t know if this is relevant to this issue, but I was constantly getting 
failures trying to reproduce this leak using your Job, because you were using 
non deterministic getKey function:
@Override
public Integer getKey(Integer event) {
   Random randomGen = new Random((new Date()).getTime());
   return randomGen.nextInt() % 8;
}
And quoting Java doc of KeySelector:

"If invoked multiple times on the same object, the returned key must be the 
same.”

I’m trying to reproduce this issue with following job:

https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3 
<https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3>

Where IntegerSource is just an infinite source, DisardingSink is well just 
discarding incoming data. I’m cancelling the job every 5 seconds and so far 
(after ~15 minutes) my memory consumption is stable, well below maximum java 
heap size.

Piotrek

> On 8 Nov 2017, at 15:28, Javier Lopez <javier.lo...@zalando.de> wrote:
> 
> Yes, I tested with just printing the stream. But it could take a lot of time 
> to fail. 
> 
> On Wednesday, 8 November 2017, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> > Thanks for quick answer. 
> > So it will also fail after some time with `fromElements` source instead of 
> > Kafka, right? 
> > Did you try it also without a Kafka producer?
> > Piotrek
> >
> > On 8 Nov 2017, at 14:57, Javier Lopez <javier.lo...@zalando.de 
> > <mailto:javier.lo...@zalando.de>> wrote:
> > Hi,
> > You don't need data. With data it will die faster. I tested as well with a 
> > small data set, using the fromElements source, but it will take some time 
> > to die. It's better with some data.
> > On 8 November 2017 at 14:54, Piotr Nowojski <pi...@data-artisans.com 
> > <mailto:pi...@data-artisans.com>> wrote:
> >>
> >> Hi,
> >> Thanks for sharing this job. 
> >> Do I need to feed some data to the Kafka to reproduce this issue with your 
> >> script?
> >> Does this OOM issue also happen when you are not using the Kafka 
> >> source/sink? 
> >> Piotrek
> >>
> >> On 8 Nov 2017, at 14:08, Javier Lopez <javier.lo...@zalando.de 
> >> <mailto:javier.lo...@zalando.de>> wrote:
> >> Hi,
> >> This is the test flink job we created to trigger this leak 
> >> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 
> >> <https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6>
> >> And this is the python script we are using to execute the job thousands of 
> >> times to get the OOM problem 
> >> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107 
> >> <https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107>
> >> The cluster we used for this has this configuration:
> >>
> >> Instance type: t2.large
> >> Number of workers: 2
> >> HeapMemory: 5500
> >> Number of task slots per node: 4
> >> TaskMangMemFraction: 0.5
> >> NumberOfNetworkBuffers: 2000
> >>
> >> We have tried several things, increasing the heap, reducing the heap, more 
> >> memory fraction, changes this value in the taskmanager.sh 
> >> "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to work.
> >> Thanks for your help.
> >> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
> >> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> 
> >> wrote:
> >>>
> >>> On 2017-11-08 15:20, Piotr Nowojski wrote:
> >>>>
> >>>> Hi Ebru and Javier,
> >>>>
> >>>> Yes, if you could share this example job it would be helpful.
> >>>>
> >>>> Ebru: could you explain in a little more details how does your Job(s)
> >>>> look like? Could you post some code? If you are just using maps and
> >>>> filters there shouldn’t be any network transfers involved, aside
> >>>> from Source and Sink functions.
> >>>>
> >>>> Piotrek
> >>>>
> >>>>> On 8 Nov 2017, at 12:54, ebru <b20926...@cs.hacettepe.edu.tr 
> >>>>> <mailto:b20926...@cs.hacettepe.edu.tr>> wrote:
> >>>>>
> >>>>> Hi Javier,
> >>>>>
> >>>>> It would be helpful if you share your test job with us.
> >>>>> Which configurations did you try?
> >>>>>
> >>>>> -Ebru
> >>>>>
>

Re: Flink memory leak

2017-11-08 Thread Piotr Nowojski
Thanks for quick answer. 

So it will also fail after some time with `fromElements` source instead of 
Kafka, right? 

Did you try it also without a Kafka producer?

Piotrek

> On 8 Nov 2017, at 14:57, Javier Lopez <javier.lo...@zalando.de> wrote:
> 
> Hi,
> 
> You don't need data. With data it will die faster. I tested as well with a 
> small data set, using the fromElements source, but it will take some time to 
> die. It's better with some data.
> 
> On 8 November 2017 at 14:54, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> Thanks for sharing this job. 
> 
> Do I need to feed some data to the Kafka to reproduce this issue with your 
> script?
> 
> Does this OOM issue also happen when you are not using the Kafka source/sink? 
> 
> Piotrek
> 
>> On 8 Nov 2017, at 14:08, Javier Lopez <javier.lo...@zalando.de 
>> <mailto:javier.lo...@zalando.de>> wrote:
>> 
>> Hi,
>> 
>> This is the test flink job we created to trigger this leak 
>> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 
>> <https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6>
>> And this is the python script we are using to execute the job thousands of 
>> times to get the OOM problem 
>> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107 
>> <https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107>
>> 
>> The cluster we used for this has this configuration:
>> Instance type: t2.large
>> Number of workers: 2
>> HeapMemory: 5500
>> Number of task slots per node: 4
>> TaskMangMemFraction: 0.5
>> NumberOfNetworkBuffers: 2000
>> We have tried several things, increasing the heap, reducing the heap, more 
>> memory fraction, changes this value in the taskmanager.sh 
>> "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to work.
>> 
>> Thanks for your help.
>> 
>> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> wrote:
>> On 2017-11-08 15:20, Piotr Nowojski wrote:
>> Hi Ebru and Javier,
>> 
>> Yes, if you could share this example job it would be helpful.
>> 
>> Ebru: could you explain in a little more details how does your Job(s)
>> look like? Could you post some code? If you are just using maps and
>> filters there shouldn’t be any network transfers involved, aside
>> from Source and Sink functions.
>> 
>> Piotrek
>> 
>> On 8 Nov 2017, at 12:54, ebru <b20926...@cs.hacettepe.edu.tr 
>> <mailto:b20926...@cs.hacettepe.edu.tr>> wrote:
>> 
>> Hi Javier,
>> 
>> It would be helpful if you share your test job with us.
>> Which configurations did you try?
>> 
>> -Ebru
>> 
>> On 8 Nov 2017, at 14:43, Javier Lopez <javier.lo...@zalando.de 
>> <mailto:javier.lo...@zalando.de>>
>> wrote:
>> 
>> Hi,
>> 
>> We have been facing a similar problem. We have tried some different
>> configurations, as proposed in other email thread by Flavio and
>> Kien, but it didn't work. We have a workaround similar to the one
>> that Flavio has, we restart the taskmanagers once they reach a
>> memory threshold. We created a small test to remove all of our
>> dependencies and leave only flink native libraries. This test reads
>> data from a Kafka topic and writes it back to another topic in
>> Kafka. We cancel the job and start another every 5 seconds. After
>> ~30 minutes of doing this process, the cluster reaches the OS memory
>> limit and dies.
>> 
>> Currently, we have a test cluster with 8 workers and 8 task slots
>> per node. We have one job that uses 56 slots, and we cannot execute
>> that job 5 times in a row because the whole cluster dies. If you
>> want, we can publish our test job.
>> 
>> Regards,
>> 
>> On 8 November 2017 at 11:20, Aljoscha Krettek <aljos...@apache.org 
>> <mailto:aljos...@apache.org>>
>> wrote:
>> 
>> @Nico & @Piotr Could you please have a look at this? You both
>> recently worked on the network stack and might be most familiar with
>> this.
>> 
>> On 8. Nov 2017, at 10:25, Flavio Pompermaier <pomperma...@okkam.it 
>> <mailto:pomperma...@okkam.it>>
>> wrote:
>> 
>> We also have the same problem in production. At the moment the
>> solution is to restart the entire Flink cluster after every job..
>> We've tried to reproduce this problem with a test (see
>&g

Re: Flink memory leak

2017-11-08 Thread Piotr Nowojski
Hi,

Thanks for sharing this job. 

Do I need to feed some data to the Kafka to reproduce this issue with your 
script?

Does this OOM issue also happen when you are not using the Kafka source/sink? 

Piotrek

> On 8 Nov 2017, at 14:08, Javier Lopez <javier.lo...@zalando.de> wrote:
> 
> Hi,
> 
> This is the test flink job we created to trigger this leak 
> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 
> <https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6>
> And this is the python script we are using to execute the job thousands of 
> times to get the OOM problem 
> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107 
> <https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107>
> 
> The cluster we used for this has this configuration:
> Instance type: t2.large
> Number of workers: 2
> HeapMemory: 5500
> Number of task slots per node: 4
> TaskMangMemFraction: 0.5
> NumberOfNetworkBuffers: 2000
> We have tried several things, increasing the heap, reducing the heap, more 
> memory fraction, changes this value in the taskmanager.sh 
> "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to work.
> 
> Thanks for your help.
> 
> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> wrote:
> On 2017-11-08 15:20, Piotr Nowojski wrote:
> Hi Ebru and Javier,
> 
> Yes, if you could share this example job it would be helpful.
> 
> Ebru: could you explain in a little more details how does your Job(s)
> look like? Could you post some code? If you are just using maps and
> filters there shouldn’t be any network transfers involved, aside
> from Source and Sink functions.
> 
> Piotrek
> 
> On 8 Nov 2017, at 12:54, ebru <b20926...@cs.hacettepe.edu.tr 
> <mailto:b20926...@cs.hacettepe.edu.tr>> wrote:
> 
> Hi Javier,
> 
> It would be helpful if you share your test job with us.
> Which configurations did you try?
> 
> -Ebru
> 
> On 8 Nov 2017, at 14:43, Javier Lopez <javier.lo...@zalando.de 
> <mailto:javier.lo...@zalando.de>>
> wrote:
> 
> Hi,
> 
> We have been facing a similar problem. We have tried some different
> configurations, as proposed in other email thread by Flavio and
> Kien, but it didn't work. We have a workaround similar to the one
> that Flavio has, we restart the taskmanagers once they reach a
> memory threshold. We created a small test to remove all of our
> dependencies and leave only flink native libraries. This test reads
> data from a Kafka topic and writes it back to another topic in
> Kafka. We cancel the job and start another every 5 seconds. After
> ~30 minutes of doing this process, the cluster reaches the OS memory
> limit and dies.
> 
> Currently, we have a test cluster with 8 workers and 8 task slots
> per node. We have one job that uses 56 slots, and we cannot execute
> that job 5 times in a row because the whole cluster dies. If you
> want, we can publish our test job.
> 
> Regards,
> 
> On 8 November 2017 at 11:20, Aljoscha Krettek <aljos...@apache.org 
> <mailto:aljos...@apache.org>>
> wrote:
> 
> @Nico & @Piotr Could you please have a look at this? You both
> recently worked on the network stack and might be most familiar with
> this.
> 
> On 8. Nov 2017, at 10:25, Flavio Pompermaier <pomperma...@okkam.it 
> <mailto:pomperma...@okkam.it>>
> wrote:
> 
> We also have the same problem in production. At the moment the
> solution is to restart the entire Flink cluster after every job..
> We've tried to reproduce this problem with a test (see
> https://issues.apache.org/jira/browse/FLINK-7845 
> <https://issues.apache.org/jira/browse/FLINK-7845> [1]) but we don't
> 
> know whether the error produced by the test and the leak are
> correlated..
> 
> Best,
> Flavio
> 
> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> wrote:
> On 2017-11-07 16:53, Ufuk Celebi wrote:
> Do you use any windowing? If yes, could you please share that code?
> If
> there is no stateful operation at all, it's strange where the list
> state instances are coming from.
> 
> On Tue, Nov 7, 2017 at 2:35 PM, ebru <b20926...@cs.hacettepe.edu.tr 
> <mailto:b20926...@cs.hacettepe.edu.tr>>
> wrote:
> Hi Ufuk,
> 
> We don’t explicitly define any state descriptor. We only use map
> and filters
> operator. We thought that gc handle clearing the flink’s internal
> states.
> So how can we manage the memory if it is always increasing?
> 
> - Ebru
&g

Re: Flink memory leak

2017-11-08 Thread Piotr Nowojski
Hi Ebru and Javier,

Yes, if you could share this example job it would be helpful.

Ebru: could you explain in a little more details how does your Job(s) look 
like? Could you post some code? If you are just using maps and filters there 
shouldn’t be any network transfers involved, aside from Source and Sink 
functions.

Piotrek

> On 8 Nov 2017, at 12:54, ebru  wrote:
> 
> Hi Javier,
> 
> It would be helpful if you share your test job with us. 
> Which configurations did you try?
> 
> -Ebru
>> On 8 Nov 2017, at 14:43, Javier Lopez > > wrote:
>> 
>> Hi,
>> 
>> We have been facing a similar problem. We have tried some different 
>> configurations, as proposed in other email thread by Flavio and Kien, but it 
>> didn't work. We have a workaround similar to the one that Flavio has, we 
>> restart the taskmanagers once they reach a memory threshold. We created a 
>> small test to remove all of our dependencies and leave only flink native 
>> libraries. This test reads data from a Kafka topic and writes it back to 
>> another topic in Kafka. We cancel the job and start another every 5 seconds. 
>> After ~30 minutes of doing this process, the cluster reaches the OS memory 
>> limit and dies. 
>> 
>> Currently, we have a test cluster with 8 workers and 8 task slots per node. 
>> We have one job that uses 56 slots, and we cannot execute that job 5 times 
>> in a row because the whole cluster dies. If you want, we can publish our 
>> test job.
>> 
>> Regards,
>> 
>> On 8 November 2017 at 11:20, Aljoscha Krettek > > wrote:
>> @Nico & @Piotr Could you please have a look at this? You both recently 
>> worked on the network stack and might be most familiar with this.
>> 
>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier >> > wrote:
>>> 
>>> We also have the same problem in production. At the moment the solution is 
>>> to restart the entire Flink cluster after every job..
>>> We've tried to reproduce this problem with a test (see 
>>> https://issues.apache.org/jira/browse/FLINK-7845 
>>> ) but we don't know 
>>> whether the error produced by the test and the leak are correlated..
>>> 
>>> Best,
>>> Flavio
>>> 
>>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>>> > 
>>> wrote:
>>> On 2017-11-07 16:53, Ufuk Celebi wrote:
>>> Do you use any windowing? If yes, could you please share that code? If
>>> there is no stateful operation at all, it's strange where the list
>>> state instances are coming from.
>>> 
>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru >> > wrote:
>>> Hi Ufuk,
>>> 
>>> We don’t explicitly define any state descriptor. We only use map and filters
>>> operator. We thought that gc handle clearing the flink’s internal states.
>>> So how can we manage the memory if it is always increasing?
>>> 
>>> - Ebru
>>> 
>>> On 7 Nov 2017, at 16:23, Ufuk Celebi >> > wrote:
>>> 
>>> Hey Ebru, the memory usage might be increasing as long as a job is running.
>>> This is expected (also in the case of multiple running jobs). The
>>> screenshots are not helpful in that regard. :-(
>>> 
>>> What kind of stateful operations are you using? Depending on your use case,
>>> you have to manually call `clear()` on the state instance in order to
>>> release the managed state.
>>> 
>>> Best,
>>> 
>>> Ufuk
>>> 
>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru >> > wrote:
>>> 
>>> 
>>> 
>>> Begin forwarded message:
>>> 
>>> From: ebru >> >
>>> Subject: Re: Flink memory leak
>>> Date: 7 November 2017 at 14:09:17 GMT+3
>>> To: Ufuk Celebi >
>>> 
>>> Hi Ufuk,
>>> 
>>> There are there snapshots of htop output.
>>> 1. snapshot is initial state.
>>> 2. snapshot is after submitted one job.
>>> 3. Snapshot is the output of the one job with 15000 EPS. And the memory
>>> usage is always increasing over time.
>>> 
>>> 
>>> 
>>> 
>>> <1.png><2.png><3.png>
>>> 
>>> On 7 Nov 2017, at 13:34, Ufuk Celebi >> > wrote:
>>> 
>>> Hey Ebru,
>>> 
>>> let me pull in Aljoscha (CC'd) who might have an idea what's causing this.
>>> 
>>> Since multiple jobs are running, it will be hard to understand to
>>> which job the state descriptors from the heap snapshot belong to.
>>> - Is it possible to isolate the problem and reproduce the behaviour
>>> with only a single job?
>>> 
>>> – Ufuk
>>> 
>>> 
>>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>> 

Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes

2017-11-06 Thread Piotr Nowojski
Till, is there somewhere a list of ports that need to exposed that’s more up to 
date compared to docker-flunk README?

Piotrek

> On 3 Nov 2017, at 10:23, Vergilio, Thalita 
> <t.vergilio4...@student.leedsbeckett.ac.uk> wrote:
> 
> Just an update: by changing the JOB_MANAGER_RPC_ADDRESS to the public IP of 
> the JobManager and exposing port 6123 as {{PUBLIC_IP}}:6123:6123, I manged to 
> get the TaskManagers from different nodes and even different subnets to talk 
> to the JobManager.
> 
> This is how I created the services:
> 
> docker network create -d overlay overlay
> 
> docker service create --name jobmanager --env 
> JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 8081:8081 -p{{PUBLIC_IP}}:6123:6123 
> -p 48081:48081 -p 6124:6124 -p 6125:6125 --network overlay --constraint 
> 'node.hostname == ubuntu-swarm-manager' flink jobmanager
> 
> docker service create --name taskmanager --env 
> JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 6121:6121 -p 6122:6122  --network 
> overlay --constraint 'node.hostname != ubuntu-swarm-manager' flink taskmanager
> 
> However, I am still encountering errors further down the line. When I submit 
> a job using the Web UI, it fails because the JobManager can't talk to the 
> TaskManager on port 35033. I presume this is the taskmanager.data.port, which 
> needs to be set to a range and this range exposed when I create the service?
> 
> Are there any other ports that I need to open at service creation time?
> 
> Connecting the channel failed: Connecting to remote task manager + 
> '/{{IP_ADDRESS_OF_MANAGER}}:35033' has failed. This might indicate that the 
> remote task manager has been lost.
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
>   at 
> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:112)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:433)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:455)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:91)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> 
> 
> From: Piotr Nowojski <pi...@data-artisans.com>
> Sent: 02 November 2017 14:26:32
> To: Vergilio, Thalita
> Cc: user@flink.apache.org
> Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if 
> they are on different nodes
>  
> Did you try to expose required ports that are listed in the README when 
> starting the containers?
> 
> https://github.com/apache/flink/tree/master/flink-contrib/docker-flink 
> <https://github.com/apache/flink/tree/master/flink-contrib/docker-flink>
> Ports:
> • The Web Client is on port 48081
> • JobManager RPC port 6123 (default, not exposed to host)
> • TaskManagers RPC port 6122 (default, not exposed to host)
> • TaskManagers Data port 6121 (default, not exposed to host)
> 
> Piotrek
> 
>> On 2 Nov 2017, at 14:44, javalass <t.vergilio4...@student.leedsbeckett.ac.uk 
>> <mailto:t.vergilio4...@student.leedsbeckett.ac.uk>> wrote:
>> 
>> I am using the Docker-Flink project in:
>> https://github.com/apache/flink/tree/master/flink-contrib/docker-flink 
>> <https://github.com/apache/flink/tree/master/flink-contrib/docker-flink> 
>> 
>> I am creating the services with the following commands:
>> docker network create -d overlay overlay
>> docker service create --name jobmanager --env
>> JOB_MANAGER_RPC_ADDRESS=jobmanager -p 8081:8081 --network overlay
>> --constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager
>> docker service create --name taskmanager --env
>

Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes

2017-11-02 Thread Piotr Nowojski
Did you try to expose required ports that are listed in the README when 
starting the containers?

https://github.com/apache/flink/tree/master/flink-contrib/docker-flink 

Ports:
• The Web Client is on port 48081
• JobManager RPC port 6123 (default, not exposed to host)
• TaskManagers RPC port 6122 (default, not exposed to host)
• TaskManagers Data port 6121 (default, not exposed to host)

Piotrek

> On 2 Nov 2017, at 14:44, javalass  
> wrote:
> 
> I am using the Docker-Flink project in:
> https://github.com/apache/flink/tree/master/flink-contrib/docker-flink 
> 
> I am creating the services with the following commands:
> docker network create -d overlay overlay
> docker service create --name jobmanager --env
> JOB_MANAGER_RPC_ADDRESS=jobmanager -p 8081:8081 --network overlay
> --constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager
> docker service create --name taskmanager --env
> JOB_MANAGER_RPC_ADDRESS=jobmanager --network overlay --constraint
> 'node.hostname != ubuntu-swarm-manager' flink taskmanager
> 
> I wonder if there's any configuration I'm missing. This is the error I get:
> - Trying to register at JobManager akka.tcp://flink@jobmanager:6123/  
> user/jobmanager (attempt 4, timeout: 4000 milliseconds)
> 
> 
> 
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Flink flick cancel vs stop

2017-10-24 Thread Piotr Nowojski
I would propose implementations of NewSource to be not blocking/asynchronous. 
For example something like

public abstract Future getCurrent();

Which would allow us to perform some certain actions while there are no data 
available to process (for example flush output buffers). Something like this 
came up recently when we were discussing possible future changes in the network 
stack. It wouldn’t complicate API by a lot, since default implementation could 
just:

public Future getCurrent() {
  return completedFuture(getCurrentBlocking());
}

Another thing to consider is maybe we would like to leave the door open for 
fetching records in some batches from the source’s input buffers? Source 
function (like Kafka) have some internal buffers and it would be more efficient 
to read all/deserialise all data present in the input buffer at once, instead 
of paying synchronisation/calling virtual method/etc costs once per each record.

Piotrek

> On 22 Sep 2017, at 11:13, Aljoscha Krettek  wrote:
> 
> @Eron Yes, that would be the difference in characterisation. I think 
> technically all sources could be transformed by that by pushing data into a 
> (blocking) queue and having the "getElement()" method pull from that.
> 
>> On 15. Sep 2017, at 20:17, Elias Levy > > wrote:
>> 
>> On Fri, Sep 15, 2017 at 10:02 AM, Eron Wright > > wrote:
>> Aljoscha, would it be correct to characterize your idea as a 'pull' source 
>> rather than the current 'push'?  It would be interesting to look at the 
>> existing connectors to see how hard it would be to reverse their 
>> orientation.   e.g. the source might require a buffer pool.
>> 
>> The Kafka client works that way.  As does the QueueingConsumer used by the 
>> RabbitMQ source.  The Kinesis and NiFi sources also seems to poll. Those are 
>> all the bundled sources.
> 



Re: HBase config settings go missing within Yarn.

2017-10-23 Thread Piotr Nowojski
Till do you have some idea what is going on? I do not see any meaningful 
difference between Niels code and HBaseWriteStreamExample.java. There is also a 
very similar issue on mailing list as well: “Flink can't read hdfs namenode 
logical url” 

Piotrek

> On 22 Oct 2017, at 12:56, Niels Basjes <ni...@basjes.nl> wrote:
> 
> HI,
> 
> Yes, on all nodes the the same /etc/hbase/conf/hbase-site.xml that contains 
> the correct settings for hbase to find zookeeper.
> That is why adding that files as an additional resource to the configuration 
> works.
> I have created a very simple project that reproduces the problem on my setup:
> https://github.com/nielsbasjes/FlinkHBaseConnectProblem 
> <https://github.com/nielsbasjes/FlinkHBaseConnectProblem>
> 
> Niels Basjes
> 
> 
> On Fri, Oct 20, 2017 at 6:54 PM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Is this /etc/hbase/conf/hbase-site.xml file is present on all of the 
> machines? If yes, could you share your code?
> 
>> On 20 Oct 2017, at 16:29, Niels Basjes <ni...@basjes.nl 
>> <mailto:ni...@basjes.nl>> wrote:
>> 
>> I look at the logfiles from the Hadoop Yarn webinterface. I.e. actually 
>> looking in the jobmanager.log of the container running the Flink task.
>> That is where I was able to find these messages .
>> 
>> I do the
>>  hbaseConfig.addResource(new Path("file:/etc/hbase/conf/hbase-site.xml"));
>> in all places directly after the  HBaseConfiguration.create();
>> That way I simply force the task to look on the actual Hadoop node for the 
>> same file it already loaded locally.
>> 
>> The reason I'm suspecting Flink is because the clientside part of the Flink 
>> application does have the right setting and the task/job actually running in 
>> the cluster does not have the same settings.
>> So it seems in the transition into the cluster the application does not copy 
>> everything it has available locally for some reason.
>> 
>> There is a very high probability I did something wrong, I'm just not seeing 
>> it at this moment.
>> 
>> Niels
>> 
>> 
>> 
>> On Fri, Oct 20, 2017 at 2:53 PM, Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>> wrote:
>> Hi,
>> 
>> What do you mean by saying:
>> 
>>> When I open the logfiles on the Hadoop cluster I see this:
>> 
>> 
>> The error doesn’t come from Flink? Where do you execute 
>> 
>> hbaseConfig.addResource(new Path("file:/etc/hbase/conf/hbase-site.xml"));
>> 
>> ?
>> 
>> To me it seems like it is a problem with misconfigured HBase and not 
>> something related to Flink.
>> 
>> Piotrek
>> 
>>> On 20 Oct 2017, at 13:44, Niels Basjes <ni...@basjes.nl 
>>> <mailto:ni...@basjes.nl>> wrote:
>>> 
>>> To facilitate you guys helping me I put this test project on github:
>>> https://github.com/nielsbasjes/FlinkHBaseConnectProblem 
>>> <https://github.com/nielsbasjes/FlinkHBaseConnectProblem>
>>> 
>>> Niels Basjes
>>> 
>>> On Fri, Oct 20, 2017 at 1:32 PM, Niels Basjes <ni...@basjes.nl 
>>> <mailto:ni...@basjes.nl>> wrote:
>>> Hi,
>>> 
>>> Ik have a Flink 1.3.2 application that I want to run on a Hadoop yarn 
>>> cluster where I need to connect to HBase.
>>> 
>>> What I have:
>>> 
>>> In my environment:
>>> HADOOP_CONF_DIR=/etc/hadoop/conf/
>>> HBASE_CONF_DIR=/etc/hbase/conf/
>>> HIVE_CONF_DIR=/etc/hive/conf/
>>> YARN_CONF_DIR=/etc/hadoop/conf/
>>> 
>>> In /etc/hbase/conf/hbase-site.xml I have correctly defined the zookeeper 
>>> hosts for HBase.
>>> 
>>> My test code is this:
>>> public class Main {
>>>   private static final Logger LOG = LoggerFactory.getLogger(Main.class);
>>> 
>>>   public static void main(String[] args) throws Exception {
>>> printZookeeperConfig();
>>> final StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
>>> env.createInput(new HBaseSource()).print();
>>> env.execute("HBase config problem");
>>>   }
>>> 
>>>   public static void printZookeeperConfig() {
>>> String zookeeper = 
>>> HBaseConfiguration.create().get("hbase.zookeeper.quorum");
>>> LOG.info("> Loading HBaseConfiguration: Zookeeper = {}", zoo

Re: SLF4j logging system gets clobbered?

2017-10-23 Thread Piotr Nowojski
Till could you take a look at this?

Piotrek

> On 18 Oct 2017, at 20:32, Jared Stehler  
> wrote:
> 
> I’m having an issue where I’ve got logging setup and functioning for my 
> flink-mesos deployment, and works fine up to a point (the same point every 
> time) where it seems to fall back to “defaults” and loses all of my 
> configured filtering.
> 
> 2017-10-11 21:37:17.454 [flink-akka.actor.default-dispatcher-17] INFO  
> o.a.f.m.runtime.clusterframework.MesosFlinkResourceManager  - TaskManager 
> taskmanager-8 has started.
> 2017-10-11 21:37:17.454 [flink-akka.actor.default-dispatcher-16] INFO  
> org.apache.flink.runtime.instance.InstanceManager  - Registered TaskManager 
> at ip-10-80-54-201 
> (akka.tcp://fl...@ip-10-80-54-201.us-west-2.compute.internal:31014/user/taskmanager
>  
> )
>  as 697add78bd00fe7dc6a7aa60bc8d75fb. Current number of registered hosts is 
> 39. Current number of alive task slots is 39.
> 2017-10-11 21:37:18.820 [flink-akka.actor.default-dispatcher-17] INFO  
> org.apache.flink.runtime.instance.InstanceManager  - Registered TaskManager 
> at ip-10-80-54-201 
> (akka.tcp://fl...@ip-10-80-54-201.us-west-2.compute.internal:31018/user/taskmanager
>  
> )
>  as a6cff0f18d71aabfb3b112f5e2c36c2b. Current number of registered hosts is 
> 40. Current number of alive task slots is 40.
> 2017-10-11 21:37:18.821 [flink-akka.actor.default-dispatcher-17] INFO  
> o.a.f.m.runtime.clusterframework.MesosFlinkResourceManager  - TaskManager 
> taskmanager-00010 has started.
> 2017-10-11 
> 21:39:04,371:6171(0x7f67fe9cd700):ZOO_WARN@zookeeper_interest@1570: Exceeded 
> deadline by 13ms
> 
> — here is where it turns over into default pattern layout ---
> 21:39:05.616 [nioEventLoopGroup-5-6] INFO  o.a.flink.runtime.blob.BlobClient 
> - Blob client connecting to akka://flink/user/jobmanager 
> 
> 
> 21:39:09.322 [nioEventLoopGroup-5-6] INFO  o.a.flink.runtime.client.JobClient 
> - Checking and uploading JAR files
> 21:39:09.322 [nioEventLoopGroup-5-6] INFO  o.a.flink.runtime.blob.BlobClient 
> - Blob client connecting to akka://flink/user/jobmanager 
> 
> 21:39:09.788 [flink-akka.actor.default-dispatcher-4] INFO  
> o.a.f.m.r.c.MesosJobManager - Submitting job 005b570ff2866023aa905f2bc850f7a3 
> (Sa-As-2b-Submission-Join-V3 := demos-demo500--data-canvas-2-sa-qs-as-v3).
> 21:39:09.789 [flink-akka.actor.default-dispatcher-4] INFO  
> o.a.f.m.r.c.MesosJobManager - Using restart strategy 
> FailureRateRestartStrategy(failuresInterval=12 msdelayInterval=1000 
> msmaxFailuresPerInterval=3) for 005b570ff2866023aa905f2bc850f7a3.
> 21:39:09.789 [flink-akka.actor.default-dispatcher-4] INFO  
> o.a.f.r.e.ExecutionGraph - Job recovers via failover strategy: full graph 
> restart
> 21:39:09.790 [flink-akka.actor.default-dispatcher-4] INFO  
> o.a.f.m.r.c.MesosJobManager - Running initialization on master for job 
> Sa-As-2b-Submission-Join-V3 := demos-demo500--data-canvas-2-sa-qs-as-v3 
> (005b570ff2866023aa905f2bc850f7a3).
> 21:39:09.790 [flink-akka.actor.default-dispatcher-4] INFO  
> o.a.f.m.r.c.MesosJobManager - Successfully ran initialization on master in 0 
> ms.
> 21:39:09.791 [flink-akka.actor.default-dispatcher-4] WARN  
> o.a.f.configuration.Configuration - Config uses deprecated configuration key 
> 'high-availability.zookeeper.storageDir' instead of proper key 
> 'high-availability.storageDir'
> 21:39:09.791 [flink-akka.actor.default-dispatcher-4] INFO  
> o.a.f.c.GlobalConfiguration - Loading configuration property: 
> mesos.failover-timeout, 60
> 21:39:09.791 [flink-akka.actor.default-dispatcher-4] INFO  
> o.a.f.c.GlobalConfiguration - Loading configuration property: 
> mesos.initial-tasks, 1
> 21:39:09.791 [flink-akka.actor.default-dispatcher-4] INFO  
> o.a.f.c.GlobalConfiguration - Loading configuration property: 
> mesos.maximum-failed-tasks, -1
> 21:39:09.791 [flink-akka.actor.default-dispatcher-4] INFO  
> o.a.f.c.GlobalConfiguration - Loading configuration property: 
> mesos.resourcemanager.framework.role, '*'
> 
> The reason this is a vexing issue is that the app master then proceeds to 
> dump megabytes of " o.a.f.c.GlobalConfiguration - Loading configuration 
> property:” messages into the log, and I’m unable to filter them out.
> 
> My logback config is:
> 
> 
> 
> 
> 
> %d{-MM-dd HH:mm:ss.SSS} [%thread] %-5level 
> %logger{60} %X{sourceThread} - %msg%n
> 
> 
> 
> 
> 
> ERROR
> 
> 
> 
>  level="OFF" />
> 
>  name="org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler" 
> level="OFF" />
>  name="org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase"
>  level="OFF" />
> 
>  level="WARN" />
>  level="WARN" />
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> --
> Jared Stehler
> Chief Architect - Intellify Learning
> o: 617.701.6330 x703
> 
> 
> 



Re: flink can't read hdfs namenode logical url

2017-10-23 Thread Piotr Nowojski
Hi,

Why in this new message there is a different host? Previously code was trying 
to connect to “master:8020” and now it is “startdt”? If you were able to change 
this host somehow between runs, I guess you should be also able to set it to 
correct one.

Piotrek

> On 23 Oct 2017, at 09:11, 邓俊华 <dengjun...@startdt.com> wrote:
> 
> Hi,
> 
> Thanks for your replay! I have been block in this for several days.
> And I have double checked that there are 
> hdfs-site.xml,core-site.xml,yarn-site.xml  in YARN_CONF_DIR. But it is still 
> can't read hdfs namenode logical url.
> 
> 2017-10-23 14:35:17,750 DEBUG 
> org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils - Cannot find 
> hdfs-default configuration file
> 2017-10-23 14:35:17,750 DEBUG 
> org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils - Cannot find 
> hdfs-site configuration file
> 2017-10-23 14:35:17,751 DEBUG 
> org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils - Adding 
> /data/hadoop-2.7.3/etc/hadoop/core-site.xml to hadoop configuration
> 2017-10-23 14:35:17,751 DEBUG 
> org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils - Adding 
> /data/hadoop-2.7.3/etc/hadoop/hdfs-site.xml to hadoop configuration
> 
> 2017-10-23 14:35:19,887 DEBUG org.apache.hadoop.hdfs.BlockReaderLocal 
>   - dfs.domain.socket.path = 
> 2017-10-23 14:35:19,952 ERROR 
> org.apache.flink.yarn.YarnApplicationMasterRunner - YARN 
> Application Master initialization failed
> java.lang.IllegalArgumentException: java.net.UnknownHostException: startdt
>   at 
> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
>   at 
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
>   at 
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
>   at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:678)
>   at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:619)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
> --
> 发件人:Piotr Nowojski <pi...@data-artisans.com>
> 发送时间:2017年10月20日(星期五) 21:39
> 收件人:邓俊华 <dengjun...@startdt.com>
> 抄 送:user <user@flink.apache.org>
> 主 题:Re: flink can't read hdfs namenode logical url
> 
> Hi,
> 
> Please double check the content of config files in YARN_CONF_DIR and 
> HADOOP_CONF_DIR (the first one has a priority over the latter one) and that 
> they are pointing to correct files.
> 
> Also check logs (WARN and INFO) for any relevant entries.
> 
> Piotrek
> 
> On 20 Oct 2017, at 06:07, 邓俊华 <dengjun...@startdt.com 
> <mailto:dengjun...@startdt.com>> wrote:
> 
> hi,
> 
> I start yarn-ssession.sh on yarn, but it can't read hdfs logical url. It 
> always connect to hdfs://master:8020 , it should be 9000, 
> my hdfs defaultfs is hdfs://master .
> I have config the YARN_CONF_DIR and HADOOP_CONF_DIR, it didn't work.
> Is it a bug? i use flink-1.3.0-bin-hadoop27-scala_2.10
> 
> 2017-10-20 11:00:05,395 DEBUG org.apache.hadoop.ipc.Client
>   - IPC Client (1035144464) connection to 
> startdt/173.16.5.215:8020 from admin: closed2017-10-20 11:00:05,398 ERROR 
> org.apache.flink.yarn.YarnApplicationMasterRunner - YARN 
> Application Master initialization failedjava.net.ConnectException: Call From 
> spark3/173.16.5.216 to master:8020 failed on connection exception: 
> java.net.ConnectException: Connection refused; For more details see:  
> http://wiki.apache.org/hadoop/ConnectionRefused 
> <http://wiki.apache.org/hadoop/ConnectionRefused>at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>  at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  at java.lang.reflect.Constructor.newInstance(Constructor.java:526)  at 
> org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792)at 
> org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)  at 
> org.apache.hadoop.ipc.Client.call(Client.java:1479)  at 
> org.apache.hadoop.ipc.Client.call(Client.java:1412)  at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source)at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at 
> su

Re: HBase config settings go missing within Yarn.

2017-10-20 Thread Piotr Nowojski
Is this /etc/hbase/conf/hbase-site.xml file is present on all of the machines? 
If yes, could you share your code?

> On 20 Oct 2017, at 16:29, Niels Basjes <ni...@basjes.nl> wrote:
> 
> I look at the logfiles from the Hadoop Yarn webinterface. I.e. actually 
> looking in the jobmanager.log of the container running the Flink task.
> That is where I was able to find these messages .
> 
> I do the
>  hbaseConfig.addResource(new Path("file:/etc/hbase/conf/hbase-site.xml"));
> in all places directly after the  HBaseConfiguration.create();
> That way I simply force the task to look on the actual Hadoop node for the 
> same file it already loaded locally.
> 
> The reason I'm suspecting Flink is because the clientside part of the Flink 
> application does have the right setting and the task/job actually running in 
> the cluster does not have the same settings.
> So it seems in the transition into the cluster the application does not copy 
> everything it has available locally for some reason.
> 
> There is a very high probability I did something wrong, I'm just not seeing 
> it at this moment.
> 
> Niels
> 
> 
> 
> On Fri, Oct 20, 2017 at 2:53 PM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> What do you mean by saying:
> 
>> When I open the logfiles on the Hadoop cluster I see this:
> 
> 
> The error doesn’t come from Flink? Where do you execute 
> 
> hbaseConfig.addResource(new Path("file:/etc/hbase/conf/hbase-site.xml"));
> 
> ?
> 
> To me it seems like it is a problem with misconfigured HBase and not 
> something related to Flink.
> 
> Piotrek
> 
>> On 20 Oct 2017, at 13:44, Niels Basjes <ni...@basjes.nl 
>> <mailto:ni...@basjes.nl>> wrote:
>> 
>> To facilitate you guys helping me I put this test project on github:
>> https://github.com/nielsbasjes/FlinkHBaseConnectProblem 
>> <https://github.com/nielsbasjes/FlinkHBaseConnectProblem>
>> 
>> Niels Basjes
>> 
>> On Fri, Oct 20, 2017 at 1:32 PM, Niels Basjes <ni...@basjes.nl 
>> <mailto:ni...@basjes.nl>> wrote:
>> Hi,
>> 
>> Ik have a Flink 1.3.2 application that I want to run on a Hadoop yarn 
>> cluster where I need to connect to HBase.
>> 
>> What I have:
>> 
>> In my environment:
>> HADOOP_CONF_DIR=/etc/hadoop/conf/
>> HBASE_CONF_DIR=/etc/hbase/conf/
>> HIVE_CONF_DIR=/etc/hive/conf/
>> YARN_CONF_DIR=/etc/hadoop/conf/
>> 
>> In /etc/hbase/conf/hbase-site.xml I have correctly defined the zookeeper 
>> hosts for HBase.
>> 
>> My test code is this:
>> public class Main {
>>   private static final Logger LOG = LoggerFactory.getLogger(Main.class);
>> 
>>   public static void main(String[] args) throws Exception {
>> printZookeeperConfig();
>> final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
>> env.createInput(new HBaseSource()).print();
>> env.execute("HBase config problem");
>>   }
>> 
>>   public static void printZookeeperConfig() {
>> String zookeeper = 
>> HBaseConfiguration.create().get("hbase.zookeeper.quorum");
>> LOG.info("> Loading HBaseConfiguration: Zookeeper = {}", zookeeper);
>>   }
>> 
>>   public static class HBaseSource extends AbstractTableInputFormat {
>> @Override
>> public void configure(org.apache.flink.configuration.Configuration 
>> parameters) {
>>   table = createTable();
>>   if (table != null) {
>> scan = getScanner();
>>   }
>> }
>> 
>> private HTable createTable() {
>>   LOG.info("Initializing HBaseConfiguration");
>>   // Uses files found in the classpath
>>   org.apache.hadoop.conf.Configuration hConf = 
>> HBaseConfiguration.create();
>>   printZookeeperConfig();
>> 
>>   try {
>> return new HTable(hConf, getTableName());
>>   } catch (Exception e) {
>> LOG.error("Error instantiating a new HTable instance", e);
>>   }
>>   return null;
>> }
>> 
>> @Override
>> public String getTableName() {
>>   return "bugs:flink";
>> }
>> 
>> @Override
>> protected String mapResultToOutType(Result result) {
>>   return new 
>> String(result.getFamilyMap("v".getBytes(UTF_8)).get("column".getBytes(UTF_8)));
>> }
>> 
>> 

Re:

2017-10-20 Thread Piotr Nowojski
Hi,

Only batch API is using managed memory. If you are using streaming API, you can 
do two things:
- estimate max cache size based on for example fraction of max heap size 
- use WeakReference to implement your cache

In batch API, you could estimate max cache size based on:
- fraction of (heapSize - managedMemorySize)

managedMemorySize you can obtain from for example: 

getContainingTask().getEnvironment().getMemoryManager().getMemorySize();

But keep in mind, that any heap memory allocations in your code will contest 
for the same memory and as mentioned before, except of batch API, memory 
allocations are not accounted anywhere (because it difficult to calculate 
memory usage of a operator :( )

Piotrek

> On 20 Oct 2017, at 06:04, Navneeth Krishnan  wrote:
> 
> Hello All,
> 
> I have an in-memory cache created inside a user function and I need to assign 
> the max capacity for it. Since the program can be run on any hardware, I'm 
> thinking if I cloud assign based on flink's allocated managed memory. 
> 
> Is there a way to get the flink managed memory size inside a user function? 
> If not are there any other options?
> 
> Thanks,
> Navneeth



Re: flink can't read hdfs namenode logical url

2017-10-20 Thread Piotr Nowojski
Hi,

Please double check the content of config files in YARN_CONF_DIR and 
HADOOP_CONF_DIR (the first one has a priority over the latter one) and that 
they are pointing to correct files.

Also check logs (WARN and INFO) for any relevant entries.

Piotrek

> On 20 Oct 2017, at 06:07, 邓俊华  wrote:
> 
> hi,
> 
> I start yarn-ssession.sh on yarn, but it can't read hdfs logical url. It 
> always connect to hdfs://master:8020, it should be 9000, my hdfs defaultfs is 
> hdfs://master.
> I have config the YARN_CONF_DIR and HADOOP_CONF_DIR, it didn't work.
> Is it a bug? i use flink-1.3.0-bin-hadoop27-scala_2.10
> 
> 2017-10-20 11:00:05,395 DEBUG org.apache.hadoop.ipc.Client
>   - IPC Client (1035144464) connection to 
> startdt/173.16.5.215:8020 from admin: closed
> 2017-10-20 11:00:05,398 ERROR 
> org.apache.flink.yarn.YarnApplicationMasterRunner - YARN 
> Application Master initialization failed
> java.net.ConnectException: Call From spark3/173.16.5.216 to master:8020 
> failed on connection exception: java.net.ConnectException: Connection 
> refused; For more details see:  
> http://wiki.apache.org/hadoop/ConnectionRefused
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>   at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792)
>   at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1479)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>   at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>   at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source)
>   at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
>   at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> 
> 



Re: java.lang.NoSuchMethodError and dependencies problem

2017-10-20 Thread Piotr Nowojski
That’s good to hear :)

Unfortunately at this moment dependencies can pollute class path in both ways 
(Flink’s can mess with user’s application and also the other way around).

Cheers, Piotrek

> On 20 Oct 2017, at 15:11, r. r. <rob...@abv.bg> wrote:
> 
> By Ali Baba's beard and his forty bandits, Piotrek, this worked!
> My understanding was that I have to prevent Flink from loading the older 
> compress.jar and force the newer one.
> One I shade-relocated org.apache.commons.compress for my project the problem 
> went away
> 
> Many thanks!
> 
> 
> 
> 
> 
> 
>> ---- Оригинално писмо 
> 
>> От: Piotr Nowojski pi...@data-artisans.com
> 
>> Относно: Re: java.lang.NoSuchMethodError and dependencies problem
> 
>> До: "r. r." <rob...@abv.bg>
> 
>> Изпратено на: 20.10.2017 14:46
> 
> 
> 
> 
>> But you said
> 
>> 
> 
>>> this seems to work as mvn dependency:tree -Ddetail=true only shows 1.14
> 
>> 
> 
>> To avoid this error that you describe I think that you have to ensure, that 
>> no 1.14 commons-compress comes from your application, because it can 
>> conflict with 1.4.1 used by flink cluster.
> 
>> 
> 
>> By shading I meant that you could shade/relocate 1.14 usages in your 
>> application, so that they don’t collide with Flink’s 1.4.1.
> 
>> 
> 
>> Piotrek
> 
>> 
> 
>>> On 19 Oct 2017, at 19:58, r. r. <rob...@abv.bg> wrote:
> 
>>> 
> 
>>> Thanks, Piotr
> 
>>> but my app code is self-contained in a fat-jar with maven-shade, so why 
>>> would the class path affect this?
> 
>>> 
> 
>>> by shade commons-compress do you mean :
> 
>>> 
> 
>>> it doesn't have effect either
> 
>>> 
> 
>>> as a last resort i may try to rebuild Flink to use 1.14, but don't want to 
>>> go there yet =/
> 
>>> 
> 
>>> 
> 
>>> Best regards
> 
>>> 
> 
>>> 
> 
>>> 
> 
>>> 
> 
>>> 
> 
>>> 
> 
>>>>  Оригинално писмо 
> 
>>> 
> 
>>>> От: Piotr Nowojski pi...@data-artisans.com
> 
>>> 
> 
>>>> Относно: Re: java.lang.NoSuchMethodError and dependencies problem
> 
>>> 
> 
>>>> До: "r. r." <rob...@abv.bg>
> 
>>> 
> 
>>>> Изпратено на: 19.10.2017 20:04
> 
>>> 
> 
>>> 
> 
>>> 
> 
>>> 
> 
>>>> I’m not 100% sure, so treat my answer with a grain of salt.
> 
>>> 
> 
>>>> 
> 
>>> 
> 
>>>> I think when you start the cluster this way, dependencies (some? all?) are 
>>>> being loaded to the class path before loading user’s application. At that 
>>>> point, it doesn’t matter whether you have excluded commons-compress 1.4.1 
>>>> in yours application pom.xml. I’m not sure if this is solvable in some 
>>>> way, or not.
> 
>>> 
> 
>>>> 
> 
>>> 
> 
>>>> Maybe as a walk around, you could shade commons-compress usages in your 
>>>> pom.xml?
> 
>>> 
> 
>>>> 
> 
>>> 
> 
>>>> Piotr Nowojski
> 
>>> 
> 
>>>> 
> 
>>> 
> 
>>>>> On 19 Oct 2017, at 17:36, r. r. <rob...@abv.bg> wrote:
> 
>>> 
> 
>>>>> 
> 
>>> 
> 
>>>>> flink is started with bin/start-local.sh
> 
>>> 
> 
>>>>> 
> 
>>> 
> 
>>>>> there is no classpath variable in the environment; 
>>>>> flink/lib/flink-dist_2.11-1.3.2.jar contains commons-compress, still it 
>>>>> should be overridden by the dependencyManagement directive
> 
>>> 
> 
>>>>> 
> 
>>> 
> 
>>>>> here is the stacktrace:
> 
>>> 
> 
>>>>> 
> 
>>> 
> 
>>>>> The program finished with the following exception:
> 
>>> 
> 
>>>>> 
> 
>>> 
> 
>>>>> org.apache.flink.client.program.ProgramInvocationException: The program 
>>>>> execution failed: Job execution failed.
> 
>>> 
> 
>>>>>   at 
>>>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
> 
>>> 
> 
>>>>>   at 
>>>>> org.apache.flink.client.program.StandaloneCl

Re: Does heap memory gets released after computing windows?

2017-10-20 Thread Piotr Nowojski
Hi,

Memory used by session windows should be released once window is triggered 
(allowedLateness can prolong window’s life). Unless your code introduces some 
memory leak (by not releasing references) everything should be garbage 
collected.

Keep in mind that session windows with time gap of 10 minute, doesn’t mean that 
you will have a constant/fixed memory usage. As the times goes by and you 
process more and more elements your memory usage can increase indefinitely as 
events with new keys arrive and if older windows are still being kept alive (by 
arrival of events with existing keys).

Also check what you are doing in your reduce function. If you collect incoming 
events on some collection then your memory footprint also can grow over time.

Piotrek

> On 20 Oct 2017, at 12:55, Rahul Raj  wrote:
> 
> Hi All,
> 
> I have been facing "GC Overhead limit exceeded" exception for quite a time 
> with my flink program. In my flink program, I have created a session window 
> with time gap of 10 minutes and I am performing some custom reduce operation 
> using Window function. The current statebackend is File System. The program 
> works fine for 15-20 mins and then fails with "GC overhead limited exception" 
> . My understanding of flink is that, the task manager's memory will be 
> released as soon as operation under window function will be completed. But 
> then why I am getting this error? What should I do to avoid this?
> 
> Rahul Raj



Re: HBase config settings go missing within Yarn.

2017-10-20 Thread Piotr Nowojski
Hi,

What do you mean by saying:

> When I open the logfiles on the Hadoop cluster I see this:


The error doesn’t come from Flink? Where do you execute 

hbaseConfig.addResource(new Path("file:/etc/hbase/conf/hbase-site.xml"));

?

To me it seems like it is a problem with misconfigured HBase and not something 
related to Flink.

Piotrek

> On 20 Oct 2017, at 13:44, Niels Basjes  wrote:
> 
> To facilitate you guys helping me I put this test project on github:
> https://github.com/nielsbasjes/FlinkHBaseConnectProblem 
> 
> 
> Niels Basjes
> 
> On Fri, Oct 20, 2017 at 1:32 PM, Niels Basjes  > wrote:
> Hi,
> 
> Ik have a Flink 1.3.2 application that I want to run on a Hadoop yarn cluster 
> where I need to connect to HBase.
> 
> What I have:
> 
> In my environment:
> HADOOP_CONF_DIR=/etc/hadoop/conf/
> HBASE_CONF_DIR=/etc/hbase/conf/
> HIVE_CONF_DIR=/etc/hive/conf/
> YARN_CONF_DIR=/etc/hadoop/conf/
> 
> In /etc/hbase/conf/hbase-site.xml I have correctly defined the zookeeper 
> hosts for HBase.
> 
> My test code is this:
> public class Main {
>   private static final Logger LOG = LoggerFactory.getLogger(Main.class);
> 
>   public static void main(String[] args) throws Exception {
> printZookeeperConfig();
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
> env.createInput(new HBaseSource()).print();
> env.execute("HBase config problem");
>   }
> 
>   public static void printZookeeperConfig() {
> String zookeeper = 
> HBaseConfiguration.create().get("hbase.zookeeper.quorum");
> LOG.info("> Loading HBaseConfiguration: Zookeeper = {}", zookeeper);
>   }
> 
>   public static class HBaseSource extends AbstractTableInputFormat {
> @Override
> public void configure(org.apache.flink.configuration.Configuration 
> parameters) {
>   table = createTable();
>   if (table != null) {
> scan = getScanner();
>   }
> }
> 
> private HTable createTable() {
>   LOG.info("Initializing HBaseConfiguration");
>   // Uses files found in the classpath
>   org.apache.hadoop.conf.Configuration hConf = 
> HBaseConfiguration.create();
>   printZookeeperConfig();
> 
>   try {
> return new HTable(hConf, getTableName());
>   } catch (Exception e) {
> LOG.error("Error instantiating a new HTable instance", e);
>   }
>   return null;
> }
> 
> @Override
> public String getTableName() {
>   return "bugs:flink";
> }
> 
> @Override
> protected String mapResultToOutType(Result result) {
>   return new 
> String(result.getFamilyMap("v".getBytes(UTF_8)).get("column".getBytes(UTF_8)));
> }
> 
> @Override
> protected Scan getScanner() {
>   return new Scan();
> }
>   }
> 
> }
> 
> I run this application with this command on my Yarn cluster (note: first 
> starting a yarn-cluster and then submitting the job yields the same result).
> 
> flink \
> run \
> -m yarn-cluster \
> --yarncontainer 1 \
> --yarnname "Flink on Yarn HBase problem" \
> --yarnslots 1 \
> --yarnjobManagerMemory  4000  \
> --yarntaskManagerMemory 4000  \
> --yarnstreaming   \
> target/flink-hbase-connect-1.0-SNAPSHOT.jar
> 
> Now in the client side logfile 
> /usr/local/flink-1.3.2/log/flink--client-80d2d21b10e0.log I see 
> 1) Classpath actually contains /etc/hbase/conf/ both near the start and at 
> the end.
> 2) The zookeeper settings of my experimental environent have been picked up 
> by the software
> 2017-10-20 11:17:23,973 INFO  com.bol.bugreports.Main 
>   - > Loading HBaseConfiguration: Zookeeper = 
> node1.kluster.local.nl.bol.com:2181 
> ,node2.kluster.local.nl.bol.com:2181
>  
> ,node3.kluster.local.nl.bol.com:2181
>  
> 
> When I open the logfiles on the Hadoop cluster I see this:
> 
> 2017-10-20 13:17:33,250 INFO  com.bol.bugreports.Main 
>   - > Loading HBaseConfiguration: Zookeeper = localhost
> 
> and as a consequence
> 
> 2017-10-20 13:17:33,368 INFO  org.apache.zookeeper.ClientCnxn 
>   - Opening socket connection to server 
> localhost.localdomain/127.0.0.1:2181 
> 2017-10-20 13:17:33,369 WARN  org.apache.zookeeper.ClientCnxn 
>   - Session 0x0 for server null, unexpected error, closing socket 
> connection and attempting reconnect
> java.net.ConnectException: Connection refused
>   at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>   at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>   at 

Re: Flink BucketingSink, subscribe on moving of file into final state

2017-10-20 Thread Piotr Nowojski
You’re welcome. Unfortunately I am not aware about a such use case before 

Piotrek

> On 20 Oct 2017, at 13:47, Rinat <r.shari...@cleverdata.ru> wrote:
> 
> Piotrek, thanks for your reply.
> 
> Yes, now I’m looking for the most suitable way to extend BucketingSink 
> functionality, to handle moments of moving the file into final state.
> I thought, that maybe someone has already implemented such thing or knows any 
> other approaches that will help me to not copy/ paste existing sink impl ))
> 
> Thx !
> 
> 
>> On 20 Oct 2017, at 14:37, Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>> wrote:
>> 
>> Piotrek
> 



Re: java.lang.NoSuchMethodError and dependencies problem

2017-10-20 Thread Piotr Nowojski
But you said

> this seems to work as mvn dependency:tree -Ddetail=true only shows 1.14

To avoid this error that you describe I think that you have to ensure, that no 
1.14 commons-compress comes from your application, because it can conflict with 
1.4.1 used by flink cluster.

By shading I meant that you could shade/relocate 1.14 usages in your 
application, so that they don’t collide with Flink’s 1.4.1.

Piotrek

> On 19 Oct 2017, at 19:58, r. r. <rob...@abv.bg> wrote:
> 
> Thanks, Piotr
> but my app code is self-contained in a fat-jar with maven-shade, so why would 
> the class path affect this?
> 
> by shade commons-compress do you mean :
> 
> it doesn't have effect either
> 
> as a last resort i may try to rebuild Flink to use 1.14, but don't want to go 
> there yet =/
> 
> 
> Best regards
> 
> 
> 
> 
> 
> 
>>  Оригинално писмо 
> 
>> От: Piotr Nowojski pi...@data-artisans.com
> 
>> Относно: Re: java.lang.NoSuchMethodError and dependencies problem
> 
>> До: "r. r." <rob...@abv.bg>
> 
>> Изпратено на: 19.10.2017 20:04
> 
> 
> 
> 
>> I’m not 100% sure, so treat my answer with a grain of salt.
> 
>> 
> 
>> I think when you start the cluster this way, dependencies (some? all?) are 
>> being loaded to the class path before loading user’s application. At that 
>> point, it doesn’t matter whether you have excluded commons-compress 1.4.1 in 
>> yours application pom.xml. I’m not sure if this is solvable in some way, or 
>> not.
> 
>> 
> 
>> Maybe as a walk around, you could shade commons-compress usages in your 
>> pom.xml?
> 
>> 
> 
>> Piotr Nowojski
> 
>> 
> 
>>> On 19 Oct 2017, at 17:36, r. r. <rob...@abv.bg> wrote:
> 
>>> 
> 
>>> flink is started with bin/start-local.sh
> 
>>> 
> 
>>> there is no classpath variable in the environment; 
>>> flink/lib/flink-dist_2.11-1.3.2.jar contains commons-compress, still it 
>>> should be overridden by the dependencyManagement directive
> 
>>> 
> 
>>> here is the stacktrace:
> 
>>> 
> 
>>> The program finished with the following exception:
> 
>>> 
> 
>>> org.apache.flink.client.program.ProgramInvocationException: The program 
>>> execution failed: Job execution failed.
> 
>>>at 
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
> 
>>>at 
>>> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
> 
>>>at 
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
> 
>>>at 
>>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:73)
> 
>>>at com.foot.semantic.flink.PipelineJob.main(PipelineJob.java:73)
> 
>>>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 
>>>at 
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 
>>>at 
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 
>>>at java.lang.reflect.Method.invoke(Method.java:497)
> 
>>>at 
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
> 
>>>at 
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
> 
>>>at 
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:381)
> 
>>>at 
>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:838)
> 
>>>at org.apache.flink.client.CliFrontend.run(CliFrontend.java:259)
> 
>>>at 
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1086)
> 
>>>at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)
> 
>>>at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)
> 
>>>at 
>>> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
> 
>>>at java.security.AccessController.doPrivileged(Native Method)
> 
>>>at javax.security.auth.Subject.doAs(Subject.java:422)
> 
>>>at 
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
> 
>>>at 
>>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.

Re: java.lang.NoSuchMethodError and dependencies problem

2017-10-19 Thread Piotr Nowojski
I’m not 100% sure, so treat my answer with a grain of salt.

I think when you start the cluster this way, dependencies (some? all?) are 
being loaded to the class path before loading user’s application. At that 
point, it doesn’t matter whether you have excluded commons-compress 1.4.1 in 
yours application pom.xml. I’m not sure if this is solvable in some way, or not.

Maybe as a walk around, you could shade commons-compress usages in your pom.xml?

Piotr Nowojski

> On 19 Oct 2017, at 17:36, r. r. <rob...@abv.bg> wrote:
> 
> flink is started with bin/start-local.sh
> 
> there is no classpath variable in the environment; 
> flink/lib/flink-dist_2.11-1.3.2.jar contains commons-compress, still it 
> should be overridden by the dependencyManagement directive
> 
> here is the stacktrace:
> 
> The program finished with the following exception:
> 
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
> at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
> at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:73)
> at com.foot.semantic.flink.PipelineJob.main(PipelineJob.java:73)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:381)
> at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:838)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:259)
> at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1086)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1130)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:933)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:876)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:876)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.NoSuchMethodError: 
> org.apache.commons.compress.archivers.ArchiveStreamFactory.detect(Ljava/io/InputStream;)Ljava/lang/String;
> at 
> org.apache.tika.parser.pkg.ZipContainerDetector.detectArchiveFormat(ZipContainerDetector.java:114)
> at 
> org.apache.tika.parser.pkg.ZipContainerDetector.detect(ZipContainerDetector.java:85)
> at 
> org.apache.tika.detect.CompositeDetector.detect(CompositeDetector.java:77)
> at 
> org.apache.tika.parser.AutoDetectParser.parse(AutoDetectParser.java:115)

Re: Watermark on connected stream

2017-10-19 Thread Piotr Nowojski
With aitozi we have a hat trick oO
 
> On 19 Oct 2017, at 17:08, Tzu-Li (Gordon) Tai  wrote:
> 
> Ah, sorry for the duplicate answer, I didn’t see Piotr’s reply. Slight delay 
> on the mail client.
> 
> 
> On 19 October 2017 at 11:05:01 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org 
> ) wrote:
> 
>> Hi Kien,
>> 
>> The watermark of an operator with multiple inputs will be determined by the 
>> current minimum watermark across all inputs.
>> 
>> Cheers,
>> Gordon
>> 
>> 
>> On 19 October 2017 at 8:06:11 PM, Kien Truong (duckientru...@gmail.com 
>> ) wrote:
>> 
>>> Hi, 
>>> 
>>> If I connect two stream with different watermark, how are the watermark 
>>> of the resulting stream determined ? 
>>> 
>>> 
>>> Best regards, 
>>> 
>>> Kien



Re: Watermark on connected stream

2017-10-19 Thread Piotr Nowojski
Hi,

As you can see in 
org.apache.flink.streaming.api.operators.AbstractStreamOperator#processWatermark1
 it takes a minimum of both of the inputs.

Piotrek
 
> On 19 Oct 2017, at 14:06, Kien Truong  wrote:
> 
> Hi,
> 
> If I connect two stream with different watermark, how are the watermark of 
> the resulting stream determined ?
> 
> 
> Best regards,
> 
> Kien
> 



Re: java.lang.NoSuchMethodError and dependencies problem

2017-10-19 Thread Piotr Nowojski
Hi,

What is the full stack trace of the error?
Are you sure that there is no commons-compresss somewhere in the classpath 
(like in the lib directory)? How are you running your Flink cluster?

Piotrek

> On 19 Oct 2017, at 13:34, r. r.  wrote:
> 
> Hello
> I have a job that runs an Apache Tika pipeline and it fails with "Caused by: 
> java.lang.NoSuchMethodError: 
> org.apache.commons.compress.archivers.ArchiveStreamFactory.detect(Ljava/io/InputStream;)Ljava/lang/String;"
> 
> Flink includes commons-compress 1.4.1, while Tika needs 1.14. 
> I also have Apache Avro in the project with commons-compress at 1.8.1, so I 
> force 1.14 with 
> 
> 
> 
> 
> org.apache.commons
> commons-compress
> 1.14
> 
> 
> 
> 
> this seems to work as mvn dependency:tree -Ddetail=true only shows 1.14 and 
> after purge, the local maven repo also only contains 1.14
> 
> yet, after i deploy the job and it reads an Avro package from kafka and 
> passes it to Tika, it fails with the error above, which leads me to think it 
> somehow uses commons-compress at a version prior to 1.14, because method 
> 'detect' is not present in older versions
> 
> I excluded/included it from the fat-jar
> org.apache.commons:commons-compress
> still the same problem
> 
> thanks for any hints!
> 
> 



Re: Set heap size

2017-10-19 Thread Piotr Nowojski
Hi,

Just log into the machine and check it’s memory consumption using htop or a 
similar tool under the load. Remember about subtracting Flink’s memory usage 
and and file system cache.

Piotrek

> On 19 Oct 2017, at 10:15, AndreaKinn  wrote:
> 
> About task manager heap size Flink doc says:
> 
> ... If the cluster is exclusively running Flink, the total amount of
> available memory per machine minus some memory for the operating system
> (maybe 1-2 GB) is a good value
> 
> But my nodes have 2GB of ram each. There isn't an empirical count to set ram
> memory or a way to estimate the ram used by the OS?
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: SLF4j logging system gets clobbered?

2017-10-19 Thread Piotr Nowojski
Hi,

What versions of Flink/logback are you using?

Have you read this: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html#use-logback-when-running-flink-out-of-the-ide--from-a-java-application
 

 ?
Maybe this is an issue of having multiple logging tools and their 
configurations on the class path?

Piotrek

> On 18 Oct 2017, at 20:32, Jared Stehler  
> wrote:
> 
> I’m having an issue where I’ve got logging setup and functioning for my 
> flink-mesos deployment, and works fine up to a point (the same point every 
> time) where it seems to fall back to “defaults” and loses all of my 
> configured filtering.
> 
> 2017-10-11 21:37:17.454 [flink-akka.actor.default-dispatcher-17] INFO  
> o.a.f.m.runtime.clusterframework.MesosFlinkResourceManager  - TaskManager 
> taskmanager-8 has started.
> 2017-10-11 21:37:17.454 [flink-akka.actor.default-dispatcher-16] INFO  
> org.apache.flink.runtime.instance.InstanceManager  - Registered TaskManager 
> at ip-10-80-54-201 
> (akka.tcp://fl...@ip-10-80-54-201.us-west-2.compute.internal:31014/user/taskmanager
>  
> )
>  as 697add78bd00fe7dc6a7aa60bc8d75fb. Current number of registered hosts is 
> 39. Current number of alive task slots is 39.
> 2017-10-11 21:37:18.820 [flink-akka.actor.default-dispatcher-17] INFO  
> org.apache.flink.runtime.instance.InstanceManager  - Registered TaskManager 
> at ip-10-80-54-201 
> (akka.tcp://fl...@ip-10-80-54-201.us-west-2.compute.internal:31018/user/taskmanager
>  
> )
>  as a6cff0f18d71aabfb3b112f5e2c36c2b. Current number of registered hosts is 
> 40. Current number of alive task slots is 40.
> 2017-10-11 21:37:18.821 [flink-akka.actor.default-dispatcher-17] INFO  
> o.a.f.m.runtime.clusterframework.MesosFlinkResourceManager  - TaskManager 
> taskmanager-00010 has started.
> 2017-10-11 
> 21:39:04,371:6171(0x7f67fe9cd700):ZOO_WARN@zookeeper_interest@1570: Exceeded 
> deadline by 13ms
> 
> — here is where it turns over into default pattern layout ---
> 21:39:05.616 [nioEventLoopGroup-5-6] INFO  o.a.flink.runtime.blob.BlobClient 
> - Blob client connecting to akka://flink/user/jobmanager 
> 
> 
> 21:39:09.322 [nioEventLoopGroup-5-6] INFO  o.a.flink.runtime.client.JobClient 
> - Checking and uploading JAR files
> 21:39:09.322 [nioEventLoopGroup-5-6] INFO  o.a.flink.runtime.blob.BlobClient 
> - Blob client connecting to akka://flink/user/jobmanager 
> 
> 21:39:09.788 [flink-akka.actor.default-dispatcher-4] INFO  
> o.a.f.m.r.c.MesosJobManager - Submitting job 005b570ff2866023aa905f2bc850f7a3 
> (Sa-As-2b-Submission-Join-V3 := demos-demo500--data-canvas-2-sa-qs-as-v3).
> 21:39:09.789 [flink-akka.actor.default-dispatcher-4] INFO  
> o.a.f.m.r.c.MesosJobManager - Using restart strategy 
> FailureRateRestartStrategy(failuresInterval=12 msdelayInterval=1000 
> msmaxFailuresPerInterval=3) for 005b570ff2866023aa905f2bc850f7a3.
> 21:39:09.789 [flink-akka.actor.default-dispatcher-4] INFO  
> o.a.f.r.e.ExecutionGraph - Job recovers via failover strategy: full graph 
> restart
> 21:39:09.790 [flink-akka.actor.default-dispatcher-4] INFO  
> o.a.f.m.r.c.MesosJobManager - Running initialization on master for job 
> Sa-As-2b-Submission-Join-V3 := demos-demo500--data-canvas-2-sa-qs-as-v3 
> (005b570ff2866023aa905f2bc850f7a3).
> 21:39:09.790 [flink-akka.actor.default-dispatcher-4] INFO  
> o.a.f.m.r.c.MesosJobManager - Successfully ran initialization on master in 0 
> ms.
> 21:39:09.791 [flink-akka.actor.default-dispatcher-4] WARN  
> o.a.f.configuration.Configuration - Config uses deprecated configuration key 
> 'high-availability.zookeeper.storageDir' instead of proper key 
> 'high-availability.storageDir'
> 21:39:09.791 [flink-akka.actor.default-dispatcher-4] INFO  
> o.a.f.c.GlobalConfiguration - Loading configuration property: 
> mesos.failover-timeout, 60
> 21:39:09.791 [flink-akka.actor.default-dispatcher-4] INFO  
> o.a.f.c.GlobalConfiguration - Loading configuration property: 
> mesos.initial-tasks, 1
> 21:39:09.791 [flink-akka.actor.default-dispatcher-4] INFO  
> o.a.f.c.GlobalConfiguration - Loading configuration property: 
> mesos.maximum-failed-tasks, -1
> 21:39:09.791 [flink-akka.actor.default-dispatcher-4] INFO  
> o.a.f.c.GlobalConfiguration - Loading configuration property: 
> mesos.resourcemanager.framework.role, '*'
> 
> The reason this is a vexing issue is that the app master then proceeds to 
> dump megabytes of " o.a.f.c.GlobalConfiguration - Loading configuration 
> property:” messages into the log, and I’m unable to filter them out.
> 
> My logback config is:
> 
> 
> 
> 
> 
> %d{-MM-dd HH:mm:ss.SSS} [%thread] %-5level 
> %logger{60} %X{sourceThread} - %msg%n
> 
> 
> 
> 
> 
> ERROR
> 
> 
> 
>  level="OFF" />
> 
>  

Re: Monitoring job w/LocalStreamEnvironment

2017-10-16 Thread Piotr Nowojski
Hi,

Regarding metrics please check the "Writing an Integration test for 
flink-metrics” recent mailing list question. You can either use JMXReporter or 
write some custom reporter for this purpose.

Piotrek

> On 13 Oct 2017, at 20:57, Ken Krugler <kkrugler_li...@transpac.com> wrote:
> 
> Hi Piotr,
> 
> Thanks for responding, see below.
> 
>> On Oct 12, 2017, at 7:51 AM, Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>> wrote:
>> 
>> Hi,
>> 
>> Have you read the following doc?
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html>
>> 
>> There are some hints regarding testing your application. Especially take a 
>> look at the example with using static field to communicate with the running 
>> job.
> 
> Yes, I’d read those.
> 
> I already have a bunch of Flink metrics, I was hoping to leverage those to 
> know when my test can safely terminate my iteration;
> 
> I guess I could create a metrics wrapper that also logs to a static class 
> during tests.
> 
> Regards,
> 
> — Ken
> 
> 
>>> On 12 Oct 2017, at 16:33, Ken Krugler <kkrugler_li...@transpac.com 
>>> <mailto:kkrugler_li...@transpac.com>> wrote:
>>> 
>>> Hi all,
>>> 
>>> With an iteration-based workflow, it’s helpful to be able to monitor the 
>>> job counters and explicitly terminate when the test has completed.
>>> 
>>> I didn’t see support for async job creation, though.
>>> 
>>> So I extended LocalStreamEnvironment to add an executeAsync(), which 
>>> returns the LocalFlinkMiniCluster.submitJobDetached() result.
>>> 
>>> But it appears like I need to have a ClusterClient in order to actually 
>>> monitor this job.
>>> 
>>> And ClusterClient is bound in with a lot of CLI code, so I’m hesitant to 
>>> try to extract what I need.
>>> 
>>> Is there an easier/recommended approach to the above?
>>> 
>>> Thanks!
>>> 
>>> — Ken
>>> 
>>> 
>>> http://about.me/kkrugler <http://about.me/kkrugler>
>>> +1 530-210-6378
>>> 
>> 
> 
> 
> http://about.me/kkrugler <http://about.me/kkrugler>
> +1 530-210-6378



Re: Submitting a job via command line

2017-10-13 Thread Piotr Nowojski
Good to hear that :)


> On 13 Oct 2017, at 14:40, Alexander Smirnov <asmir...@five9.com> wrote:
> 
> Thank you so much, it helped!
> 
> From: Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>>
> Date: Thursday, October 12, 2017 at 6:00 PM
> To: Alexander Smirnov <asmir...@five9.com <mailto:asmir...@five9.com>>
> Cc: "user@flink.apache.org <mailto:user@flink.apache.org>" 
> <user@flink.apache.org <mailto:user@flink.apache.org>>
> Subject: Re: Submitting a job via command line
> 
> Have you tried this 
> http://mail-archives.apache.org/mod_mbox/flink-user/201705.mbox/%3ccagr9p8bxhljseexwzvxlk+drotyp1yxjy4n4_qgerdzxz8u...@mail.gmail.com%3E
>  
> <http://mail-archives.apache.org/mod_mbox/flink-user/201705.mbox/%3ccagr9p8bxhljseexwzvxlk+drotyp1yxjy4n4_qgerdzxz8u...@mail.gmail.com%3E>
> ?
> 
> Piotrek
> 
>> On 12 Oct 2017, at 16:30, Alexander Smirnov <asmir...@five9.com 
>> <mailto:asmir...@five9.com>> wrote:
>> 
>> Hello All,
>>  
>> I got the following error while attempting to execute a job via command line:
>> 
>> [root@flink01 bin]# ./flink run -c com.five9.stream.PrecomputeJob 
>> /vagrant/flink-precompute-1.0-SNAPSHOT.jar -Xmx2048m -Xms2048m
>> Cluster configuration: Standalone cluster with JobManager at 
>> flink01.pb.lx-draskin5.five9.com/10.11.132.110:6123 
>> <http://flink01.pb.lx-draskin5.five9.com/10.11.132.110:6123>
>> Using address flink01.pb.lx-draskin5.five9.com:6123 
>> <http://flink01.pb.lx-draskin5.five9.com:6123/> to connect to JobManager.
>> JobManager web interface address 
>> http://flink01.pb.lx-draskin5.five9.com:8081 
>> <http://flink01.pb.lx-draskin5.five9.com:8081/>
>> Starting execution of program
>> Submitting job with JobID: 222a9d44d2069ab3cc41866c8f3a. Waiting for job 
>> completion.
>> Connected to JobManager at 
>> Actor[akka.tcp://fl...@flink01.pb.lx-draskin5.five9.com 
>> <mailto://fl...@flink01.pb.lx-draskin5.five9.com>:6123/user/jobmanager#-1899708478]
>>  with leader session id ----.
>>  
>> 
>> The program finished with the following exception:
>>  
>> org.apache.flink.client.program.ProgramInvocationException: The program 
>> execution failed: Couldn't retrieve the JobExecutionResult from the 
>> JobManager.
>> at 
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
>> at 
>> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
>> at 
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
>> at 
>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:73)
>> at 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1499)
>> at com.five9.stream.PrecomputeJob.execute(PrecomputeJob.java:137)
>> at 
>> com.five9.stream.PrecomputeJob.configureAndExecute(PrecomputeJob.java:78)
>> at com.five9.stream.PrecomputeJob.main(PrecomputeJob.java:65)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
>> at 
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
>> at 
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:381)
>> at 
>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:838)
>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:259)
>> at 
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1086)
>> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)
>> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)
>> at 
>> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.se

Re: Writing an Integration test for flink-metrics

2017-10-13 Thread Piotr Nowojski
For testing Link applications in general you can read 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html>

However as we said before, testing metrics would require using custom or a imx 
reporter.

Yes, please report this bug in Jira. 

Thanks, Piotrek

> On 13 Oct 2017, at 04:31, Colin Williams <colin.williams.seat...@gmail.com> 
> wrote:
> 
> Team wants an integration test, I'm not sure what unit test you had in mind. 
> Actually feel that I've been trying to avoid the reporter method but that 
> would be more end to end.
> 
> The documentation for metrics and Scala are missing with the exception of 
> Gauge: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html>
>  . Should I file a issue against that?
> 
> Then it leaves you guessing a little bit how to implement Counters. One 
> approach tried was using objects
> 
> object PointFilter extends RichMapFunction[...
>   @transient lazy val someCounter = 
> getRuntimeContext.getMetricGroup.counter(...)
> 
> This allowed access to the counter before and after execution . However 
> between the unit tests the Counter kept its value also and that's a no for 
> the test. Think that might be an issue with ScalaTest. 
> 
> I've tried to get at the counter from some other directions like trying to 
> find a way to inject a reporter to get it's state. But don't see a way to do 
> it. So probably the best thing to do is fire up something to collect the 
> metrics from the reporter.
> 
> On Thu, Oct 12, 2017 at 5:29 AM, Chesnay Schepler <ches...@apache.org 
> <mailto:ches...@apache.org>> wrote:
> Well damn, i should've read the second part of the initial mail.
> 
> I'm wondering though, could you not unit-test this behavior?
> 
> 
> On 12.10.2017 14:25, Chesnay Schepler wrote:
> You could also write a custom reporter that opens a socket or similar for 
> communication purposes.
> 
> You can then either query it for the metrics, or even just trigger the 
> verification in the reporter,
> and fail with an error if the reporter returns an error.
> 
> On 12.10.2017 14:02, Piotr Nowojski wrote:
> Hi,
> 
> Doing as you proposed using JMXReporter (or custom reporter) should work. I 
> think there is no easier way to do this at the moment.
> 
> Piotrek
> 
> On 12 Oct 2017, at 04:58, Colin Williams <colin.williams.seat...@gmail.com 
> <mailto:colin.williams.seat...@gmail.com>> wrote:
> 
> I have a RichMapFunction and I'd like to ensure Meter fields are properly 
> incremented. I've been trying to think of the best way to do this. Currently 
> I think that I'd need to either implement my own reporter (or use JMX) and 
> write to a socket, create a listener and wait for the reporter to send the 
> message.
> 
> Is this a good approach for writing the test, or should I be considering 
> something else?
> 
> 
> 
> 
> 



Re: Beam Application run on cluster setup (Kafka+Flink)

2017-10-13 Thread Piotr Nowojski
Hi,

What version of Flink are you using. In earlier 1.3.x releases there were some 
bugs in Kafka Consumer code.
Could you change the log level in Flink to debug? 
Did you check the Kafka logs for some hint maybe?
I guess that metrics like bytes read/input records of this Link application are 
not changing?

Piotrek

> On 13 Oct 2017, at 07:51, Shankara  wrote:
> 
> Hi,
> 
>I mean same code works fine in flink local setup. I can able to see
> "Received Message  from testkafka Topic : " on console when kafka
> receive some message (Kafka Producer is in other machine and sending some
> message frequently to testkafka topic).
> 
> *Submitted the Beam application to flink local by below command :*
> mvn compile exec:java
> -Dexec.mainClass=org.apache.beam.influxdb.KafkaRead  -Pflink-runner
> 
> *Output is :*
> Connected to JobManager at Actor[akka://flink/user/jobmanager_1#735957608]
> with leader session id d97c060d-bdf9-4215-8d7c-138f13cbff1e.
> 10/13/2017 11:09:09   Job execution switched to status RUNNING.
> 10/13/2017 11:09:09   Source: Read(UnboundedKafkaSource) -> Flat Map ->
> ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to SCHEDULED 
> 10/13/2017 11:09:09   Source: Read(UnboundedKafkaSource) -> Flat Map ->
> ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to DEPLOYING 
> 10/13/2017 11:09:09   Source: Read(UnboundedKafkaSource) -> Flat Map ->
> ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to RUNNING 
> *Received in Deserilize..
> Received Message  from testkafka Topic : HELLOASA*
> 
> 
> 
>If I run same code in Flink Cluster I cannot see any message in
> log/stdout, But job is continuously running and Kafka Producer is in other
> machine and sending some message frequently to testkafka topic.
> 
>  * I started flink cluster by below command : *
>   bin/start-cluster.sh
> 
>   *Submitted the Beam application to flink cluster by below command :*
>  bin/flink run -c org.apache.beam.influxdb.KafkaRead
> /home/root1/NAI/US_Working/NAI_KPI/Kafka_Proto_Sub/target/kafka-proto-sub-bundled-2.1.0.jar
> --runner=FlinkRunner --flinkMaster=192.168.1.116
> --filesToStage=/home/root1/NAI/US_Working/NAI_KPI/Kafka_Proto_Sub/target/kafka-proto-sub-bundled-2.1.0.jar
> 
> 
>   In dashboad :
> 
> 
> 
>  
> 
> 
> 
>I cannot see any message in dashboard :
> 
> 
> 
>  
> 
> 
>   As per log Job execution is running :
> Cluster configuration: Standalone cluster with JobManager at
> /192.168.1.116:6123
> Using address 192.168.1.116:6123 to connect to JobManager.
> JobManager web interface address http://192.168.1.116:8081
> Starting execution of program
> Submitting job with JobID: 8d731f801d00268f951a98d093f21e0c. Waiting for job
> completion.
> Connected to JobManager at
> Actor[akka.tcp://flink@192.168.1.116:6123/user/jobmanager#422012792] with
> leader session id ----.
> 10/13/2017 11:10:57   Job execution switched to status RUNNING.
> 10/13/2017 11:10:57   Source: Read(UnboundedKafkaSource) -> Flat Map ->
> ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to SCHEDULED 
> 10/13/2017 11:10:57   Source: Read(UnboundedKafkaSource) -> Flat Map ->
> ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to DEPLOYING 
> 10/13/2017 11:11:05   Source: Read(UnboundedKafkaSource) -> Flat Map ->
> ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to RUNNING 
> 
>   There is no exception in log. I suspect deployment of kafka having issue. 
> 
> Can you please help me to check it.
> 
> 
> 
> 
> public static void main(String[] args) { 
>Pipeline p = initializePipeline(args); 
>Map> intelliOmIms = new TreeMap<>(); 
> 
>PTransform>>
> reader; 
>reader = KafkaIO.read() 
>   .withBootstrapServers("192.168.1.116:9092")--->Kafka 
> zookeeper and server running 
>.withTopic("kafkatest") 
>.withKeyDeserializer(IntegerDeserializer.class) 
>.withValueDeserializer(IntelliOmImsKpiDataUtil.class) 
>.withoutMetadata(); 
> 
>PCollection> output = p.apply(reader); 
>output.apply(ParDo.of(new PrintMsg())); 
> 
>p.run().waitUntilFinish(); 
> } 
> 
> public static class PrintMsg extends DoFn, Void> {
> 
>@ProcessElement
>public void processElement(ProcessContext c) {
> 
>try {
>System.out.println("Received Message  from testkafka
> Topic : " + new String(c.element().getValue(), "UTF-8"));
>} catch (UnsupportedEncodingException e) {
>e.printStackTrace();
>}
>}
>}
> 
> 
> 
> --
> Sent from: 
> 

Re: Submitting a job via command line

2017-10-12 Thread Piotr Nowojski
Have you tried this 
http://mail-archives.apache.org/mod_mbox/flink-user/201705.mbox/%3ccagr9p8bxhljseexwzvxlk+drotyp1yxjy4n4_qgerdzxz8u...@mail.gmail.com%3E
 

?

Piotrek

> On 12 Oct 2017, at 16:30, Alexander Smirnov  wrote:
> 
> Hello All,
>  
> I got the following error while attempting to execute a job via command line:
> 
> [root@flink01 bin]# ./flink run -c com.five9.stream.PrecomputeJob 
> /vagrant/flink-precompute-1.0-SNAPSHOT.jar -Xmx2048m -Xms2048m
> Cluster configuration: Standalone cluster with JobManager at 
> flink01.pb.lx-draskin5.five9.com/10.11.132.110:6123
> Using address flink01.pb.lx-draskin5.five9.com:6123 to connect to JobManager.
> JobManager web interface address http://flink01.pb.lx-draskin5.five9.com:8081 
> 
> Starting execution of program
> Submitting job with JobID: 222a9d44d2069ab3cc41866c8f3a. Waiting for job 
> completion.
> Connected to JobManager at 
> Actor[akka.tcp://fl...@flink01.pb.lx-draskin5.five9.com 
> :6123/user/jobmanager#-1899708478]
>  with leader session id ----.
>  
> 
> The program finished with the following exception:
>  
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Couldn't retrieve the JobExecutionResult from the 
> JobManager.
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
> at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
> at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:73)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1499)
> at com.five9.stream.PrecomputeJob.execute(PrecomputeJob.java:137)
> at 
> com.five9.stream.PrecomputeJob.configureAndExecute(PrecomputeJob.java:78)
> at com.five9.stream.PrecomputeJob.main(PrecomputeJob.java:65)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:381)
> at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:838)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:259)
> at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1086)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1130)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't 
> retrieve the JobExecutionResult from the JobManager.
> at 
> org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:309)
> at 
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467)
> ... 25 more
> Caused by: 
> org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job 
> submission to the JobManager timed out. You may increase 
> 'akka.client.timeout' in case the JobManager needs more time to configure and 
> confirm the job submission.
> at 
> org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
> at 
> org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251)
> at 
> 

Re: Monitoring job w/LocalStreamEnvironment

2017-10-12 Thread Piotr Nowojski
Hi,

Have you read the following doc?
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html
 


There are some hints regarding testing your application. Especially take a look 
at the example with using static field to communicate with the running job.

Piotrek

> On 12 Oct 2017, at 16:33, Ken Krugler  wrote:
> 
> Hi all,
> 
> With an iteration-based workflow, it’s helpful to be able to monitor the job 
> counters and explicitly terminate when the test has completed.
> 
> I didn’t see support for async job creation, though.
> 
> So I extended LocalStreamEnvironment to add an executeAsync(), which returns 
> the LocalFlinkMiniCluster.submitJobDetached() result.
> 
> But it appears like I need to have a ClusterClient in order to actually 
> monitor this job.
> 
> And ClusterClient is bound in with a lot of CLI code, so I’m hesitant to try 
> to extract what I need.
> 
> Is there an easier/recommended approach to the above?
> 
> Thanks!
> 
> — Ken
> 
> 
> http://about.me/kkrugler
> +1 530-210-6378
> 



Re: Beam Application run on cluster setup (Kafka+Flink)

2017-10-12 Thread Piotr Nowojski
Hi,

What do you mean by:

> With standalone beam application kafka can receive the message, But in
cluster setup it is not working.

In your example you are reading the data from Kafka and printing them to 
console. There doesn’t seems to be anything that writes back to Kafka, so what 
do you mean by “Kafka can not receive the message”?

Did you check the output file of your application in the log directory? Did you 
check Flink logs if there are any errors?

Piotrek

> On 12 Oct 2017, at 15:49, Shankara  wrote:
> 
> Below is my setup 
>1. Kafka zookeeper and server in one machine (192.168.1.116) and
> producer (192.168.1.100) and consumer (192.168.1.117) in another machine.  
> --> This work fine no issue 
>2. Running standalone beam application with kafka consumer --> This
> work fine
>3. Running beam application in flink cluster with kafka consumer -->
> This is not working
>  Not receiving message from kafka producer.
> 
> Same program works fine with standalone with flink runner.
> Below is my code snippet.
> 
> public static void main(String[] args) {
>Pipeline p = initializePipeline(args);
>Map> intelliOmIms = new TreeMap<>();
> 
>PTransform>> reader;
>reader = KafkaIO.read()
>.withBootstrapServers("192.168.1.116:9092")--->Kafka
> zookeeper and server running
>.withTopic("kafkatest")
>.withKeyDeserializer(IntegerDeserializer.class)
>.withValueDeserializer(IntelliOmImsKpiDataUtil.class)
>.withoutMetadata();
> 
>PCollection> output = p.apply(reader);
>output.apply(ParDo.of(new PrintMsg()));
> 
>p.run().waitUntilFinish();
> }
> 
>  In IntelliOmImsKpiDataUtil deserializer I am just printing message saying
> that kafka is received the message.
> 
> public static class PrintMsg extends DoFn, Void> {
> 
>@ProcessElement
>public void processElement(ProcessContext c) {
>System.out.println("Received Message  from kafkatest Topic ");
>}
> }
> 
>  Started Zookeeper in 192.168.1.116 like below :
>bin/zookeeper-server-start.sh config/zookeeper.properties
> 
>  Started Server in 192.168.1.116 like below :
>bin/kafka-server-start.sh config/server.properties
> 
>  Started Producer in 192.168.1.100 like below :
>bin/kafka-console-producer.sh --broker-list 192.168.1.116:9092 --topic
> kafkatest
> 
>  Started Consumer in 192.168.1.117 like below :
>bin/kafka-console-consumer.sh --zookeeper 192.168.1.116:2181 --topic
> kafkatest --from-beginning
> 
>   With standalone beam application kafka can receive the message, But in
> cluster setup it is not working.
> 
> Can you please help me to check it. 
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Writing to an HDFS file from a Flink stream job

2017-10-12 Thread Piotr Nowojski
I think the issue might be that writeAsText (TextOutputFormat) doesn’t flush 
the data anywhere (only on close, which in streaming doesn’t happen). You would 
need to use custom output format, but as Aljoscha pointed out BucketingSink 
makes more sense for streaming applications.

Piotrek

> On 12 Oct 2017, at 14:58, Aljoscha Krettek <aljos...@apache.org> wrote:
> 
> Hi Isuru,
> 
> What is the source in your job and is the job terminating at some point or 
> running continuously?
> 
> In general, the writeAsText()/writeAsCsv() methods should not be used because 
> they don't work well in an infinite streaming job that might have failures 
> and recovery. I.e. what does that mean for the file, if you have recovery. 
> For writing to files you would use the BucketingSink: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/filesystem_sink.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/filesystem_sink.html>
> 
> Best,
> Aljoscha
> 
>> On 12. Oct 2017, at 14:55, Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>> wrote:
>> 
>> Hi,
>> 
>> Maybe this is an access rights issue? Could you try to create and write to 
>> same file (same directory) in some other way (manually?), using the same 
>> user and the same machine as would Flink job do?
>> 
>> Maybe there will be some hint in hdfs logs?
>> 
>> Piotrek
>> 
>>> On 12 Oct 2017, at 00:19, Isuru Suriarachchi <isur...@gmail.com 
>>> <mailto:isur...@gmail.com>> wrote:
>>> 
>>> Hi all,
>>> 
>>> I'm just trying to use an HDFS file as the sink for my flink stream job. I 
>>> use the following line to do so.
>>> 
>>> stream.writeAsText("hdfs://hadoop-master:9000/user/isuru/foo 
>>> ");
>>> 
>>> I have not set "fs.hdfs.hadoopconf" in my flink configuration as it should 
>>> work with the full hdfs file name according to [1]. 
>>> 
>>> However, it doesn't work as expected. File foo is created on hdfs. But that 
>>> file is empty. But I don't see any error logs too on Flink side. When I 
>>> used a normal file sink using a "file:/// <file:///>.." url, it works fine 
>>> and data is there in the file.
>>> 
>>> Do I need any other configuration to get his working?
>>> 
>>> Thanks,
>>> Isuru
>>> 
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#hdfs
>>>  
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#hdfs>
> 



Re: Writing to an HDFS file from a Flink stream job

2017-10-12 Thread Piotr Nowojski
Hi,

Maybe this is an access rights issue? Could you try to create and write to same 
file (same directory) in some other way (manually?), using the same user and 
the same machine as would Flink job do?

Maybe there will be some hint in hdfs logs?

Piotrek

> On 12 Oct 2017, at 00:19, Isuru Suriarachchi  wrote:
> 
> Hi all,
> 
> I'm just trying to use an HDFS file as the sink for my flink stream job. I 
> use the following line to do so.
> 
> stream.writeAsText("hdfs://hadoop-master:9000/user/isuru/foo");
> 
> I have not set "fs.hdfs.hadoopconf" in my flink configuration as it should 
> work with the full hdfs file name according to [1]. 
> 
> However, it doesn't work as expected. File foo is created on hdfs. But that 
> file is empty. But I don't see any error logs too on Flink side. When I used 
> a normal file sink using a "file:///.." url, it works fine and data is there 
> in the file.
> 
> Do I need any other configuration to get his working?
> 
> Thanks,
> Isuru
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#hdfs
>  
> 


Re: Writing an Integration test for flink-metrics

2017-10-12 Thread Piotr Nowojski
Hi,

Doing as you proposed using JMXReporter (or custom reporter) should work. I 
think there is no easier way to do this at the moment.

Piotrek

> On 12 Oct 2017, at 04:58, Colin Williams  
> wrote:
> 
> I have a RichMapFunction and I'd like to ensure Meter fields are properly 
> incremented. I've been trying to think of the best way to do this. Currently 
> I think that I'd need to either implement my own reporter (or use JMX) and 
> write to a socket, create a listener and wait for the reporter to send the 
> message.
> 
> Is this a good approach for writing the test, or should I be considering 
> something else?



Re: Kafka 11 connector on Flink 1.3

2017-10-12 Thread Piotr Nowojski
Hi,

Kafka 0.11 connector depends on some API changes for Flink 1.4, so without 
rebasing the code and solving some small issues it is not possible to use it 
for 1.3.x.

We are about to finalizing the timeframe for 1.4 release, it would be great if 
you could come back with this question after the weekend.

If you do not need exactly-once Kafka producer that will come with Kafka 0.11 
connector, you should be able to use 0.10 connector to read (exactly-once) and 
write (at-least-once) to Kafka 0.11.

Piotrek

> On 12 Oct 2017, at 10:44, Syed Moizuddin  wrote:
> 
> Hi,
> 
> I was just wondering if I could use the Kafka 11 connector on 1.3.
> If there are dependencies, then what would be the timeframe for 1.4 release
> 
> Thanks
> Moiz



Re: Implement bunch of transformations applied to same source stream in Apache Flink in parallel and combine result

2017-10-12 Thread Piotr Nowojski
Hi,

What is the number of events per second that you wish to process? If it’s high 
enough (~ number of machines * number of cores) you should be just fine, 
instead of scaling with number of features, scale with number of events. If you 
have a single data source you still could randomly shuffle events before 
applying your transformations. 

Another solution might be to:
1. Assign unique eventId and split the original event using flatMap into 
tuples: 
2. keyBy featureId, eventId (or maybe do random partitioning with shuffle?)
3. perform transformation
4. keyBy eventId, ….
5. Window and reduce

But that would add more overhead compared to processing more events at the same 
time.

Piotrek

> On 11 Oct 2017, at 23:02, Andrey Salnikov  wrote:
> 
> Hi!
> 
> Could you please help me - I'm trying to use Apache Flink for machine 
> learning tasks with external ensemble/tree libs like XGBoost, so my workflow 
> will be like this:
> 
> receive single stream of data which atomic event looks like a simple vector 
> event=(X1, X2, X3...Xn) and it can be imagined as POJO fields so initially we 
> have DataStream source=...
> a lot of feature extractions code applied to the same event source: feature1 
> = source.map(X1...Xn) feature2 = source.map(X1...Xn) etc. For simplicity lets 
> DataStream feature(i) = source.map() for all features
> then I need to create a vector with extracted features (feature1, feature2, 
> ...featureK) for now it will be 40-50 features, but I'm sure it will contain 
> more items in future and easily can contains 100-500 features and more
> put these extracted features to dataset/table columns by 10 minutes window 
> and run final machine learning task on such 10 minutes data
> In simple words I need to apply several quite different map operations to the 
> same single event in stream and then combine result from all map functions in 
> single vector.
> 
> So for now I can't figure out how to implement final reduce step and run all 
> feature extraction mapjobs in parallel if possible. I spend several days on 
> flink docs site, youtube videos, googling, reading Flink's sources but it 
> seems I'm really stuck here.
> 
> The easy solution here will be to use single map operation and run each 
> feature extraction code sequentially one by one in huge map body, and then 
> return final vector (Feature1...FeatureK) for each input event. But it should 
> be crazy and non optimal.
> 
> Another solution for each two pair of features use join since all feature 
> DataStreams has same initial event and same key and only apply some 
> transformation code, but it looks ugly: write 50 joins code with some window. 
> And I think that joins and cogroups developed for joining different streams 
> from different sources and not for such map/reduce operations.
> 
> As for me for all map operations here should be a something simple which I'm 
> missing.
> 
> Could you please point me how you guys implement such tasks in Flink, and if 
> possible with example of code?
> 
> PS: I posted this question 
> 
>  to stackoverflow.
> PPS: If I will use feature1.union(feature2...featureK) I still need somehow 
> separate and combine features vector before sink, and preserve order of final 
> vectors.
> 
> Th​​anks,
> Andrey



Re: R/W traffic estimation between Flink and Zookeeper

2017-10-12 Thread Piotr Nowojski
Hi,

Are you asking how to measure records/s or is it possible to achieve it? To 
measure it you can check numRecordsInPerSecond metric.

As far if 1000 records/s is possible, it depends on many things like state 
backend used, state size, complexity of your application, size of the records, 
number of machines, their hardware and the network. In the very simplest cases 
it is possible to achieve millions of records per second per machine. It would 
be best to try it out in your particular use case on some small scale.

Piotrek

> On 11 Oct 2017, at 19:58, Hao Sun  wrote:
> 
> Hi Is there a way to estimate read/write traffic between flink and zk?
> I am looking for something like 1000 reads/sec or 1000 writes/sec. And the 
> size of the message.
> 
> Thanks



Re: Write each group to its own file

2017-10-12 Thread Piotr Nowojski
Hi,

There is no straightforward way to do that. First of all, the error you are 
getting is because you are trying to start new application ( 
env.fromElements(items) ) inside your reduce function.

To do what you want, you have to hash partition the products based on category 
(instead of grouping by and reducing) and after that either:

1. Sort the hash partitioned products and implement custom OutputFormat (maybe 
based on FileOutputFormat), that would start a new file when key value has 
changed.

Or

2. Implement custom OutputFormat (maybe based on FileOutputFormat), that would 
keep multiple opened files - one file per category - and write records 
accordingly.

Note that both options require first to hash partition the products. 1. Will be 
more CPU and memory consuming (have to sort the data), 2. Can exceed the 
maximum number of simultaneously opened file if number of categories is very 
high. 

Piotrek

> On 11 Oct 2017, at 17:47, rlazoti  wrote:
> 
> Hi,
> 
> Is there a way to write each group to its own file using the Dataset api
> (Batch)?
> 
> For example, lets use the following class:
> 
> case class Product(name: String, category: String)
> 
> And the following Dataset:
> 
> val products = env.fromElements(Product("i7", "cpu"), Product("R5", "cpu"),
> Product("gtx1080", "gpu"), Product("vega64", "gpu"), Product("evo250gb",
> "ssd"))
> 
> So in this example my output should be these 3 files:
> 
> - cpu.csv
> i7, cpu
> R5, cpu
> 
> - gpu.csv
> gtx1080, gpu
> vega64, gpu
> 
> - ssd.csv
> evo250gb, ssd
> 
> 
> I tried the following code, but got
> org.apache.flink.api.common.InvalidProgramException: Task not serializable.
> 
> products.groupBy("category").reduceGroup { group: Iterator[Product] =>
>  val items = group.toSeq
>  env.fromElements(items).writeAsCsv(s"${items.head.category}.csv")
>  items
> }
> 
> I welcome any of your inputs.
> 
> Thanks!
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Avoid duplicate messages while restarting a job for an application upgrade

2017-10-10 Thread Piotr Nowojski
Hi, 

That’s good to hear :)

I quickly went through the code and it seems reasonable. I think there might be 
need to think a little bit more about how this cancel checkpoint should be 
exposed to the operators and what should be default action - right now by 
default cancel flag is ignored, I would like to consider if throwing an 
UnsupportedOperation would be a better long therm solution.

But at first glance I do not see any larger issues and it would great if you 
could make a pull request out of it.

Piotrek

> On 9 Oct 2017, at 15:56, Antoine Philippot <antoine.philip...@teads.tv> wrote:
> 
> Thanks for your advices Piotr.
> 
> Firstly, yes, we are aware that even with clean shutdown we can end up with 
> duplicated messages after a crash and it is acceptable as is it rare and 
> unintentional unlike deploying new business code or up/down scale.
> 
> I made a fork of the 1.2.1 version which we currently use and developed a 
> simple POC based on the solution to pass a boolean stopSourceSavepoint from 
> the job manager to the source when a cancel with savepoint is triggered.
> This is the altered code : 
> https://github.com/aphilippot/flink/compare/release-1.2.1...aphilippot:flink_1_2_1_POC_savepoint
>  
> <https://github.com/aphilippot/flink/compare/release-1.2.1...aphilippot:flink_1_2_1_POC_savepoint>
> 
> We test it with our production workload and there are no duplicated messages 
> any more while hundred of thousands were duplicated before.
> 
> I planned to reapply/adapt this patch for the 1.3.2 release when we migrate 
> to it and maybe later to the 1.4
> 
> I'm open to suggestion or to help/develop this feature upstream if you want.
> 
> 
> Le lun. 2 oct. 2017 à 19:09, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> a écrit :
> We are planning to work on this clean shut down after releasing Flink 1.4. 
> Implementing this properly would require some work, for example:
> - adding some checkpoint options to add information about “closing”/“shutting 
> down” event
> - add clean shutdown to source functions API
> - implement handling of this clean shutdown in desired sources
> 
> Those are not super complicated changes but also not trivial.
> 
> One thing that you could do, is to implement some super hacky filter function 
> just after source operator, that you would manually trigger. Normally it 
> would pass all of the messages. Once triggered, it would wait for next 
> checkpoint to happen. It would assume that it is a save point, and would 
> start filtering out all of the subsequent messages. When this checkpoint 
> completes, you could manually shutdown your Flink application. This could 
> guarantee that there are no duplicated writes after a restart. This might 
> work for clean shutdown, but it would be a very hacky solution. 
> 
> Btw, keep in mind that even with clean shutdown you can end up with 
> duplicated messages after a crash and there is no way around this with Kafka 
> 0.9.
> 
> Piotrek
> 
>> On Oct 2, 2017, at 5:30 PM, Antoine Philippot <antoine.philip...@teads.tv 
>> <mailto:antoine.philip...@teads.tv>> wrote:
>> 
>> Thanks Piotr for your answer, we sadly can't use kafka 0.11 for now (and 
>> until a while).
>> 
>> We can not afford tens of thousands of duplicated messages for each 
>> application upgrade, can I help by working on this feature ?
>> Do you have any hint or details on this part of that "todo list" ? 
>>  
>> 
>> Le lun. 2 oct. 2017 à 16:50, Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>> a écrit :
>> Hi,
>> 
>> For failures recovery with Kafka 0.9 it is not possible to avoid duplicated 
>> messages. Using Flink 1.4 (unreleased yet) combined with Kafka 0.11 it will 
>> be possible to achieve exactly-once end to end semantic when writing to 
>> Kafka. However this still a work in progress:
>> 
>> https://issues.apache.org/jira/browse/FLINK-6988 
>> <https://issues.apache.org/jira/browse/FLINK-6988>
>> 
>> However this is a superset of functionality that you are asking for. 
>> Exactly-once just for clean shutdowns is also on our “TODO” list (it 
>> would/could support Kafka 0.9), but it is not currently being actively 
>> developed.
>> 
>> Piotr Nowojski
>> 
>>> On Oct 2, 2017, at 3:35 PM, Antoine Philippot <antoine.philip...@teads.tv 
>>> <mailto:antoine.philip...@teads.tv>> wrote:
>>> 
>>> Hi,
>>> 
>>> I'm working on a flink streaming app with a kafka09 to kafka09 use case 
>>> which handles around 100k messages p

Re: Sink buffering

2017-10-04 Thread Piotr Nowojski
What do you mean by "This always depends on checkpointing interval right?”?

In TwoPhaseCommitSinkFunction, transactions are being committed on each Flink 
checkpoint. I guess same applies to GenericWriteAheadSink. The first one just 
commits/pre-commits the data on checkpoint, second rewrites them from the state.

If your sink supports transactions (and especially it can handle committing 
same transaction twice), then both should be able to provide exactly-once.

Piotrek

> On Oct 4, 2017, at 10:30 AM, nragon  
> wrote:
> 
> Thanks for you opinion on this.
> TwoPhaseCommitSinkFunction would probably be the best solution overall.
> Using this with something like Phoenix or Tephra would probably work. 
> This always depends on checkpointing interval right?
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Sink buffering

2017-10-04 Thread Piotr Nowojski
Hi,

Do you mean buffer on state and you want to achieve exactly-once HBase sink? If 
so keep in mind that you will need some kind of transactions support in HBase 
to make it 100% reliable.

Without transactions, buffering messages on state only reduces chance of 
duplicated records. How much “reduced” depends on checkpointing interval and 
how long does it take to rewrite messages from state buffer to HBase - if 
checkpoint interval is 10s, rewriting takes 2s, your duplicate chances are 2/10 
of what you would have without buffering, because you will actively write to 
HBase only 20% of time you would normally write without the buffering.

Having said that you can take a look at following classes on which you could 
base your sink:
GenericWriteAheadSink - probably this is better for you
TwoPhaseCommitSinkFunction - this one is in master branch, but hasn’t yet been 
released

Piotrek

> On Sep 29, 2017, at 6:21 PM, nragon  
> wrote:
> 
> Hi,
> 
> Just like mentioned at Berlin FF17, Pravega talk, can we simulate, somehow,
> sink buffering(pravega transactions) and coordinate them with checkpoints?
> My intension is to buffer records before sending them to hbase.
> Any opinions or tips?
> 
> Thanks
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: How flink monitor source stream task(Time Trigger) is running?

2017-10-04 Thread Piotr Nowojski
You are welcome :)

Piotrek

> On Oct 2, 2017, at 1:19 PM, yunfan123  wrote:
> 
> Thank you. 
> "If SourceFunction.run methods returns without an exception Flink assumes
> that it has cleanly shutdown and that there were simply no more elements to
> collect/create by this task. "
> This sentence solve my confusion.
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Avoid duplicate messages while restarting a job for an application upgrade

2017-10-02 Thread Piotr Nowojski
We are planning to work on this clean shut down after releasing Flink 1.4. 
Implementing this properly would require some work, for example:
- adding some checkpoint options to add information about “closing”/“shutting 
down” event
- add clean shutdown to source functions API
- implement handling of this clean shutdown in desired sources

Those are not super complicated changes but also not trivial.

One thing that you could do, is to implement some super hacky filter function 
just after source operator, that you would manually trigger. Normally it would 
pass all of the messages. Once triggered, it would wait for next checkpoint to 
happen. It would assume that it is a save point, and would start filtering out 
all of the subsequent messages. When this checkpoint completes, you could 
manually shutdown your Flink application. This could guarantee that there are 
no duplicated writes after a restart. This might work for clean shutdown, but 
it would be a very hacky solution. 

Btw, keep in mind that even with clean shutdown you can end up with duplicated 
messages after a crash and there is no way around this with Kafka 0.9.

Piotrek

> On Oct 2, 2017, at 5:30 PM, Antoine Philippot <antoine.philip...@teads.tv> 
> wrote:
> 
> Thanks Piotr for your answer, we sadly can't use kafka 0.11 for now (and 
> until a while).
> 
> We can not afford tens of thousands of duplicated messages for each 
> application upgrade, can I help by working on this feature ?
> Do you have any hint or details on this part of that "todo list" ? 
>  
> 
> Le lun. 2 oct. 2017 à 16:50, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> a écrit :
> Hi,
> 
> For failures recovery with Kafka 0.9 it is not possible to avoid duplicated 
> messages. Using Flink 1.4 (unreleased yet) combined with Kafka 0.11 it will 
> be possible to achieve exactly-once end to end semantic when writing to 
> Kafka. However this still a work in progress:
> 
> https://issues.apache.org/jira/browse/FLINK-6988 
> <https://issues.apache.org/jira/browse/FLINK-6988>
> 
> However this is a superset of functionality that you are asking for. 
> Exactly-once just for clean shutdowns is also on our “TODO” list (it 
> would/could support Kafka 0.9), but it is not currently being actively 
> developed.
> 
> Piotr Nowojski
> 
>> On Oct 2, 2017, at 3:35 PM, Antoine Philippot <antoine.philip...@teads.tv 
>> <mailto:antoine.philip...@teads.tv>> wrote:
>> 
>> Hi,
>> 
>> I'm working on a flink streaming app with a kafka09 to kafka09 use case 
>> which handles around 100k messages per seconds.
>> 
>> To upgrade our application we used to run a flink cancel with savepoint 
>> command followed by a flink run with the previous saved savepoint and the 
>> new application fat jar as parameter. We notice that we can have more than 
>> 50k of duplicated messages in the kafka sink wich is not idempotent.
>> 
>> This behaviour is actually problematic for this project and I try to find a 
>> solution / workaround to avoid these duplicated messages.
>> 
>> The JobManager indicates clearly that the cancel call is triggered once the 
>> savepoint is finished, but during the savepoint execution, kafka source 
>> continue to poll new messages which will not be part of the savepoint and 
>> will be replayed on the next application start.
>> 
>> I try to find a solution with the stop command line argument but the kafka 
>> source doesn't implement StoppableFunction 
>> (https://issues.apache.org/jira/browse/FLINK-3404 
>> <https://issues.apache.org/jira/browse/FLINK-3404>) and the savepoint 
>> generation is not available with stop in contrary to cancel.
>> 
>> Is there an other solution to not process duplicated messages for each 
>> application upgrade or rescaling ?
>> 
>> If no, has someone planned to implement it? Otherwise, I can propose a pull 
>> request after some architecture advices.
>> 
>> The final goal is to stop polling source and trigger a savepoint once 
>> polling stopped.
>> 
>> Thanks
> 



Re: Avoid duplicate messages while restarting a job for an application upgrade

2017-10-02 Thread Piotr Nowojski
Hi,

For failures recovery with Kafka 0.9 it is not possible to avoid duplicated 
messages. Using Flink 1.4 (unreleased yet) combined with Kafka 0.11 it will be 
possible to achieve exactly-once end to end semantic when writing to Kafka. 
However this still a work in progress:

https://issues.apache.org/jira/browse/FLINK-6988 
<https://issues.apache.org/jira/browse/FLINK-6988>

However this is a superset of functionality that you are asking for. 
Exactly-once just for clean shutdowns is also on our “TODO” list (it 
would/could support Kafka 0.9), but it is not currently being actively 
developed.

Piotr Nowojski

> On Oct 2, 2017, at 3:35 PM, Antoine Philippot <antoine.philip...@teads.tv> 
> wrote:
> 
> Hi,
> 
> I'm working on a flink streaming app with a kafka09 to kafka09 use case which 
> handles around 100k messages per seconds.
> 
> To upgrade our application we used to run a flink cancel with savepoint 
> command followed by a flink run with the previous saved savepoint and the new 
> application fat jar as parameter. We notice that we can have more than 50k of 
> duplicated messages in the kafka sink wich is not idempotent.
> 
> This behaviour is actually problematic for this project and I try to find a 
> solution / workaround to avoid these duplicated messages.
> 
> The JobManager indicates clearly that the cancel call is triggered once the 
> savepoint is finished, but during the savepoint execution, kafka source 
> continue to poll new messages which will not be part of the savepoint and 
> will be replayed on the next application start.
> 
> I try to find a solution with the stop command line argument but the kafka 
> source doesn't implement StoppableFunction 
> (https://issues.apache.org/jira/browse/FLINK-3404 
> <https://issues.apache.org/jira/browse/FLINK-3404>) and the savepoint 
> generation is not available with stop in contrary to cancel.
> 
> Is there an other solution to not process duplicated messages for each 
> application upgrade or rescaling ?
> 
> If no, has someone planned to implement it? Otherwise, I can propose a pull 
> request after some architecture advices.
> 
> The final goal is to stop polling source and trigger a savepoint once polling 
> stopped.
> 
> Thanks



Re: starting query server when running flink embedded

2017-09-29 Thread Piotr Nowojski
Hi,

You can take a look at how is it done in the exercises here 
. There are example 
solutions that run on a local environment.

I Hope that helps :)

Piotrek

> On Sep 28, 2017, at 11:22 PM, Henri Heiskanen  
> wrote:
> 
> Hi,
> 
> I would like to test queryable state just by running the flink embedded from 
> my IDE. What is the easiest way to start it properly? If I run the below I 
> can not see the query server listening at the given port. I found something 
> about this, but it was about copying some base classes and post was from 2016 
> so maybe things have improved.
> 
> Configuration conf = new Configuration();
> conf.setBoolean("query.server.enable", true);
> conf.setInteger("query.server.port", 16122);
> 
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment(2, conf);
> 
> Br,
> Henkka



Re: state of parallel jobs when one task fails

2017-09-29 Thread Piotr Nowojski
Hi,

Yes, by default Flink will restart all of the tasks. I think that since Flink 
1.3, you can configure a FailoverStrategy 

 to change this behavior.

Thanks, Piotrek

> On Sep 29, 2017, at 5:10 PM, r. r.  wrote:
> 
> Hello
> I have a simple job with a single map() processing which I want to run with 
> many documents in parallel in Flink.
> What will happen if one of the 'instances' of the job fails?
>  
> This statement in Flink docs confuses me:
> "In case of failures, a job switches first to failing where it cancels all 
> running tasks".
> So if I have 10 documents processed in parallel in the job's map() (each in a 
> different task slot, I presume) and one of them fails, does it mean that all 
> the rest will be failed/cancelled as well?
> 
> Thanks!
> 



Re: How flink monitor source stream task(Time Trigger) is running?

2017-09-29 Thread Piotr Nowojski
I am still not sure what do you mean by “thread crash without throw”.

If SourceFunction.run methods returns without an exception Flink assumes that 
it has cleanly shutdown and that there were simply no more elements to 
collect/create by this task. 
If it continue working, without throwing an exception, but it is in some 
corrupted state, then there is no way for Flink to know that anything has 
broken. 
If it crash with some segfault, whole TaskManager will crash and that should be 
detected by Akka.

Piotrek

> On Sep 29, 2017, at 3:05 PM, yunfan123  wrote:
> 
> So my question is if this thread crash without throw any Exception.
> It seems flink can't handle this state.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: How flink monitor source stream task(Time Trigger) is running?

2017-09-29 Thread Piotr Nowojski
Any exception thrown by your SourceFunction will be caught by Flink and that 
will mark a task (that was executing this SourceFuntion) as failed.

If you started some custom threads in your SourceFunction, you have to manually 
propagate their exceptions to the SourceFunction.

Piotrek

> On Sep 29, 2017, at 2:09 PM, yunfan123  wrote:
> 
> My source stream means the funciton implement the
> org.apache.flink.streaming.api.functions.source.SourceFunction.
> My question is how flink know all working thread is alive?
> If one working thread that execute the SourceFunction crash, how flink know
> this happenned? 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: How flink monitor source stream task(Time Trigger) is running?

2017-09-29 Thread Piotr Nowojski
We use Akka's DeathWatch mechanism to detect dead components.

TaskManager failure shouldn’t prevent recovering from state (as long as there 
are enough task slots).

I’m not sure if I understand what you mean by "source stream thread" crash. If 
is was some error during performing a checkpoint so that it didn’t complete, 
Flink will not be able to recover from such incomplete checkpoint.

Could you share us the logs with your issue?

Thanks, Piotrek

> On Sep 29, 2017, at 7:30 AM, yunfan123  wrote:
> 
> In my understanding, flink just use task heartbeat to monitor taskManager is
> running.
> If source stream (Time Trigger for XXX)thread is crash, it seems flink can't
> recovery from this state?
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Bucketing/Rolling Sink: New timestamp appeded to the part file name everytime a new part file is rolled

2017-09-01 Thread Piotr Nowojski
Hi,

BucketingSink doesn’t support the feature that you are requesting, you can not 
specify a dynamically generated prefix/suffix.

Piotrek

> On Aug 31, 2017, at 7:12 PM, Raja.Aravapalli  
> wrote:
> 
>  
> Hi,
>  
> I have a flink application that is streaming data into HDFS and I am using 
> Bucketing Sink for that. And, I want to know if is it possible to rename the 
> part files that is being created in the base hdfs directory.
>  
> Right now I am using the below code for including the timestamp into 
> part-file name, but the problem I am facing is the timestamp is not changing 
> for the new part file that is being rolled!
>  
>  
> BucketingSink HdfsSink = new BucketingSink (hdfsOutputPath);
> 
> HdfsSink.setBucketer(new BasePathBucketer());
> HdfsSink.setBatchSize(1024 * 1024 * hdfsOutputBatchSizeInMB); // this means 
> 'hdfsOutputBatchSizeInMB' MB
> HdfsSink.setPartPrefix("PART-FILE-" + 
> Long.toString(System.currentTimeMillis()));
>  
>  
> Can someone please suggest me, what code changes I can try so that I get a 
> new timestamp for every part file that is being rolled new?
>  
>  
> Thanks a lot. 
>  
> Regards,
> Raja.



<    1   2   3   4   5   6   7   >