How to customize triggering of checkpoints?

2018-04-10 Thread syed
I am new to the flink environment and looking to analyze the triggering of
checkpoints. I am looking to trigger non-periodic checkpoints such that
checkpoint intervals are not of equal length, but not sure how can I do this
in Flink.  

My specific query is;

(1) How can I trigger non-periodic checkpoints in Flink? I am looking to
trigger first checkpoint say after 10 seconds, the next checkpoint at say 25
seconds, third at 45 seconds and so on. Can I define my own function which
triggers non-periodic checkpoints and generates no-uniform checkpoint
intervals?
Thanks.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: java.lang.Exception: TaskManager was lost/killed

2018-04-10 Thread 周思华
Hi Lasse,


I met that before. I think maybe the non-heap memory trend of the graph you 
attached is the "expected" result ... Because rocksdb will keep the a "filter 
(bloom filter)" in memory for every opened sst file by default, and the num of 
the sst file will increase by time, so it looks like a leak. There is a 
issue(https://issues.apache.org/jira/browse/FLINK-7289) Stefan created to track 
this, and the 
page(https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB) from 
RocksDB's wiki could give you a better understand of the memory used by 
RocksDB, and Stefan please correct me if I bring any wrong information above.


Best Regards,
Sihua Zhou
On 04/11/2018 09:55,Ted Yu wrote:
Please see the last comment on this issue:


https://github.com/facebook/rocksdb/issues/3216



FYI


On Tue, Apr 10, 2018 at 12:25 AM, Lasse Nedergaard  
wrote:


This graph shows Non-Heap . If the same pattern exists it make sense that it 
will try to allocate more memory and then exceed the limit. I can see the trend 
for all other containers that has been killed. So my question is now, what is 
using non-heap memory?
From 
http://mail-archives.apache.org/mod_mbox/flink-user/201707.mbox/%3ccanc1h_u0dqqvbysdaollbemewaxiimtmfjjcribpfpo0idl...@mail.gmail.com%3E
 it look like RockDb could be guilty.


I have job using incremental checkpointing and some without, some optimised for 
FLASH_SSD. all have same pattern


Lasse 






2018-04-10 8:52 GMT+02:00 Lasse Nedergaard :

Hi.


I found the exception attached below, for our simple job. It states that our 
task-manager was killed du to exceed memory limit on 2.7GB. But when I look at 
Flink metricts just 30 sec before it use 1.3 GB heap and 712 MB Non-Heap around 
2 GB. 
So something else are also using memory inside the conatianer any idea how to 
figure out what?
As a side note we use RockDBStateBackend with this configuration


env.getCheckpointConfig().setMinPauseBetweenCheckpoints((long)(config.checkPointInterval
 * 0.75));
env.enableCheckpointing(config.checkPointInterval, 
CheckpointingMode.AT_LEAST_ONCE);
env.setStateBackend(new RocksDBStateBackend(config.checkpointDataUri));
Where checkpointDataUri point to S3


Lasse Nedergaard



2018-04-09 16:52:01,239 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
- Diagnostics for container container_1522921976871_0001_01_79 
in state COMPLETE : exitStatus=Pmem limit exceeded (-104) diagnostics=Container 
[pid=30118,containerID=container_1522921976871_0001_01_79] is running 
beyond physical memory limits. Current usage: 2.7 GB of 2.7 GB physical memory 
used; 4.9 GB of 13.4 GB virtual memory used. Killing container.

Dump of the process-tree for container_1522921976871_0001_01_79 :

|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) 
VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE

|- 30136 30118 30118 30118 (java) 245173 68463 5193723904 703845 
/usr/lib/jvm/java-openjdk/bin/java -Xms2063m -Xmx2063m 
-Dlog.file=/var/log/hadoop-yarn/containers/application_1522921976871_0001/container_1522921976871_0001_01_79/taskmanager.log
 -Dlogback.configurationFile=file:./logback.xml 
-Dlog4j.configuration=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskManager --configDir . 

|- 30118 30116 30118 30118 (bash) 0 0 115818496 674 /bin/bash -c 
/usr/lib/jvm/java-openjdk/bin/java -Xms2063m -Xmx2063m  
-Dlog.file=/var/log/hadoop-yarn/containers/application_1522921976871_0001/container_1522921976871_0001_01_79/taskmanager.log
 -Dlogback.configurationFile=file:./logback.xml 
-Dlog4j.configuration=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskManager --configDir . 1> 
/var/log/hadoop-yarn/containers/application_1522921976871_0001/container_1522921976871_0001_01_79/taskmanager.out
 2> 
/var/log/hadoop-yarn/containers/application_1522921976871_0001/container_1522921976871_0001_01_79/taskmanager.err
 




2018-04-09 16:51:26,659 DEBUG org.trackunit.tm2.LogReporter 
- 
gauge.ip-10-1-1-181.taskmanager.container_1522921976871_0001_01_79.Status.JVM.Memory.Heap.Used=1398739496




2018-04-09 16:51:26,659 DEBUG org.trackunit.tm2.LogReporter 
- 
gauge.ip-10-1-1-181.taskmanager.container_1522921976871_0001_01_79.Status.JVM.Memory.NonHeap.Used=746869520







 


2018-04-09 23:52 GMT+02:00 Ken Krugler :

Hi Chesnay,


Don’t know if this helps, but I’d run into this as well, though I haven’t 
hooked up YourKit to analyze exactly what’s causing the memory problem.


E.g. after about 3.5 hours running locally, it failed with memory issues.



In the TaskManager logs, I start seeing exceptions in my code….


java.lang.OutOfMemoryError: GC overhead limit exceeded


And then eventually...


2018-04-07 21:55:25,686 WARN  
org.apache.flink.runtime.accumulators.AccumulatorRegistry - 

Re: java.lang.Exception: TaskManager was lost/killed

2018-04-10 Thread Ted Yu
Please see the last comment on this issue:

https://github.com/facebook/rocksdb/issues/3216

FYI

On Tue, Apr 10, 2018 at 12:25 AM, Lasse Nedergaard <
lassenederga...@gmail.com> wrote:

>
> This graph shows Non-Heap . If the same pattern exists it make sense that
> it will try to allocate more memory and then exceed the limit. I can see
> the trend for all other containers that has been killed. So my question is
> now, what is using non-heap memory?
> From http://mail-archives.apache.org/mod_mbox/flink-
> user/201707.mbox/%3CCANC1h_u0dQQvbysDAoLLbEmeWaxiimTMFjJC
> ribpfpo0idl...@mail.gmail.com%3E it look like RockDb could be guilty.
>
> I have job using incremental checkpointing and some without, some
> optimised for FLASH_SSD. all have same pattern
>
> Lasse
>
>
>
> 2018-04-10 8:52 GMT+02:00 Lasse Nedergaard :
>
>> Hi.
>>
>> I found the exception attached below, for our simple job. It states that
>> our task-manager was killed du to exceed memory limit on 2.7GB. But when I
>> look at Flink metricts just 30 sec before it use 1.3 GB heap and 712 MB
>> Non-Heap around 2 GB.
>> So something else are also using memory inside the conatianer any idea
>> how to figure out what?
>> As a side note we use RockDBStateBackend with this configuration
>>
>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints((long)(config.checkPointInterval
>>  * 0.75));
>> env.enableCheckpointing(config.checkPointInterval, 
>> CheckpointingMode.AT_LEAST_ONCE);
>> env.setStateBackend(new RocksDBStateBackend(config.checkpointDataUri));
>>
>> Where checkpointDataUri point to S3
>>
>> Lasse Nedergaard
>>
>> 2018-04-09 16:52:01,239 INFO  org.apache.flink.yarn.YarnFlin
>> kResourceManager- Diagnostics for container
>> container_1522921976871_0001_01_79 in state COMPLETE :
>> exitStatus=Pmem limit exceeded (-104) diagnostics=Container
>> [pid=30118,containerID=container_1522921976871_0001_01_79] is
>> running beyond physical memory limits. Current usage: 2.7 GB of 2.7 GB
>> physical memory used; 4.9 GB of 13.4 GB virtual memory used. Killing
>> container.
>>
>> Dump of the process-tree for container_1522921976871_0001_01_79 :
>>
>> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
>> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
>>
>> |- 30136 30118 30118 30118 (java) 245173 68463 5193723904 703845
>> /usr/lib/jvm/java-openjdk/bin/java -Xms2063m -Xmx2063m
>> -Dlog.file=/var/log/hadoop-yarn/containers/application_15229
>> 21976871_0001/container_1522921976871_0001_01_79/taskmanager.log
>> -Dlogback.configurationFile=file:./logback.xml
>> -Dlog4j.configuration=file:./log4j.properties
>> org.apache.flink.yarn.YarnTaskManager --configDir .
>>
>> |- 30118 30116 30118 30118 (bash) 0 0 115818496 674 /bin/bash -c
>> /usr/lib/jvm/java-openjdk/bin/java -Xms2063m -Xmx2063m
>> -Dlog.file=/var/log/hadoop-yarn/containers/application_15229
>> 21976871_0001/container_1522921976871_0001_01_79/taskmanager.log
>> -Dlogback.configurationFile=file:./logback.xml
>> -Dlog4j.configuration=file:./log4j.properties
>> org.apache.flink.yarn.YarnTaskManager --configDir . 1>
>> /var/log/hadoop-yarn/containers/application_1522921976871_
>> 0001/container_1522921976871_0001_01_79/taskmanager.out 2>
>> /var/log/hadoop-yarn/containers/application_1522921976871_
>> 0001/container_1522921976871_0001_01_79/taskmanager.err
>>
>>
>> 2018-04-09 16:51:26,659 DEBUG org.trackunit.tm2.LogReporter
>> - gauge.ip-10-1-1-181.taskmanage
>> r.container_1522921976871_0001_01_79.Status.JVM.
>> Memory.Heap.Used=1398739496
>>
>>
>> 2018-04-09 16:51:26,659 DEBUG org.trackunit.tm2.LogReporter
>> - gauge.ip-10-1-1-181.taskmanage
>> r.container_1522921976871_0001_01_79.Status.JVM.
>> Memory.NonHeap.Used=746869520
>>
>>
>>
>>
>>
>> 2018-04-09 23:52 GMT+02:00 Ken Krugler :
>>
>>> Hi Chesnay,
>>>
>>> Don’t know if this helps, but I’d run into this as well, though I
>>> haven’t hooked up YourKit to analyze exactly what’s causing the memory
>>> problem.
>>>
>>> E.g. after about 3.5 hours running locally, it failed with memory issues.
>>>
>>> In the TaskManager logs, I start seeing exceptions in my code….
>>>
>>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>
>>> And then eventually...
>>>
>>> 2018-04-07 21:55:25,686 WARN  
>>> org.apache.flink.runtime.accumulators.AccumulatorRegistry
>>> - Failed to serialize accumulators for task.
>>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>
>>> Immediately after this, one of my custom functions gets a close() call,
>>> and I see a log msg about it "switched from RUNNING to FAILED”.
>>>
>>> After this, I see messages that the job is being restarted, but the
>>> TaskManager log output abruptly ends.
>>>
>>> In the Job Manager log, this is what is output following the time of the
>>> last TaskManager logging output:
>>>
>>> 2018-04-07 21:57:33,702 INFO  

Is Flink able to do real time stock market analysis?

2018-04-10 Thread Ivan Wang
Hi all,

I've spent nearly 2 weeks trying to figure a solution to my requirement as
below. If anyone can advise, that would be great.

1. There're going to be 2000 transactions per second as StreamRaw, I'm
going to tumbleWindow them as StreamA, let's say every 15 seconds. Then I'm
going to countWindow StreamA as StreamB, let's say every 20 events.

2. For every event in  StreamRaw as E, I need to find exact one event in
StreamB which is earlier than E and closest to E. Then some comparison will
be proceeded. For example, if timestamp in E is 9:46:38, there should be an
event in StreamB with timestamp 9:46:30 because I use 15 seconds interval.

I tried CEP using StreamRaw, however, I didn't figure out how to involve
StreamB and get the exact one event in condition method.

I tried tableAPI and SQL, it throws time attribute error during the second
window method.

*window(Tumble).group().select().window(Slide).group().select()*

Seems there's no way to tell Flink the time attribute after the first
window.group(). I then tried to convert it into table first then
leftoutJoin them. But Flink tells me it's not supported.

Is Flink able to do this? If not, I'll go for other alternatives. Thanks
again if someone can help.


Watermark and multiple streams

2018-04-10 Thread Filipe Couto
Hello.

I'm joining several data streams, using ConnectedStreams. Let's say something 
like A connect B which outputs AB, and then I join AB with C, which outputs ABC.

However, the relationship between A and B, or AB and C may be of 1 to many, or 
1 to 1, depending on the case. For the 1 to 1, it's expected to produce an 
output as soon as I obtain both records that match the same key, but for the 1 
to many, it's not so simple, there's no event that guarantees that I have 
obtained all of my  records. For this, I've searched and found out that I 
have to implement an onTimer method, in a Process Function. I also read that an 
event timer is registered in a queue and when a new watermark arrives, the 
event timers that  will be triggered.

Given this, I'm running into trouble when generating new watermarks like in the 
example: 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html#with-periodic-watermarks

The solution I've reached basically processes (on the onTimer method) all the 
records that I obtained previously when the first watermark arrives, but after 
that, it stops triggering.

I read that watermarks are global, so how can I create a new watermark when 
working with several streams?

Thank you


Recovering snapshot state saved with TypeInformation generated by the implicit macro from the scala API

2018-04-10 Thread Petter Arvidsson
Hello everyone,

We are trying to recover state from a snapshot which we can no longer load.
When it is loaded we receive the following exception:
java.lang.ClassNotFoundException: io.relayr.counter.FttCounter$$
anon$71$$anon$33
This, via a couple more exceptions, leads to:
java.io.IOException: Unloadable class for type serializer.

