Re: Som question about Flink stream sql

2017-01-05 Thread Hongyuhong
Thanks Jark

Do you have any design documents or instructions about row window? We are 
interested in adding supports of window in streamsql side and it’s greate if 
can have some reference.
Thanks very much.

Best
Yuhong


发件人: Jark Wu [mailto:wuchong...@alibaba-inc.com]
发送时间: 2017年1月6日 11:01
收件人: user@flink.apache.org
主题: Re: Som question about Flink stream sql

Hi Yuhong,

I have assigned an issue for tumble row-window, but the progress is still under 
design and discuss. The implementation of row window is more complex than group 
window.
I will push forward this issue in the next days.

- Jark Wu

在 2017年1月5日,下午7:00,Hongyuhong 
> 写道:

Hi Fabian,

Thanks for the reply.
As you noticed, row windows are already supported by Calcite and FLIP-11 has 
planned,
Can you tell something about the progress of the row windows in Table API?

Regards.
Yuhong




发件人: Fabian Hueske [mailto:fhue...@gmail.com]
发送时间: 2017年1月5日 17:43
收件人: user@flink.apache.org
主题: Re: Som question about Flink stream sql

Hi Yuhong,
as you noticed, FLIP-11 is about the window operations on the Table API and 
does not include SQL.
The reason is that the Table API is completely Flink domain, i.e., we can 
design and implement the API. For SQL we have a dependency on Calcite.
You are right, that Calcite's JIRA issue for group windows [1] seems to be 
stale. I don't have more information on that. You could ping there and ask what 
the status is.
Row windows are already supported by Calcite (SQL's OVER and WINDOW clauses), 
so these could be implemented for SQL as well.
Maybe it's even easier to start from the SQL side and add the Table API later 
to ensure a compatible compilation process.
Best, Fabian


2017-01-05 4:40 GMT+01:00 Hongyuhong 
>:
Hi,
We are currently exploring on Flink streamsql ,
And I see the group-window has been implemented in Table API, and row-window is 
also planning in FLIP-11. It seems that row-window grammar is more similar to 
calcite over clause.
I’m curious about the detail plan and roadmap of stream sql, cause FLIP-11 just 
mentioned Table API.
And is that streamsql priority implement row-window? Or if group-window is 
considered, what is the status on calcite integration?
The issue on calcite jira was raised in August, what’s the status right now?
Thanks in advance!

Regards
Yuhong



re: Does Flink cluster security works in Flink 1.1.4 release?

2017-01-05 Thread Zhangrucong
Hi Stephan:
  Thanks for your reply.
You mean the CLI、JM、TM、WebUI  have supported Kerberos authentication only in 
yarn cluster model in 1.1.x release?

发件人: Stephan Ewen [mailto:se...@apache.org]
发送时间: 2017年1月4日 17:23
收件人: user@flink.apache.org
主题: Re: Does Flink cluster security works in Flink 1.1.4 release?

Hi!

Flink 1.1.x supports Kerberos for Hadoop (HDFS, YARN, HBase) via Hadoop's 
ticket system. It should work via kinit, in the same way when submitting a 
secure MapReduce job.

Kerberos for ZooKeeper, Kafka, etc, is only part of the 1.2 release.

Greetings,
Stephan


On Wed, Jan 4, 2017 at 7:25 AM, Zhangrucong 
> wrote:
Hi:
 Now I use Flink 1.1.4 release in standalone cluster model. I want to do 
the Kerberos authentication between Flink CLI and the Jobmanager. But in the 
flink-conf.yaml, there is no Flink cluster security configuration.
Does the Kerberos authentication works in Flink 1.1.4 release?
Thanks in advance!




Re: Som question about Flink stream sql

2017-01-05 Thread Jark Wu
Hi Yuhong, 

I have assigned an issue for tumble row-window, but the progress is still under 
design and discuss. The implementation of row window is more complex than group 
window.
I will push forward this issue in the next days.

- Jark Wu 

> 在 2017年1月5日,下午7:00,Hongyuhong  写道:
> 
> Hi Fabian,
>  
> Thanks for the reply.
> As you noticed, row windows are already supported by Calcite and FLIP-11 has 
> planned,
> Can you tell something about the progress of the row windows in Table API?
>  
> Regards.
> Yuhong
>  
>  
>  
>  
> 发件人: Fabian Hueske [mailto:fhue...@gmail.com] 
> 发送时间: 2017年1月5日 17:43
> 收件人: user@flink.apache.org
> 主题: Re: Som question about Flink stream sql
>  
> Hi Yuhong,
> 
> as you noticed, FLIP-11 is about the window operations on the Table API and 
> does not include SQL.
> The reason is that the Table API is completely Flink domain, i.e., we can 
> design and implement the API. For SQL we have a dependency on Calcite.
> 
> You are right, that Calcite's JIRA issue for group windows [1] seems to be 
> stale. I don't have more information on that. You could ping there and ask 
> what the status is.
> Row windows are already supported by Calcite (SQL's OVER and WINDOW clauses), 
> so these could be implemented for SQL as well.
> Maybe it's even easier to start from the SQL side and add the Table API later 
> to ensure a compatible compilation process.
> 
> Best, Fabian
>  
> 
>  
> 2017-01-05 4:40 GMT+01:00 Hongyuhong  >:
> Hi,
> We are currently exploring on Flink streamsql ,
> And I see the group-window has been implemented in Table API, and row-window 
> is also planning in FLIP-11. It seems that row-window grammar is more similar 
> to calcite over clause.
> I’m curious about the detail plan and roadmap of stream sql, cause FLIP-11 
> just mentioned Table API.
> And is that streamsql priority implement row-window? Or if group-window is 
> considered, what is the status on calcite integration?
> The issue on calcite jira was raised in August, what’s the status right now?
> Thanks in advance!
>  
> Regards
> Yuhong



Re: Regarding ordering of events

2017-01-05 Thread Kostas Kloudas
Hi Abdul,

Every window is handled by a single machine, if this is what you mean by 
“partition”.

Kostas

> On Jan 5, 2017, at 9:21 PM, Abdul Salam Shaikh  
> wrote:
> 
> Thanks Fabian and Kostas, 
> 
> How can I put to use the power of flink as a distributed system ? 
> 
> In cases where we have multiple windows, is one single window handled by one 
> partition entirely or is it spread across several partitions ? 
> 
> On Thu, Jan 5, 2017 at 12:25 PM, Fabian Hueske  > wrote:
> Flink is a distributed system and does not preserve order across partitions.
> The number prefix (e.g., 1>, 2>, ...) tells you the parallel instance of the 
> printing operator.
> 
> You can set the parallelism to 1 to have the stream in order.
> 
> Fabian
> 
> 2017-01-05 12:16 GMT+01:00 Kostas Kloudas  >:
> Hi Abdul,
> 
> Flink provides no ordering guarantees on the elements within a window.
> The only “order” it guarantees is that the results referring to window-1 are
> going to be emitted before those of window-2 (assuming that window-1 precedes 
> window-2).
> 
> Thanks,
> Kostas
> 
>> On Jan 5, 2017, at 11:57 AM, Abdul Salam Shaikh > > wrote:
>> 
>> Hi, 
>> 
>> I am using a JSON file as the source for the streaming (in the ascending 
>> order of the field Umlaufsekunde)which has events as follows: 
>> 
>> {"event":[{"Umlaufsekunde":115}]}
>> {"event":[{"Umlaufsekunde":135}]}
>> {"event":[{"Umlaufsekunde":135}]}
>> {"event":[{"Umlaufsekunde":145}]}
>> {"event":[{"Umlaufsekunde":155}]}
>> {"event":[{"Umlaufsekunde":155}]}
>> {"event":[{"Umlaufsekunde":185}]}
>> {"event":[{"Umlaufsekunde":195}]}
>> {"event":[{"Umlaufsekunde":195}]}
>> {"event":[{"Umlaufsekunde":205}]}
>> {"event":[{"Umlaufsekunde":245}]}
>> 
>> However, when I try to print the stream, it is unordered as given below: 
>> 1> (115,null,1483517983252,1190)  -- The first value indicating Umlaufsekunde
>> 2> (135,null,1483517984877,1190)
>> 2> (155,null,1483517986861,1190)
>> 4> (145,null,1483517985752,1190)
>> 3> (135,null,1483517985424,1190)
>> 4> (195,null,1483517990736,1190)
>> 4> (255,null,1483517997424,1190)
>> 2> (205,null,1483517991518,1190)
>> 2> (275,null,1483517999330,1190)
>> 2> (385,null,1483518865371,1190)
>> 2> (395,null,1483518866840,1190)
>> 1> (155,null,1483517986533,1190)
>> 4> (285,null,1483518000189,1190)
>> 4> (395,null,1483518866231,1190)
>> 
>> I have also tried using the Timestamps and Watermarks but no luck as 
>> follows: 
>> 
>> public class TimestampExtractor implements 
>> AssignerWithPeriodicWatermarks>{
>> 
>> private long currentMaxTimestamp;
>> 
>> @Override
>> public Watermark getCurrentWatermark() {
>> return new Watermark(currentMaxTimestamp);   
>> }
>> 
>> @Override
>> public long extractTimestamp(Tuple5 element, long 
>> previousElementTimestamp) {
>> long timestamp = element.getField(1);
>> currentMaxTimestamp = timestamp;
>> return currentMaxTimestamp;
>>   }
>> 
>> }
>> 
>> Could anyone suggest how do I handle this problem for the arrival of events 
>> in order ? 
>> 
>> ​Thanks!​
>> 
>> 
> 
> 
> 
> 
> 
> -- 
> Thanks & Regards,
> 
> Abdul Salam Shaikh
> 



Re: Regarding ordering of events

2017-01-05 Thread Abdul Salam Shaikh
Thanks Fabian and Kostas,

How can I put to use the power of flink as a distributed system ?

In cases where we have multiple windows, is one single window handled by
one partition entirely or is it spread across several partitions ?

On Thu, Jan 5, 2017 at 12:25 PM, Fabian Hueske  wrote:

> Flink is a distributed system and does not preserve order across
> partitions.
> The number prefix (e.g., 1>, 2>, ...) tells you the parallel instance of
> the printing operator.
>
> You can set the parallelism to 1 to have the stream in order.
>
> Fabian
>
> 2017-01-05 12:16 GMT+01:00 Kostas Kloudas :
>
>> Hi Abdul,
>>
>> Flink provides no ordering guarantees on the elements within a window.
>> The only “order” it guarantees is that the results referring to window-1
>> are
>> going to be emitted before those of window-2 (assuming that window-1
>> precedes window-2).
>>
>> Thanks,
>> Kostas
>>
>> On Jan 5, 2017, at 11:57 AM, Abdul Salam Shaikh <
>> abd.salam.sha...@gmail.com> wrote:
>>
>> Hi,
>>
>> I am using a JSON file as the source for the streaming (in the ascending
>> order of the field Umlaufsekunde)which has events as follows:
>>
>> {"event":[{"*Umlaufsekunde*":115}]}
>> {"event":[{"*Umlaufsekunde*":135}]}
>> {"event":[{"*Umlaufsekunde*":135}]}
>> {"event":[{"*Umlaufsekunde*":145}]}
>> {"event":[{"*Umlaufsekunde*":155}]}
>> {"event":[{"*Umlaufsekunde*":155}]}
>> {"event":[{"*Umlaufsekunde*":185}]}
>> {"event":[{"*Umlaufsekunde*":195}]}
>> {"event":[{"*Umlaufsekunde*":195}]}
>> {"event":[{"*Umlaufsekunde*":205}]}
>> {"event":[{"*Umlaufsekunde*":245}]}
>>
>> However, when I try to print the stream, it is unordered as given below:
>> 1> (*115*,null,1483517983252,1190)  -- The first value indicating
>> Umlaufsekunde
>> 2> (135,null,1483517984877,1190)
>> 2> (155,null,1483517986861,1190)
>> 4> (145,null,1483517985752,1190)
>> 3> (135,null,1483517985424,1190)
>> 4> (195,null,1483517990736,1190)
>> 4> (255,null,1483517997424,1190)
>> 2> (205,null,1483517991518,1190)
>> 2> (275,null,1483517999330,1190)
>> 2> (385,null,1483518865371,1190)
>> 2> (395,null,1483518866840,1190)
>> 1> (155,null,1483517986533,1190)
>> 4> (285,null,1483518000189,1190)
>> 4> (395,null,1483518866231,1190)
>>
>> I have also tried using the Timestamps and Watermarks but no luck as
>> follows:
>>
>> public class TimestampExtractor implements 
>> AssignerWithPeriodicWatermarks> Long, List, Long, Long>>{
>>
>> private long currentMaxTimestamp;
>>
>> @Override
>> public Watermark getCurrentWatermark() {
>> return new Watermark(currentMaxTimestamp);
>> }
>>
>> @Override
>> public long extractTimestamp(Tuple5 element, long
>> previousElementTimestamp) {
>> long timestamp = element.getField(1);
>> currentMaxTimestamp = timestamp;
>> return currentMaxTimestamp;
>>   }
>>
>> }
>>
>> Could anyone suggest how do I handle this problem for the arrival of
>> events in order ?
>>
>> ​Thanks!​
>>
>>
>>
>>
>


-- 
Thanks & Regards,

*Abdul Salam Shaikh*


failure-rate restart strategy not working?

2017-01-05 Thread Shannon Carey
I recently updated my cluster with the following config:

restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s

I see the settings inside the JobManager web UI, as expected. I am not setting 
the restart-strategy programmatically, but the job does have checkpointing 
enabled.

However, if I launch a job that (intentionally) fails every 10 seconds by 
throwing a RuntimeException, it continues to restart beyond the limit of 3 
failures.

Does anyone know why this might be happening? Any ideas of things I could check?

Thanks!
Shannon


Re: Speedup of Flink Applications

2017-01-05 Thread Hanna Prinz
Hey Fabian and Timur,

Thank you for your helpful answers.
Especially because I'm aware that there is no simple answer to that. And also, 
I've just started to work with flink, so I might have not understood everything 
yet :)

From the documentation on task scheduling, I assumed that the JobManger might 
be a bottleneck on extreme parallelization. As for the the skewness of the 
data: I suppose that's a problem every data processing framework on a large 
scale has and there is not much to do about it besides improving the 
partitioning where possible.

I will look into the configuration you mentioned @Fabian and might get back to 
you with further questions later.

Cheers
Hanna

> Am 05.01.2017 um 11:32 schrieb Fabian Hueske :
> 
> Hi Hanna,
> 
> I assume you are asking about the possible speed up of batch analysis 
> programs and not about streaming applications (please correct me if I'm 
> wrong).
> 
> Timur raised very good points about data size and skew.
> 
> Given evenly distributed data (no skewed key distribution for a grouping or 
> join operation) and sufficiently large data sets, Flink scales quite well.
> Flink uses by default pipelined shuffles. Depending on the parallelism, you 
> need to adapt the number of network buffers (see 
> taskmanager.network.numberOfBuffers [1]). When the required number of network 
> buffers becomes too large, you can switch to batched shuffles (set 
> ExecutionMode.BATCH on ExecutionConfig of ExecutionEnvironment).
> Apart from that, really large parallelism of complex programs might suffer 
> from the scheduling overhead of individual tasks.  Here the JobManager might 
> become a bottleneck when assigning tasks to workers so that there might be an 
> initial delay before a job starts processing.
> 
> If your data is skewed to too small, scaling out doesn't help because a 
> single worker will be busy working while all others are waiting for it or the 
> overhead of distributing the work becomes too large.
> 
> Hope this helps,
> Fabian
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/config.html#jobmanager-amp-taskmanager
> 
> 
> 2017-01-03 15:44 GMT+01:00 Timur Shenkao :
>> Hi, 
>> It seems your questions are too abstract & theoretical. The answer is : it 
>> depends on several factors. Skewness in data, data volume, reliability 
>> requirements, "fatness" of servers, whether one performs look-up in other 
>> data sources, etc.
>> The papers you mentioned mean the following: under concrete & specific 
>> conditions, researchers achieved their results. If they had changed some 
>> parameters slightly (increase network's throughput, for example, or change 
>> garbage collector's options) , the results would have been completely 
>> different.
>> 
>> 
>>> On Tuesday, January 3, 2017, Hanna Prinz  wrote:
>>> Happy new year everyone :)
>>> 
>>> I’m currently working on a paper about Flink. I already got some 
>>> recommendations on general papers with details about Flink, which helped me 
>>> a lot already. But now that I read them, I’m further interested is the 
>>> speedup capabilities, provided by the Flink Framework: How „far“ can it 
>>> scale efficiently?
>>> 
>>> Amdahls law states that a parallelization is only efficient as long as the 
>>> non-parallelizable part of the processing (time for the communication 
>>> between the nodes etc.) doesn’t „eat up“ the speed gains of parallelization 
>>> (= parallel slowdown). 
>>> Of course, the communication overhead is mostly caused by the 
>>> implementation, but the frameworks specific solution for the communication 
>>> between the nodes has a reasonable effect as well.
>>> 
>>> After studying these papers, it looks like, although Flinks performance is 
>>> better in many cases, the possible speedup is equal to the possible speedup 
>>> of Spark.
>>> 1. Spark versus Flink - Understanding Performance in Big Data Analytics 
>>> Frameworks | https://hal.inria.fr/hal-01347638/document
>>> 2. Big Data Analytics on Cray XC Series DataWarp using Hadoop, Spark and 
>>> Flink | 
>>> https://cug.org/proceedings/cug2016_proceedings/includes/files/pap141.pdf
>>> 3. Thrill - High-Performance Algorithmic Distributed Batch Data Processing 
>>> with C++ | 
>>> https://panthema.net/2016/0816-Thrill-High-Performance-Algorithmic-Distributed-Batch-Data-Processing-with-CPP/1608.05634v1.pdf
>>> 
>>> Does someone have …
>>> … more information (or data) on speedup of Flink applications? 
>>> … experience (or data) with Flink in an extremely paralellized environment?
>>> … detailed information on how the nodes communicate, especially when they 
>>> are waiting for task results of one another?
>>> 
>>> Thank you very much for your time & answers!
>>> Hanna
> 


Kafka KeyedStream source

2017-01-05 Thread Niels Basjes
Hi,

In my scenario I have click stream data that I persist in Kafka.
I use the sessionId as the key to instruct Kafka to put everything with the
same sessionId into the same Kafka partition. That way I already have all
events of a visitor in a single kafka partition in a fixed order.

When I read this data into Flink I get a generic data stream ontop of which
I have to do a keyBy before my processing can continue. Such a keyBy will
redistribute the data again to later tasks that can do the actual work.

Is it possible to create an adapted version of the Kafka source that
immediately produces a keyed data stream?


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


Re: Consistency guarantees on multiple sinks

2017-01-05 Thread Stephan Ewen
Extending on what Paris said:

If you have an exactly-once sink (like the Rolling/Bucketing file sink or
the Cassandra write-ahead sink), then all of them are correctly adjusted to
preserve the exactly once semantics. That is regardless or one, two, or n
sinks.

On Thu, Jan 5, 2017 at 2:47 PM, Paris Carbone  wrote:

> Hi Nancy,
>
> Flink’s vanilla rollback recovery mechanism restarts computation from a
> global checkpoint thus sink duplicates (job output) can occur no matter how
> many sinks are declared;  the whole computation in the failed execution
> graph will roll back.
>
> cheers
> Paris
>
>
> > On 5 Jan 2017, at 14:24, Nancy Estrada 
> wrote:
> >
> > Hi,
> >
> > If in a Job there is more than one sink declared, what happens when a
> > failure occurs? all the sink operations get aborted? (atomically as in a
> > transactional environment), or the exactly-once-processing consistency
> > guarantees are provided just when one sink is declared per job? Is it
> > recommended to have more than one sink per job?
> >
> > Thank you!
> > Nancy Estrada
> >
> >
> >
> > --
> > View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Consistency-
> guarantees-on-multiple-sinks-tp10877.html
> > Sent from the Apache Flink User Mailing List archive. mailing list
> archive at Nabble.com.
>
>


Re: Sequential/ordered map

2017-01-05 Thread Fabian Hueske
Please avoid collecting the data to the client using collect(). This
operation looks convenient but is only meant for super small data and would
be a lot slower and less robust even if it would work for large data sets.
Rather set the parallelism of the operator to 1.

Fabian

2017-01-05 13:18 GMT+01:00 Sebastian Neef :

> Hi Chesnay,
>
> thanks for the input. Finding a word's first occurrence is part of the
> algorithm.
>
> To be exact I'm trying to implement Adler's Text authorship tracking in
> flink (http://www2007.org/papers/paper692.pdf, page 266).
>
> Thanks,
> Sebastian
>


Re: High virtual memory usage

2017-01-05 Thread Stephan Ewen
Happy to hear that!



On Thu, Jan 5, 2017 at 1:34 PM, Paulo Cezar  wrote:

> Hi Stephan, thanks for your support.
>
> I was able to track the problem a few days ago. Unirest was the one to
> blame, I was using it on some mapfuncionts to connect to external services
> and for some reason it was using insane amounts of virtual memory.
>
> Paulo Cezar
>
> On Mon, Dec 19, 2016 at 11:30 AM Stephan Ewen  wrote:
>
>> Hi Paulo!
>>
>> Hmm, interesting. The high discrepancy between virtual and physical
>> memory usually means that the process either maps large files into memory,
>> or that it pre-allocates a lot of memory without immediately using it.
>> Neither of these things are done by Flink.
>>
>> Could this be an effect of either the Docker environment (mapping certain
>> kernel spaces / libraries / whatever) or a result of one of the libraries
>> (gRPC or so)?
>>
>> Stephan
>>
>>
>> On Mon, Dec 19, 2016 at 12:32 PM, Paulo Cezar 
>> wrote:
>>
>>   - Are you using RocksDB?
>>
>> No.
>>
>>
>>   - What is your flink configuration, especially around memory settings?
>>
>> I'm using default config with 2GB for jobmanager and 5GB for
>> taskmanagers. I'm starting flink via "./bin/yarn-session.sh -d -n 5 -jm
>> 2048 -tm 5120 -s 4 -nm 'Flink'"
>>
>>   - What do you use for TaskManager heap size? Any manual value, or do
>> you let Flink/Yarn set it automatically based on container size?
>>
>> No manual values here. YARN config is pretty much default with maximum
>> allocation of 12GB of physical memory and ratio between virtual memory to
>> physical memory 2.1 (via yarn.nodemanager.vmem-pmem-ratio).
>>
>>
>>   - Do you use any libraries or connectors in your program?
>>
>> I'm using  flink-connector-kafka-0.10_2.11, a MongoDB client, a gRPC
>> client and some http libraries like unirest and Apache HttpClient.
>>
>>   - Also, can you tell us what OS you are running on?
>>
>> My YARN cluster runs on Docker containers (docker version 1.12) with
>> images based on Ubuntu 14.04. Host OS is Ubuntu 14.04.4 LTS (GNU/Linux
>> 3.19.0-65-generic x86_64).
>>
>>
>>


Re: Rapidly failing job eventually causes "Not enough free slots"

2017-01-05 Thread Stephan Ewen
Another thought on the container failure:

in 1.1, the user code is loaded dynamically whenever a Task is started.
That means that on every task restart the code is reloaded. For that to
work proper, class unloading needs to happen, or the permgen will
eventually overflow.

It can happen that class unloading is prevented if the user functions do
leave references around as "GC roots", which may be threads, or references
in registries, etc.

In Flink 1.2, YARN will put the user code into the application classpath,
so code needs not be reloaded on every restart. That should solve that
issue.
To "simulate" that behavior in Flink 1.1, put your application code jars
into the "lib" folder

Best,
Stephan


On Thu, Jan 5, 2017 at 1:15 PM, Yury Ruchin  wrote:

> Hi,
>
> I've faced a similar issue recently. Hope sharing my findings will help.
> The problem can be split into 2 parts:
>
> *Source of container failures*
> The logs you provided indicate that YARN kills its containers for
> exceeding memory limits. Important point here is that memory limit = JVM
> heap memory + off-heap memory. So if off-heap memory usage is high, YARN
> may kill containers despite JVM heap consumption is fine. To solve this
> issue, Flink reserves a share of container memory for off-heap memory. How
> much will be reserved is controlled by yarn.heap-cutoff-ratio and
> yarn.heap-cutoff-min configuration. By default 25% of the requested
> container memory will be reserved for off-heap. This is seems to be a good
> start, but one should experiment and tune to meet their job specifics.
>
> It's also worthwhile to figure out who consumes off-heap memory. Is it
> Flink managed memory moved off heap (taskmanager.memory.off-heap = true)?
> Is it some external library allocating something off heap? Is it your own
> code?
>
> *How Flink handles task manager failures*
> Whenever a task manager fails, the Flink jobmanager decides whether it
> should:
> - reallocate failed task manager container
> - fail application entirely
> These decisions can be guided by certain configuration (
> https://ci.apache.org/projects/flink/flink-docs-release-1.
> 1/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn). With default
> settings, job manager does reallocate task manager containers up to the
> point when N failures have been observed, where N is the number of
> requested task managers. After that the application is stopped.
>
> According to the logs, you have a finite number in yarn.maximum-failed-
> containers (11, as I can see from the logs - this may be set by Flink if
> not provided explicitly). On 12th container restart, jobmanager gives up
> and the application stops. I'm not sure why it keeps reporting not enough
> slots after that point. In my experience this may happen when job eats up
> all the available slots, so that after container failure its tasks cannot
> be restarted in other (live) containers. But I believe once the decision to
> stop the application is made, there should not be any further attempts to
> restart the job, hence no logs like those. Hopefully, someone else will
> explain this to us :)
>
> In my case I made jobmanager restart containers infinitely by setting 
> yarn.maximum-failed-containers
> = -1, so that taskmanager failure never results in application
> death. Note this is unlikely a good choice for a batch job.
>
> Regards,
> Yury
>
> 2017-01-05 3:21 GMT+03:00 Shannon Carey :
>
>> In Flink 1.1.3 on emr-5.2.0, I've experienced a particular problem twice
>> and I'm wondering if anyone has some insight about it.
>>
>> In both cases, we deployed a job that fails very frequently (within
>> 15s-1m of launch). Eventually, the Flink cluster dies.
>>
>> The sequence of events looks something like this:
>>
>>- bad job is launched
>>- bad job fails & is restarted many times (I didn't have the
>>"failure-rate" restart strategy configuration right)
>>- Task manager logs: org.apache.flink.yarn.YarnTaskManagerRunner
>>(SIGTERM handler): RECEIVED SIGNAL 15: SIGTERM. Shutting down as 
>> requested.
>>- At this point, the YARN resource manager also logs the container
>>failure
>>- More logs: Container ResourceID{resourceId='contain
>>er_1481658997383_0003_01_13'} failed. Exit status: Pmem limit
>>exceeded (-104)
>>- Diagnostics for container ResourceID{resourceId='contain
>>er_1481658997383_0003_01_13'} in state COMPLETE : exitStatus=Pmem
>>limit exceeded (-104) diagnostics=Container [pid=21246,containerID=contain
>>er_1481658997383_0003_01_13] is running beyond physical memory
>>limits. Current usage: 5.6 GB of 5.6 GB physical memory used; 9.6 GB of
>>28.1 GB virtual memory used. Killing container.
>>Container killed on request. Exit code is 143
>>Container exited with a non-zero exit code 143
>>Total number of failed containers so far: 12
>>Stopping YARN session because the number of failed 

Re: Consistency guarantees on multiple sinks

2017-01-05 Thread Paris Carbone
Hi Nancy,

Flink’s vanilla rollback recovery mechanism restarts computation from a global 
checkpoint thus sink duplicates (job output) can occur no matter how many sinks 
are declared;  the whole computation in the failed execution graph will roll 
back.

cheers
Paris


> On 5 Jan 2017, at 14:24, Nancy Estrada  wrote:
> 
> Hi,
> 
> If in a Job there is more than one sink declared, what happens when a
> failure occurs? all the sink operations get aborted? (atomically as in a
> transactional environment), or the exactly-once-processing consistency
> guarantees are provided just when one sink is declared per job? Is it
> recommended to have more than one sink per job?
> 
> Thank you!
> Nancy Estrada
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Consistency-guarantees-on-multiple-sinks-tp10877.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Consistency guarantees on multiple sinks

2017-01-05 Thread Nancy Estrada
Hi,
 
If in a Job there is more than one sink declared, what happens when a
failure occurs? all the sink operations get aborted? (atomically as in a
transactional environment), or the exactly-once-processing consistency
guarantees are provided just when one sink is declared per job? Is it
recommended to have more than one sink per job?

Thank you!
Nancy Estrada



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Consistency-guarantees-on-multiple-sinks-tp10877.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Running into memory issues while running on Yarn

2017-01-05 Thread Yury Ruchin
Hi,

You containers got killed by YARN for exceeding virtual memory limits. For
some reason your container intensively allocate virtual memory while having
free physical memory.

There are some gotchas regarding such issue on CentOS, caused by
OS-specific aggressive virtual memory allocation: [1], [2]. They disable
YARN virtual memory checker to work around that.

Also in this mailing list people recently reported that high virtual memory
consumption may be caused by some libraries.

Links:
[1]
http://blog.cloudera.com/blog/2014/04/apache-hadoop-yarn-avoiding-6-time-consuming-gotchas/,
section "Killing of Tasks Due to Virtual Memory Usage"
[2] https://www.mapr.com/blog/best-practices-yarn-resource-management,
section "3. Virtual/physical memory checker".

Regards,
Yury

2017-01-05 11:54 GMT+03:00 Sachin Goel :

> Hey!
>
> I'm running locally under this configuration(copied from nodemanager logs):
> physical-memory=8192 virtual-memory=17204 virtual-cores=8
>
> Before starting a flink deployment, memory usage stats show 3.7 GB used on
> system, indicating lots of free memory for flink containers.
> However, after I submit using minimal resource requirements,
> ./yarn-session.sh -n 1 -tm 768, the cluster deploys successfully but then
> every application on system receives a sigterm and it basically kills the
> current user session, logging out of the system.
>
> The job manager and task manager logs contain just the information that a
> SIGTERM was received and shut down gracefully.
> All yarn and dfs process contain the log information showing the receipt
> of a sigterm.
>
> Here's the relevant log from nodemanager:
>
> 2017-01-05 17:00:06,089 INFO 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl:
>  Container container_1483603191971_0002_01_02 transitioned from LOCALIZED 
> to RUNNING
> 2017-01-05 17:00:06,092 INFO 
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: 
> launchContainer: [bash, 
> /opt/hadoop-2.7.3/tmp/nm-local-dir/usercache/kirk/appcache/application_1483603191971_0002/container_1483603191971_0002_01_02/default_container_executor.sh]
> 2017-01-05 17:00:08,731 INFO 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
>  Starting resource-monitoring for container_1483603191971_0002_01_02
> 2017-01-05 17:00:08,744 INFO 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
>  Memory usage of ProcessTree 17872 for container-id 
> container_1483603191971_0002_01_01: 282.7 MB of 1 GB physical memory 
> used; 2.1 GB of 2.1 GB virtual memory used
> 2017-01-05 17:00:08,744 WARN 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
>  Process tree for container: container_1483603191971_0002_01_01 has 
> processes older than 1 iteration running over the configured limit. 
> Limit=2254857728, current usage = 2255896576
> 2017-01-05 17:00:08,745 WARN 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
>  Container [pid=17872,containerID=container_1483603191971_0002_01_01] is 
> running beyond virtual memory limits. Current usage: 282.7 MB of 1 GB 
> physical memory used; 2.1 GB of 2.1 GB virtual memory used. Killing container.
> Dump of the process-tree for container_1483603191971_0002_01_01 :
>   |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) 
> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
>   |- 17872 17870 17872 17872 (bash) 0 0 21409792 812 /bin/bash -c 
> /usr/lib/jvm/java-8-openjdk-amd64//bin/java -Xmx424M  
> -Dlog.file=/opt/hadoop-2.7.3/logs/userlogs/application_1483603191971_0002/container_1483603191971_0002_01_01/jobmanager.log
>  -Dlogback.configurationFile=file:logback.xml 
> -Dlog4j.configuration=file:log4j.properties 
> org.apache.flink.yarn.YarnApplicationMasterRunner  
> 1>/opt/hadoop-2.7.3/logs/userlogs/application_1483603191971_0002/container_1483603191971_0002_01_01/jobmanager.out
>  
> 2>/opt/hadoop-2.7.3/logs/userlogs/application_1483603191971_0002/container_1483603191971_0002_01_01/jobmanager.err
>   |- 17879 17872 17872 17872 (java) 748 20 2234486784 71553 
> /usr/lib/jvm/java-8-openjdk-amd64//bin/java -Xmx424M 
> -Dlog.file=/opt/hadoop-2.7.3/logs/userlogs/application_1483603191971_0002/container_1483603191971_0002_01_01/jobmanager.log
>  -Dlogback.configurationFile=file:logback.xml 
> -Dlog4j.configuration=file:log4j.properties 
> org.apache.flink.yarn.YarnApplicationMasterRunner
>
> 2017-01-05 17:00:08,745 INFO 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
>  Removed ProcessTree with root 17872
> 2017-01-05 17:00:08,746 INFO 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl:
>  Container container_1483603191971_0002_01_01 transitioned from RUNNING 
> to KILLING
> 

Increasing parallelism skews/increases overall job processing time linearly

2017-01-05 Thread Chakravarthy varaga
Hi All,

I have a job as attached.

I have a 16 Core blade running RHEL 7. The taskmanager default number of
slots is set to 1. The source is a kafka stream and each of the 2
sources(topic) have 2 partitions each.


*What I notice is that when I deploy a job to run with #parallelism=2 the
total processing time doubles the time it took when the same job was
deployed with #parallelism=1. It linearly increases with the parallelism.*
Since the numberof slots is set to 1 per TM, I would assume that the job
would be processed in parallel in 2 different TMs and that each consumer in
each TM is connected to 1 partition of the topic. This therefore should
have kept the overall processing time the same or less !!!

The co-flatmap connects the 2 streams & uses ValueState (checkpointed in
FS). I think this is distributed among the TMs. My understanding is that
the search of values state could be costly between TMs.  Do you sense
something wrong here?

Best Regards
CVP


Re: High virtual memory usage

2017-01-05 Thread Paulo Cezar
Hi Stephan, thanks for your support.

I was able to track the problem a few days ago. Unirest was the one to
blame, I was using it on some mapfuncionts to connect to external services
and for some reason it was using insane amounts of virtual memory.

Paulo Cezar

On Mon, Dec 19, 2016 at 11:30 AM Stephan Ewen  wrote:

> Hi Paulo!
>
> Hmm, interesting. The high discrepancy between virtual and physical memory
> usually means that the process either maps large files into memory, or that
> it pre-allocates a lot of memory without immediately using it.
> Neither of these things are done by Flink.
>
> Could this be an effect of either the Docker environment (mapping certain
> kernel spaces / libraries / whatever) or a result of one of the libraries
> (gRPC or so)?
>
> Stephan
>
>
> On Mon, Dec 19, 2016 at 12:32 PM, Paulo Cezar 
> wrote:
>
>   - Are you using RocksDB?
>
> No.
>
>
>   - What is your flink configuration, especially around memory settings?
>
> I'm using default config with 2GB for jobmanager and 5GB for taskmanagers.
> I'm starting flink via "./bin/yarn-session.sh -d -n 5 -jm 2048 -tm 5120 -s
> 4 -nm 'Flink'"
>
>   - What do you use for TaskManager heap size? Any manual value, or do you
> let Flink/Yarn set it automatically based on container size?
>
> No manual values here. YARN config is pretty much default with maximum
> allocation of 12GB of physical memory and ratio between virtual memory to
> physical memory 2.1 (via yarn.nodemanager.vmem-pmem-ratio).
>
>
>   - Do you use any libraries or connectors in your program?
>
> I'm using  flink-connector-kafka-0.10_2.11, a MongoDB client, a gRPC
> client and some http libraries like unirest and Apache HttpClient.
>
>   - Also, can you tell us what OS you are running on?
>
> My YARN cluster runs on Docker containers (docker version 1.12) with
> images based on Ubuntu 14.04. Host OS is Ubuntu 14.04.4 LTS (GNU/Linux
> 3.19.0-65-generic x86_64).
>
>
>


Re: Sequential/ordered map

2017-01-05 Thread Sebastian Neef
Hi Chesnay,

thanks for the input. Finding a word's first occurrence is part of the
algorithm.

To be exact I'm trying to implement Adler's Text authorship tracking in
flink (http://www2007.org/papers/paper692.pdf, page 266).

Thanks,
Sebastian


Re: Rapidly failing job eventually causes "Not enough free slots"

2017-01-05 Thread Yury Ruchin
Hi,

I've faced a similar issue recently. Hope sharing my findings will help.
The problem can be split into 2 parts:

*Source of container failures*
The logs you provided indicate that YARN kills its containers for exceeding
memory limits. Important point here is that memory limit = JVM heap memory
+ off-heap memory. So if off-heap memory usage is high, YARN may kill
containers despite JVM heap consumption is fine. To solve this issue, Flink
reserves a share of container memory for off-heap memory. How much will be
reserved is controlled by yarn.heap-cutoff-ratio and
yarn.heap-cutoff-min configuration.
By default 25% of the requested container memory will be reserved for
off-heap. This is seems to be a good start, but one should experiment and
tune to meet their job specifics.

It's also worthwhile to figure out who consumes off-heap memory. Is it
Flink managed memory moved off heap (taskmanager.memory.off-heap = true)?
Is it some external library allocating something off heap? Is it your own
code?

*How Flink handles task manager failures*
Whenever a task manager fails, the Flink jobmanager decides whether it
should:
- reallocate failed task manager container
- fail application entirely
These decisions can be guided by certain configuration (
https://ci.apache.org/projects/flink/flink-docs-
release-1.1/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn). With
default settings, job manager does reallocate task manager containers up to
the point when N failures have been observed, where N is the number of
requested task managers. After that the application is stopped.

According to the logs, you have a finite number in
yarn.maximum-failed-containers (11, as I can see from the logs - this may
be set by Flink if not provided explicitly). On 12th container restart,
jobmanager gives up and the application stops. I'm not sure why it keeps
reporting not enough slots after that point. In my experience this may
happen when job eats up all the available slots, so that after container
failure its tasks cannot be restarted in other (live) containers. But I
believe once the decision to stop the application is made, there should not
be any further attempts to restart the job, hence no logs like those.
Hopefully, someone else will explain this to us :)

In my case I made jobmanager restart containers infinitely by setting
yarn.maximum-failed-containers
= -1, so that taskmanager failure never results in application death. Note
this is unlikely a good choice for a batch job.

Regards,
Yury

2017-01-05 3:21 GMT+03:00 Shannon Carey :

> In Flink 1.1.3 on emr-5.2.0, I've experienced a particular problem twice
> and I'm wondering if anyone has some insight about it.
>
> In both cases, we deployed a job that fails very frequently (within 15s-1m
> of launch). Eventually, the Flink cluster dies.
>
> The sequence of events looks something like this:
>
>- bad job is launched
>- bad job fails & is restarted many times (I didn't have the
>"failure-rate" restart strategy configuration right)
>- Task manager logs: org.apache.flink.yarn.YarnTaskManagerRunner
>(SIGTERM handler): RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
>- At this point, the YARN resource manager also logs the container
>failure
>- More logs: Container ResourceID{resourceId='
>container_1481658997383_0003_01_13'} failed. Exit status: Pmem
>limit exceeded (-104)
>- Diagnostics for container ResourceID{resourceId='
>container_1481658997383_0003_01_13'} in state COMPLETE :
>exitStatus=Pmem limit exceeded (-104) diagnostics=Container
>[pid=21246,containerID=container_1481658997383_0003_01_13] is
>running beyond physical memory limits. Current usage: 5.6 GB of 5.6 GB
>physical memory used; 9.6 GB of 28.1 GB virtual memory used. Killing
>container.
>Container killed on request. Exit code is 143
>Container exited with a non-zero exit code 143
>Total number of failed containers so far: 12
>Stopping YARN session because the number of failed containers (12)
>exceeded the maximum failed containers (11). This number is controlled by
>the 'yarn.maximum-failed-containers' configuration setting. By default
>its the number of requested containers.
>- From here onward, the logs repeatedly show that jobs fail to restart
>due to 
> "org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>Not enough free slots available to run the job. You can decrease the
>operator parallelism or increase the number of slots per TaskManager in the
>configuration. Task to schedule: < Attempt #68 (Source: …) @ (unassigned) -
>[SCHEDULED] > with groupID < 73191c171abfff61fb5102c161274145 > in
>sharing group < SlotSharingGroup [73191c171abfff61fb5102c161274145,
>19596f7834805c8409c419f0edab1f1b] >. Resources available to scheduler:
>Number of instances=0, total number of slots=0, available slots=0"
>- 

Re: Sequential/ordered map

2017-01-05 Thread Chesnay Schepler
So given an ordered list of texts, for each word find the earliest text 
it appears in?


As Kostas said, when splitting the text into words wrap them in a Tuple2 
containing the word

and text index and group them by the word.

As far as i can tell the next step would be a simple reduce that finds 
the smallest

index; for this there is a convenience minBy() transformation.

Regards,
Chesnay

On 05.01.2017 12:25, Kostas Kloudas wrote:

Hi Sebastian,

If T_1 must be processed before T_i, i>1, then you cannot parallelize the 
algorithm.

If this is not a restriction, then you could;
1) split the text in words and also attach the id of the text they appear in,
2) do a groupBy that will send all the same words to the same node,
3) keep a “per-word” state with the word and the index of the text,
4) when a new word arrives you should check if the word already exists in the 
state.

