Error while trigger checkpoint due to Kyro Exception

2018-08-17 Thread Bruce Qiu
Hi Community,

I am using Flink 1.4.2 to do streaming processing. I fetch data from Kafka and 
write the parquet file to HDFS. In the previous environment, the Kafka had 192 
partitions and I set the source parallelism to 192, the application works fine. 
But recently we had increased the Kafka paritions to 384. So I changed the 
source parallelism to 384. After I made this change, the application throws the 
exception as blow, and the checkpoint is always fail. Also I saw the 
backpressure is very high in the ColFlatMap stage. My application’s DAG as 
blow. Can someone helps me about this exception, thanks a lot.

 

DAG Stage

 

 

 

Exception stack trace:

 

java.lang.Exception: Error while triggering checkpoint 109 for Source: Custom 
Source (257/384)

  at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1210)

  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

  at java.util.concurrent.FutureTask.run(FutureTask.java:266)

  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

  at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.Exception: Could not perform checkpoint 109 for operator 
Source: Custom Source (257/384).

  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:544)

  at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:111)

  at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1199)

  ... 5 more

Caused by: java.lang.Exception: Could not complete snapshot 109 for operator 
Source: Custom Source (257/384).

  at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:378)

  at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1089)

  at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1038)

  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:671)

  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:607)

  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:538)

  ... 7 more

Caused by: java.lang.ArrayIndexOutOfBoundsException: -1

  at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)

  at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)

    at 
com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625)

  at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)

  at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175)

  at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)

  at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)

  at 
org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:74)

  at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.(DefaultOperatorStateBackend.java:448)

  at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.deepCopy(DefaultOperatorStateBackend.java:460)

  at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:220)

  at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:363)

  ... 12 more

 

 

Regards,

Bruce



Re: Operator metrics do not get unregistered after job finishes

2018-08-17 Thread vino yang
Hi Helmut,

Is the metrics of all the sub task instances of a job not unregistered, or
part of it is not unregistered. Is there any exception log information
available?

Please feel free to create a JIRA issue and clearly describe your problem.

Thanks, vino.

Helmut Zechmann  于2018年8月17日周五 下午11:14写道:

> Hi all,
>
>
> we are using flink 1.5.2 in batch mode with prometheus monitoring.
>
> We noticed that a few metrics do not get unregistered after a job is
> finished:
>
> flink_taskmanager_job_task_operator_numRecordsIn
> flink_taskmanager_job_task_operator_numRecordsInPerSecond
> flink_taskmanager_job_task_operator_numRecordsOut
> flink_taskmanager_job_task_operator_numRecordsOutPerSecond
>
>
> Those metrics stay in the taksmanager metrics list until the task manger
> gets restarted.
>
> Our metrics config is:
>
> metrics.reporters: prom
> metrics.reporter.prom.class:
> org.apache.flink.metrics.prometheus.PrometheusReporter
> metrics.reporter.prom.port: 7000-7001
>
> metrics.scope.jm: flink..jobmanager
> metrics.scope.tm: flink..taskmanager.
> metrics.scope.jm.job: flink..jobmanager.
> metrics.scope.tm.job: flink..taskmanager..
> metrics.scope.task:
> flink..taskmanager
> metrics.scope.operator:
> flink..taskmanager
>
>
> Since we run many batch jobs, this makes prometheus monitoring unusable
> for us. Is this a known issue?
>
>
> Best,
>
> Helmut


Re: processWindowFunction

2018-08-17 Thread vino yang
Hi antonio,

Yes, ProcessWindowFunction is a very low level window function.
It allows you to access the data in the window and allows you to customize
the output of the window.
So if you use it, while giving you flexibility, you need to think about
other things, which may require you to write more processing logic.

Generally speaking, sliding windows usually have some data that is
repeated, but a common mode is to apply a reduce function on it to get your
calculation results.
If you only send data, there will definitely be some duplication.

Thanks, vino.

antonio saldivar  于2018年8月17日周五 下午12:01写道:

> Hi Vino
> thank you for the information, actually I am using a trigger alert and
> processWindowFunction to send my results, but when my window slides or ends
> it sends again the objects and I an getting duplicated data
>
> El jue., 16 ago. 2018 a las 22:05, vino yang ()
> escribió:
>
>> Hi Antonio,
>>
>> What results do not you want to get when creating each window?
>> Examples of the use of ProcessWindowFunction are included in many test
>> files in Flink's project, such as SideOutputITCase.scala or
>> WindowTranslationTest.scala.
>>
>> For more information on ProcessWindowFunction, you can refer to the
>> official website.[1]
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#processwindowfunction
>>
>> Thanks, vino.
>>
>> antonio saldivar  于2018年8月17日周五 上午6:24写道:
>>
>>> Hello
>>>
>>> I am implementing a data stream where I use sliding windows but I am
>>> stuck because I need to set values to my object based on some if statements
>>> in my process function  and send the object to the next step but I don't
>>> want results every time a window is creating
>>>
>>> if anyone has a good example on this that can help me
>>>
>>


Error in KyroSerializer

2018-08-17 Thread Pankaj Chaudhary
Hi,

I am on Flink 1.4.2 and as part of my operator logic (i.e. RichFlatMapFunction) 
I am collecting the values in the Collector object.

But I am getting an error stating “Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator”

On debugging it looks like the root cause of this exception is in 
KyroSerializer where its try to do some copy operation. Can some one please let 
me know how I can get around this issue.

Below is the stack trace of the error

Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
... 11 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: -14
at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
... 17 more