The cause of this behavior is an implicit macro that is part of the scala
API package, https://github.com/apache/flink/blob/release-1.4/flink-scala
/src/main/scala/org/apache/flink/api/scala/package.scala#L46. This macro
generates an anonymous class implementing TypeInformation for classes that
lack it. In our case it seems to have generated
"io.relayr.counter.FttCounter$$anon$71$$anon$33" which does not have a
stable name. When we change the class implementing the job, the name of
this anonymous class changes and we can no longer load the snapshot.

To solve the problem we introduced an explicit TypeInformation instance
instead, which makes new instances of the job work properly. The problem is
that this new version is no longer compatible with the old state (loading
it generates the same exception), since the original TypeInformation is no
longer generated. This is due to the explicitly provided instance
preventing the macro from being executed.

Did anyone else experience this or a similar problem? Is there a good way
to get out of this situation, i.e. how could we migrate the snapshot to one
where the state points to a TypeInformation instance with a stable class
name and not the macro generated one without losing the state?

We are using Flink 1.4.2.

Regards,
Petter


Re: Kafka Consumers Partition Discovery doesn't work

2018-04-10 Thread Juho Autio
Ahhh looks like I had simply misunderstood where that property should go.

The docs correctly say:
> To enable it, set a non-negative value for
flink.partition-discovery.interval-millis in the __provided properties
config__

So it should be set in the Properties that are passed in the constructor of
FlinkKafkaConsumer!

I had somehow assumed that this should go to flink-conf.yaml (maybe because
it starts with "flink."?), and obviously the FlinkKafkaConsumer doesn't
read that.

Sorry for the trouble. If anything, I guess a piece of example code
might've helped me avoid this mistake. The docs are clear though, I just
had become blind to this detail as I thought I had already read it.

On Thu, Apr 5, 2018 at 10:26 AM, Juho Autio  wrote:

> Still not working after I had a fresh build from https://github.com/
> apache/flink/tree/release-1.5.
>
> When the job starts this is logged:
>
> 2018-04-05 09:29:38,157 INFO  
> org.apache.flink.configuration.GlobalConfiguration
>   - Loading configuration property: 
> flink.partition-discovery.interval-millis,
> 6
>
> So that's 1 minute.
>
> As before, I added one more partition to a topic that is being consumed.
> Secor started consuming it as expected, but Flink didn't – or at least it
> isn't reporting anything about doing so. The new partition is not shown in
> Flink task metrics or consumer offsets committed by Flink.
>
> How could I investigate this further? How about that additional logging
> for partition discovery?
>
> On Thu, Mar 22, 2018 at 3:09 PM, Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi,
>>
>> I think you’ve made a good point: there is currently no logs that tell
>> anything about discovering a new partition. We should probably add this.
>>
>> And yes, it would be great if you can report back on this using either
>> the latest master, release-1.5 or release-1.4 branches.
>>
>> On 22 March 2018 at 10:24:09 PM, Juho Autio (juho.au...@rovio.com) wrote:
>>
>> Thanks, that sounds promising. I don't know how to check if it's
>> consuming all partitions? For example I couldn't find any logs about
>> discovering a new partition. However, did I understand correctly that this
>> is also fixed in Flink dev? If yes, I could rebuild my 1.5-SNAPSHOT and try
>> again.
>>
>> On Thu, Mar 22, 2018 at 4:18 PM, Tzu-Li (Gordon) Tai > > wrote:
>>
>>> Hi Juho,
>>>
>>> Can you confirm that the new partition is consumed, but only that
>>> Flink’s reported metrics do not include them?
>>> If yes, then I think your observations can be explained by this issue:
>>> https://issues.apache.org/jira/browse/FLINK-8419
>>>
>>> 
>>> This issue should have been fixed in the recently released 1.4.2 version.
>>>
>>> Cheers,
>>> Gordon
>>>
>>> On 22 March 2018 at 8:02:40 PM, Juho Autio (juho.au...@rovio.com) wrote:
>>>
>>> According to the docs*, flink.partition-discovery.interval-millis can
>>> be set to enable automatic partition discovery.
>>>
>>> I'm testing this, apparently it doesn't work.
>>>
>>> I'm using Flink Version: 1.5-SNAPSHOT Commit: 8395508
>>> and FlinkKafkaConsumer010.
>>>
>>> I had my flink stream running, consuming an existing topic with 3
>>> partitions, among some other topics.
>>> I modified partitions of an existing topic: 3 -> 4**.
>>> I checked consumer offsets by secor: it's now consuming all 4 partitions.
>>> I checked consumer offset by my flink stream: it's still consuming only
>>> the 3 original partitions.
>>>
>>> I also checked the Task Metrics of this job from Flink UI and it only
>>> offers Kafka related metrics to be added for 3 partitions (0,1 & 2).
>>>
>>> According to Flink UI > Job Manager > Configuration:
>>> flink.partition-discovery.interval-millis=6
>>> – so that's just 1 minute. It's already more than 20 minutes since I
>>> added the new partition, so Flink should've picked it up.
>>>
>>> How to debug?
>>>
>>>
>>> Btw, this job has external checkpoints enabled, done once per minute.
>>> Those are also succeeding.
>>>
>>> *) https://ci.apache.org/projects/flink/flink-docs-master/dev/c
>>> onnectors/kafka.html#kafka-consumers-topic-and-partition-discovery
>>>
>>> **)
>>>
>>> ~/kafka$ bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --describe
>>> --topic my_topic
>>> Topic:my_topic PartitionCount:3 ReplicationFactor:1 Configs:
>>>
>>> ~/kafka$ bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --alter --topic
>>>  my_topic --partitions 4
>>> Adding partitions succeeded!
>>>
>>>
>>>
>>
>


Re: RocksDBMapState example?

2018-04-10 Thread Ted Yu
For KeyedState, apart from https://ci.apache.org/projects/flink/flink-docs-
release-1.4/dev/stream/state/state.html#keyed-state-and-operator-state ,
you can refer to docs/dev/migration.md :