Regards,
Kostas


On Jan 5, 2017, at 11:51 AM, Sebastian Neef  
wrote:

Hello,

I'd like to implement an algorithm which doesn't really look
parallelizable to me, but maybe there's a way around it:

In general the algorithm looks like this:

1. Take a list of texts T_1 ... T_n
2. For every text T_i (i > 1) do
2.1: Split text into a list of words W_1 ... W_m
2.2: For every word W_j do:
2.2.1.: Check if word already existed in a prior text T_k ( i > k )
2.2.2.: If so, mark word W_j with k
2.2.3.: Else mark word W_j with i
3. Do something with texts based on marked words...

I have a DataSet with all texts T_1...T_n.

As far as I understand, I cannot simply .map(...) the DataSet, because
T_i's calculation is based on previous results (i.e. T_(i-1)).

My current solution would be to set the  parallelism to 1.

- Is there an elegant way to parallelize this algorithm?
- Does setting parallelism=1 guarantee a specific order of the DataSet?
- Is there a way to check if an element exists in a DataSet? E.g.
DataSet<>.contains(elem)?

Best regards,
Sebastian






Re: Sequential/ordered map

2017-01-05 Thread Sebastian Neef
Hi Kostas,

thanks for the quick reply.

> If T_1 must be processed before T_i, i>1, then you cannot parallelize the 
> algorithm.