Regards,
Pankaj.

Re: Override CaseClassSerializer with custom serializer

2018-08-17 Thread Timo Walther

Hi Gerard,

you are correct, Kryo serializers are only used when no built-in Flink 
serializer is available.


Actually, the tuple and case class serializers are one of the most 
performant serializers in Flink (due to their fixed length, no null 
support). If you really want to reduce the serialization overhead you 
could look into the object reuse mode. We had this topic on the mailing 
list recently, I will just copy it here:


If you want to improve the performance of a collect() between operators, 
you could also enable object reuse. You can read more about this here 
[1] (section "Issue 2: Object Reuse"), but make sure your implementation 
is correct because an operator could modify the objects of follwing 
operators.


I hope this helps.

Regards,
Timo

[1] 
https://data-artisans.com/blog/curious-case-broken-benchmark-revisiting-apache-flink-vs-databricks-runtime


Am 17.08.18 um 17:29 schrieb gerardg:

Hello,

I can't seem to be able to override the CaseClassSerializer with my custom
serializer. I'm using env.getConfig.addDefaultKryoSerializer() to add the
custom serializer but I don't see it being used. I guess it is because it
only uses Kryo based serializers if it can't find a Flink serializer?

Is then worth it to replace the CaseClassSerializer with a custom
serializer? (when I profile the CaseClassSerializer.(de)serialize method
appears as the most used so I wanted to give it a try) If so, how can I do
it?

Thanks,

Gerard



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Flink not rolling log files

2018-08-17 Thread Dominik Wosiński
I am using this *log4j.properties *file config for rolling files once per
day and it is working perfectly. Maybe this will give You some hint:

log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{-MM-dd
HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log4j.appender.file.layout.DatePattern='.'-MM-dd

Best Regards,
Dominik.


Re: Flink Jobmanager Failover in HA mode

2018-08-17 Thread Dominik Wosiński
I have faced this issue, but in 1.4.0 IIRC. This seems to be related to
https://issues.apache.org/jira/browse/FLINK-10011. What was the status of
the jobs when the main Job Manager has been stopped ?

2018-08-17 17:08 GMT+02:00 Helmut Zechmann :

> Hi all,
>
> we have a problem with flink 1.5.2 high availability in standalone mode.
>
> We have two jobmanagers running. When I shut down the main job manager,
> the failover job manager encounters an error during failover.
>
> Logs:
>
>
> 2018-08-17 14:38:16,478 WARN  akka.remote.ReliableDeliverySupervisor
>   - Association with remote system [akka.tcp://
> fl...@seg-1.adjust.com:29095] has failed, address is now gated for [50]
> ms. Reason: [Disassociated]
> 2018-08-17 14:38:31,449 WARN  akka.remote.transport.netty.NettyTransport
>   - Remote connection to [null] failed with
> java.net.ConnectException: Connection refused:
> seg-1.adjust.com/178.162.219.66:29095
> 2018-08-17 14:38:31,451 WARN  akka.remote.ReliableDeliverySupervisor
>   - Association with remote system [akka.tcp://
> fl...@seg-1.adjust.com:29095] has failed, address is now gated for [50]
> ms. Reason: [Association failed with [akka.tcp://flink@seg-1.
> adjust.com:29095]] Caused by: [Connection refused:
> seg-1.adjust.com/178.162.219.66:29095]
> 2018-08-17 14:38:41,379 ERROR org.apache.flink.runtime.rest.
> handler.legacy.files.StaticFileServerHandler  - Could not retrieve the
> redirect address.
> java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException:
> Ask timed out on [Actor[akka.tcp://fl...@seg-1.adjust.com:29095/user/
> dispatcher#-1599908403]] after [1 ms]. Sender[null] sent message of
> type "org.apache.flink.runtime.rpc.messages.RemoteFencedMessage".
> at java.util.concurrent.CompletableFuture.encodeThrowable(
> CompletableFuture.java:292)
> [... shortened ...]
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka.tcp://fl...@seg-1.adjust.com:29095/user/dispatcher#-1599908403]]
> after [1 ms]. Sender[null] sent message of type
> "org.apache.flink.runtime.rpc.messages.RemoteFencedMessage".
> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(
> AskSupport.scala:604)
> ... 9 more
> 2018-08-17 14:38:48,005 INFO  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
>   - http://seg-2.adjust.com:8083 was granted leadership with
> leaderSessionID=708d1a64-c353-448b-9101-7eb3f910970e
> 2018-08-17 14:38:48,005 INFO  
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager
> - ResourceManager akka.tcp://flink@seg-2.adjust.
> com:30169/user/resourcemanager was granted leadership with fencing token
> 8de829de14876a367a80d37194b944ee
> 2018-08-17 14:38:48,006 INFO  org.apache.flink.runtime.
> resourcemanager.slotmanager.SlotManager  - Starting the SlotManager.
> 2018-08-17 14:38:48,007 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher
> - Dispatcher akka.tcp://fl...@seg-2.adjust.com:30169/user/dispatcher
> was granted leadership with fencing token 684f50f8-327c-47e1-a53c-
> 931c4f4ea3e5
> 2018-08-17 14:38:48,007 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher
> - Recovering all persisted jobs.
> 2018-08-17 14:38:48,021 INFO  org.apache.flink.runtime.jobmanager.
> ZooKeeperSubmittedJobGraphStore  - Recovered SubmittedJobGraph(
> b951bbf518bcf6cc031be6d2ccc441bb, null).
> 2018-08-17 14:38:48,028 INFO  org.apache.flink.runtime.jobmanager.
> ZooKeeperSubmittedJobGraphStore  - Recovered SubmittedJobGraph(
> 06ed64f48fa0a7cffde53b99cbaa073f, null).
> 2018-08-17 14:38:48,035 ERROR 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint
>- Fatal error occurred in the cluster entrypoint.
> java.lang.RuntimeException: 
> org.apache.flink.runtime.client.JobExecutionException:
> Could not set up JobManager
> at org.apache.flink.util.ExceptionUtils.rethrow(
> ExceptionUtils.java:199)
> [... shortened ...]
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
> not set up JobManager
> at org.apache.flink.runtime.jobmaster.JobManagerRunner.<
> init>(JobManagerRunner.java:176)
> at org.apache.flink.runtime.dispatcher.Dispatcher$
> DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:936)
> at org.apache.flink.runtime.dispatcher.Dispatcher.
> createJobManagerRunner(Dispatcher.java:291)
> at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(
> Dispatcher.java:281)
> at org.apache.flink.util.function.ConsumerWithException.accept(
> ConsumerWithException.java:38)
> ... 21 more
> Caused by: java.lang.Exception: Cannot set up the user code libraries:
> /var/lib/flink/ceph/prod/1.5-batch/ha_state/1.5-batch/blob/job_
> b951bbf518bcf6cc031be6d2ccc441bb/blob_p-a26f62e3bbdcd8884dd18c42a3f6f2
> 02b9d2c6e7-0dc87a56862a1f799d515306ffeddfcb (No such file or directory)
> at 

Override CaseClassSerializer with custom serializer

2018-08-17 Thread gerardg
Hello,

I can't seem to be able to override the CaseClassSerializer with my custom
serializer. I'm using env.getConfig.addDefaultKryoSerializer() to add the
custom serializer but I don't see it being used. I guess it is because it
only uses Kryo based serializers if it can't find a Flink serializer? 

Is then worth it to replace the CaseClassSerializer with a custom
serializer? (when I profile the CaseClassSerializer.(de)serialize method
appears as the most used so I wanted to give it a try) If so, how can I do
it?

Thanks,

Gerard



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink not rolling log files

2018-08-17 Thread Gary Yao
Hello Navneet Kumar Pandey,

org.apache.log4j.rolling.RollingFileAppender is part of Apache Extras
Companion for Apache log4j [1]. Is that library in your classpath?

Are there hints in taskmanager.err?

Can you run:

cat /usr/lib/flink/conf/log4j.properties

on the EMR master node and show the output?

For troubleshooting, you can also try org.apache.log4j.RollingFileAppender
which can roll the file if a certain size is exceeded. An example
configuration can be found here (I have not tested it):

https://github.com/apache/flink/pull/5371/files

Best,
Gary

[1] https://logging.apache.org/log4j/extras/

On Fri, Aug 17, 2018 at 4:09 PM, Navneet Kumar Pandey 
wrote:

> I am using Flink in EMR with following configuration.
>
>  {
>   "Classification": "flink-log4j",
>   "Properties": {
> "log4j.logger.no":"DEBUG",
> "log4j.appender.file":"org.apache.log4j.rolling.
> RollingFileAppender",
> "log4j.appender.file.RollingPolicy.FileNamePattern"
> :"logs/log.%d{MMdd-HHmm}.log",
> "log4j.appender.file.RollingPolicy":"org.apache.log4j.rolling.
> TimeBasedRollingPolicy",
> "log4j.appender.file.append":"false",
> "log4j.appender.file.layout":"org.apache.log4j.PatternLayout",
> "log4j.appender.file.layout.ConversionPattern":"%d{-MM-dd
> HH:mm:ss,SSS} %-5p %-60c %x - %m%n"
>
>   }
> }
>
> FYI this configuration get written into flink's log4j.properties.As you
> can see even after this setting taskmanager and jobmanager log files are
> not getting rolled.
>
> [hadoop@ip-XX ~]$ sudo ls -lh  /mnt/var/log/hadoop-yarn/
> containers/application_DDD_0002/container_
> _0002_01_02
> total 7.0G
> -rw-r--r-- 1 yarn yarn 770K Aug 17 14:02 taskmanager.err
> -rw-r--r-- 1 yarn yarn 6.0G Aug 17 14:02 taskmanager.log
> -rw-r--r-- 1 yarn yarn 526K Aug 17 13:54 taskmanager.out
>
> Can somebody help me to give pointer about how to roll these log files?
> Note that these files are also being copied into s3.
>
>


Operator metrics do not get unregistered after job finishes

2018-08-17 Thread Helmut Zechmann
Hi all,


we are using flink 1.5.2 in batch mode with prometheus monitoring.

We noticed that a few metrics do not get unregistered after a job is finished:

flink_taskmanager_job_task_operator_numRecordsIn
flink_taskmanager_job_task_operator_numRecordsInPerSecond
flink_taskmanager_job_task_operator_numRecordsOut
flink_taskmanager_job_task_operator_numRecordsOutPerSecond


Those metrics stay in the taksmanager metrics list until the task manger gets 
restarted.

Our metrics config is:

metrics.reporters: prom
metrics.reporter.prom.class: 
org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 7000-7001

metrics.scope.jm: flink..jobmanager
metrics.scope.tm: flink..taskmanager.
metrics.scope.jm.job: flink..jobmanager.
metrics.scope.tm.job: flink..taskmanager..
metrics.scope.task: 
flink..taskmanager
metrics.scope.operator: 
flink..taskmanager


Since we run many batch jobs, this makes prometheus monitoring unusable for us. 
Is this a known issue?


Best,

Helmut

Re: Looking for flink code example using flink-jpmml library over DataStream

2018-08-17 Thread sagar loke
Hi Hequn,

Thanks for pointing that out.

We were wondering if there is anything else other than these examples, that
would help.

Thanks,

On Fri, Aug 17, 2018 at 5:33 AM Hequn Cheng  wrote:

> Hi sagar,
>
> There are some examples in flink-jpmml git library[1], for example[2].
> Hope it helps.
>
> Best, Hequn
>
> [1] https://github.com/FlinkML/flink-jpmml
> [2]
> https://github.com/FlinkML/flink-jpmml/tree/master/flink-jpmml-examples/src/main/scala/io/radicalbit/examples
>
> On Fri, Aug 17, 2018 at 10:09 AM, sagar loke  wrote:
>
>> Hi,
>>
>> We are planning to use flink to run jpmml models using flink-jpmml
>> library from (radicalbit) over DataStream in Flink.
>>
>> Is there any code example which we can refer to kick start the process ?
>>
>> Thanks,
>>
>
> --
Cheers,
Sagar


Flink Jobmanager Failover in HA mode

2018-08-17 Thread Helmut Zechmann
Hi all,

we have a problem with flink 1.5.2 high availability in standalone mode.

We have two jobmanagers running. When I shut down the main job manager, the 
failover job manager encounters an error during failover.

Logs:


2018-08-17 14:38:16,478 WARN  akka.remote.ReliableDeliverySupervisor
- Association with remote system 
[akka.tcp://fl...@seg-1.adjust.com:29095] has failed, address is now gated for 
[50] ms. Reason: [Disassociated]
2018-08-17 14:38:31,449 WARN  akka.remote.transport.netty.NettyTransport
- Remote connection to [null] failed with 
java.net.ConnectException: Connection refused: 
seg-1.adjust.com/178.162.219.66:29095
2018-08-17 14:38:31,451 WARN  akka.remote.ReliableDeliverySupervisor
- Association with remote system 
[akka.tcp://fl...@seg-1.adjust.com:29095] has failed, address is now gated for 
[50] ms. Reason: [Association failed with 
[akka.tcp://fl...@seg-1.adjust.com:29095]] Caused by: [Connection refused: 
seg-1.adjust.com/178.162.219.66:29095]
2018-08-17 14:38:41,379 ERROR 
org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler  - 
Could not retrieve the redirect address.
java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask 
timed out on 
[Actor[akka.tcp://fl...@seg-1.adjust.com:29095/user/dispatcher#-1599908403]] 
after [1 ms]. Sender[null] sent message of type 
"org.apache.flink.runtime.rpc.messages.RemoteFencedMessage".
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
[... shortened ...]
Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka.tcp://fl...@seg-1.adjust.com:29095/user/dispatcher#-1599908403]] 
after [1 ms]. Sender[null] sent message of type 
"org.apache.flink.runtime.rpc.messages.RemoteFencedMessage".
at 
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
... 9 more
2018-08-17 14:38:48,005 INFO  
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- 
http://seg-2.adjust.com:8083 was granted leadership with 
leaderSessionID=708d1a64-c353-448b-9101-7eb3f910970e
2018-08-17 14:38:48,005 INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - 
ResourceManager akka.tcp://fl...@seg-2.adjust.com:30169/user/resourcemanager 
was granted leadership with fencing token 8de829de14876a367a80d37194b944ee
2018-08-17 14:38:48,006 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Starting 
the SlotManager.
2018-08-17 14:38:48,007 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Dispatcher 
akka.tcp://fl...@seg-2.adjust.com:30169/user/dispatcher was granted leadership 
with fencing token 684f50f8-327c-47e1-a53c-931c4f4ea3e5
2018-08-17 14:38:48,007 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Recovering all 
persisted jobs.
2018-08-17 14:38:48,021 INFO  
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - 
Recovered SubmittedJobGraph(b951bbf518bcf6cc031be6d2ccc441bb, null).
2018-08-17 14:38:48,028 INFO  
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - 
Recovered SubmittedJobGraph(06ed64f48fa0a7cffde53b99cbaa073f, null).
2018-08-17 14:38:48,035 ERROR 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error 
occurred in the cluster entrypoint.
java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobExecutionException: Could not set up 
JobManager
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:199)
[... shortened ...]
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set 
up JobManager
at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:176)
at 
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:936)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:291)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:281)
at 
org.apache.flink.util.function.ConsumerWithException.accept(ConsumerWithException.java:38)
... 21 more
Caused by: java.lang.Exception: Cannot set up the user code libraries: 
/var/lib/flink/ceph/prod/1.5-batch/ha_state/1.5-batch/blob/job_b951bbf518bcf6cc031be6d2ccc441bb/blob_p-a26f62e3bbdcd8884dd18c42a3f6f202b9d2c6e7-0dc87a56862a1f799d515306ffeddfcb
 (No such file or directory)
at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:134)
... 25 more
Caused by: java.io.FileNotFoundException: 
/var/lib/flink/ceph/prod/1.5-batch/ha_state/1.5-batch/blob/job_b951bbf518bcf6cc031be6d2ccc441bb/blob_p-a26f62e3bbdcd8884dd18c42a3f6f202b9d2c6e7-0dc87a56862a1f799d515306ffeddfcb
 (No such file or directory)