public void initializeState(FunctionInitializationContext context)
throws Exception {
counter = context.getKeyedStateStore().getState(
new ValueStateDescriptor<>("counter", Integer.class, 0));

FYI


On Tue, Apr 10, 2018 at 7:24 AM, NEKRASSOV, ALEXEI  wrote:

> Yes, I've read the documentation on working with state.
> It talks about MapState. When I looked at Javadoc, I learned that
> MapState is an interface, with RocksDBMapState as one of the implementing
> classes.
>
> I'm not sure what you mean by KeyedState; I don't see a class with that
> name.
>
> I'm not clear how ValueState can be used to store key-value mapping. Can
> you please clarify?
>
> Thanks,
> Alex
>
> -Original Message-
> From: Dawid Wysakowicz [mailto:wysakowicz.da...@gmail.com]
> Sent: Tuesday, April 10, 2018 8:54 AM
> To: NEKRASSOV, ALEXEI 
> Cc: user@flink.apache.org
> Subject: Re: RocksDBMapState example?
>
> Hi Alexei,
>
> You should not use RocksDBMapState directly. Have you went through the doc
> page regarding working with state[1]?
> I think you want to use KeyedState, assuming the size of your keyspace.
> Probably a way to go would be to key your stream and then even ValueState
> (which will be scoped to that key) might be sufficient.
> You can configure flink further to use RocksDB as the underlying state
> backend[2]
>
> Regards,
> Dawid
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/stream/state/state.html#working-with-state
> [2] https://ci.apache.org/projects/flink/flink-docs-
> master/ops/state/state_backends.html#state-backends
>
> > On 9 Apr 2018, at 17:41, NEKRASSOV, ALEXEI  wrote:
> >
> > Hi,
> >
> > I’d like to use RocksDB to store a key-value mapping table (with 45
> million keys).
> > Can someone please point me to an example of RocksDBMapState()
> constructor invocation? Or an explanation of constructor arguments?..
> >
> > Thanks,
> > Alex Nekrassov
> > nekras...@att.com
>
>


RE: RocksDBMapState example?

2018-04-10 Thread NEKRASSOV, ALEXEI
Yes, I've read the documentation on working with state.
It talks about MapState. When I looked at Javadoc, I learned that 
MapState is an interface, with RocksDBMapState as one of the implementing 
classes.

I'm not sure what you mean by KeyedState; I don't see a class with that name.

I'm not clear how ValueState can be used to store key-value mapping. Can you 
please clarify?

Thanks,
Alex

-Original Message-
From: Dawid Wysakowicz [mailto:wysakowicz.da...@gmail.com] 
Sent: Tuesday, April 10, 2018 8:54 AM
To: NEKRASSOV, ALEXEI 
Cc: user@flink.apache.org
Subject: Re: RocksDBMapState example?

Hi Alexei,

You should not use RocksDBMapState directly. Have you went through the doc page 
regarding working with state[1]?
I think you want to use KeyedState, assuming the size of your keyspace. 
Probably a way to go would be to key your stream and then even ValueState 
(which will be scoped to that key) might be sufficient.
You can configure flink further to use RocksDB as the underlying state 
backend[2]

Regards,
Dawid

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#working-with-state
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/state_backends.html#state-backends

> On 9 Apr 2018, at 17:41, NEKRASSOV, ALEXEI  wrote:
> 
> Hi,
> 
> I’d like to use RocksDB to store a key-value mapping table (with 45 million 
> keys).
> Can someone please point me to an example of RocksDBMapState() constructor 
> invocation? Or an explanation of constructor arguments?..
> 
> Thanks,
> Alex Nekrassov
> nekras...@att.com



Re: RocksDBMapState example?

2018-04-10 Thread Dawid Wysakowicz
Hi Alexei,

You should not use RocksDBMapState directly. Have you went through the doc page 
regarding working with state[1]?
I think you want to use KeyedState, assuming the size of your keyspace. 
Probably a way to go would be to key your stream and then even ValueState 
(which will be scoped to that key) might be sufficient.
You can configure flink further to use RocksDB as the underlying state 
backend[2]

Regards,
Dawid

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#working-with-state
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/state_backends.html#state-backends

> On 9 Apr 2018, at 17:41, NEKRASSOV, ALEXEI  wrote:
> 
> Hi,
> 
> I’d like to use RocksDB to store a key-value mapping table (with 45 million 
> keys).
> Can someone please point me to an example of RocksDBMapState() constructor 
> invocation? Or an explanation of constructor arguments?..
> 
> Thanks,
> Alex Nekrassov
> nekras...@att.com



signature.asc
Description: Message signed with OpenPGP


RE: RocksDBMapState example?

2018-04-10 Thread NEKRASSOV, ALEXEI
I looked at that code, but I’m still not clear.

new RocksDBMapState<>(columnFamily, namespaceSerializer, stateDesc, this);

columnFamily is determined by 50-line function; is this necessary for a simple 
use case like mine? What should I use as state descriptor in that function?..
Last argument is set to “this”; does this mean I need to implement 
AbstractKeyedStateBackend, like RocksDBKeyedStateBackend does?

Again, I’m looking for a simple equivalent to
new HashMap();

or to
JedisPool pool = new JedisPool(new JedisPoolConfig(), redisHost);
Jedis jedis = pool.getResource();

Thanks,
Alex

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Monday, April 09, 2018 11:48 AM
To: user 
Subject: Re: RocksDBMapState example?

Hi,
Have you looked at the ctor call in :
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java

around line 1261 ?

Cheers


Re: Record timestamp from kafka

2018-04-10 Thread Ben Yan


> On Apr 10, 2018, at 7:32 PM, Ben Yan  wrote:
> 
> Hi Chesnay:
> 
> I think it would be better without such a limitation.I want to 
> consult another problem. When I use BucketingSink(I use aws s3), the filename 
> of a few files after checkpoint still hasn't changed, resulting in the 
> underline prefix of the final generation of a small number of files. After 
> analysis, it is found that it is due to the eventually consistent  of S3.Are 
> there any better solutions available?thanks
> See : 
> https://issues.apache.org/jira/browse/FLINK-8794?jql=text%20~%20%22BucketingSink%22
>  
> 
>   
> Best
> Ben
> 
>> On Apr 10, 2018, at 6:29 PM, Ben Yan > > wrote:
>> 
>> Hi Fabian.
>> 
>>  If I use ProcessFunction , I can get it! But I want to know  that how 
>> to get Kafka timestamp in like flatmap and map methods of datastream using 
>> scala programming language.
>> Thanks!
>> 
>> Best
>> Ben
>> 
>>> On Apr 4, 2018, at 7:00 PM, Fabian Hueske >> > wrote:
>>> 
>>> Hi Navneeth,
>>> 
>>> Flink's KafkaConsumer automatically attaches Kafka's ingestion timestamp if 
>>> you configure EventTime for an application [1].
>>> Since Flink treats record timestamps as meta data, they are not directly 
>>> accessible by most functions. You can implement a ProcessFunction [2] to 
>>> access the timestamp of a record via the ProcessFunction's Context object.
>>> 
>>> Best, Fabian
>>> 
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010
>>>  
>>> 
>>> [2] 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html#the-processfunction
>>>  
>>> 
>>> 
>>> 2018-03-30 7:45 GMT+02:00 Ben Yan >> >:
>>> hi,
>>> Is that what you mean?
>>> See : 
>>> https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel=16377145#comment-16377145
>>>  
>>> 
>>>  
>>> 
>>> Best
>>> Ben
>>> 
 On 30 Mar 2018, at 12:23 PM, Navneeth Krishnan > wrote:
 
 Hi,
 
 Is there way to get the kafka timestamp in deserialization schema? All 
 records are written to kafka with timestamp and I would like to set that 
 timestamp to every record that is ingested. Thanks.
>>> 
>>> 
>> 
> 



Re: Record timestamp from kafka

2018-04-10 Thread Ben Yan
Hi Fabian:

I think it would be better without such a limitation.I want to consult 
another problem. When I use BucketingSink(I use aws s3), the filename of a few 
files after checkpoint still hasn't changed, resulting in the underline prefix 
of the final generation of a small number of files. After analysis, it is found 
that it is due to the eventually consistent  of S3.Are there any better 
solutions available?thanks

Best
Ben


https://issues.apache.org/jira/browse/FLINK-8794?jql=text%20~%20%22BucketingSink%22
 

 

> On Apr 10, 2018, at 6:29 PM, Ben Yan  wrote:
> 
> Hi Fabian.
> 
>   If I use ProcessFunction , I can get it! But I want to know  that how 
> to get Kafka timestamp in like flatmap and map methods of datastream using 
> scala programming language.
> Thanks!
> 
> Best
> Ben
> 
>> On Apr 4, 2018, at 7:00 PM, Fabian Hueske > > wrote:
>> 
>> Hi Navneeth,
>> 
>> Flink's KafkaConsumer automatically attaches Kafka's ingestion timestamp if 
>> you configure EventTime for an application [1].
>> Since Flink treats record timestamps as meta data, they are not directly 
>> accessible by most functions. You can implement a ProcessFunction [2] to 
>> access the timestamp of a record via the ProcessFunction's Context object.
>> 
>> Best, Fabian
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010
>>  
>> 
>> [2] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html#the-processfunction
>>  
>> 
>> 
>> 2018-03-30 7:45 GMT+02:00 Ben Yan > >:
>> hi,
>> Is that what you mean?
>> See : 
>> https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel=16377145#comment-16377145
>>  
>> 
>>  
>> 
>> Best
>> Ben
>> 
>>> On 30 Mar 2018, at 12:23 PM, Navneeth Krishnan >> > wrote:
>>> 
>>> Hi,
>>> 
>>> Is there way to get the kafka timestamp in deserialization schema? All 
>>> records are written to kafka with timestamp and I would like to set that 
>>> timestamp to every record that is ingested. Thanks.
>> 
>> 
> 



Re: Record timestamp from kafka

2018-04-10 Thread Chesnay Schepler
You must use a ProcessFunction for this, the timestamps are not exposed 
in any way to map/flatmap functions.


On 10.04.2018 12:29, Ben Yan wrote:

Hi Fabian.

If I use ProcessFunction , I can get it! But I want to know  that how 
to get Kafka timestamp in like flatmap and map methods of datastream 
using scala programming language.

Thanks!

Best
Ben

On Apr 4, 2018, at 7:00 PM, Fabian Hueske > wrote:


Hi Navneeth,

Flink's KafkaConsumer automatically attaches Kafka's ingestion 
timestamp if you configure EventTime for an application [1].
Since Flink treats record timestamps as meta data, they are not 
directly accessible by most functions. You can implement a 
ProcessFunction [2] to access the timestamp of a record via the 
ProcessFunction's Context object.


Best, Fabian

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html#the-processfunction


2018-03-30 7:45 GMT+02:00 Ben Yan >:


hi,
Is that what you mean?
See :

https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel=16377145#comment-16377145




Best
Ben


On 30 Mar 2018, at 12:23 PM, Navneeth Krishnan
> wrote:

Hi,

Is there way to get the kafka timestamp in deserialization
schema? All records are written to kafka with timestamp and I
would like to set that timestamp to every record that is
ingested. Thanks.









Re: Record timestamp from kafka

2018-04-10 Thread Ben Yan
Hi Fabian.

If I use ProcessFunction , I can get it! But I want to know  that how 
to get Kafka timestamp in like flatmap and map methods of datastream using 
scala programming language.
Thanks!

Best
Ben

> On Apr 4, 2018, at 7:00 PM, Fabian Hueske  wrote:
> 
> Hi Navneeth,
> 
> Flink's KafkaConsumer automatically attaches Kafka's ingestion timestamp if 
> you configure EventTime for an application [1].
> Since Flink treats record timestamps as meta data, they are not directly 
> accessible by most functions. You can implement a ProcessFunction [2] to 
> access the timestamp of a record via the ProcessFunction's Context object.
> 
> Best, Fabian
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010
>  
> 
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html#the-processfunction
>  
> 
> 
> 2018-03-30 7:45 GMT+02:00 Ben Yan  >:
> hi,
> Is that what you mean?
> See : 
> https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel=16377145#comment-16377145
>  
> 
>  
> 
> Best
> Ben
> 
>> On 30 Mar 2018, at 12:23 PM, Navneeth Krishnan > > wrote:
>> 
>> Hi,
>> 
>> Is there way to get the kafka timestamp in deserialization schema? All 
>> records are written to kafka with timestamp and I would like to set that 
>> timestamp to every record that is ingested. Thanks.
> 
> 



Re: java.lang.Exception: TaskManager was lost/killed

2018-04-10 Thread Lasse Nedergaard
This time attached.



2018-04-10 10:41 GMT+02:00 Ted Yu :

> Can you use third party site for the graph ?
>
> I cannot view it.
>
> Thanks
>
>  Original message 
> From: Lasse Nedergaard 
> Date: 4/10/18 12:25 AM (GMT-08:00)
> To: Ken Krugler 
> Cc: user , Chesnay Schepler 
> Subject: Re: java.lang.Exception: TaskManager was lost/killed
>
>
> This graph shows Non-Heap . If the same pattern exists it make sense that
> it will try to allocate more memory and then exceed the limit. I can see
> the trend for all other containers that has been killed. So my question is
> now, what is using non-heap memory?
> From http://mail-archives.apache.org/mod_mbox/flink-
> user/201707.mbox/%3CCANC1h_u0dQQvbysDAoLLbEmeWaxiimTMFjJC
> ribpfpo0idl...@mail.gmail.com%3E it look like RockDb could be guilty.
>
> I have job using incremental checkpointing and some without, some
> optimised for FLASH_SSD. all have same pattern
>
> Lasse
>
>
>
> 2018-04-10 8:52 GMT+02:00 Lasse Nedergaard :
>
>> Hi.
>>
>> I found the exception attached below, for our simple job. It states that
>> our task-manager was killed du to exceed memory limit on 2.7GB. But when I
>> look at Flink metricts just 30 sec before it use 1.3 GB heap and 712 MB
>> Non-Heap around 2 GB.
>> So something else are also using memory inside the conatianer any idea
>> how to figure out what?
>> As a side note we use RockDBStateBackend with this configuration
>>
>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints((long)(config.checkPointInterval
>>  * 0.75));
>> env.enableCheckpointing(config.checkPointInterval, 
>> CheckpointingMode.AT_LEAST_ONCE);
>> env.setStateBackend(new RocksDBStateBackend(config.checkpointDataUri));
>>
>> Where checkpointDataUri point to S3
>>
>> Lasse Nedergaard
>>
>> 2018-04-09 16:52:01,239 INFO  org.apache.flink.yarn.YarnFlin
>> kResourceManager- Diagnostics for container
>> container_1522921976871_0001_01_79 in state COMPLETE :
>> exitStatus=Pmem limit exceeded (-104) diagnostics=Container
>> [pid=30118,containerID=container_1522921976871_0001_01_79] is
>> running beyond physical memory limits. Current usage: 2.7 GB of 2.7 GB
>> physical memory used; 4.9 GB of 13.4 GB virtual memory used. Killing
>> container.
>>
>> Dump of the process-tree for container_1522921976871_0001_01_79 :
>>
>> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
>> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
>>
>> |- 30136 30118 30118 30118 (java) 245173 68463 5193723904 703845
>> /usr/lib/jvm/java-openjdk/bin/java -Xms2063m -Xmx2063m
>> -Dlog.file=/var/log/hadoop-yarn/containers/application_15229
>> 21976871_0001/container_1522921976871_0001_01_79/taskmanager.log
>> -Dlogback.configurationFile=file:./logback.xml
>> -Dlog4j.configuration=file:./log4j.properties
>> org.apache.flink.yarn.YarnTaskManager --configDir .
>>
>> |- 30118 30116 30118 30118 (bash) 0 0 115818496 674 /bin/bash -c
>> /usr/lib/jvm/java-openjdk/bin/java -Xms2063m -Xmx2063m
>> -Dlog.file=/var/log/hadoop-yarn/containers/application_15229
>> 21976871_0001/container_1522921976871_0001_01_79/taskmanager.log
>> -Dlogback.configurationFile=file:./logback.xml
>> -Dlog4j.configuration=file:./log4j.properties
>> org.apache.flink.yarn.YarnTaskManager --configDir . 1>
>> /var/log/hadoop-yarn/containers/application_1522921976871_
>> 0001/container_1522921976871_0001_01_79/taskmanager.out 2>
>> /var/log/hadoop-yarn/containers/application_1522921976871_
>> 0001/container_1522921976871_0001_01_79/taskmanager.err
>>
>>
>> 2018-04-09 16:51:26,659 DEBUG org.trackunit.tm2.LogReporter
>> - gauge.ip-10-1-1-181.taskmanage
>> r.container_1522921976871_0001_01_79.Status.JVM.
>> Memory.Heap.Used=1398739496
>>
>>
>> 2018-04-09 16:51:26,659 DEBUG org.trackunit.tm2.LogReporter
>> - gauge.ip-10-1-1-181.taskmanage
>> r.container_1522921976871_0001_01_79.Status.JVM.
>> Memory.NonHeap.Used=746869520
>>
>>
>>
>>
>>
>> 2018-04-09 23:52 GMT+02:00 Ken Krugler :
>>
>>> Hi Chesnay,
>>>
>>> Don’t know if this helps, but I’d run into this as well, though I
>>> haven’t hooked up YourKit to analyze exactly what’s causing the memory
>>> problem.
>>>
>>> E.g. after about 3.5 hours running locally, it failed with memory issues.
>>>
>>> In the TaskManager logs, I start seeing exceptions in my code….
>>>
>>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>
>>> And then eventually...
>>>
>>> 2018-04-07 21:55:25,686 WARN  
>>> org.apache.flink.runtime.accumulators.AccumulatorRegistry
>>> - Failed to serialize accumulators for task.
>>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>
>>> Immediately after this, one of my custom functions gets a close() call,
>>> and I see a log msg about it "switched from 

Re: java.lang.Exception: TaskManager was lost/killed

2018-04-10 Thread Ted Yu
Can you use third party site for the graph ?
I cannot view it.

Thanks
 Original message From: Lasse Nedergaard 
 Date: 4/10/18  12:25 AM  (GMT-08:00) To: Ken 
Krugler  Cc: user , Chesnay 
Schepler  Subject: Re: java.lang.Exception: TaskManager was 
lost/killed 

This graph shows Non-Heap . If the same pattern exists it make sense that it 
will try to allocate more memory and then exceed the limit. I can see the trend 
for all other containers that has been killed. So my question is now, what is 
using non-heap memory?From 
http://mail-archives.apache.org/mod_mbox/flink-user/201707.mbox/%3ccanc1h_u0dqqvbysdaollbemewaxiimtmfjjcribpfpo0idl...@mail.gmail.com%3E
 it look like RockDb could be guilty.
I have job using incremental checkpointing and some without, some optimised for 
FLASH_SSD. all have same pattern
Lasse 


2018-04-10 8:52 GMT+02:00 Lasse Nedergaard :
Hi.
I found the exception attached below, for our simple job. It states that our 
task-manager was killed du to exceed memory limit on 2.7GB. But when I look at 
Flink metricts just 30 sec before it use 1.3 GB heap and 712 MB Non-Heap around 
2 GB. So something else are also using memory inside the conatianer any idea 
how to figure out what?As a side note we use RockDBStateBackend with this 
configuration
env.getCheckpointConfig().setMinPauseBetweenCheckpoints((long)(config.checkPointInterval
 * 0.75));
env.enableCheckpointing(config.checkPointInterval, 
CheckpointingMode.AT_LEAST_ONCE);
env.setStateBackend(new RocksDBStateBackend(config.checkpointDataUri));Where 
checkpointDataUri point to S3
Lasse Nedergaard











2018-04-09 16:52:01,239 INFO  org.apache.flink.yarn.YarnFlinkResourceManager    
            - Diagnostics for container container_1522921976871_0001_01_79 
in state COMPLETE : exitStatus=Pmem limit exceeded (-104) diagnostics=Container 
[pid=30118,containerID=container_1522921976871_0001_01_79] is running 
beyond physical memory limits. Current usage: 2.7 GB of 2.7 GB physical memory 
used; 4.9 GB of 13.4 GB virtual memory used. Killing container.
Dump of the process-tree for container_1522921976871_0001_01_79 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) 
SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
|- 30136 30118 30118 30118 (java) 245173 68463 5193723904 703845 
/usr/lib/jvm/java-openjdk/bin/java -Xms2063m -Xmx2063m 
-Dlog.file=/var/log/hadoop-yarn/containers/application_1522921976871_0001/container_1522921976871_0001_01_79/taskmanager.log
 -Dlogback.configurationFile=file:./logback.xml 