What would be the best way to process it anyway?

DataSet.collect() -> loop over List -> env.fromCollection(...) ?
Or with a parallelism of 1 and a .map(...) ?

However, this approach would collect all data at one node and wouldn't
scale, correct?

Regards,
Sebastian


Re: Regarding ordering of events

2017-01-05 Thread Fabian Hueske
Flink is a distributed system and does not preserve order across partitions.
The number prefix (e.g., 1>, 2>, ...) tells you the parallel instance of
the printing operator.

You can set the parallelism to 1 to have the stream in order.

Fabian

2017-01-05 12:16 GMT+01:00 Kostas Kloudas :

> Hi Abdul,
>
> Flink provides no ordering guarantees on the elements within a window.
> The only “order” it guarantees is that the results referring to window-1
> are
> going to be emitted before those of window-2 (assuming that window-1
> precedes window-2).
>
> Thanks,
> Kostas
>
> On Jan 5, 2017, at 11:57 AM, Abdul Salam Shaikh <
> abd.salam.sha...@gmail.com> wrote:
>
> Hi,
>
> I am using a JSON file as the source for the streaming (in the ascending
> order of the field Umlaufsekunde)which has events as follows:
>
> {"event":[{"*Umlaufsekunde*":115}]}
> {"event":[{"*Umlaufsekunde*":135}]}
> {"event":[{"*Umlaufsekunde*":135}]}
> {"event":[{"*Umlaufsekunde*":145}]}
> {"event":[{"*Umlaufsekunde*":155}]}
> {"event":[{"*Umlaufsekunde*":155}]}
> {"event":[{"*Umlaufsekunde*":185}]}
> {"event":[{"*Umlaufsekunde*":195}]}
> {"event":[{"*Umlaufsekunde*":195}]}
> {"event":[{"*Umlaufsekunde*":205}]}
> {"event":[{"*Umlaufsekunde*":245}]}
>
> However, when I try to print the stream, it is unordered as given below:
> 1> (*115*,null,1483517983252,1190)  -- The first value indicating
> Umlaufsekunde
> 2> (135,null,1483517984877,1190)
> 2> (155,null,1483517986861,1190)
> 4> (145,null,1483517985752,1190)
> 3> (135,null,1483517985424,1190)
> 4> (195,null,1483517990736,1190)
> 4> (255,null,1483517997424,1190)
> 2> (205,null,1483517991518,1190)
> 2> (275,null,1483517999330,1190)
> 2> (385,null,1483518865371,1190)
> 2> (395,null,1483518866840,1190)
> 1> (155,null,1483517986533,1190)
> 4> (285,null,1483518000189,1190)
> 4> (395,null,1483518866231,1190)
>
> I have also tried using the Timestamps and Watermarks but no luck as
> follows:
>
> public class TimestampExtractor implements 
> AssignerWithPeriodicWatermarks Long, List, Long, Long>>{
>
> private long currentMaxTimestamp;
>
> @Override
> public Watermark getCurrentWatermark() {
> return new Watermark(currentMaxTimestamp);
> }
>
> @Override
> public long extractTimestamp(Tuple5 element, long
> previousElementTimestamp) {
> long timestamp = element.getField(1);
> currentMaxTimestamp = timestamp;
> return currentMaxTimestamp;
>   }
>
> }
>
> Could anyone suggest how do I handle this problem for the arrival of
> events in order ?
>
> ​Thanks!​
>
>
>
>