at java.io.FileInputStream.open0(Native Method)
[... shortened 

Flink not rolling log files

2018-08-17 Thread Navneet Kumar Pandey
I am using Flink in EMR with following configuration.

 {
  "Classification": "flink-log4j",
  "Properties": {
"log4j.logger.no":"DEBUG",

"log4j.appender.file":"org.apache.log4j.rolling.RollingFileAppender",

"log4j.appender.file.RollingPolicy.FileNamePattern":"logs/log.%d{MMdd-HHmm}.log",

"log4j.appender.file.RollingPolicy":"org.apache.log4j.rolling.TimeBasedRollingPolicy",
"log4j.appender.file.append":"false",
"log4j.appender.file.layout":"org.apache.log4j.PatternLayout",
"log4j.appender.file.layout.ConversionPattern":"%d{-MM-dd
HH:mm:ss,SSS} %-5p %-60c %x - %m%n"

  }
}

FYI this configuration get written into flink's log4j.properties.As you can
see even after this setting taskmanager and jobmanager log files are not
getting rolled.

[hadoop@ip-XX ~]$ sudo ls -lh
/mnt/var/log/hadoop-yarn/containers/application_DDD_0002/container__0002_01_02
total 7.0G
-rw-r--r-- 1 yarn yarn 770K Aug 17 14:02 taskmanager.err
-rw-r--r-- 1 yarn yarn 6.0G Aug 17 14:02 taskmanager.log
-rw-r--r-- 1 yarn yarn 526K Aug 17 13:54 taskmanager.out

