Re: Serializing NULLs

2016-12-20 Thread Anirudh Mallem
If you are using Avro generated classes then you cannot have your values null.
https://cwiki.apache.org/confluence/display/AVRO/FAQ#FAQ-Whyisn'teveryvalueinAvronullable?

From: Stephan Ewen
Reply-To: "user@flink.apache.org"
Date: Tuesday, December 20, 2016 at 8:17 AM
To: "user@flink.apache.org"
Subject: Re: Serializing NULLs

Thanks for sharing the stack trace.

This seems not really Flink related, it is part of the specific Avro encoding 
logic.
The Avro Generic Record Type apparently does not allow the map value to be null.



On Tue, Dec 20, 2016 at 4:55 PM, Matt 
> wrote:
Here is the back trace: https://gist.github.com/56af4818bcf5dee6b97c248fd9233c67

In the meanwhile I've solved the issue by creating a POJO class where null is 
just Long.MIN_VALUE, that with a custom equals() made the trick. I guess it's 
not as fast as de/serializing Double though.

If you need any other information let me know.

Regards,
Matt

On Tue, Dec 20, 2016 at 6:46 AM, Stephan Ewen 
> wrote:
The "null" support in some types is not fully developed. However in that case I 
am wondering why it does not work. Can you share the stack trace, so we can 
take a look at the serializer?



On Mon, Dec 19, 2016 at 9:56 PM, Matt 
> wrote:
Hello list,

I'm getting this error:

java.lang.RuntimeException: Could not forward element to next operator
...
Caused by: java.lang.NullPointerException: in com.entities.Sector in map in 
double null of double of map in field properties of com.entities.Sector
...
Caused by: java.lang.NullPointerException

The field mentioned is a HashMap, and some keys are mapped to 
null values.

Why isn't it possible to forward/serialize those elements with null values?
What do you do when your elements may contain nulls?

Regards,
Matt





Re: Reg Checkpoint size using RocksDb

2016-12-19 Thread Anirudh Mallem
Hi Stephan,
Thanks for your response. I shall try switching to the fully Async mode and see.
On another note, is there any option available to configure compaction 
capabilities using the default checkpointing mode? Thanks.

From: Stephan Ewen
Reply-To: "user@flink.apache.org<mailto:user@flink.apache.org>"
Date: Monday, December 19, 2016 at 11:51 AM
To: "user@flink.apache.org<mailto:user@flink.apache.org>"
Subject: Re: Reg Checkpoint size using RocksDb

Hi!

If you use the default checkpoint mode, Flink will checkpoint the current 
RocksDB instance. It may be that there simply has not been a compaction in 
RocksDB when checkpointing, so the checkpoint contains some "old data" as well.

If you switch to the "fully async" mode, it should always only checkpoint the 
latest state of RocksDB.

Best,
Stephan


On Mon, Dec 19, 2016 at 10:47 AM, Anirudh Mallem 
<anirudh.mal...@247-inc.com<mailto:anirudh.mal...@247-inc.com>> wrote:
Hi,
I was experimenting with using RocksDb as the state backend for my job and to 
test its behavior I modified the socket word count program to store states. I 
also wrote a RichMapFunction which stores the states as a ValueState with 
default value as null.
What the job does basically is, for every word received if the current state is 
null then it updates the state with a fixed value say “abc” and in case the 
state is nonNull then it is cleared.
So ideally if my input stream has the word “foo” twice then the corresponding 
state is first set to “abc” and then cleared at the second “foo”. I see that 
this behavior is occurring as expected but the checkpointed size keeps 
increasing! Is this expected? I believe the checkpointed size as shown on the 
dashboard should decrease when some of the states are cleared right?
In this case if each of the “foo” word come in successive checkpointing 
intervals then we should observe rise and one fall in the checkpointing size 
right? I see the checkpointed size is increasing in both cases!!!

Any ideas as to what is happening here? My checkpoint duration is 5 secs. 
Thanks.

Regards,
Anirudh





Reg Checkpoint size using RocksDb