Re: Flink Checkpoint runs slow for low load stream

2017-01-05 Thread Chakravarthy varaga
BRILLIANT !!!

Checkpoint times are consistent with 1.1.4...

Thanks for your formidable support !

Best Regards
CVP

On Wed, Jan 4, 2017 at 5:33 PM, Fabian Hueske  wrote:

> Hi CVP,
>
> we recently release Flink 1.1.4, i.e., the next bugfix release of the
> 1.1.x series with major robustness improvements [1].
> You might want to give 1.1.4 a try as well.
>
> Best, Fabian
>
> [1] http://flink.apache.org/news/2016/12/21/release-1.1.4.html
>
> 2017-01-04 16:51 GMT+01:00 Chakravarthy varaga :
>
>> Hi Stephan, All,
>>
>>  I just got a chance to try if 1.1.3 fixes slow check pointing on FS
>> backend. It seemed to have been fixed. Thanks for the fix.
>>
>>  While testing this, with varying check point intervals, there seem
>> to be Spikes of slow checkpoints every 30/40 seconds for an interval of 15
>> secs. The check point time lasts for about 300 ms as apposed to 10/20 ms.
>>  Basically 15 secs seem to be the nominal value so far. anything
>> below this interval shoots the spikes too often. For us living with 15 sec
>> recovery is do-able and eventually catch up on recovery !
>>
>> Best Regards
>> CVP
>>
>> On Tue, Oct 4, 2016 at 6:20 PM, Chakravarthy varaga <
>> chakravarth...@gmail.com> wrote:
>>
>>> Thanks for your prompt response Stephan.
>>>
>>> I'd wait for Flink 1.1.3 !!!
>>>
>>> Best Regards
>>> Varaga
>>>
>>> On Tue, Oct 4, 2016 at 5:36 PM, Stephan Ewen  wrote:
>>>
 The plan to release 1.1.3 is asap ;-)

 Waiting for last backported patched to get in, then release testing and
 release.

 If you want to test it today, you would need to manually build the
 release-1.1 branch.

 Best,
 Stephan


 On Tue, Oct 4, 2016 at 5:46 PM, Chakravarthy varaga <
 chakravarth...@gmail.com> wrote:

> Hi Gordon,
>
>  Do I need to clone and build release-1.1 branch to test this?
>  I currently use flinlk 1.1.2 runtime. When is the plan to release
> it in 1.1.3?
>
> Best Regards
> Varaga
>
> On Tue, Oct 4, 2016 at 9:25 AM, Tzu-Li (Gordon) Tai <
> tzuli...@apache.org> wrote:
>
>> Hi,
>>
>> Helping out here: this is the PR for async Kafka offset committing -
>> https://github.com/apache/flink/pull/2574.
>> It has already been merged into the master and release-1.1 branches,
>> so you can try out the changes now if you’d like.
>> The change should also be included in the 1.1.3 release, which the
>> Flink community is discussing to release soon.
>>
>> Will definitely be helpful if you can provide feedback afterwards!
>>
>> Best Regards,
>> Gordon
>>
>>
>> On October 3, 2016 at 9:40:14 PM, Chakravarthy varaga (
>> chakravarth...@gmail.com) wrote:
>>
>> Hi Stephan,
>>
>> Is the Async kafka offset commit released in 1.3.1?
>>
>> Varaga
>>
>> On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga <
>> chakravarth...@gmail.com> wrote:
>>
>>> Hi Stephan,
>>>
>>>  That should be great. Let me know once the fix is done and the
>>> snapshot version to use, I'll check and revert then.
>>>  Can you also share the JIRA that tracks the issue?
>>>
>>>  With regards to offset commit issue, I'm not sure as to how to
>>> proceed here. Probably I'll use your fix first and see if the problem
>>> reoccurs.
>>>
>>> Thanks much
>>> Varaga
>>>
>>> On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen 
>>> wrote:
>>>
 @CVP

 Flink stores in checkpoints in your case only the Kafka offsets
 (few bytes) and the custom state (e).

 Here is an illustration of the checkpoint and what is stored (from
 the Flink docs).
 https://ci.apache.org/projects/flink/flink-docs-master/inter
 nals/stream_checkpointing.html


 I am quite puzzled why the offset committing problem occurs only
 for one input, and not for the other.
 I am preparing a fix for 1.2, possibly going into 1.1.3 as well.
 Could you try out a snapshot version to see if that fixes your
 problem?

 Greetings,
 Stephan



 On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga <
 chakravarth...@gmail.com> wrote:

> Hi Stefan,
>
>  Thanks a million for your detailed explanation. I appreciate
> it.
>
>  -  The *zookeeper bundled with kafka 0.9.0.1* was used to
> start zookeeper. There is only 1 instance (standalone) of zookeeper 
> running
> on my localhost (ubuntu 14.04)
>  -  There is only 1 Kafka broker (*version: 0.9.0.1* )
>
>  With regards to Flink cluster there's only 1 JM & 2 TMs
> started with no 

Re: Sequential/ordered map

2017-01-05 Thread Kostas Kloudas
Hi Sebastian,

If T_1 must be processed before T_i, i>1, then you cannot parallelize the 
algorithm.

If this is not a restriction, then you could;
1) split the text in words and also attach the id of the text they appear in, 
2) do a groupBy that will send all the same words to the same node,
3) keep a “per-word” state with the word and the index of the text,
4) when a new word arrives you should check if the word already exists in the 
state.

Regards,
Kostas

> On Jan 5, 2017, at 11:51 AM, Sebastian Neef  
> wrote:
> 
> Hello,
> 
> I'd like to implement an algorithm which doesn't really look
> parallelizable to me, but maybe there's a way around it:
> 
> In general the algorithm looks like this:
> 
> 1. Take a list of texts T_1 ... T_n
> 2. For every text T_i (i > 1) do
> 2.1: Split text into a list of words W_1 ... W_m
> 2.2: For every word W_j do:
> 2.2.1.: Check if word already existed in a prior text T_k ( i > k )
> 2.2.2.: If so, mark word W_j with k
> 2.2.3.: Else mark word W_j with i
> 3. Do something with texts based on marked words...
> 
> I have a DataSet with all texts T_1...T_n.
> 
> As far as I understand, I cannot simply .map(...) the DataSet, because
> T_i's calculation is based on previous results (i.e. T_(i-1)).
> 
> My current solution would be to set the  parallelism to 1.
> 
> - Is there an elegant way to parallelize this algorithm?
> - Does setting parallelism=1 guarantee a specific order of the DataSet?
> - Is there a way to check if an element exists in a DataSet? E.g.
> DataSet<>.contains(elem)?
> 
> Best regards,
> Sebastian



Re: Debugging Python-Api fails with NoClassDefFoundError

2017-01-05 Thread Chesnay Schepler

Hello,

all Flink dependencies of the Python APi are marked as *provided* in the 
pom.xml similar to most connectors.
By removing the provided tags in the pom.xml you should be able to run 
the PythonPlanBinder from the IDE.


This was done to exclude these dependencies in the flink-python jar; 
since we copy this into the /lib folder

of flink-dist we would otherwise include some parts of flink twice.

Maybe we could use the shade-plugin to exclude all flink-dependencies in 
the jar, then we could remove the

provided tags and it should work from the IDE.

Feel free to open a JIRA for this.

Regards,
Chesnay


On 05.01.2017 08:25, Mathias Peters wrote:


Yes, it is. Also, the project import in Idea has worked so far.

Cheers


On 04.01.2017 21:52, Ted Yu wrote:

This class is in flink-core jar.

Have you verified that the jar is on classpath ?

Cheers

On Wed, Jan 4, 2017 at 12:16 PM, Mathias Peters 
> wrote:


Hi,

I just wanted to debug a custom python script using your python dataset
api. Running the PythonPlanBinder in Intellij IDEA gives me the
subjected error. I took a fresh clone, built it with mvn clean install
-DskipTest, and imported everything in idea. Using an older version this
worked fine, so assume no(t the usual noob) errors on my side

Submitting a python script via console works.

The full stack trace looks like this:

Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/flink/api/java/typeutils/TupleTypeInfoBase
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:264)
 at com.intellij.rt.execution.application.AppMain.main(AppMain.java:122)
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 ... 3 more


Thanks for the help.

best

Mathias



Re: Regarding ordering of events

2017-01-05 Thread Kostas Kloudas
Hi Abdul,

Flink provides no ordering guarantees on the elements within a window.
The only “order” it guarantees is that the results referring to window-1 are
going to be emitted before those of window-2 (assuming that window-1 precedes 
window-2).

Thanks,
Kostas

> On Jan 5, 2017, at 11:57 AM, Abdul Salam Shaikh  
> wrote:
> 
> Hi, 
> 
> I am using a JSON file as the source for the streaming (in the ascending 
> order of the field Umlaufsekunde)which has events as follows: 
> 
> {"event":[{"Umlaufsekunde":115}]}
> {"event":[{"Umlaufsekunde":135}]}
> {"event":[{"Umlaufsekunde":135}]}
> {"event":[{"Umlaufsekunde":145}]}
> {"event":[{"Umlaufsekunde":155}]}
> {"event":[{"Umlaufsekunde":155}]}
> {"event":[{"Umlaufsekunde":185}]}
> {"event":[{"Umlaufsekunde":195}]}
> {"event":[{"Umlaufsekunde":195}]}
> {"event":[{"Umlaufsekunde":205}]}
> {"event":[{"Umlaufsekunde":245}]}
> 
> However, when I try to print the stream, it is unordered as given below: 
> 1> (115,null,1483517983252,1190)  -- The first value indicating Umlaufsekunde
> 2> (135,null,1483517984877,1190)
> 2> (155,null,1483517986861,1190)
> 4> (145,null,1483517985752,1190)
> 3> (135,null,1483517985424,1190)
> 4> (195,null,1483517990736,1190)
> 4> (255,null,1483517997424,1190)
> 2> (205,null,1483517991518,1190)
> 2> (275,null,1483517999330,1190)
> 2> (385,null,1483518865371,1190)
> 2> (395,null,1483518866840,1190)
> 1> (155,null,1483517986533,1190)
> 4> (285,null,1483518000189,1190)
> 4> (395,null,1483518866231,1190)
> 
> I have also tried using the Timestamps and Watermarks but no luck as follows: 
> 
> public class TimestampExtractor implements 
> AssignerWithPeriodicWatermarks>{
> 
> private long currentMaxTimestamp;
> 
> @Override
> public Watermark getCurrentWatermark() {
> return new Watermark(currentMaxTimestamp);   
> }
> 
> @Override
> public long extractTimestamp(Tuple5 element, long 
> previousElementTimestamp) {
> long timestamp = element.getField(1);
> currentMaxTimestamp = timestamp;
> return currentMaxTimestamp;
>   }
> 
> }
> 
> Could anyone suggest how do I handle this problem for the arrival of events 
> in order ? 
> 
> ​Thanks!​
> 
> 