Can somebody help me to give pointer about how to roll these log files?
Note that these files are also being copied into s3.


Data loss when restoring from savepoint

2018-08-17 Thread Juho Autio
Some data is silently lost on my Flink stream job when state is restored
from a savepoint.

Do you have any debugging hints to find out where exactly the data gets
dropped?

My job gathers distinct values using a 24-hour window. It doesn't have any
custom state management.

When I cancel the job with savepoint and restore from that savepoint, some
data is missed. It seems to be losing just a small amount of data. The
event time of lost data is probably around the time of savepoint. In other
words the rest of the time window is not entirely missed – collection works
correctly also for (most of the) events that come in after restoring.

When the job processes a full 24-hour window without interruptions it
doesn't miss anything.

Usually the problem doesn't happen in test environments that have smaller
parallelism and smaller data volumes. But in production volumes the job
seems to be consistently missing at least something on every restore.

This issue has consistently happened since the job was initially created.
It was at first run on an older version of Flink 1.5-SNAPSHOT and it still
happens on both Flink 1.5.2 & 1.6.0.

I'm wondering if this could be for example some synchronization issue
between the kafka consumer offsets vs. what's been written by BucketingSink?

1. Job content, simplified

kafkaStream
.flatMap(new ExtractFieldsFunction())
.keyBy(new MapKeySelector(1, 2, 3, 4))
.timeWindow(Time.days(1))
.allowedLateness(allowedLateness)
.sideOutputLateData(lateDataTag)
.reduce(new DistinctFunction())
.addSink(sink)
// use a fixed number of output partitions
.setParallelism(8))

/**
 * Usage: .keyBy("the", "distinct", "fields").reduce(new DistinctFunction())
 */
public class DistinctFunction implements
ReduceFunction> {
@Override
public Map reduce(Map value1,
Map value2) {
return value1;
}
}

2. State configuration

boolean enableIncrementalCheckpointing = true;
String statePath = "s3n://bucket/savepoints";
new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);

Checkpointing Mode Exactly Once
Interval 1m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 1m 0s
Maximum Concurrent Checkpoints 1
Persist Checkpoints Externally Enabled (retain on cancellation)

3. BucketingSink configuration

We use BucketingSink, I don't think there's anything special here, if not
the fact that we're writing to S3.

String outputPath = "s3://bucket/output";
BucketingSink> sink = new
BucketingSink>(outputPath)
.setBucketer(new ProcessdateBucketer())
.setBatchSize(batchSize)
.setInactiveBucketThreshold(inactiveBucketThreshold)

.setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
sink.setWriter(new IdJsonWriter());

4. Kafka & event time

My flink job reads the data from Kafka, using a
BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to
synchronize watermarks accross all kafka partitions. We also write late
data to side output, but nothing is written there – if it would, it could
explain missed data in the main output (I'm also sure that our late data
writing works, because we previously had some actual late data which ended
up there).

5. allowedLateness

It may be or may not be relevant that I have also enabled allowedLateness
with 1 minute lateness on the 24-hour window:

If that makes sense, I could try removing allowedLateness entirely? That
would be just to rule out that Flink doesn't have a bug that's related to
restoring state in combination with the allowedLateness feature. After all,
all of our data should be in a good enough order to not be late, given the
max out of orderness used on kafka consumer timestamp extractor.

Thank you in advance!


Re: Looking for flink code example using flink-jpmml library over DataStream

2018-08-17 Thread Hequn Cheng
Hi sagar,

There are some examples in flink-jpmml git library[1], for example[2]. Hope
it helps.

Best, Hequn

[1] https://github.com/FlinkML/flink-jpmml
[2]
https://github.com/FlinkML/flink-jpmml/tree/master/flink-jpmml-examples/src/main/scala/io/radicalbit/examples

On Fri, Aug 17, 2018 at 10:09 AM, sagar loke  wrote:

> Hi,
>
> We are planning to use flink to run jpmml models using flink-jpmml library
> from (radicalbit) over DataStream in Flink.
>
> Is there any code example which we can refer to kick start the process ?
>
> Thanks,
>


High CPU usage

2018-08-17 Thread Alexander Smirnov
Hello,

I noticed CPU utilization went high and took a thread dump on the task
manager node. Why would RocksDBMapState.entries() / seek0 call consumes CPU?

It is Flink 1.4.2

"Co-Flat Map (3/4)" #16129 prio=5 os_prio=0 tid=0x7fefac029000
nid=0x338f runnable [0x7feed2002000]
   java.lang.Thread.State: RUNNABLE
at org.rocksdb.RocksIterator.seek0(Native Method)
at
org.rocksdb.AbstractRocksIterator.seek(AbstractRocksIterator.java:58)
at
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapIterator.loadCache(RocksDBMapState.java:489)
at
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapIterator.hasNext(RocksDBMapState.java:433)
at
org.apache.flink.contrib.streaming.state.RocksDBMapState.entries(RocksDBMapState.java:147)
at
org.apache.flink.runtime.state.UserFacingMapState.entries(UserFacingMapState.java:77)

Thank you,
Alex


ProcessingTimeSessionWindows and many other windowing pieces are built around Object

2018-08-17 Thread Andrew Roberts
I’m exploring moving some “manual” state management into Flink-managed state 
via Flink’s windowing paradigms, and I’m running into the surprise that many 
pieces of the windowing architecture require the stream be upcast to Object 
(AnyRef in scala). Is there a technical reason for this? I’m currently working 
on re-implementing the necessary pieces (for my problem) with the type 
parameters cascaded out, but is there some reason for this? Did someone start 
doing this, and then hit some wall? Would Flink be interested in these changes 
as a contribution if I get it working?

Thanks,

Andrew
-- 
*Confidentiality Notice: The information contained in this e-mail and any

attachments may be confidential. If you are not an intended recipient, you

are hereby notified that any dissemination, distribution or copying of this

e-mail is strictly prohibited. If you have received this e-mail in error,

please notify the sender and permanently delete the e-mail and any

attachments immediately. You should not retain, copy or use this e-mail or

any attachment for any purpose, nor disclose all or any part of the

contents to any other person. Thank you.*


Re: Initial value of ValueStates of primitive types

2018-08-17 Thread Averell
Thank you Dominik.

So there's an implicit conversion, which means that getState().value() would
always give a deteministic result (i.e: Boolean value would always be false,
Int value would always be 0)

I found another funny thing is even with ref type like Integer, there is
also that implicit conversion:

val y:Integer = getRuntimeContext.getState(new
ValueStateDescriptor[Int]("Triggered", classOf[Int])).value()
>> y = {Integer@5795} "0" 

Thanks for your time.
Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Initial value of ValueStates of primitive types

2018-08-17 Thread Dominik Wosiński
Hey,

After you call, by default values you mean after you call :

getRuntimeContext.getState()

If so, the default value will be state with *value() *of null, as described
in :

/**
 * Returns the current value for the state. When the state is not
 * partitioned the returned value is the same for all inputs in a given
 * operator instance. If state partitioning is applied, the value returned
 * depends on the current operator input, as the operator maintains an
 * independent state for each partition.
 *
 * If you didn't specify a default value when creating the {@link
ValueStateDescriptor}
 * this will return {@code null} when to value was previously set
using {@link #update(Object)}.
 *
 * @return The state value corresponding to the current input.
 *
 * @throws IOException Thrown if the system cannot access the state.
 */
T value() throws IOException;

For the *MapState* it should be an empty map with no keys present.

Funny thing is that there is an implicit conversion between null values
returned by state, so assume you have defined :

private lazy val *test*: ValueState[Boolean] =
getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("test",
classOf[Boolean]))

If you will now do :

print(test.value())

It will indeed print the *null*.
But if You will do  :

val myTest = test.value()
print(test.value())

It will now print *false *instead;

Best Regards,
Dominik.

2018-08-17 11:13 GMT+02:00 Averell :

> Hi,
>
> In Flink's documents, I couldn't find any example that uses primitive type
> when working with States. What would be the initial value of a ValueState
> of
> type Int/Boolean/...? The same question apply for MapValueState like
> [String, Int]
>
> Thanks and regards,
> Averell
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Need a clarification about removing a stateful operator

2018-08-17 Thread Tony Wei
Hi Stefan,

Thanks for your detailed explanation.

Best,
Tony Wei

2018-08-17 15:56 GMT+08:00 Stefan Richter :

> Hi,
>
> it will not be transported. The JM does the state assignment to create the
> deployment information for all tasks. If will just exclude the state for
> operators that are not present. So in your next checkpoints they will no
> longer be contained.
>
> Best,
> Stefan
>
>
> Am 17.08.2018 um 09:26 schrieb Tony Wei :
>
> Hi Chesnay,
>
> Thanks for your quick reply. I have another question. Will the state,
> which is ignored, be transported
> to TMs from DFS? Or will it be detected by JM's checkpoint coordinator and
> only those states reuired
> by operators be transported to each TM?
>
> Best,
> Tony Wei
>
> 2018-08-17 14:38 GMT+08:00 Chesnay Schepler :
>
>> The state won't exist in the snapshot.
>>
>>
>> On 17.08.2018 04:38, Tony Wei wrote:
>>
>> Hi all,
>>
>> I'm confused about the description in documentation. [1]
>>
>>
>>- *Removing a stateful operator:* The state of the removed operator
>>is lost unless
>>another operator takes it over. When starting the upgraded
>>application, you have
>>to explicitly agree to discard the state.
>>
>> Does that mean if I take a full snapshot (e.g. savepoint) after restoring
>> by explicitly agreeing to
>> discard the state, then the state won't exist in that snapshot? Or does
>> it just mean ignore the
>> state but the state still exist forever, unless I explicitly purge that
>> state by using state operator?
>>
>> And could this behavior differ between different state backend (Memory,
>> FS, RocksDB) ?
>>
>> Many thanks,
>> Tony Wei
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-master/
>> ops/upgrading.html#application-topology
>>
>>
>>
>
>