-Dlog4j.configuration=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskManager --configDir . 
|- 30118 30116 30118 30118 (bash) 0 0 115818496 674 /bin/bash -c 
/usr/lib/jvm/java-openjdk/bin/java -Xms2063m -Xmx2063m  
-Dlog.file=/var/log/hadoop-yarn/containers/application_1522921976871_0001/container_1522921976871_0001_01_79/taskmanager.log
 -Dlogback.configurationFile=file:./logback.xml 
-Dlog4j.configuration=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskManager --configDir . 1> 
/var/log/hadoop-yarn/containers/application_1522921976871_0001/container_1522921976871_0001_01_79/taskmanager.out
 2> 
/var/log/hadoop-yarn/containers/application_1522921976871_0001/container_1522921976871_0001_01_79/taskmanager.err
 


2018-04-09 16:51:26,659 DEBUG org.trackunit.tm2.LogReporter                     
            - 
gauge.ip-10-1-1-181.taskmanager.container_1522921976871_0001_01_79.Status.JVM.Memory.Heap.Used=1398739496


2018-04-09 16:51:26,659 DEBUG org.trackunit.tm2.LogReporter                     
            - 
gauge.ip-10-1-1-181.taskmanager.container_1522921976871_0001_01_79.Status.JVM.Memory.NonHeap.Used=746869520






 
2018-04-09 23:52 GMT+02:00 Ken Krugler :
Hi Chesnay,
Don’t know if this helps, but I’d run into this as well, though I haven’t 
hooked up YourKit to analyze exactly what’s causing the memory problem.
E.g. after about 3.5 hours running locally, it failed with memory issues.