Re: 答复: Som question about Flink stream sql

2017-01-05 Thread Fabian Hueske
There are several JIRA issues for row windows, some of them might be
assigned.
I haven't seen pull requests and nothing to merged yet.

I would ask on the JIRA issue of the current status.

Best, Fabian

2017-01-05 12:00 GMT+01:00 Hongyuhong :

> Hi Fabian,
>
>
>
> Thanks for the reply.
>
> As you noticed, row windows are already supported by Calcite and FLIP-11
> has planned,
>
> Can you tell something about the progress of the row windows in Table API?
>
>
>
> Regards.
>
> Yuhong
>
>
>
>
>
>
>
>
>
> *发件人:* Fabian Hueske [mailto:fhue...@gmail.com]
> *发送时间:* 2017年1月5日 17:43
> *收件人:* user@flink.apache.org
> *主题:* Re: Som question about Flink stream sql
>
>
>
> Hi Yuhong,
>
> as you noticed, FLIP-11 is about the window operations on the Table API
> and does not include SQL.
>
> The reason is that the Table API is completely Flink domain, i.e., we can
> design and implement the API. For SQL we have a dependency on Calcite.
>
> You are right, that Calcite's JIRA issue for group windows [1] seems to be
> stale. I don't have more information on that. You could ping there and ask
> what the status is.
>
> Row windows are already supported by Calcite (SQL's OVER and WINDOW
> clauses), so these could be implemented for SQL as well.
>
> Maybe it's even easier to start from the SQL side and add the Table API
> later to ensure a compatible compilation process.
>
> Best, Fabian
>
>
>
>
>
> 2017-01-05 4:40 GMT+01:00 Hongyuhong :
>
> Hi,
>
> We are currently exploring on Flink streamsql ,
>
> And I see the group-window has been implemented in Table API, and
> row-window is also planning in FLIP-11. It seems that row-window grammar is
> more similar to calcite over clause.
>
> I’m curious about the detail plan and roadmap of stream sql, cause
> FLIP-11 just mentioned Table API.
>
> And is that streamsql priority implement row-window? Or if group-window is
> considered, what is the status on calcite integration?
>
> The issue on calcite jira was raised in August, what’s the status right
> now?
>
> Thanks in advance!
>
>
>
> Regards
>
> Yuhong
>
>
>
>
>
>
>
>
>


答复: Som question about Flink stream sql

2017-01-05 Thread Hongyuhong
Hi Fabian,

Thanks for the reply.
As you noticed, row windows are already supported by Calcite and FLIP-11 has 
planned,
Can you tell something about the progress of the row windows in Table API?

Regards.
Yuhong




发件人: Fabian Hueske [mailto:fhue...@gmail.com]
发送时间: 2017年1月5日 17:43
收件人: user@flink.apache.org
主题: Re: Som question about Flink stream sql

Hi Yuhong,
as you noticed, FLIP-11 is about the window operations on the Table API and 
does not include SQL.
The reason is that the Table API is completely Flink domain, i.e., we can 
design and implement the API. For SQL we have a dependency on Calcite.
You are right, that Calcite's JIRA issue for group windows [1] seems to be 
stale. I don't have more information on that. You could ping there and ask what 
the status is.
Row windows are already supported by Calcite (SQL's OVER and WINDOW clauses), 
so these could be implemented for SQL as well.
Maybe it's even easier to start from the SQL side and add the Table API later 
to ensure a compatible compilation process.
Best, Fabian


2017-01-05 4:40 GMT+01:00 Hongyuhong 
>:
Hi,
We are currently exploring on Flink streamsql ,
And I see the group-window has been implemented in Table API, and row-window is 
also planning in FLIP-11. It seems that row-window grammar is more similar to 
calcite over clause.
I’m curious about the detail plan and roadmap of stream sql, cause FLIP-11 just 
mentioned Table API.
And is that streamsql priority implement row-window? Or if group-window is 
considered, what is the status on calcite integration?
The issue on calcite jira was raised in August, what’s the status right now?
Thanks in advance!

Regards
Yuhong






Regarding ordering of events

2017-01-05 Thread Abdul Salam Shaikh
Hi,

I am using a JSON file as the source for the streaming (in the ascending
order of the field Umlaufsekunde)which has events as follows:

{"event":[{"*Umlaufsekunde*":115}]}
{"event":[{"*Umlaufsekunde*":135}]}
{"event":[{"*Umlaufsekunde*":135}]}
{"event":[{"*Umlaufsekunde*":145}]}
{"event":[{"*Umlaufsekunde*":155}]}
{"event":[{"*Umlaufsekunde*":155}]}
{"event":[{"*Umlaufsekunde*":185}]}
{"event":[{"*Umlaufsekunde*":195}]}
{"event":[{"*Umlaufsekunde*":195}]}
{"event":[{"*Umlaufsekunde*":205}]}
{"event":[{"*Umlaufsekunde*":245}]}

However, when I try to print the stream, it is unordered as given below:
1> (*115*,null,1483517983252,1190)  -- The first value indicating
Umlaufsekunde
2> (135,null,1483517984877,1190)
2> (155,null,1483517986861,1190)
4> (145,null,1483517985752,1190)
3> (135,null,1483517985424,1190)
4> (195,null,1483517990736,1190)
4> (255,null,1483517997424,1190)
2> (205,null,1483517991518,1190)
2> (275,null,1483517999330,1190)
2> (385,null,1483518865371,1190)
2> (395,null,1483518866840,1190)
1> (155,null,1483517986533,1190)
4> (285,null,1483518000189,1190)
4> (395,null,1483518866231,1190)

I have also tried using the Timestamps and Watermarks but no luck as
follows:

public class TimestampExtractor implements
AssignerWithPeriodicWatermarks>{

private long currentMaxTimestamp;

@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp);
}

@Override
public long extractTimestamp(Tuple5 element, long
previousElementTimestamp) {
long timestamp = element.getField(1);
currentMaxTimestamp = timestamp;
return currentMaxTimestamp;
  }

}

Could anyone suggest how do I handle this problem for the arrival of events
in order ?

​Thanks!​


Sequential/ordered map

2017-01-05 Thread Sebastian Neef
Hello,

I'd like to implement an algorithm which doesn't really look
parallelizable to me, but maybe there's a way around it:

In general the algorithm looks like this:

1. Take a list of texts T_1 ... T_n
2. For every text T_i (i > 1) do
2.1: Split text into a list of words W_1 ... W_m
2.2: For every word W_j do:
2.2.1.: Check if word already existed in a prior text T_k ( i > k )
2.2.2.: If so, mark word W_j with k
2.2.3.: Else mark word W_j with i
3. Do something with texts based on marked words...

I have a DataSet with all texts T_1...T_n.

As far as I understand, I cannot simply .map(...) the DataSet, because
T_i's calculation is based on previous results (i.e. T_(i-1)).

My current solution would be to set the  parallelism to 1.

- Is there an elegant way to parallelize this algorithm?
- Does setting parallelism=1 guarantee a specific order of the DataSet?
- Is there a way to check if an element exists in a DataSet? E.g.
DataSet<>.contains(elem)?

Best regards,
Sebastian


Re: Speedup of Flink Applications

2017-01-05 Thread Fabian Hueske
Hi Hanna,

I assume you are asking about the possible speed up of batch analysis
programs and not about streaming applications (please correct me if I'm
wrong).

Timur raised very good points about data size and skew.

Given evenly distributed data (no skewed key distribution for a grouping or
join operation) and sufficiently large data sets, Flink scales quite well.
Flink uses by default pipelined shuffles. Depending on the parallelism, you
need to adapt the number of network buffers (see
taskmanager.network.numberOfBuffers [1]). When the required number of
network buffers becomes too large, you can switch to batched shuffles (set
ExecutionMode.BATCH on ExecutionConfig of ExecutionEnvironment).
Apart from that, really large parallelism of complex programs might suffer
from the scheduling overhead of individual tasks.  Here the JobManager
might become a bottleneck when assigning tasks to workers so that there
might be an initial delay before a job starts processing.

If your data is skewed to too small, scaling out doesn't help because a
single worker will be busy working while all others are waiting for it or
the overhead of distributing the work becomes too large.

Hope this helps,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/config.html#jobmanager-amp-taskmanager


2017-01-03 15:44 GMT+01:00 Timur Shenkao :

> Hi,
> It seems your questions are too abstract & theoretical. The answer is : it
> depends on several factors. Skewness in data, data volume, reliability
> requirements, "fatness" of servers, whether one performs look-up in other
> data sources, etc.
> The papers you mentioned mean the following: under concrete & specific
> conditions, researchers achieved their results. If they had changed some
> parameters slightly (increase network's throughput, for example, or change
> garbage collector's options) , the results would have been completely
> different.
>
>
> On Tuesday, January 3, 2017, Hanna Prinz  wrote:
>
>> Happy new year everyone :)
>>
>> I’m currently working on a paper about Flink. I already got some
>> recommendations on general papers with details about Flink, which helped me
>> a lot already. But now that I read them,* I’m further interested is the
>> speedup capabilities, provided by the Flink Framework: How „far“ can it
>> scale efficiently?*
>>
>> Amdahls law states that a parallelization is only efficient as long as
>> the non-parallelizable part of the processing (time for the communication
>> between the nodes etc.) doesn’t „eat up“ the speed gains of parallelization
>> (= parallel slowdown).
>> Of course, the communication overhead is mostly caused by the
>> implementation, but the frameworks specific solution for the communication
>> between the nodes has a reasonable effect as well.
>>
>> After studying these papers, it looks like, although Flinks performance
>> is better in many cases, the possible speedup is equal to the possible
>> speedup of Spark.
>>
>> 1. Spark versus Flink - Understanding Performance in Big Data Analytics
>> Frameworks | https://hal.inria.fr/hal-01347638/document
>> 2. Big Data Analytics on Cray XC Series DataWarp using Hadoop, Spark and
>> Flink | https://cug.org/proceedings/cug2016_proceedings/includes/f
>> iles/pap141.pdf
>> 3. Thrill - High-Performance Algorithmic Distributed Batch Data
>> Processing with C++ | https://panthema.net/2016/08
>> 16-Thrill-High-Performance-Algorithmic-Distributed-Batch-Dat
>> a-Processing-with-CPP/1608.05634v1.pdf
>>
>>
>> Does someone have …
>> … more information (or data) on speedup of Flink applications?
>> … experience (or data) with Flink in an extremely paralellized
>> environment?
>> … detailed information on how the nodes communicate, especially when they
>> are waiting for task results of one another?
>>
>> Thank you very much for your time & answers!
>> Hanna
>>
>