Initial value of ValueStates of primitive types

2018-08-17 Thread Averell
Hi,

In Flink's documents, I couldn't find any example that uses primitive type
when working with States. What would be the initial value of a ValueState of
type Int/Boolean/...? The same question apply for MapValueState like
[String, Int]

Thanks and regards,
Averell 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: InvalidTypesException: Type of TypeVariable 'K' in 'class X' could not be determined

2018-08-17 Thread Timo Walther

Hi Miguel,

the issue that you are observing is due to Java's type erasure.

"new MyClass()" is always erasured to "new MyClass()" by the 
Java compiler so it is impossible for Flink to extract something.


For classes in declarations like

class MyClass extends ... {
   ...
}

the compiler adds the actual generic and Flink can extract it. So for 
classes the generics remain but generics passed to objects are erasured.


Regards,
Timo

Am 16.08.18 um 22:28 schrieb Miguel Coimbra:

Hello,

I have some code which compiles correctly (Flink 1.4) under Java 8.
It uses generic types.
While it compiles correctly, the execution fails with the error:

org.apache.flink.api.common.functions.InvalidTypesException: Type of 
TypeVariable 'K' in 'class X' could not be determined.


This is my main:

public static void main(final String[] args) {
 X x = new X();
}

This is my class X:

public class X{
public X() { TypeInformation keySelector = TypeInformation.of(new 
TypeHint(){}); } }


Perhaps I'm lacking knowledge on the way Java's generics work, but why 
can't Flink determine the TypeVariable of 'K'?
As I am instantiating X parameterized as a Long, that information 
should eventually reach Flink and the constructor of X would be 
equivalent to this:


public X() { TypeInformation keySelector = 
TypeInformation.of(new TypeHint(){}); }

During execution, however, this error pops up.
What am I missing here, and what is the best way to achieve this 
generic behavior in a Flink-idiomatic way?


Thank you very much for your time.





Re: Need a clarification about removing a stateful operator

2018-08-17 Thread Stefan Richter
Hi,

it will not be transported. The JM does the state assignment to create the 
deployment information for all tasks. If will just exclude the state for 
operators that are not present. So in your next checkpoints they will no longer 
be contained.

Best,
Stefan

> Am 17.08.2018 um 09:26 schrieb Tony Wei :
> 
> Hi Chesnay,
> 
> Thanks for your quick reply. I have another question. Will the state, which 
> is ignored, be transported
> to TMs from DFS? Or will it be detected by JM's checkpoint coordinator and 
> only those states reuired
> by operators be transported to each TM?
> 
> Best,
> Tony Wei
> 
> 2018-08-17 14:38 GMT+08:00 Chesnay Schepler  >:
> The state won't exist in the snapshot.
> 
> 
> On 17.08.2018 04:38, Tony Wei wrote:
>> Hi all,
>> 
>> I'm confused about the description in documentation. [1]
>> 
>> Removing a stateful operator: The state of the removed operator is lost 
>> unless
>> another operator takes it over. When starting the upgraded application, you 
>> have
>> to explicitly agree to discard the state.
>> Does that mean if I take a full snapshot (e.g. savepoint) after restoring by 
>> explicitly agreeing to
>> discard the state, then the state won't exist in that snapshot? Or does it 
>> just mean ignore the
>> state but the state still exist forever, unless I explicitly purge that 
>> state by using state operator?
>> 
>> And could this behavior differ between different state backend (Memory, FS, 
>> RocksDB) ?
>> 
>> Many thanks,
>> Tony Wei
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-master/ops/upgrading.html#application-topology
>>  
>> 
> 



Re: Need a clarification about removing a stateful operator

2018-08-17 Thread Tony Wei
Hi Chesnay,

Thanks for your quick reply. I have another question. Will the state, which
is ignored, be transported
to TMs from DFS? Or will it be detected by JM's checkpoint coordinator and
only those states reuired
by operators be transported to each TM?

Best,
Tony Wei

2018-08-17 14:38 GMT+08:00 Chesnay Schepler :

> The state won't exist in the snapshot.
>
>
> On 17.08.2018 04:38, Tony Wei wrote:
>
> Hi all,
>
> I'm confused about the description in documentation. [1]
>
>
>- *Removing a stateful operator:* The state of the removed operator is
>lost unless
>another operator takes it over. When starting the upgraded
>application, you have
>to explicitly agree to discard the state.
>
> Does that mean if I take a full snapshot (e.g. savepoint) after restoring
> by explicitly agreeing to
> discard the state, then the state won't exist in that snapshot? Or does it
> just mean ignore the
> state but the state still exist forever, unless I explicitly purge that
> state by using state operator?
>
> And could this behavior differ between different state backend (Memory,
> FS, RocksDB) ?
>
> Many thanks,
> Tony Wei
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> master/ops/upgrading.html#application-topology
>
>
>