2016-12-19 Thread Anirudh Mallem
Hi,
I was experimenting with using RocksDb as the state backend for my job and to 
test its behavior I modified the socket word count program to store states. I 
also wrote a RichMapFunction which stores the states as a ValueState with 
default value as null.
What the job does basically is, for every word received if the current state is 
null then it updates the state with a fixed value say “abc” and in case the 
state is nonNull then it is cleared.
So ideally if my input stream has the word “foo” twice then the corresponding 
state is first set to “abc” and then cleared at the second “foo”. I see that 
this behavior is occurring as expected but the checkpointed size keeps 
increasing! Is this expected? I believe the checkpointed size as shown on the 
dashboard should decrease when some of the states are cleared right?
In this case if each of the “foo” word come in successive checkpointing 
intervals then we should observe rise and one fall in the checkpointing size 
right? I see the checkpointed size is increasing in both cases!!!

Any ideas as to what is happening here? My checkpoint duration is 5 secs. 
Thanks.

Regards,
Anirudh




Re: Query regarding state backend for Custom Map Function

2016-12-01 Thread Anirudh Mallem
Thanks a lot Stefan. I got what I was looking for. Is the MapState 
functionality coming as a part of the 1.2 release?

From: Stefan Richter
Reply-To: "user@flink.apache.org<mailto:user@flink.apache.org>"
Date: Thursday, December 1, 2016 at 2:53 AM
To: "user@flink.apache.org<mailto:user@flink.apache.org>"
Subject: Re: Query regarding state backend for Custom Map Function

Hi,

using the ValueState and RocksDB to store a map inside the value state means 
that you will have a different map for each key, which is automatically swapped 
on a per record basis, depending on the record’s key. If you are using a map 
and Checkpointed, there is only one map and your code is responsible for 
dispatching state between different keys.

If you use a map and Checkpointed, the map will be on the heap and the 
checkpoint will go directly against the filesystem; this is independent of the 
chosen backend, so no RocksDB is involved.

On a further note, we are working on an alternative to ValueState that is like 
a MapState. In contrast to ValueState, MapState does not deserialize the whole 
map on each access, but can access individual key/value pairs. This might be 
what you are looking for.

Best,
Stefan


Am 01.12.2016 um 09:35 schrieb Anirudh Mallem 
<anirudh.mal...@247-inc.com<mailto:anirudh.mal...@247-inc.com>>:

Hi Everyone,
I am trying to understand the Working With State feature page of the Flink 
documentation.
 My question is in case I am using a ValueState in my CustomMap class to store 
my states with the RocksDb as my state backend then it is clear that every 
state value is stored in RocksDb.
Now instead of a ValueState if I just use a normal Java Hashmap to store my 
states and implement the Checkpointed interface then will the entire HashMap 
reside on the RocksDb backend or will the HashMap be in memory and just the 
snapshots sent to RocksDb? I am trying to see what will I lose/gain if I have 
my own data structure to do state maintenance. Thanks.

Regards,
Anirudh



Re: Issue while restarting from SavePoint

2016-10-19 Thread Anirudh Mallem
Hi Ufuk,
Thank you for looking into the issue. Please find your answers below :

(1) In detached mode the configuration seems to be not picked up
correctly. That should be independent of the savepoints. Can you
confirm this?
—> I tried starting a new job in detached mode and the job started on the 
cluster.


(2) The program was changed in a non-compatible way after the
savepoint. Did you change the program and if yes in which way?
—> No, I did not make any change to the existing job. I tried restarting the 
same job. 


However, I think I have found the problem. I was not mentioning the parallelism 
specifically when restarting the job from the savepoint. I assumed that this 
information was also captured in the save point. So the non-detached mode was 
actually throwing the right error but the detached mode was not picking up the 
config. I guess the detached mode should also have thrown the same exception 
right? Thanks a lot for helping.



On 10/19/16, 1:19 AM, "Ufuk Celebi" <u...@apache.org> wrote:

>Hey Anirudh!
>
>As you say, this looks like two issues:
>
>(1) In detached mode the configuration seems to be not picked up
>correctly. That should be independent of the savepoints. Can you
>confirm this?
>
>(2) The program was changed in a non-compatible way after the
>savepoint. Did you change the program and if yes in which way?
>
>– Ufuk
>
>
>On Wed, Oct 19, 2016 at 12:01 AM, Anirudh Mallem
><anirudh.mal...@247-inc.com> wrote:
>> Hi,
>> The issue seems to be connected with trying to restart the job in the
>> detached mode. The stack trace is as follows:
>>
>> -bash-3.2$ bin/flink run -d -s jobmanager://savepoints/1 -c
>> com.tfs.rtdp.precompute.Flink.FlinkTest
>> /tmp/flink-web-upload-2155906f-be54-47f3-b9f7-7f6f0f54f74b/448724f9-f69f-455f-99b9-c57289657e29_uber-flink-test-1.0-SNAPSHOT.jar
>> file:///home/amallem/capitalone.properties
>> Cluster configuration: Standalone cluster with JobManager at
>> /10.64.119.90:33167
>> Using address 10.64.119.90:33167 to connect to JobManager.
>> JobManager web interface address http://10.64.119.90:8081
>> Starting execution of program
>> Submitting Job with JobID: 6c5596772627d3c9366deaa0c47ab0ad. Returning after
>> job submission.
>>
>> 
>>  The program finished with the following exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The program
>> execution failed: JobManager did not respond within 6 milliseconds
>> at
>> org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:432)
>> at
>> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:93)
>> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
>> at
>> org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75)
>> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:323)
>> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
>> at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
>> Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager
>> did not respond within 6 milliseconds
>> at
>> org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:227)
>> at
>> org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:429)
>> ... 8 more
>> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
>> [6 milliseconds]
>> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>> at
>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>> at scala.concurrent.Await$.result(package.scala:107)
>> at scala.concurrent.Await.result(package.scala)
>> at
>> org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:223)
>> ... 9 more
>>
>> When running it without the detached mode, the job manager is responding but
>> the job is failing as it thinks that the structure of the job is modified.
>>
>> -bash-3.2$ bin/flink run -s jobmanager://savepoints/1 -c
>> com.tfs.rtdp.precompute.Flink.FlinkTest
>> /tmp/flink-web-upload-2155906f-be54-47f3-b9f7-7f6f0f54f74b/448724f9-f69f-455f-99b9-c57289

Re: Issue while restarting from SavePoint

2016-10-18 Thread Anirudh Mallem
 to rollback to savepoint 
jobmanager://savepoints/1. Cannot map old state for task 
4ca3b7c2641c4299a3378fab220c4e5c to the new program. This indicates that the 
program has been changed in a non-compatible way  after the savepoint.
at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator.restoreSavepoint(SavepointCoordinator.java:248)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.restoreSavepoint(ExecutionGraph.java:983)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1302)
... 10 more

Thanks.

From: Anirudh Mallem
Date: Tuesday, October 18, 2016 at 2:07 PM
To: "user@flink.apache.org<mailto:user@flink.apache.org>"
Subject: Issue while restarting from SavePoint

Hi,
I am relatively new to Flink and I was experimenting with the save points 
feature. I have an HA cluster running with 1 Master and 4 Workers. The 
flink-config.yaml is as follows :

#==
# Common
#==
jobmanager.rpc.address: stable-stream-master01.app.shared.int.sv2.247-inc.net

# The port where the JobManager's main actor system listens for messages.
jobmanager.rpc.port: 6123

# The heap size for the JobManager JVM
jobmanager.heap.mb: 512

# The heap size for the TaskManager JVM
taskmanager.heap.mb: 2048

# The number of task slots that each TaskManager offers. Each slot runs one 
parallel pipeline.
taskmanager.numberOfTaskSlots: 8

# Specify whether TaskManager memory should be allocated when starting up 
(true) or when
# memory is required in the memory manager (false)
taskmanager.memory.preallocate: false

# The parallelism used for programs that did not specify and other parallelism.
parallelism.default: 1

env.java.home: /usr/local/java

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2
restart-strategy.fixed-delay.delay: 10 s
#==
# Web Frontend
#==

# The port under which the web-based runtime monitor listens.
# A value of -1 deactivates the web server.

jobmanager.web.port: 8081

# Flag to specify whether job submission is enabled from the web-based
# runtime monitor. Uncomment to disable.

#jobmanager.web.submit.enable: false

#==
# Streaming state checkpointing
#==

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends: jobmanager, filesystem, 
#
state.backend: filesystem


# Directory for storing checkpoints in a Flink-supported filesystem
# Note: State backend must be accessible from the JobManager and all 
TaskManagers.
# Use "hdfs://" for HDFS setups, "file://" for UNIX/POSIX-compliant file 
systems,
# (or any local file system under Windows), or "S3://" for S3 file system.
#
 state.backend.fs.checkpointdir: file:///home/amallem/
 state.savepoints.dir: file:///home/amallem/save/