Re: Caching collected objects in .apply()

2017-01-05 Thread Fabian Hueske
Hi Matt,

I think your approach should be fine.
Although the second keyBy is logically a shuffle, the data will not be sent
of the wire to a different machine if the parallelism of the first and
second window operator are identical.
It only cost one serialization / deserialization step.

I would be careful about putting the result of the first window into
operator state. I think it is not well defined how function objects are
reused. This might be an internal implementation detail which might change
in the future.
Aljoscha (in CC) should know more about how the window function objects are
used.

Best, Fabian

2017-01-05 10:06 GMT+01:00 Matt :

> I'm still looking for an answer to this question. Hope you can give me
> some insight!
>
> On Thu, Dec 22, 2016 at 6:17 PM, Matt  wrote:
>
>> Just to be clear, the stream is of String elements. The first part of the
>> pipeline (up to the first .apply) receives those strings, and returns
>> objects of another class ("A" let's say).
>>
>> On Thu, Dec 22, 2016 at 6:04 PM, Matt  wrote:
>>
>>> Hello,
>>>
>>> I have a window processing 10 objects at a time, and creating 1 as a
>>> result. The problem is in order to create that object I need the object
>>> from the previous window.
>>>
>>> I'm doing this:
>>>
>>> stream
>>>   .keyBy(...some key...)
>>>   .countWindow(10, 1)
>>>   .apply(...creates an element A...)
>>>   .keyBy(...same key as above...)
>>>   .countWindow(2, 1)
>>>   .apply(...updates A with the value of the previous element A...)
>>>   .addSink(...)
>>>
>>> Probably there is a way to retrieve the last collected object inside the
>>> first .apply(), or to cache it somehow.
>>>
>>> Is there a better way to achieve the same? How inefficient is this?
>>>
>>> Regards,
>>> Matt
>>>
>>
>>
>


Re: Som question about Flink stream sql

2017-01-05 Thread Fabian Hueske
Hi Yuhong,

as you noticed, FLIP-11 is about the window operations on the Table API and
does not include SQL.
The reason is that the Table API is completely Flink domain, i.e., we can
design and implement the API. For SQL we have a dependency on Calcite.

You are right, that Calcite's JIRA issue for group windows [1] seems to be
stale. I don't have more information on that. You could ping there and ask
what the status is.
Row windows are already supported by Calcite (SQL's OVER and WINDOW
clauses), so these could be implemented for SQL as well.
Maybe it's even easier to start from the SQL side and add the Table API
later to ensure a compatible compilation process.

Best, Fabian



2017-01-05 4:40 GMT+01:00 Hongyuhong :

> Hi,
>
> We are currently exploring on Flink streamsql ,
>
> And I see the group-window has been implemented in Table API, and
> row-window is also planning in FLIP-11. It seems that row-window grammar is
> more similar to calcite over clause.
>
> I’m curious about the detail plan and roadmap of stream sql, cause FLIP-11
> just mentioned Table API.
>
> And is that streamsql priority implement row-window? Or if group-window is
> considered, what is the status on calcite integration?
>
> The issue on calcite jira was raised in August, what’s the status right
> now?
>
> Thanks in advance!
>
>
>
> Regards
>
> Yuhong
>
>
>
>
>
>
>


Re: Caching collected objects in .apply()

2017-01-05 Thread Matt
I'm still looking for an answer to this question. Hope you can give me some
insight!

On Thu, Dec 22, 2016 at 6:17 PM, Matt  wrote:

> Just to be clear, the stream is of String elements. The first part of the
> pipeline (up to the first .apply) receives those strings, and returns
> objects of another class ("A" let's say).
>
> On Thu, Dec 22, 2016 at 6:04 PM, Matt  wrote:
>
>> Hello,
>>
>> I have a window processing 10 objects at a time, and creating 1 as a
>> result. The problem is in order to create that object I need the object
>> from the previous window.
>>
>> I'm doing this:
>>
>> stream
>>   .keyBy(...some key...)
>>   .countWindow(10, 1)
>>   .apply(...creates an element A...)
>>   .keyBy(...same key as above...)
>>   .countWindow(2, 1)
>>   .apply(...updates A with the value of the previous element A...)
>>   .addSink(...)
>>
>> Probably there is a way to retrieve the last collected object inside the
>> first .apply(), or to cache it somehow.
>>
>> Is there a better way to achieve the same? How inefficient is this?
>>
>> Regards,
>> Matt
>>
>
>


Running into memory issues while running on Yarn

2017-01-05 Thread Sachin Goel
Hey!

I'm running locally under this configuration(copied from nodemanager logs):
physical-memory=8192 virtual-memory=17204 virtual-cores=8

Before starting a flink deployment, memory usage stats show 3.7 GB used on
system, indicating lots of free memory for flink containers.
However, after I submit using minimal resource requirements,
./yarn-session.sh -n 1 -tm 768, the cluster deploys successfully but then
every application on system receives a sigterm and it basically kills the
current user session, logging out of the system.

The job manager and task manager logs contain just the information that a
SIGTERM was received and shut down gracefully.
All yarn and dfs process contain the log information showing the receipt of
a sigterm.

Here's the relevant log from nodemanager:

2017-01-05 17:00:06,089 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl:
Container container_1483603191971_0002_01_02 transitioned from
LOCALIZED to RUNNING
2017-01-05 17:00:06,092 INFO
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor:
launchContainer: [bash,
/opt/hadoop-2.7.3/tmp/nm-local-dir/usercache/kirk/appcache/application_1483603191971_0002/container_1483603191971_0002_01_02/default_container_executor.sh]
2017-01-05 17:00:08,731 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
Starting resource-monitoring for
container_1483603191971_0002_01_02
2017-01-05 17:00:08,744 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
Memory usage of ProcessTree 17872 for container-id
container_1483603191971_0002_01_01: 282.7 MB of 1 GB physical
memory used; 2.1 GB of 2.1 GB virtual memory used
2017-01-05 17:00:08,744 WARN
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
Process tree for container: container_1483603191971_0002_01_01 has
processes older than 1 iteration running over the configured limit.
Limit=2254857728, current usage = 2255896576
2017-01-05 17:00:08,745 WARN
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
Container [pid=17872,containerID=container_1483603191971_0002_01_01]
is running beyond virtual memory limits. Current usage: 282.7 MB of 1
GB physical memory used; 2.1 GB of 2.1 GB virtual memory used. Killing
container.
Dump of the process-tree for container_1483603191971_0002_01_01 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES)
FULL_CMD_LINE
|- 17872 17870 17872 17872 (bash) 0 0 21409792 812 /bin/bash -c
/usr/lib/jvm/java-8-openjdk-amd64//bin/java -Xmx424M
-Dlog.file=/opt/hadoop-2.7.3/logs/userlogs/application_1483603191971_0002/container_1483603191971_0002_01_01/jobmanager.log
-Dlogback.configurationFile=file:logback.xml
-Dlog4j.configuration=file:log4j.properties
org.apache.flink.yarn.YarnApplicationMasterRunner
1>/opt/hadoop-2.7.3/logs/userlogs/application_1483603191971_0002/container_1483603191971_0002_01_01/jobmanager.out
2>/opt/hadoop-2.7.3/logs/userlogs/application_1483603191971_0002/container_1483603191971_0002_01_01/jobmanager.err
|- 17879 17872 17872 17872 (java) 748 20 2234486784 71553
/usr/lib/jvm/java-8-openjdk-amd64//bin/java -Xmx424M
-Dlog.file=/opt/hadoop-2.7.3/logs/userlogs/application_1483603191971_0002/container_1483603191971_0002_01_01/jobmanager.log
-Dlogback.configurationFile=file:logback.xml
-Dlog4j.configuration=file:log4j.properties
org.apache.flink.yarn.YarnApplicationMasterRunner

2017-01-05 17:00:08,745 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
Removed ProcessTree with root 17872
2017-01-05 17:00:08,746 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl:
Container container_1483603191971_0002_01_01 transitioned from
RUNNING to KILLING
2017-01-05 17:00:08,746 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
Cleaning up container container_1483603191971_0002_01_01
2017-01-05 17:00:08,779 ERROR
org.apache.hadoop.yarn.server.nodemanager.NodeManager: RECEIVED SIGNAL
15: SIGTERM
2017-01-05 17:00:08,822 WARN
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor:
Exit code from container container_1483603191971_0002_01_01 is :
143
2017-01-05 17:00:08,825 WARN
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor:
Exit code from container container_1483603191971_0002_01_02 is :
143


Is the memory available on my pc not enough or are there any known issues
which might lead to this?

Also, this doesn't occur every time I start a flink session.

Thanks
Sachin