Re: OneInputStreamOperatorTestHarness.snapshot doesn't include timers?

2018-08-17 Thread Piotr Nowojski
No problem :) You motivated me to do a fix for that, since I stumbled across 
this bug/issue myself before and also took me some time in the debugger to find 
the cause.

Piotrek

> On 16 Aug 2018, at 20:05, Ken Krugler  wrote:
> 
> Hi Piotr,
> 
> Thanks, and darn it that’s something I should have noticed.
> 
> — Ken
> 
> 
>> On Aug 16, 2018, at 4:37 AM, Piotr Nowojski > > wrote:
>> 
>> Hi,
>> 
>> You made a small mistake when restoring from state using test harness, that 
>> I myself have also done in the past. Problem is with an ordering of those 
>> calls:
>> 
>> result.open();
>> if (savedState != null) {
>> result.initializeState(savedState);
>> }
>> 
>> Open is supposed to be called after initializeState, and if you look into 
>> the code of AbstractStreamOperatorTestHarness#open, if it is called before 
>> initialize, it will initialize harness without any state.
>> 
>> Unfortunate is that this is implicit behaviour that doesn’t throw any error 
>> (test harness is not part of a Flink’s public api). I will try to fix this: 
>> https://issues.apache.org/jira/browse/FLINK-10159 
>> 
>> 
>> Piotrek
>> 
>>> On 16 Aug 2018, at 00:24, Ken Krugler >> > wrote:
>>> 
>>> Hi all,
>>> 
>>> It looks to me like the OperatorSubtaskState returned from 
>>> OneInputStreamOperatorTestHarness.snapshot fails to include any timers that 
>>> had been registered via registerProcessingTimeTimer but had not yet fired 
>>> when the snapshot was saved.
>>> 
>>> Is this a known limitation of OneInputStreamOperatorTestHarness?
>>> 
>>> If not, is there anything special I need to do when setting up the test 
>>> harness to ensure that timers are saved?
>>> 
>>> Below is the unit test, which shows how the test harness is being set up 
>>> and run.
>>> 
>>> The TimerFunction used in this test does seem to be doing the right thing, 
>>> as using it in a simple job on a local Flink cluster works as expected when 
>>> creating & then restarting from a savepoint.
>>> 
>>> Thanks,
>>> 
>>> — Ken
>>> 
>>> ==
>>> TimerTest.java
>>> ==
>>> package com.scaleunlimited.flinkcrawler.functions;
>>> 
>>> import static org.junit.Assert.assertTrue;
>>> 
>>> import java.util.ArrayList;
>>> import java.util.List;
>>> 
>>> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
>>> import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
>>> import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
>>> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
>>> import 
>>> org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
>>> import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
>>> import org.junit.Before;
>>> import org.junit.Test;
>>> import org.slf4j.Logger;
>>> import org.slf4j.LoggerFactory;
>>> 
>>> import com.scaleunlimited.flinkcrawler.tools.TimerTool;
>>> 
>>> public class TimerTest {
>>> public static final Logger LOGGER = 
>>> LoggerFactory.getLogger(TimerTest.class);
>>> 
>>> private List _firedTimers = new ArrayList();
>>> 
>>> @Before
>>> public void setUp() throws Exception {
>>> }
>>> 
>>> @Test
>>> public void testTimerSaving() throws Throwable {
>>> 
>>> // This operator doesn't really do much at all, but the first 
>>> element
>>> // it processes will create a timer for (timestamp+1).
>>> // Whenever that timer fires, it will create another timer for 
>>> // (timestamp+1).
>>> KeyedProcessOperator operator = 
>>> new KeyedProcessOperator<>(new TimerFunction(_firedTimers));
>>> 
>>> // Create a test harness from scratch
>>> OneInputStreamOperatorTestHarness testHarness = 
>>> makeTestHarness(operator, null);
>>> 
>>> // We begin at time zero
>>> testHarness.setProcessingTime(0);
>>> 
>>> // Process some elements, which should also create a timer for time 
>>> 1.
>>> int inputs[] = new int[] {1, 2, 3};
>>> for (int input : inputs) {
>>> testHarness.processElement(new StreamRecord<>(input));
>>> }
>>> 
>>> // Force some time to pass, which should keep moving the timer 
>>> ahead,
>>> // finally leaving it set for time 10.
>>> for (long i = 1; i < 10; i++) {
>>> testHarness.setProcessingTime(i);
>>> }
>>> 
>>> // Save the state, which we assume should include the timer we set 
>>> for
>>> // time 10.
>>> OperatorSubtaskState savedState = 
>>> testHarness.snapshot(0L, testHarness.getProcessingTime());
>>> 
>>> // 

Re: Need a clarification about removing a stateful operator

2018-08-17 Thread Chesnay Schepler

The state won't exist in the snapshot.

On 17.08.2018 04:38, Tony Wei wrote:

Hi all,

I'm confused about the description in documentation. [1]

  * *Removing a stateful operator:*The state of the removed operator
is lost unless
another operator takes it over. When starting the upgraded
application, you have
to explicitly agree to discard the state.

Does that mean if I take a full snapshot (e.g. savepoint) after 
restoring by explicitly agreeing to
discard the state, then the state won't exist in that snapshot? Or 
does it just mean ignore the
state but the state still exist forever, unless I explicitly purge 
that state by using state operator?


And could this behavior differ between different state backend 
(Memory, FS, RocksDB) ?


Many thanks,
Tony Wei

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/upgrading.html#application-topology