In the TaskManager logs, I start seeing exceptions in my code….
java.lang.OutOfMemoryError: GC overhead limit exceeded
And then eventually...
2018-04-07 21:55:25,686 WARN  
org.apache.flink.runtime.accumulators.AccumulatorRegistry     - Failed to 
serialize accumulators for task.
java.lang.OutOfMemoryError: GC overhead limit exceeded

Immediately after this, one of my custom functions gets a close() call, and I 
see a log msg about it "switched from RUNNING to FAILED”.
After this, I see messages that the job is being restarted, but the TaskManager 
log output abruptly ends.
In the Job Manager log, this is what is output following the time of the last 
TaskManager logging output:
2018-04-07 21:57:33,702 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - 

Re: java.lang.Exception: TaskManager was lost/killed

2018-04-10 Thread Lasse Nedergaard
This graph shows Non-Heap . If the same pattern exists it make sense that
it will try to allocate more memory and then exceed the limit. I can see
the trend for all other containers that has been killed. So my question is
now, what is using non-heap memory?
From
http://mail-archives.apache.org/mod_mbox/flink-user/201707.mbox/%3ccanc1h_u0dqqvbysdaollbemewaxiimtmfjjcribpfpo0idl...@mail.gmail.com%3E
it look like RockDb could be guilty.