#==
# Master High Availability (required configuration)
#==

# The list of ZooKepper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2[:clientPort],..." (default clientPort: 2181)
#
 recovery.mode: zookeeper
#
 recovery.zookeeper.quorum: 
stable-stream-zookeeper01.app.shared.int.net:2181,stable-stream-zookeeper02.app.shared.int.net:2181
#
# Note: You need to set the state backend to 'filesystem' and the checkpoint
# directory (see above) before configuring the storageDir.
#
 recovery.zookeeper.storageDir: file:///home/amallem/recovery
 recovery.zookeeper.path.root: /flink
 recovery.zookeeper.path.namespace: /cluster_one

Query : I am able to take a save point and then cancel my current job but when 
I try to start the job again using the save point I get the error stating 
“ProgramInvocationException: JobManager did not respond within 6ms”. I 
checked Zookeeper and everything seems fine. I also followed the 
recommendations in the following post : 
http://stackoverflow.com/questions/36625742/cant-deploy-flow-to-ha-cluster-of-apache-flink-using-flink-cli
 Is there any configuration I am missing to enable restarting of the job. Any 
help will be appreciated. Thanks.

Regards,
Anirudh


Issue while restarting from SavePoint

2016-10-18 Thread Anirudh Mallem
Hi,
I am relatively new to Flink and I was experimenting with the save points 
feature. I have an HA cluster running with 1 Master and 4 Workers. The 
flink-config.yaml is as follows :

#==
# Common
#==
jobmanager.rpc.address: stable-stream-master01.app.shared.int.sv2.247-inc.net

# The port where the JobManager's main actor system listens for messages.
jobmanager.rpc.port: 6123

# The heap size for the JobManager JVM
jobmanager.heap.mb: 512

# The heap size for the TaskManager JVM
taskmanager.heap.mb: 2048

# The number of task slots that each TaskManager offers. Each slot runs one 
parallel pipeline.
taskmanager.numberOfTaskSlots: 8

# Specify whether TaskManager memory should be allocated when starting up 
(true) or when
# memory is required in the memory manager (false)
taskmanager.memory.preallocate: false

# The parallelism used for programs that did not specify and other parallelism.
parallelism.default: 1

env.java.home: /usr/local/java

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2
restart-strategy.fixed-delay.delay: 10 s
#==
# Web Frontend
#==

# The port under which the web-based runtime monitor listens.
# A value of -1 deactivates the web server.

jobmanager.web.port: 8081

# Flag to specify whether job submission is enabled from the web-based
# runtime monitor. Uncomment to disable.

#jobmanager.web.submit.enable: false

#==
# Streaming state checkpointing
#==

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends: jobmanager, filesystem, 
#
state.backend: filesystem


# Directory for storing checkpoints in a Flink-supported filesystem
# Note: State backend must be accessible from the JobManager and all 
TaskManagers.
# Use "hdfs://" for HDFS setups, "file://" for UNIX/POSIX-compliant file 
systems,
# (or any local file system under Windows), or "S3://" for S3 file system.
#
 state.backend.fs.checkpointdir: file:///home/amallem/
 state.savepoints.dir: file:///home/amallem/save/

#==
# Master High Availability (required configuration)
#==

# The list of ZooKepper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2[:clientPort],..." (default clientPort: 2181)
#
 recovery.mode: zookeeper
#
 recovery.zookeeper.quorum: 
stable-stream-zookeeper01.app.shared.int.net:2181,stable-stream-zookeeper02.app.shared.int.net:2181
#
# Note: You need to set the state backend to 'filesystem' and the checkpoint
# directory (see above) before configuring the storageDir.
#
 recovery.zookeeper.storageDir: file:///home/amallem/recovery
 recovery.zookeeper.path.root: /flink
 recovery.zookeeper.path.namespace: /cluster_one

Query : I am able to take a save point and then cancel my current job but when 
I try to start the job again using the save point I get the error stating 
“ProgramInvocationException: JobManager did not respond within 6ms”. I 
checked Zookeeper and everything seems fine. I also followed the 
recommendations in the following post : 
http://stackoverflow.com/questions/36625742/cant-deploy-flow-to-ha-cluster-of-apache-flink-using-flink-cli
 Is there any configuration I am missing to enable restarting of the job. Any 
help will be appreciated. Thanks.

Regards,
Anirudh