I have job using incremental checkpointing and some without, some optimised
for FLASH_SSD. all have same pattern

Lasse



2018-04-10 8:52 GMT+02:00 Lasse Nedergaard :

> Hi.
>
> I found the exception attached below, for our simple job. It states that
> our task-manager was killed du to exceed memory limit on 2.7GB. But when I
> look at Flink metricts just 30 sec before it use 1.3 GB heap and 712 MB
> Non-Heap around 2 GB.
> So something else are also using memory inside the conatianer any idea how
> to figure out what?
> As a side note we use RockDBStateBackend with this configuration
>
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints((long)(config.checkPointInterval
>  * 0.75));
> env.enableCheckpointing(config.checkPointInterval, 
> CheckpointingMode.AT_LEAST_ONCE);
> env.setStateBackend(new RocksDBStateBackend(config.checkpointDataUri));
>
> Where checkpointDataUri point to S3
>
> Lasse Nedergaard
>
> 2018-04-09 16:52:01,239 INFO  org.apache.flink.yarn.
> YarnFlinkResourceManager- Diagnostics for container
> container_1522921976871_0001_01_79 in state COMPLETE :
> exitStatus=Pmem limit exceeded (-104) diagnostics=Container
> [pid=30118,containerID=container_1522921976871_0001_01_79] is running
> beyond physical memory limits. Current usage: 2.7 GB of 2.7 GB physical
> memory used; 4.9 GB of 13.4 GB virtual memory used. Killing container.
>
> Dump of the process-tree for container_1522921976871_0001_01_79 :
>
> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
>
> |- 30136 30118 30118 30118 (java) 245173 68463 5193723904 703845
> /usr/lib/jvm/java-openjdk/bin/java -Xms2063m -Xmx2063m
> -Dlog.file=/var/log/hadoop-yarn/containers/application_
> 1522921976871_0001/container_1522921976871_0001_01_79/taskmanager.log
> -Dlogback.configurationFile=file:./logback.xml
> -Dlog4j.configuration=file:./log4j.properties 
> org.apache.flink.yarn.YarnTaskManager
> --configDir .
>
> |- 30118 30116 30118 30118 (bash) 0 0 115818496 674 /bin/bash -c
> /usr/lib/jvm/java-openjdk/bin/java -Xms2063m -Xmx2063m
> -Dlog.file=/var/log/hadoop-yarn/containers/application_
> 1522921976871_0001/container_1522921976871_0001_01_79/taskmanager.log
> -Dlogback.configurationFile=file:./logback.xml
> -Dlog4j.configuration=file:./log4j.properties 
> org.apache.flink.yarn.YarnTaskManager
> --configDir . 1> /var/log/hadoop-yarn/containers/application_
> 1522921976871_0001/container_1522921976871_0001_01_79/taskmanager.out
> 2> /var/log/hadoop-yarn/containers/application_
> 1522921976871_0001/container_1522921976871_0001_01_79/taskmanager.err
>
>
> 2018-04-09 16:51:26,659 DEBUG org.trackunit.tm2.LogReporter
>   - gauge.ip-10-1-1-181.taskmanager.container_
> 1522921976871_0001_01_79.Status.JVM.Memory.Heap.Used=1398739496
>
>
> 2018-04-09 16:51:26,659 DEBUG org.trackunit.tm2.LogReporter
>   - gauge.ip-10-1-1-181.taskmanager.container_
> 1522921976871_0001_01_79.Status.JVM.Memory.NonHeap.Used=746869520
>
>
>
>
>
> 2018-04-09 23:52 GMT+02:00 Ken Krugler :
>
>> Hi Chesnay,
>>
>> Don’t know if this helps, but I’d run into this as well, though I haven’t
>> hooked up YourKit to analyze exactly what’s causing the memory problem.
>>
>> E.g. after about 3.5 hours running locally, it failed with memory issues.
>>
>> In the TaskManager logs, I start seeing exceptions in my code….
>>
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>
>> And then eventually...
>>
>> 2018-04-07 21:55:25,686 WARN  
>> org.apache.flink.runtime.accumulators.AccumulatorRegistry
>> - Failed to serialize accumulators for task.
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>
>> Immediately after this, one of my custom functions gets a close() call,
>> and I see a log msg about it "switched from RUNNING to FAILED”.
>>
>> After this, I see messages that the job is being restarted, but the
>> TaskManager log output abruptly ends.
>>
>> In the Job Manager log, this is what is output following the time of the
>> last TaskManager logging output:
>>
>> 2018-04-07 21:57:33,702 INFO  
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>> - Triggering checkpoint 129 @ 1523163453702
>> 2018-04-07 21:58:43,916 WARN  akka.remote.ReliableDeliverySupervisor
>>- Association with remote system [
>> akka.tcp://fl...@kens-mbp.hsd1.ca.comcast.net:63780] has failed, address
>> is now 

Re: java.lang.Exception: TaskManager was lost/killed

2018-04-10 Thread Lasse Nedergaard
Hi.

I found the exception attached below, for our simple job. It states that
our task-manager was killed du to exceed memory limit on 2.7GB. But when I
look at Flink metricts just 30 sec before it use 1.3 GB heap and 712 MB
Non-Heap around 2 GB.
So something else are also using memory inside the conatianer any idea how
to figure out what?
As a side note we use RockDBStateBackend with this configuration

env.getCheckpointConfig().setMinPauseBetweenCheckpoints((long)(config.checkPointInterval
* 0.75));
env.enableCheckpointing(config.checkPointInterval,
CheckpointingMode.AT_LEAST_ONCE);
env.setStateBackend(new RocksDBStateBackend(config.checkpointDataUri));

Where checkpointDataUri point to S3

Lasse Nedergaard

2018-04-09 16:52:01,239 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
  - Diagnostics for container
container_1522921976871_0001_01_79 in state COMPLETE : exitStatus=Pmem
limit exceeded (-104) diagnostics=Container
[pid=30118,containerID=container_1522921976871_0001_01_79] is running
beyond physical memory limits. Current usage: 2.7 GB of 2.7 GB physical
memory used; 4.9 GB of 13.4 GB virtual memory used. Killing container.

Dump of the process-tree for container_1522921976871_0001_01_79 :

|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE

|- 30136 30118 30118 30118 (java) 245173 68463 5193723904 703845
/usr/lib/jvm/java-openjdk/bin/java -Xms2063m -Xmx2063m
-Dlog.file=/var/log/hadoop-yarn/containers/application_1522921976871_0001/container_1522921976871_0001_01_79/taskmanager.log
-Dlogback.configurationFile=file:./logback.xml
-Dlog4j.configuration=file:./log4j.properties
org.apache.flink.yarn.YarnTaskManager --configDir .

|- 30118 30116 30118 30118 (bash) 0 0 115818496 674 /bin/bash -c
/usr/lib/jvm/java-openjdk/bin/java -Xms2063m -Xmx2063m
-Dlog.file=/var/log/hadoop-yarn/containers/application_1522921976871_0001/container_1522921976871_0001_01_79/taskmanager.log
-Dlogback.configurationFile=file:./logback.xml
-Dlog4j.configuration=file:./log4j.properties
org.apache.flink.yarn.YarnTaskManager --configDir . 1>
/var/log/hadoop-yarn/containers/application_1522921976871_0001/container_1522921976871_0001_01_79/taskmanager.out
2>
/var/log/hadoop-yarn/containers/application_1522921976871_0001/container_1522921976871_0001_01_79/taskmanager.err



2018-04-09 16:51:26,659 DEBUG org.trackunit.tm2.LogReporter
-
gauge.ip-10-1-1-181.taskmanager.container_1522921976871_0001_01_79.Status.JVM.Memory.Heap.Used=1398739496


2018-04-09 16:51:26,659 DEBUG org.trackunit.tm2.LogReporter
-
gauge.ip-10-1-1-181.taskmanager.container_1522921976871_0001_01_79.Status.JVM.Memory.NonHeap.Used=746869520





2018-04-09 23:52 GMT+02:00 Ken Krugler :

> Hi Chesnay,
>
> Don’t know if this helps, but I’d run into this as well, though I haven’t
> hooked up YourKit to analyze exactly what’s causing the memory problem.
>
> E.g. after about 3.5 hours running locally, it failed with memory issues.
>
> In the TaskManager logs, I start seeing exceptions in my code….
>
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>
> And then eventually...
>
> 2018-04-07 21:55:25,686 WARN  
> org.apache.flink.runtime.accumulators.AccumulatorRegistry
> - Failed to serialize accumulators for task.
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>
> Immediately after this, one of my custom functions gets a close() call,
> and I see a log msg about it "switched from RUNNING to FAILED”.
>
> After this, I see messages that the job is being restarted, but the
> TaskManager log output abruptly ends.
>
> In the Job Manager log, this is what is output following the time of the
> last TaskManager logging output:
>
> 2018-04-07 21:57:33,702 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 129 @ 1523163453702
> 2018-04-07 21:58:43,916 WARN  akka.remote.ReliableDeliverySupervisor
>- Association with remote system [
> akka.tcp://fl...@kens-mbp.hsd1.ca.comcast.net:63780] has failed, address
> is now gated for [5000] ms. Reason: [Disassociated]
> 2018-04-07 21:58:51,084 WARN  akka.remote.transport.netty.NettyTransport
>- Remote connection to [null] failed with
> java.net.ConnectException: Connection refused: kens-
> mbp.hsd1.ca.comcast.net/192.168.3.177:63780
> 2018-04-07 21:58:51,086 WARN  akka.remote.ReliableDeliverySupervisor
>- Association with remote system [akka.tcp://
> fl...@kens-mbp.hsd1.ca.comcast.net:63780] has failed, address is now
> gated for [5000] ms. Reason: [Association failed with [akka.tcp://
> fl...@kens-mbp.hsd1.ca.comcast.net:63780]] Caused by: [Connection
> refused: kens-mbp.hsd1.ca.comcast.net/192.168.3.177:63780]
> 2018-04-07 21:59:01,047 WARN  akka.remote.transport.netty.NettyTransport
>- Remote connection 

Re: Flink 1.4.2 in Zeppelin Notebook

2018-04-10 Thread Rico Bergmann
FYI: I finally managed to get the new Flink version running in Zeppelin.
Besides adding the parameters mentioned below you have to build Zeppelin
with profile scala-2.11 and the new Flink version 1.4.2.


Best,

Rico.


Am 09.04.2018 um 14:43 schrieb Rico Bergmann:
>
> The error message is:
>
> org.apache.flink.client.program.ProgramInvocationException: The
> program execution failed: Communication with JobManager failed: Lost
> connection to the JobManager.
>
>
>
> Am 09.04.2018 um 14:12 schrieb kedar mhaswade:
>> Hmm. What error do you see on the Zeppelin console when you click the
>> run (flink code) button after making these changes for flink
>> interpreter config (I assume you restart the interpreter)?
>>
>> Regards,
>> Kedar
>>
>> On Mon, Apr 9, 2018 at 12:50 AM, Rico Bergmann > > wrote:
>>
>> Hi. 
>>
>> Thanks for your reply. But this also didn’t work for me. 
>>
>> In the JM log I get an akka Error („dropping message for
>> non-local recipient“). 
>>
>> My setup: I have Flink running on Kubernetes cluster, version
>> 1.4.2. zeppelin is version 0.8 using the flink interpreter
>> compiled against flink 1.1.3. 
>> When submitting a job with the CLI tool everything is working
>> fine. The CLI tool is version 1.4.2 ...
>>
>> Any other suggestions?
>>
>> Thanks a lot. 
>> Best,
>> Rico. 
>>
>>
>> Am 06.04.2018 um 18:44 schrieb kedar mhaswade
>> >:
>>
>>> Yes. You need to add the two properties for the job manager (I
>>> agree, it is confusing because the properties named "host" and
>>> "port" already available, but the names of the useful properties
>>> are different):
>>>
>>> Could you please try this and let us know if it works for you?
>>>
>>> Regards,
>>> Kedar
>>>
>>>
>>> On Fri, Apr 6, 2018 at 5:51 AM, Dipl.-Inf. Rico Bergmann
>>> > wrote:
>>>
>>> Hi!
>>>
>>> Has someone successfully integrated Flink 1.4.2 into
>>> Zeppelin notebook (using Flink in cluster mode, not local mode)?
>>>
>>> Best,
>>>
>>> Rico.
>>>
>>>
>>
>