Re: Error due to Hadoop version mismatch

2016-01-04 Thread Kashmar, Ali
Hi Robert,

On the ubuntu host, the port 9000 is up:

ubuntu@ubuntu-171:~$ netstat -tulpn

tcp0  0 10.13.182.171:9000  0.0.0.0:*   LISTEN
 8849/java


And the process using this port is:

ubuntu@ubuntu-171:~$ ps -ef | grep 8849
ubuntu8849 1  0  2015 ?00:25:00
/usr/lib/jvm/java-8-oracle/bin/java -Dproc_namenode -Xmx1000m
-Djava.net.preferIPv4Stack=true
-Dhadoop.log.dir=/home/ubuntu/Documents/hadoop-2.6.0/logs
-Dhadoop.log.file=hadoop.log
-Dhadoop.home.dir=/home/ubuntu/Documents/hadoop-2.6.0
-Dhadoop.id.str=ubuntu -Dhadoop.root.logger=INFO,console
-Djava.library.path=/home/ubuntu/Documents/hadoop-2.6.0/lib/native
-Dhadoop.policy.file=hadoop-policy.xml -Djava.net.preferIPv4Stack=true
-Djava.net.preferIPv4Stack=true -Djava.net.preferIPv4Stack=true
-Dhadoop.log.dir=/home/ubuntu/Documents/hadoop-2.6.0/logs
-Dhadoop.log.file=hadoop-ubuntu-namenode-ubuntu-171.log
-Dhadoop.home.dir=/home/ubuntu/Documents/hadoop-2.6.0
-Dhadoop.id.str=ubuntu -Dhadoop.root.logger=INFO,RFA
-Djava.library.path=/home/ubuntu/Documents/hadoop-2.6.0/lib/native
-Dhadoop.policy.file=hadoop-policy.xml -Djava.net.preferIPv4Stack=true
-Dhadoop.security.logger=INFO,RFAS -Dhdfs.audit.logger=INFO,NullAppender
-Dhadoop.security.logger=INFO,RFAS -Dhdfs.audit.logger=INFO,NullAppender
-Dhadoop.security.logger=INFO,RFAS -Dhdfs.audit.logger=INFO,NullAppender
-Dhadoop.security.logger=INFO,RFAS
org.apache.hadoop.hdfs.server.namenode.NameNode

And the hadoop version is:

ubuntu@ubuntu-171:~$ Documents/hadoop-2.6.0/bin/hadoop version
Hadoop 2.6.0
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r
e3496499ecb8d220fba99dc5ed4c99c8f9e33bb1
Compiled by jenkins on 2014-11-13T21:10Z
Compiled with protoc 2.5.0
From source with checksum 18e43357c8f927c0695f1e9522859d6a
This command was run using
/home/ubuntu/Documents/hadoop-2.6.0/share/hadoop/common/hadoop-common-2.6.0
.jar



-Ali



On 2016-01-04, 2:20 PM, "Robert Metzger" <rmetz...@apache.org> wrote:

>Your Flink installation has Hadoop 2.6.0 included, on the other machine,
>there is a Hadoop version installed, which is most likely a 1.x or even a
>0.x version.
>Are you sure that the host "ubuntu-171" has the ip 10.13.182.171 and that
>the hadoop installation in the "/home/ubuntu/Documents/hadoop-2.6.0/"
>directory is listening on port 9000 ?
>
>On Mon, Jan 4, 2016 at 3:03 PM, Kashmar, Ali <ali.kash...@emc.com> wrote:
>
>> Hi Max,
>>
>> Both commands return nothing. Those variables aren’t set.
>>
>> The only software I installed on these machines is Flink and Java.
>>
>> -Ali
>>
>> On 2015-12-28, 6:42 AM, "Maximilian Michels" <m...@apache.org> wrote:
>>
>> >Hi Ali,
>> >
>> >The warning about the native Hadoop libraries is nothing to worry
>> >about. The native modules are platform-optimized modules which may be
>> >used to improve performance. They are not necessary for Hadoop to
>> >function correctly.
>> >
>> >The exception message implies that you are using is a very old version
>> >of Hadoop. Do you have other Hadoop versions installed on the same
>> >machine? We have had people using Flink 0.10.0 with Hadoop 2.6.0
>> >without any problems.
>> >
>> >On the cluster machines, what is the output of these commands?
>> >
>> >echo $HADOOP_CLASSPATH
>> >echo $HADOOP_CONF_DIR
>> >
>> >
>> >Thanks,
>> >Max
>> >
>> >On Wed, Dec 23, 2015 at 3:53 PM, Kashmar, Ali <ali.kash...@emc.com>
>> wrote:
>> >> Hi Max,
>> >>
>> >> I have the same output for the Task Manager:
>> >>
>> >> 11:25:04,274 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>> >>   -  Hadoop version: 2.6.0
>> >>
>> >> I do get this line at the beginning of both job and task manager log
>> >>files:
>> >>
>> >> 11:25:04,100 WARN  org.apache.hadoop.util.NativeCodeLoader
>> >>   - Unable to load native-hadoop library for your platform...
>>using
>> >> builtin-java classes where applicable
>> >>
>> >> Do you think it has anything to do with it?
>> >>
>> >> Thanks,
>> >> Ali
>> >>
>> >> On 2015-12-23, 7:30 AM, "Maximilian Michels" <m...@apache.org> wrote:
>> >>
>> >>>Hi Ali,
>> >>>
>> >>>Could you please also post the Hadoop version output of the task
>> >>>manager log files? It looks like the task managers are running a
>> >>>different Hadoop version.
>> >>>
>> &

Re: Error due to Hadoop version mismatch

2016-01-04 Thread Kashmar, Ali
Hi Max,

Both commands return nothing. Those variables aren’t set.

The only software I installed on these machines is Flink and Java.

-Ali

On 2015-12-28, 6:42 AM, "Maximilian Michels" <m...@apache.org> wrote:

>Hi Ali,
>
>The warning about the native Hadoop libraries is nothing to worry
>about. The native modules are platform-optimized modules which may be
>used to improve performance. They are not necessary for Hadoop to
>function correctly.
>
>The exception message implies that you are using is a very old version
>of Hadoop. Do you have other Hadoop versions installed on the same
>machine? We have had people using Flink 0.10.0 with Hadoop 2.6.0
>without any problems.
>
>On the cluster machines, what is the output of these commands?
>
>echo $HADOOP_CLASSPATH
>echo $HADOOP_CONF_DIR
>
>
>Thanks,
>Max
>
>On Wed, Dec 23, 2015 at 3:53 PM, Kashmar, Ali <ali.kash...@emc.com> wrote:
>> Hi Max,
>>
>> I have the same output for the Task Manager:
>>
>> 11:25:04,274 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>>   -  Hadoop version: 2.6.0
>>
>> I do get this line at the beginning of both job and task manager log
>>files:
>>
>> 11:25:04,100 WARN  org.apache.hadoop.util.NativeCodeLoader
>>   - Unable to load native-hadoop library for your platform... using
>> builtin-java classes where applicable
>>
>> Do you think it has anything to do with it?
>>
>> Thanks,
>> Ali
>>
>> On 2015-12-23, 7:30 AM, "Maximilian Michels" <m...@apache.org> wrote:
>>
>>>Hi Ali,
>>>
>>>Could you please also post the Hadoop version output of the task
>>>manager log files? It looks like the task managers are running a
>>>different Hadoop version.
>>>
>>>Thanks,
>>>Max
>>>
>>>On Tue, Dec 22, 2015 at 4:28 PM, Kashmar, Ali <ali.kash...@emc.com>
>>>wrote:
>>>> Hi Robert,
>>>>
>>>> I found the version in the job manager log file:
>>>>
>>>> 17:33:49,636 INFO  org.apache.flink.runtime.jobmanager.JobManager
>>>>   -  Hadoop version: 2.6.0
>>>>
>>>> But the Hadoop installation I have is saying this:
>>>>
>>>> ubuntu@ubuntu-171:~/Documents/hadoop-2.6.0$ bin/hadoop version
>>>> Hadoop 2.6.0
>>>> Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r
>>>> e3496499ecb8d220fba99dc5ed4c99c8f9e33bb1
>>>> Compiled by jenkins on 2014-11-13T21:10Z
>>>> Compiled with protoc 2.5.0
>>>> From source with checksum 18e43357c8f927c0695f1e9522859d6a
>>>> This command was run using
>>>>
>>>>/home/ubuntu/Documents/hadoop-2.6.0/share/hadoop/common/hadoop-common-2
>>>>.6
>>>>.0
>>>> .jar
>>>>
>>>>
>>>> So one of them is lying to me? :)
>>>>
>>>> Ali
>>>>
>>>> On 2015-12-22, 10:16 AM, "Robert Metzger" <rmetz...@apache.org> wrote:
>>>>
>>>>>Hi Ali,
>>>>>
>>>>>the TaskManagers and the JobManager is logging the Hadoop version on
>>>>>startup.
>>>>>
>>>>>On Tue, Dec 22, 2015 at 4:10 PM, Kashmar, Ali <ali.kash...@emc.com>
>>>>>wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I¹m trying to use HDFS as store for Flink checkpoints so I
>>>>>>downloaded
>>>>>>the
>>>>>> Hadoop 2.6.0/Scala 2.10 version of Flink and installed it. I also
>>>>>> downloaded Hadoop 2.6.0 separately from the Hadoop website and set
>>>>>>up
>>>>>>HDFS
>>>>>> on a separate machine. When I start Flink I get the following error:
>>>>>>
>>>>>> 17:34:13,047 INFO  org.apache.flink.runtime.jobmanager.JobManager
>>>>>>   - Status of job 9ba32a08bc0ec02810bf5d2710842f72 (Protocol
>>>>>>Event
>>>>>> Processing) changed to FAILED.
>>>>>> java.lang.Exception: Call to registerInputOutput() of invokable
>>>>>>failed
>>>>>> at
>>>>>>org.apache.flink.runtime.taskmanager.Task.run(Task.java:529)
>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>> Caused by: java.io.IOException: The given file URI (hdfs://
>>>>>> 10.13.182.171:9000/user/flink/checkpoints) points to the

Re: Error due to Hadoop version mismatch

2016-01-04 Thread Kashmar, Ali
Looking at the lib folder revealed the problem. The lib folder on one of
the nodes had libraries for both hadoop 1 and 2. I’m not sure how I ended
up with that but it must have happened while I was copying the dependency
jars to each node. I removed all the jars and started with a fresh copy
and reran the test. It worked this time.

Thank you so much for being patient with me and sorry if I wasted your
time.

-Ali

On 2016-01-04, 3:17 PM, "Robert Metzger" <rmetz...@apache.org> wrote:

>Okay, then, my previous statement is false. From the stack trace, it seems
>that Flink is using an older Hadoop version. The DFSClient and RPC classes
>look different in Hadoop 2.6.0.
>Max checked already for some environment variables. Is $CLASSPATH set? Did
>you install Hadoop only by downloading the binaries or did you use
>something like the ubuntu package manager? (those packages sometimes place
>jar files in some lib/ folders)
>Did you put something additionally into the lib/ folder of Flink?
>
>I think when Flink is starting up, its also logging the classpath of the
>JVM. Can you post it here? (Maybe you need to start with DEBUG log level)
>
>
>On Mon, Jan 4, 2016 at 8:46 PM, Kashmar, Ali <ali.kash...@emc.com> wrote:
>
>> Hi Robert,
>>
>> On the ubuntu host, the port 9000 is up:
>>
>> ubuntu@ubuntu-171:~$ netstat -tulpn
>> 
>> tcp0  0 10.13.182.171:9000  0.0.0.0:*
>>LISTEN
>>  8849/java
>> 
>>
>> And the process using this port is:
>>
>> ubuntu@ubuntu-171:~$ ps -ef | grep 8849
>> ubuntu8849 1  0  2015 ?00:25:00
>> /usr/lib/jvm/java-8-oracle/bin/java -Dproc_namenode -Xmx1000m
>> -Djava.net.preferIPv4Stack=true
>> -Dhadoop.log.dir=/home/ubuntu/Documents/hadoop-2.6.0/logs
>> -Dhadoop.log.file=hadoop.log
>> -Dhadoop.home.dir=/home/ubuntu/Documents/hadoop-2.6.0
>> -Dhadoop.id.str=ubuntu -Dhadoop.root.logger=INFO,console
>> -Djava.library.path=/home/ubuntu/Documents/hadoop-2.6.0/lib/native
>> -Dhadoop.policy.file=hadoop-policy.xml -Djava.net.preferIPv4Stack=true
>> -Djava.net.preferIPv4Stack=true -Djava.net.preferIPv4Stack=true
>> -Dhadoop.log.dir=/home/ubuntu/Documents/hadoop-2.6.0/logs
>> -Dhadoop.log.file=hadoop-ubuntu-namenode-ubuntu-171.log
>> -Dhadoop.home.dir=/home/ubuntu/Documents/hadoop-2.6.0
>> -Dhadoop.id.str=ubuntu -Dhadoop.root.logger=INFO,RFA
>> -Djava.library.path=/home/ubuntu/Documents/hadoop-2.6.0/lib/native
>> -Dhadoop.policy.file=hadoop-policy.xml -Djava.net.preferIPv4Stack=true
>> -Dhadoop.security.logger=INFO,RFAS -Dhdfs.audit.logger=INFO,NullAppender
>> -Dhadoop.security.logger=INFO,RFAS -Dhdfs.audit.logger=INFO,NullAppender
>> -Dhadoop.security.logger=INFO,RFAS -Dhdfs.audit.logger=INFO,NullAppender
>> -Dhadoop.security.logger=INFO,RFAS
>> org.apache.hadoop.hdfs.server.namenode.NameNode
>>
>> And the hadoop version is:
>>
>> ubuntu@ubuntu-171:~$ Documents/hadoop-2.6.0/bin/hadoop version
>> Hadoop 2.6.0
>> Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r
>> e3496499ecb8d220fba99dc5ed4c99c8f9e33bb1
>> Compiled by jenkins on 2014-11-13T21:10Z
>> Compiled with protoc 2.5.0
>> From source with checksum 18e43357c8f927c0695f1e9522859d6a
>> This command was run using
>> 
>>/home/ubuntu/Documents/hadoop-2.6.0/share/hadoop/common/hadoop-common-2.6
>>.0
>> .jar
>>
>>
>>
>> -Ali
>>
>>
>>
>> On 2016-01-04, 2:20 PM, "Robert Metzger" <rmetz...@apache.org> wrote:
>>
>> >Your Flink installation has Hadoop 2.6.0 included, on the other
>>machine,
>> >there is a Hadoop version installed, which is most likely a 1.x or
>>even a
>> >0.x version.
>> >Are you sure that the host "ubuntu-171" has the ip 10.13.182.171 and
>>that
>> >the hadoop installation in the "/home/ubuntu/Documents/hadoop-2.6.0/"
>> >directory is listening on port 9000 ?
>> >
>> >On Mon, Jan 4, 2016 at 3:03 PM, Kashmar, Ali <ali.kash...@emc.com>
>>wrote:
>> >
>> >> Hi Max,
>> >>
>> >> Both commands return nothing. Those variables aren’t set.
>> >>
>> >> The only software I installed on these machines is Flink and Java.
>> >>
>> >> -Ali
>> >>
>> >> On 2015-12-28, 6:42 AM, "Maximilian Michels" <m...@apache.org> wrote:
>> >>
>> >> >Hi Ali,
>> >> >
>> >> >The warning about the native Hadoop libraries is nothing to worry
>> >> >about. The native module

Re: Error due to Hadoop version mismatch

2015-12-23 Thread Kashmar, Ali
Hi Max,

I have the same output for the Task Manager:

11:25:04,274 INFO  org.apache.flink.runtime.taskmanager.TaskManager
  -  Hadoop version: 2.6.0

I do get this line at the beginning of both job and task manager log files:

11:25:04,100 WARN  org.apache.hadoop.util.NativeCodeLoader
  - Unable to load native-hadoop library for your platform... using
builtin-java classes where applicable

Do you think it has anything to do with it?

Thanks,
Ali

On 2015-12-23, 7:30 AM, "Maximilian Michels" <m...@apache.org> wrote:

>Hi Ali,
>
>Could you please also post the Hadoop version output of the task
>manager log files? It looks like the task managers are running a
>different Hadoop version.
>
>Thanks,
>Max
>
>On Tue, Dec 22, 2015 at 4:28 PM, Kashmar, Ali <ali.kash...@emc.com> wrote:
>> Hi Robert,
>>
>> I found the version in the job manager log file:
>>
>> 17:33:49,636 INFO  org.apache.flink.runtime.jobmanager.JobManager
>>   -  Hadoop version: 2.6.0
>>
>> But the Hadoop installation I have is saying this:
>>
>> ubuntu@ubuntu-171:~/Documents/hadoop-2.6.0$ bin/hadoop version
>> Hadoop 2.6.0
>> Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r
>> e3496499ecb8d220fba99dc5ed4c99c8f9e33bb1
>> Compiled by jenkins on 2014-11-13T21:10Z
>> Compiled with protoc 2.5.0
>> From source with checksum 18e43357c8f927c0695f1e9522859d6a
>> This command was run using
>> 
>>/home/ubuntu/Documents/hadoop-2.6.0/share/hadoop/common/hadoop-common-2.6
>>.0
>> .jar
>>
>>
>> So one of them is lying to me? :)
>>
>> Ali
>>
>> On 2015-12-22, 10:16 AM, "Robert Metzger" <rmetz...@apache.org> wrote:
>>
>>>Hi Ali,
>>>
>>>the TaskManagers and the JobManager is logging the Hadoop version on
>>>startup.
>>>
>>>On Tue, Dec 22, 2015 at 4:10 PM, Kashmar, Ali <ali.kash...@emc.com>
>>>wrote:
>>>
>>>> Hello,
>>>>
>>>> I¹m trying to use HDFS as store for Flink checkpoints so I downloaded
>>>>the
>>>> Hadoop 2.6.0/Scala 2.10 version of Flink and installed it. I also
>>>> downloaded Hadoop 2.6.0 separately from the Hadoop website and set up
>>>>HDFS
>>>> on a separate machine. When I start Flink I get the following error:
>>>>
>>>> 17:34:13,047 INFO  org.apache.flink.runtime.jobmanager.JobManager
>>>>   - Status of job 9ba32a08bc0ec02810bf5d2710842f72 (Protocol Event
>>>> Processing) changed to FAILED.
>>>> java.lang.Exception: Call to registerInputOutput() of invokable failed
>>>> at 
>>>>org.apache.flink.runtime.taskmanager.Task.run(Task.java:529)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>> Caused by: java.io.IOException: The given file URI (hdfs://
>>>> 10.13.182.171:9000/user/flink/checkpoints) points to the HDFS NameNode
>>>>at
>>>> 10.13.182.171:9000, but the File System could not be initialized with
>>>> that address: Server IPC version 9 cannot communicate with client
>>>>version 4
>>>> at
>>>>
>>>>org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFile
>>>>Sy
>>>>stem.java:337)
>>>> at 
>>>>org.apache.flink.core.fs.FileSystem.get(FileSystem.java:253)
>>>> at
>>>>
>>>>org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsState
>>>>Ba
>>>>ckend.java:142)
>>>> at
>>>>
>>>>org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsState
>>>>Ba
>>>>ckend.java:101)
>>>> at
>>>>
>>>>org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createF
>>>>ro
>>>>mConfig(FsStateBackendFactory.java:48)
>>>> at
>>>>
>>>>org.apache.flink.streaming.runtime.tasks.StreamTask.createStateBackend(
>>>>St
>>>>reamTask.java:517)
>>>> at
>>>>
>>>>org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOutput
>>>>(S
>>>>treamTask.java:171)
>>>> at 
>>>>org.apache.flink.runtime.taskmanager.Task.run(Task.java:526)
>>>> ... 1 more
>>>> Caused by:
>>>>
>>>>org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.RPC$Version
>>>>Mi
>>&

Error due to Hadoop version mismatch

2015-12-22 Thread Kashmar, Ali
Hello,

I’m trying to use HDFS as store for Flink checkpoints so I downloaded the 
Hadoop 2.6.0/Scala 2.10 version of Flink and installed it. I also downloaded 
Hadoop 2.6.0 separately from the Hadoop website and set up HDFS on a separate 
machine. When I start Flink I get the following error:

17:34:13,047 INFO  org.apache.flink.runtime.jobmanager.JobManager   
 - Status of job 9ba32a08bc0ec02810bf5d2710842f72 (Protocol Event Processing) 
changed to FAILED.
java.lang.Exception: Call to registerInputOutput() of invokable failed
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:529)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: The given file URI 
(hdfs://10.13.182.171:9000/user/flink/checkpoints) points to the HDFS NameNode 
at 10.13.182.171:9000, but the File System could not be initialized with that 
address: Server IPC version 9 cannot communicate with client version 4
at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:337)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:253)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:142)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:101)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:48)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createStateBackend(StreamTask.java:517)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOutput(StreamTask.java:171)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:526)
... 1 more
Caused by: 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.RPC$VersionMismatch):
 Server IPC version 9 cannot communicate with client version 4
at org.apache.hadoop.ipc.Client.call(Client.java:1113)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:229)
at com.sun.proxy.$Proxy6.getProtocolVersion(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:85)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:62)
at com.sun.proxy.$Proxy6.getProtocolVersion(Unknown Source)
at org.apache.hadoop.ipc.RPC.checkVersion(RPC.java:422)
at org.apache.hadoop.hdfs.DFSClient.createNamenode(DFSClient.java:183)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:281)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:245)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:100)
at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:321)
... 8 more

I searched for this error online and it indicates that the client which is 
Flink in this case is at a much lower version. Is there a way to check the 
version of Hadoop packaged with my Flink installation?

Thanks,
Ali


Re: Error due to Hadoop version mismatch

2015-12-22 Thread Kashmar, Ali
Hi Robert,

I found the version in the job manager log file:

17:33:49,636 INFO  org.apache.flink.runtime.jobmanager.JobManager
  -  Hadoop version: 2.6.0

But the Hadoop installation I have is saying this:

ubuntu@ubuntu-171:~/Documents/hadoop-2.6.0$ bin/hadoop version
Hadoop 2.6.0
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r
e3496499ecb8d220fba99dc5ed4c99c8f9e33bb1
Compiled by jenkins on 2014-11-13T21:10Z
Compiled with protoc 2.5.0
>From source with checksum 18e43357c8f927c0695f1e9522859d6a
This command was run using
/home/ubuntu/Documents/hadoop-2.6.0/share/hadoop/common/hadoop-common-2.6.0
.jar


So one of them is lying to me? :)

Ali

On 2015-12-22, 10:16 AM, "Robert Metzger" <rmetz...@apache.org> wrote:

>Hi Ali,
>
>the TaskManagers and the JobManager is logging the Hadoop version on
>startup.
>
>On Tue, Dec 22, 2015 at 4:10 PM, Kashmar, Ali <ali.kash...@emc.com> wrote:
>
>> Hello,
>>
>> I¹m trying to use HDFS as store for Flink checkpoints so I downloaded
>>the
>> Hadoop 2.6.0/Scala 2.10 version of Flink and installed it. I also
>> downloaded Hadoop 2.6.0 separately from the Hadoop website and set up
>>HDFS
>> on a separate machine. When I start Flink I get the following error:
>>
>> 17:34:13,047 INFO  org.apache.flink.runtime.jobmanager.JobManager
>>   - Status of job 9ba32a08bc0ec02810bf5d2710842f72 (Protocol Event
>> Processing) changed to FAILED.
>> java.lang.Exception: Call to registerInputOutput() of invokable failed
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:529)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.IOException: The given file URI (hdfs://
>> 10.13.182.171:9000/user/flink/checkpoints) points to the HDFS NameNode
>>at
>> 10.13.182.171:9000, but the File System could not be initialized with
>> that address: Server IPC version 9 cannot communicate with client
>>version 4
>> at
>> 
>>org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSy
>>stem.java:337)
>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:253)
>> at
>> 
>>org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBa
>>ckend.java:142)
>> at
>> 
>>org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBa
>>ckend.java:101)
>> at
>> 
>>org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFro
>>mConfig(FsStateBackendFactory.java:48)
>> at
>> 
>>org.apache.flink.streaming.runtime.tasks.StreamTask.createStateBackend(St
>>reamTask.java:517)
>> at
>> 
>>org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOutput(S
>>treamTask.java:171)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:526)
>> ... 1 more
>> Caused by:
>> 
>>org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.RPC$VersionMi
>>smatch):
>> Server IPC version 9 cannot communicate with client version 4
>> at org.apache.hadoop.ipc.Client.call(Client.java:1113)
>> at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:229)
>> at com.sun.proxy.$Proxy6.getProtocolVersion(Unknown Source)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> 
>>sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java
>>:62)
>> at
>> 
>>sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorI
>>mpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at
>> 
>>org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvoc
>>ationHandler.java:85)
>> at
>> 
>>org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationH
>>andler.java:62)
>> at com.sun.proxy.$Proxy6.getProtocolVersion(Unknown Source)
>> at org.apache.hadoop.ipc.RPC.checkVersion(RPC.java:422)
>> at
>> org.apache.hadoop.hdfs.DFSClient.createNamenode(DFSClient.java:183)
>> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:281)
>> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:245)
>> at
>> 
>>org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSy
>>stem.java:100)
>> at
>> 
>>org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSy
>>stem.java:321)
>> ... 8 more
>>
>> I searched for this error online and it indicates that the client which
>>is
>> Flink in this case is at a much lower version. Is there a way to check
>>the
>> version of Hadoop packaged with my Flink installation?
>>
>> Thanks,
>> Ali
>>



No job recovery after job manager failure

2015-12-16 Thread Kashmar, Ali
Hi,

I’m trying to test HA on a 3-node Flink cluster (task slots = 48). So I started 
a job with parallelism = 32 and waited for a few seconds so that all nodes are 
doing work. I then shut down the node that had the leader job manager, and by 
shut down I mean I powered off the virtual machine running it. I monitored the 
logs to see what was going on and I saw that zookeeper has elected a new 
leader. I also saw a log for recovering jobs, but nothing actually happens. 
Here’s the job manager log from the node that became the leader:

11:06:43,448 INFO  org.apache.flink.runtime.jobmanager.JobManager   
 - JobManager akka.tcp://flink@192.168.200.174:56023/user/jobmanager was 
granted leadership with leader session ID 
Some(16eb0d0a-2cae-473e-aa41-679a87d3669b).
11:06:45,912 INFO  org.apache.flink.runtime.webmonitor.JobManagerRetriever  
 - New leader reachable under 
akka.tcp://flink@192.168.200.174:56023/user/jobmanager:16eb0d0a-2cae-473e-aa41-679a87d3669b.
11:06:45,963 INFO  org.apache.flink.runtime.instance.InstanceManager
 - Registered TaskManager at 192.168.200.174 
(akka.tcp://flink@192.168.200.174:52324/user/taskmanager) as 
e8720b15c63d508e8dc19b19e70d4c88. Current number of registered hosts is 1. 
Current number of alive task slots is 16.
11:06:45,975 INFO  org.apache.flink.runtime.instance.InstanceManager
 - Registered TaskManager at 192.168.200.175 
(akka.tcp://flink@192.168.200.175:46612/user/taskmanager) as 
766a7938746c2d41e817e2ceb42a9a64. Current number of registered hosts is 2. 
Current number of alive task slots is 32.
11:08:25,925 INFO  org.apache.flink.runtime.jobmanager.JobManager   
 - Recovering all jobs.


I waited 10 minutes after that last log and there was no change. And here’s the 
task-manager log from the same node:


11:06:45,914 INFO  org.apache.flink.runtime.taskmanager.TaskManager 
 - Trying to register at JobManager 
akka.tcp://flink@192.168.200.174:56023/user/jobmanager (attempt 1, timeout: 500 
milliseconds)
11:06:45,983 INFO  org.apache.flink.runtime.taskmanager.TaskManager 
 - Successful registration at JobManager 
(akka.tcp://flink@192.168.200.174:56023/user/jobmanager), starting network 
stack and library cache.
11:06:45,988 INFO  org.apache.flink.runtime.io.network.netty.NettyClient
 - Successful initialization (took 4 ms).
11:06:45,994 INFO  org.apache.flink.runtime.io.network.netty.NettyServer
 - Successful initialization (took 6 ms). Listening on SocketAddress 
/192.168.200.174:39322.
11:06:45,994 INFO  org.apache.flink.runtime.taskmanager.TaskManager 
 - Determined BLOB server address to be /192.168.200.174:48746. Starting BLOB 
cache.
11:06:45,995 INFO  org.apache.flink.runtime.blob.BlobCache  
 - Created BLOB cache storage directory 
/tmp/blobStore-4d4e4cc2-c161-4df1-acea-abda2b28d39e


Is this a bug?

Thanks,
Ali


Re: Task Parallelism in a Cluster

2015-12-11 Thread Kashmar, Ali
192.168.200.174/192.168.200.174:2181, sessionid = 0x25181a544860094,
negotiated timeout = 4
03:46:09,469 INFO  
org.apache.flink.shaded.org.apache.curator.framework.state.ConnectionStateM
anager  - State change: RECONNECTED
03:46:09,475 INFO  
com.emc.ngen.analytics.flink.source.ParallelSocketSource  - It took
86212 to read a 1000 lines
03:46:09,523 INFO  
com.emc.ngen.analytics.flink.source.ParallelSocketSource  - It took
86217 to read a 1000 lines



You’ll notice that at some point it takes 254 milliseconds to process a
1000 lines of input, and then it jumps 86 seconds!! And I also see some
zookeeper exceptions that lead me to believe that it’s a networking
problem. I have 4 VMs running on 4 different hosts, and connected via a
10G NIC.

Thanks,
Ali


On 2015-12-11, 11:23 AM, "Stephan Ewen" <se...@apache.org> wrote:

>Hi Ali!
>
>I see, so the tasks 192.168.200.174 and 192.168.200.175 apparently do not
>make progress, even do not recognize the end-of-stream point.
>
>I expect that the streams on 192.168.200.174 and 192.168.200.175 are
>back-pressured to a stand-still. Since no network is involved, the reason
>for the back pressure are probably the sinks.
>
>What kind of data sink are you using (in the addSink()) function?
>Can you check if that one starts to fully block on machines
>192.168.200.174 and 192.168.200.175 ?
>
>Greetings,
>Stephan
>
>
>
>On Fri, Dec 11, 2015 at 4:50 PM, Kashmar, Ali <ali.kash...@emc.com> wrote:
>
>> Hi Stephan,
>>
>> I got a request to share the image with someone and I assume it was you.
>> You should be able to see it now. This seems to be the main issue I have
>> at this time. I've tried running the job on the cluster with a
>>parallelism
>> of 16, 24, 36, and even went up to 48. I see all the parallel pipelines
>> working for a bit and then some of them just stop, I’m not sure if
>>they’re
>> stuck or not. Here’s another screenshot:
>> http://postimg.org/image/gr6ogxqjj/
>>
>> Two things you’ll notice:
>> 1. Pipelines on 192.168.200.174 and 192.168.200.175 have stopped doing
>> anything at one point and only 192.168.200.173 is doing all the work.
>> 2. Pipelines on 192.168.200.174 and 192.168.200.175 don’t have an end
>>time
>> even though the job should be finished (the screenshot was taken after
>>the
>> source was closed).
>>
>> I’m not sure if this helps or not, but here are some properties from the
>> flink-conf.yaml:
>>
>> jobmanager.heap.mb: 8192
>> taskmanager.heap.mb: 49152
>> taskmanager.numberOfTaskSlots: 16
>> parallelism.default: 1
>>
>> state.backend: filesystem
>> state.backend.fs.checkpointdir: file:///tmp/flink-checkpoints
>>
>> taskmanager.network.numberOfBuffers: 3072
>>
>> recovery.mode: zookeeper
>> recovery.zookeeper.quorum:
>> 192.168.200.173:2181,192.168.200.174:2181,192.168.200.175:2181
>> recovery.zookeeper.storageDir: file:///tmp/zk-recovery
>> recovery.zookeeper.path.root: /opt/flink-0.10.0
>>
>> I appreciate all the help.
>>
>>
>> Thanks,
>> Ali
>>
>>
>> On 2015-12-10, 10:16 AM, "Stephan Ewen" <se...@apache.org> wrote:
>>
>> >Hi Ali!
>> >
>> >Seems like the Google Doc has restricted access, I tells me I have no
>> >permission to view it...
>> >
>> >Stephan
>> >
>> >
>> >On Wed, Dec 9, 2015 at 8:49 PM, Kashmar, Ali <ali.kash...@emc.com>
>>wrote:
>> >
>> >> Hi Stephan,
>> >>
>> >> Here’s a link to the screenshot I tried to attach earlier:
>> >>
>> >> https://drive.google.com/open?id=0B0_jTR8-IvUcMEdjWGFmYXJYS28
>> >>
>> >> It looks to me like the distribution is fairly skewed across the
>>nodes,
>> >> even though they’re executing the same pipeline.
>> >>
>> >> Thanks,
>> >> Ali
>> >>
>> >>
>> >> On 2015-12-09, 12:36 PM, "Stephan Ewen" <se...@apache.org> wrote:
>> >>
>> >> >Hi!
>> >> >
>> >> >The parallel socket source looks good.
>> >> >I think you forgot to attach the screenshot, or the mailing list
>> >>dropped
>> >> >the attachment...
>> >> >
>> >> >Not sure if I can diagnose that without more details. The sources
>>all
>> >>do
>> >> >the same. Assuming that the server distributes the data evenly
>>across
>> >>all
>> >> >connected sockets, and that the network bandwidth en

Re: Task Parallelism in a Cluster

2015-12-11 Thread Kashmar, Ali
Hi Stephan,

I got a request to share the image with someone and I assume it was you.
You should be able to see it now. This seems to be the main issue I have
at this time. I've tried running the job on the cluster with a parallelism
of 16, 24, 36, and even went up to 48. I see all the parallel pipelines
working for a bit and then some of them just stop, I’m not sure if they’re
stuck or not. Here’s another screenshot:
http://postimg.org/image/gr6ogxqjj/

Two things you’ll notice:
1. Pipelines on 192.168.200.174 and 192.168.200.175 have stopped doing
anything at one point and only 192.168.200.173 is doing all the work.
2. Pipelines on 192.168.200.174 and 192.168.200.175 don’t have an end time
even though the job should be finished (the screenshot was taken after the
source was closed).

I’m not sure if this helps or not, but here are some properties from the
flink-conf.yaml:

jobmanager.heap.mb: 8192
taskmanager.heap.mb: 49152
taskmanager.numberOfTaskSlots: 16
parallelism.default: 1

state.backend: filesystem
state.backend.fs.checkpointdir: file:///tmp/flink-checkpoints

taskmanager.network.numberOfBuffers: 3072

recovery.mode: zookeeper
recovery.zookeeper.quorum:
192.168.200.173:2181,192.168.200.174:2181,192.168.200.175:2181
recovery.zookeeper.storageDir: file:///tmp/zk-recovery
recovery.zookeeper.path.root: /opt/flink-0.10.0

I appreciate all the help.


Thanks,
Ali


On 2015-12-10, 10:16 AM, "Stephan Ewen" <se...@apache.org> wrote:

>Hi Ali!
>
>Seems like the Google Doc has restricted access, I tells me I have no
>permission to view it...
>
>Stephan
>
>
>On Wed, Dec 9, 2015 at 8:49 PM, Kashmar, Ali <ali.kash...@emc.com> wrote:
>
>> Hi Stephan,
>>
>> Here’s a link to the screenshot I tried to attach earlier:
>>
>> https://drive.google.com/open?id=0B0_jTR8-IvUcMEdjWGFmYXJYS28
>>
>> It looks to me like the distribution is fairly skewed across the nodes,
>> even though they’re executing the same pipeline.
>>
>> Thanks,
>> Ali
>>
>>
>> On 2015-12-09, 12:36 PM, "Stephan Ewen" <se...@apache.org> wrote:
>>
>> >Hi!
>> >
>> >The parallel socket source looks good.
>> >I think you forgot to attach the screenshot, or the mailing list
>>dropped
>> >the attachment...
>> >
>> >Not sure if I can diagnose that without more details. The sources all
>>do
>> >the same. Assuming that the server distributes the data evenly across
>>all
>> >connected sockets, and that the network bandwidth ends up being
>>divided in
>> >a fair way, all pipelines should run be similarly "eager".
>> >
>> >Greetings,
>> >Stephan
>> >
>> >
>> >On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Ali <ali.kash...@emc.com>
>>wrote:
>> >
>> >> Hi Stephan,
>> >>
>> >> That was my original understanding, until I realized that I was not
>> >>using
>> >> a parallel socket source. I had a custom source that extended
>> >> SourceFunction which always runs with parallelism = 1. I looked
>>through
>> >> the API and found the ParallelSourceFunction interface so I
>>implemented
>> >> that and voila, now all 3 nodes in the cluster are actually receiving
>> >> traffic on socket connections.
>> >>
>> >> Now that I’m running it successfully end to end, I’m trying to
>>improve
>> >>the
>> >> performance. Can you take a look at the attached screen shot and
>>tell me
>> >> if the distribution of work amongst the pipelines is normal? I feel
>>like
>> >> some pipelines are lot lazier than others, even though the cluster
>>nodes
>> >> are exactly the same.
>> >>
>> >> By the way, here’s the class I wrote. It would be useful to have this
>> >> available in Flink distro:
>> >>
>> >> public class ParallelSocketSource implements
>> >> ParallelSourceFunction {
>> >>
>> >> private static final long serialVersionUID =
>> >>-271094428915640892L;
>> >> private static final Logger LOG =
>> >> LoggerFactory.getLogger(ParallelSocketSource.class);
>> >>
>> >> private volatile boolean running = true;
>> >> private String host;
>> >> private int port;
>> >>
>> >> public ParallelSocketSource(String host, int port) {
>> >> this.host = host;
>> >> this.port = port;
>> >> }
>> >>
>> &g

Re: Task Parallelism in a Cluster

2015-12-09 Thread Kashmar, Ali
Hi Stephan,

That was my original understanding, until I realized that I was not using
a parallel socket source. I had a custom source that extended
SourceFunction which always runs with parallelism = 1. I looked through
the API and found the ParallelSourceFunction interface so I implemented
that and voila, now all 3 nodes in the cluster are actually receiving
traffic on socket connections.

Now that I’m running it successfully end to end, I’m trying to improve the
performance. Can you take a look at the attached screen shot and tell me
if the distribution of work amongst the pipelines is normal? I feel like
some pipelines are lot lazier than others, even though the cluster nodes
are exactly the same.

By the way, here’s the class I wrote. It would be useful to have this
available in Flink distro:

public class ParallelSocketSource implements
ParallelSourceFunction {

private static final long serialVersionUID = -271094428915640892L;
private static final Logger LOG =
LoggerFactory.getLogger(ParallelSocketSource.class);

private volatile boolean running = true;
private String host;
private int port;

public ParallelSocketSource(String host, int port) {
this.host = host;
this.port = port;
}

@Override
public void run(SourceContext ctx) throws Exception {
try (Socket socket = new Socket(host, port);
BufferedReader reader = new BufferedReader(new
InputStreamReader(socket.getInputStream( {
String line  = null;
while(running && ((line = reader.readLine()) != null)) {
ctx.collect(line);
}
} catch(IOException ex) {
LOG.error("error reading from socket", ex);
}
}

@Override
public void cancel() {
running = false;
}
}

Regards,
Ali
 

On 2015-12-08, 3:35 PM, "Stephan Ewen" <se...@apache.org> wrote:

>Hi Ali!
>
>In the case you have, the sequence of source-map-filter ... forms a
>pipeline.
>
>You mentioned that you set the parallelism to 16, so there should be 16
>pipelines. These pipelines should be completely independent.
>
>Looking at the way the scheduler is implemented, independent pipelines
>should be spread across machines. But when you execute that in parallel,
>you say all 16 pipelines end up on the same machine?
>
>Can you share with us the rough code of your program? Or a Screenshot from
>the runtime dashboard that shows the program graph?
>
>
>If your cluster is basically for that one job only, you could try and set
>the number of slots to 4 for each machine. Then you have 16 slots in total
>and each node would run one of the 16 pipelines.
>
>
>Greetings,
>Stephan
>
>
>On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali <ali.kash...@emc.com> wrote:
>
>> There is no shuffle operation in my flow. Mine actually looks like this:
>>
>> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map -> Map
>>->
>> Map, Filter)
>>
>>
>> Maybe it’s treating this whole flow as one pipeline and assigning it to
>>a
>> slot. What I really wanted was to have the custom source I built to have
>> running instances on all nodes. I’m not really sure if that’s the right
>> approach, but if we could add this as a feature that’d be great, since
>> having more than one node running the same pipeline guarantees the
>> pipeline is never offline.
>>
>> -Ali
>>
>> On 2015-12-02, 4:39 AM, "Till Rohrmann" <trohrm...@apache.org> wrote:
>>
>> >If I'm not mistaken, then the scheduler has already a preference to
>>spread
>> >independent pipelines out across the cluster. At least he uses a queue
>>of
>> >instances from which it pops the first element if it allocates a new
>>slot.
>> >This instance is then appended to the queue again, if it has some
>> >resources
>> >(slots) left.
>> >
>> >I would assume that you have a shuffle operation involved in your job
>>such
>> >that it makes sense for the scheduler to deploy all pipelines to the
>>same
>> >machine.
>> >
>> >Cheers,
>> >Till
>> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <se...@apache.org> wrote:
>> >
>> >> Slots are like "resource groups" which execute entire pipelines. They
>> >> frequently have more than one operator.
>> >>
>> >> What you can try as a workaround is decrease the number of slots per
>> >> machine to cause 

Re: Task Parallelism in a Cluster

2015-12-09 Thread Kashmar, Ali
Hi Stephan,

Here’s a link to the screenshot I tried to attach earlier:

https://drive.google.com/open?id=0B0_jTR8-IvUcMEdjWGFmYXJYS28

It looks to me like the distribution is fairly skewed across the nodes,
even though they’re executing the same pipeline.

Thanks,
Ali


On 2015-12-09, 12:36 PM, "Stephan Ewen" <se...@apache.org> wrote:

>Hi!
>
>The parallel socket source looks good.
>I think you forgot to attach the screenshot, or the mailing list dropped
>the attachment...
>
>Not sure if I can diagnose that without more details. The sources all do
>the same. Assuming that the server distributes the data evenly across all
>connected sockets, and that the network bandwidth ends up being divided in
>a fair way, all pipelines should run be similarly "eager".
>
>Greetings,
>Stephan
>
>
>On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Ali <ali.kash...@emc.com> wrote:
>
>> Hi Stephan,
>>
>> That was my original understanding, until I realized that I was not
>>using
>> a parallel socket source. I had a custom source that extended
>> SourceFunction which always runs with parallelism = 1. I looked through
>> the API and found the ParallelSourceFunction interface so I implemented
>> that and voila, now all 3 nodes in the cluster are actually receiving
>> traffic on socket connections.
>>
>> Now that I’m running it successfully end to end, I’m trying to improve
>>the
>> performance. Can you take a look at the attached screen shot and tell me
>> if the distribution of work amongst the pipelines is normal? I feel like
>> some pipelines are lot lazier than others, even though the cluster nodes
>> are exactly the same.
>>
>> By the way, here’s the class I wrote. It would be useful to have this
>> available in Flink distro:
>>
>> public class ParallelSocketSource implements
>> ParallelSourceFunction {
>>
>> private static final long serialVersionUID =
>>-271094428915640892L;
>> private static final Logger LOG =
>> LoggerFactory.getLogger(ParallelSocketSource.class);
>>
>> private volatile boolean running = true;
>> private String host;
>> private int port;
>>
>> public ParallelSocketSource(String host, int port) {
>> this.host = host;
>> this.port = port;
>> }
>>
>> @Override
>> public void run(SourceContext ctx) throws Exception {
>> try (Socket socket = new Socket(host, port);
>> BufferedReader reader = new BufferedReader(new
>> InputStreamReader(socket.getInputStream( {
>> String line  = null;
>> while(running && ((line = reader.readLine()) !=
>> null)) {
>> ctx.collect(line);
>> }
>> } catch(IOException ex) {
>> LOG.error("error reading from socket", ex);
>> }
>> }
>>
>> @Override
>> public void cancel() {
>> running = false;
>> }
>> }
>>
>> Regards,
>> Ali
>>
>>
>> On 2015-12-08, 3:35 PM, "Stephan Ewen" <se...@apache.org> wrote:
>>
>> >Hi Ali!
>> >
>> >In the case you have, the sequence of source-map-filter ... forms a
>> >pipeline.
>> >
>> >You mentioned that you set the parallelism to 16, so there should be 16
>> >pipelines. These pipelines should be completely independent.
>> >
>> >Looking at the way the scheduler is implemented, independent pipelines
>> >should be spread across machines. But when you execute that in
>>parallel,
>> >you say all 16 pipelines end up on the same machine?
>> >
>> >Can you share with us the rough code of your program? Or a Screenshot
>>from
>> >the runtime dashboard that shows the program graph?
>> >
>> >
>> >If your cluster is basically for that one job only, you could try and
>>set
>> >the number of slots to 4 for each machine. Then you have 16 slots in
>>total
>> >and each node would run one of the 16 pipelines.
>> >
>> >
>> >Greetings,
>> >Stephan
>> >
>> >
>> >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali <ali.kash...@emc.com>
>>wrote:
>> >
>> >> There is no shuffle operation in my flow. Mine actually looks like
>>this:
>> >>
>> >

Re: Task Parallelism in a Cluster

2015-12-01 Thread Kashmar, Ali
Is there a way to make a task cluster-parallelizable? I.e. Make sure the
parallel instances of the task are distributed across the cluster. When I
run my flink job with a parallelism of 16, all the parallel tasks are
assigned to the first task manager.

- Ali

On 2015-11-30, 2:18 PM, "Ufuk Celebi" <u...@apache.org> wrote:

>
>> On 30 Nov 2015, at 17:47, Kashmar, Ali <ali.kash...@emc.com> wrote:
>> Do the parallel instances of each task get distributed across the
>>cluster or is it possible that they all run on the same node?
>
>Yes, slots are requested from all nodes of the cluster. But keep in mind
>that multiple tasks (forming a local pipeline) can be scheduled to the
>same slot (1 slot can hold many tasks).
>
>Have you seen this?
>https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/job
>_scheduling.html
>
>> If they can all run on the same node, what happens when that node
>>crashes? Does the job manager recreate them using the remaining open
>>slots?
>
>What happens: The job manager tries to restart the program with the same
>parallelism. Thus if you have enough free slots available in your
>cluster, this works smoothly (so yes, the remaining/available slots are
>used)
>
>With a YARN cluster the task manager containers are restarted
>automatically. In standalone mode, you have to take care of this yourself.
>
>
>Does this help?
>
>­ Ufuk
>



Task Parallelism in a Cluster

2015-11-30 Thread Kashmar, Ali
Hello,

I’m trying to wrap my head around task parallelism in a Flink cluster. Let’s 
say I have a cluster of 3 nodes, each node offering 16 task slots, so in total 
I’d have 48 slots for processing. Do the parallel instances of each task get 
distributed across the cluster or is it possible that they all run on the same 
node? If they can all run on the same node, what happens when that node 
crashes? Does the job manager recreate them using the remaining open slots?

Thanks,
Ali


Re: How to use static data with streams?

2015-11-16 Thread Kashmar, Ali
Hi Robert,

Thanks for the help! I’ve managed to implement my use case using your
suggested approach of combining the streams.

Just a follow up on 2b) below, I’m not clear on this statement "partition
(split) the data stream so that the right protocol packets end up at the
right machine”. How do I know which machine the data is ending up at? My
understanding is that the Flink program is agnostic of the cluster nodes.

Maybe it would help if I explained this use case:
1. Load a CSV file and split it equally, using the ID in the CSV record,
across the Flink cluster to be stored in memory (operator’s memory
maybe?). This is basically an initialization step.
2. Once 1) is done, read events from a socket (for now) and use the ID in
the event to add attributes from the matching CSV record to the event.
Store the updated events in a file.

Based on those two requirements, what can be accomplished using Flink and
what can’t be? Is the stuff that can’t be done in Flink’s roadmap?

Thanks,
Ali


On 2015-11-05, 5:29 PM, "Robert Metzger" <rmetz...@apache.org> wrote:

>Hi Ali,
>
>1. You can connect two streams and then use the co-map operator to consume
>data from both streams. I'm not sure how much data arrives from one or the
>other stream, but maybe you can store (update) the data in memory.
>Read more here
>https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guid
>e.html#datastream-abstraction
>
>2 a) No, I think all the taskmanager nodes are listening to data. For
>making this highly available, I would recommend to let the system which is
>producing the data write it to Apache Kafka. Then, consume the data from
>Kafka using Flink.
>This way you get very good high availability and througput and you don't
>have to worry about the sockets.
>
>2 b) Sure, you can implement the splitting yourself (each mapper reads N
>lines of the file) and then partition (split) the data stream so that the
>right protocol packets end up at the right machine.
>However, if the entire CSV file fits into the entire memory of one
>machine,
>its probably faster to not split the stream and use each machine to join
>the data locally.
>
>Its really no problem that you're asking questions, that's what the
>mailing
>list is made for.
>I'm looking forward to the next set of questions ;)
>
>Regards,
>Robert
>
>
>
>On Thu, Nov 5, 2015 at 9:56 PM, Kashmar, Ali <ali.kash...@emc.com> wrote:
>
>> Hi Robert,
>>
>> I tried the approach you suggested and it works nicely. Thanks!
>>
>> I have a few more questions if you don’t mind:
>>
>> 1. Is there a way to retrieve in one stream data that's stored in
>>another
>> stream? I have a location stream that I can use to store the latest
>> subscriber location. I have another stream that needs access to the
>>latest
>> subscriber location processed by the location stream. I read a bit on
>> broadcast variables but they’re only available for DataSets, not
>> DataStreams. Did I miss a way in Flink to do this?
>>
>> 2. We are planning to test this on a Flink cluster of 3 nodes (1 master
>> and 2 slaves).
>>
>>a. If I use a socket stream, does each node listen for data on its
>> socket or is it only the job manager node? I assume it’s the latter.
>>This
>> is   important because I have to figure out how to make the
>> system highly
>> available.
>>b. Is there a way to split the afore-mentioned CSV file across the
>> three nodes in the cluster?
>>
>> Sorry for bombarding you with questions.
>>
>> Thanks,
>> Ali
>>
>>
>> On 2015-11-05, 10:47 AM, "Robert Metzger" <rmetz...@apache.org> wrote:
>>
>> >Hi Ali,
>> >
>> >great, the start-local-streaming.sh script sounds right.
>> >
>> >I can explain why your first approach didn't work:
>> >
>> >You were trying to send the CSV files from the Flink client to the
>>cluster
>> >using our RPC system (Akka). When you submit a job to Flink, we
>>serialize
>> >all the objects the user created (mappers, sources, ...) and send it to
>> >the
>> >cluster.
>> >There is a method StreamExecutionEnvironment.fromElements(..) which
>>allows
>> >users to serialize a few objects along with the job submission. But the
>> >amount of data you can transfer like this is limited by the Akka frame
>> >size. In our case I think the default is 10 megabytes.
>> >After that, Akka will probably just drop or reject the deployment
>>message.
>> >
>> >I'm pretty sure the approach I've suggested will resolve the issue.
>> 

Re: Core Memory Error

2015-11-12 Thread Kashmar, Ali
So the problem wasn’t in Flink after all. It turns out the data I was
receiving at the socket was not complete. So I went back and looked at the
way I’m sending data to the socket and realized that the socket is closed
before sending all data. I just needed to flush the stream before closing
the socket. I don’t see any more serialization errors.

Thanks everyone for the help and I apologize if I wasted your time with
this. I will stick with 0.9.1 for now but I’ll download and use 0.10 as
soon as it’s released.

Cheers,
Ali

On 2015-11-11, 6:00 PM, "Fabian Hueske" <fhue...@gmail.com> wrote:

>Hi Ali,
>
>Flink uses different serializers for different data types. For example,
>(boxed) primitives are serialized using dedicated serializers
>(IntSerializer, StringSerializer, etc.) and the ProtocolEvent class is
>recognized as a Pojo type and therefore serialized using Flink's
>PojoSerializer.
>Types that cannot be (fully) analyzed are handled as GenericTypes and
>serialized using Flink's KryoSerializer.
>
>By forcing Kryo serialization as I suggested before, Pojo types (such as
>ProtocolEvent) will be serialized with Kryo instead of Flink's
>PojoSerializer.
>Hence, forcing Kryo only affects Pojo types. GenericTypes (such as
>ProtocolAttributeMap and ProtocolDetailMap) are always handled by Kryo
>(also without forcing it).
>
>The exceptions you are facing might be caused by a bug in the
>KryoSerializer that we recently fixed (see FLINK-2800 [1]). This bug
>basically corrupts the stream of serialized data and might very well also
>be responsible for the original exception you posted. As you see from the
>JIRA issue, a bug fix was merged to all active branches however it is not
>yet contained in an official release.
>
>I would recommend you to try the latest candidate of the upcoming 0.10
>release [2] or build Flink from the 0.9-release branch [3].
>
>Please let me know if you have any questions or still facing problems when
>switching to version with a fix for FLINK-2800.
>
>Best, Fabian
>
>[1] https://issues.apache.org/jira/browse/FLINK-2800
>[2] http://people.apache.org/~mxm/flink-0.10.0-rc8/
>[3] https://github.com/apache/flink/tree/release-0.9
>
>2015-11-11 17:20 GMT+01:00 Kashmar, Ali <ali.kash...@emc.com>:
>
>> Fabian,
>>
>> I tried running it again and I noticed there were some more exceptions
>>in
>> the log. I fixed those and I don’t see the original error but I do see
>> other ArrayIndexOutofBoundExceptions in the Kryo serializer code (I
>>didn’t
>> even enable that yet like you suggested). Examples:
>>
>> 1)
>>
>> 10:49:36,331 ERROR org.apache.flink.streaming.api.collector.StreamOutput
>>   - Emit failed due to: java.lang.ArrayIndexOutOfBoundsException:
>>255
>> at
>> 
>>com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectI
>>nt
>> Map.java:364)
>> at
>> 
>>com.esotericsoftware.kryo.util.MapReferenceResolver.reset(MapReferenceRes
>>ol
>> ver.java:47)
>> at com.esotericsoftware.kryo.Kryo.reset(Kryo.java:836)
>> at
>> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:601)
>> at
>> 
>>com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.j
>>av
>> a:95)
>> at
>> 
>>com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.j
>>av
>> a:21)
>> at
>> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
>> at
>> 
>>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize
>>(K
>> ryoSerializer.java:186)
>> at
>> 
>>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(Pojo
>>Se
>> rializer.java:372)
>> at
>> 
>>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.se
>>ri
>> alize(StreamRecordSerializer.java:89)
>> at
>> 
>>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.se
>>ri
>> alize(StreamRecordSerializer.java:29)
>> at
>> 
>>org.apache.flink.runtime.plugable.SerializationDelegate.write(Serializati
>>on
>> Delegate.java:51)
>> at
>> 
>>org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSeria
>>li
>> zer.addRecord(SpanningRecordSerializer.java:76)
>> at
>> 
>>org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWr
>>it
>> er.java:83)
>> at
>> 
>>org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(Str

Re: Core Memory Error

2015-11-11 Thread Kashmar, Ali
em occurs when deserializing ProtocolEvent objects.
>Is it possible that you share this class with me?
>
>If it is not possible to share the class, it would be good, to know the
>field types of the Pojo and the associated TypeInformation.
>For that you can run the code in this gist [1] which will recursively
>print
>the field types and their TypeInformation.
>
>As a temporal workaround, you can try to use Kryo to serialize and
>deserialize your Pojos as follows:
>ExecutionEnvironment env = ...
>env.getConfig().enableForceKryo();
>
>Best,
>Fabian
>
>[1] https://gist.github.com/fhueske/6c5aa386fc79ab69712b
>
>2015-11-11 10:38 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:
>
>> Hi Ali,
>>
>> one more thing. Did that error occur once or is it reproducable?
>>
>> Thanks for your help,
>> Fabian
>>
>> 2015-11-11 9:50 GMT+01:00 Ufuk Celebi <u...@apache.org>:
>>
>>> Hey Ali,
>>>
>>> thanks for sharing the code. I assume that the custom
>>> ProtocolEvent, ProtocolDetailMap, Subscriber type are all PoJos. They
>>> should not be a problem. I think this is a bug in Flink 0.9.1.
>>>
>>> Is it possible to re-run your program with the upcoming 0.10.0 (RC8)
>>> version and report back?
>>>
>>> 1) Add
>>> https://repository.apache.org/content/repositories/orgapacheflink-1055
>>> as a
>>> snapshot repository
>>>
>>> 
>>> 
>>> apache.snapshots
>>> Apache Development Snapshot Repository
>>> 
>>> https://repository.apache.org/content/repositories/orgapacheflink-1055
>>> 
>>> 
>>> false
>>> 
>>> 
>>> true
>>> 
>>> 
>>> 
>>>
>>> 2) Set the Flink dependency version to 0.10.0
>>>
>>> 3) Use the Flink binary matching your Hadoop installation from here:
>>> http://people.apache.org/~mxm/flink-0.10.0-rc8/ (since you use Java,
>>>you
>>> can go with the Scala 2.10 builds)
>>>
>>> Sorry for the inconvenience! The release is about to be finished (the
>>> voting process is already going on).
>>>
>>> ­ Ufuk
>>>
>>>
>>> On Tue, Nov 10, 2015 at 8:05 PM, Kashmar, Ali <ali.kash...@emc.com>
>>> wrote:
>>>
>>> > Thanks for the quick reply guys! A lot of interest in this one. I¹ve
>>> > attached the source code is attached. There are other supporting
>>> > modules/classes but the main flink component is in the included zip
>>> file.
>>> >
>>> > In answer to Fabian¹s question: I¹m using the 0.9.1 release right off
>>> the
>>> > website (flink-0.9.1-bin-hadoop1.tgz).
>>> >
>>> > In answer to Ufuk¹s question: Yes I¹m using custom data types.
>>> >
>>> > Thanks,
>>> > Ali
>>> >
>>> >
>>> >
>>> > On 2015-11-10, 3:01 PM, "Ufuk Celebi" <u...@apache.org> wrote:
>>> >
>>> > >Thanks for reporting this. Are you using any custom data types?
>>> > >
>>> > >If you can share your code, it would be very helpful in order to
>>>debug
>>> > >this.
>>> > >
>>> > >­ Ufuk
>>> > >
>>> > >On Tuesday, 10 November 2015, Fabian Hueske <fhue...@gmail.com>
>>>wrote:
>>> > >
>>> > >> I agree with Robert. Looks like a bug in Flink.
>>> > >> Maybe an off-by-one issue (violating index is 32768 and the
>>>default
>>> > >>memory
>>> > >> segment size is 32KB).
>>> > >>
>>> > >> Which Flink version are you using?
>>> > >> In case you are using a custom build, can you share the commit ID
>>>(is
>>> > >> reported in the first lines of the JobManager log file)?
>>> > >>
>>> > >> Thanks, Fabian
>>> > >>
>>> > >> 2015-11-10 18:29 GMT+01:00 Robert Metzger <rmetz...@apache.org
>>> > >> <javascript:;>>:
>>> > >>
>>> > >> > Hi Ali,
>>> > >> >
>>> > >> > this could be a bug in Flink.
>>> > >> > Can you share the code of your program with us to debug the
>>>issue?
>>> > >> >
>>> > >> > On Tue, Nov 10, 2015 at 6:25 PM, Kashmar, Ali
>>><ali.kash...@emc.com
>>&

Core Memory Error

2015-11-10 Thread Kashmar, Ali
Hello,

I’m getting this error while running a streaming module on a cluster of 3 nodes:


java.lang.ArrayIndexOutOfBoundsException: 32768

at org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:178)

at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:214)

at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:219)

at org.apache.flink.types.StringValue.readString(StringValue.java:764)

at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:68)

at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)

at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)

at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:499)

at 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:102)

at 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:29)

at 
org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)

at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:110)

at 
org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNextRecord(StreamingAbstractRecordReader.java:80)

at 
org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(StreamingMutableRecordReader.java:36)

at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)

at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneInputStreamTask.java:68)

at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:101)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)

at java.lang.Thread.run(Thread.java:745)


Here’s the configuration for each node:


jobmanager.heap.mb: 2048

taskmanager.heap.mb: 4096

taskmanager.numberOfTaskSlots: 5


I’m not even sure where to start with this one so any help is appreciated.


Thanks,

Ali


Re: How to use static data with streams?

2015-11-05 Thread Kashmar, Ali
I did not load the CSV file using the approach you suggested. I was
loading it outside the operators (at the beginning of the main method of
my class), since the file will be needed by multiple operators for sure.
When the file was small, I saw the job registered and started, but when I
used a big CSV file, the job never got registered with the task manager (I
tried the ‘list' command and got nothing).

Here’s what I saw with the small(ish) file:

# flink run analytics-flink.jar 19001 minisubs.csv output.csv
loaded 20 subscribers from csv file
11/02/2015 16:36:59 Job execution switched to status RUNNING.
11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream
Sink(1/1) switched to SCHEDULED
11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream
Sink(1/1) switched to DEPLOYING
11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream
Sink(1/1) switched to RUNNING


And here’s what I saw with the big file:

# flink run analytics-flink.jar 19001 subs.csv output.csv
loaded 1173547 subscribers from csv file


I’m already using the streaming mode. I’m running a single Flink node
right now on Centos 7 using the ‘start-local-streaming.sh’ script.

Thanks,
Ali

On 2015-11-05, 10:22 AM, "Robert Metzger" <rmetz...@apache.org> wrote:

>Okay.
>
>you should be able to implement it as you described initially. I would do
>the transformation in a map() operator of Flink. The RichMapFunction
>provides you with an open() method which is called before the first record
>arrives.
>In the open() method, I would read the csv file(s) from HDFS or another
>file system accessible by all nodes.
>
>Then, you can access the data from the files in the map operator.
>
>In order to utilize the memory best, I would recommend to start Flink in
>the "streaming" mode. (-st argument on YARN). With that enabled, we
>provide
>more memory to streaming operators.
>Also, I would only expose one processing slot per TaskManager, this way we
>ensure that the files are only read once per TaskManager. (make sure you
>have only one TaskManager per machine).
>
>Why did your previous approach fail? Do you still have the error message?
>
>Regards,
>Robert
>
>On Thu, Nov 5, 2015 at 3:02 PM, Kashmar, Ali <ali.kash...@emc.com> wrote:
>
>> Hi Robert,
>>
>> The CSV file (or files as there will definitely be more than one) can be
>> large (let¹s say 1 GB). Memory is not an issue though. Each node has at
>> least 64 GB RAM mounted. The CSV files should easily fit in the memory
>>of
>> each node.
>>
>> Regards,
>> Ali
>>
>>
>>
>> On 2015-11-05, 6:30 AM, "Robert Metzger" <rmetz...@apache.org> wrote:
>>
>> >Hi Ali,
>> >
>> >I'm excited to hear that EMC is looking into Apache Flink. I think the
>> >solution to this problem depends on one question: What is the size of
>>the
>> >data in the CSV file compared to the memory you have available in the
>> >cluster?
>> >Would the mapping table from the file fit into the memory of all nodes
>> >running Flink?
>> >
>> >Regards,
>> >Robert
>> >
>> >PS: Did you subscribe to the mailing list? I've CCed you in case you're
>> >not
>> >subscribed yet
>> >
>> >On Wed, Nov 4, 2015 at 4:54 PM, Kashmar, Ali <ali.kash...@emc.com>
>>wrote:
>> >
>> >> Hi there,
>> >>
>> >> I¹m trying to design and implement a use case in Flink where I¹m
>> >>receiving
>> >> protocol packets over a socket. Each packet has the subscriber IMSI
>>in
>> >>it
>> >> and a bunch of more data. At the same time, I have a csv file with a
>> >> mapping from IMSI -> subscriber group. I need to inject the group
>>into
>> >> packet and then send it to the sink.
>> >>
>> >> I¹ve tried loading the CSV into a memory map and then accessing the
>>map
>> >> from within the Flink operators but that only works when the CSV is
>>very
>> >> small (a few hundred subscribers). I¹ve tried creating another stream
>> >>for
>> >> the CSV and connecting the streams but that doesn¹t yield anything
>>as I
>> >> can¹t have access to objects from both streams at the same time.
>> >>
>> >> How would you guys approach this?
>> >>
>> >> Thanks,
>> >> Ali
>> >>
>>
>>



Re: How to use static data with streams?

2015-11-05 Thread Kashmar, Ali
Hi Robert,

I tried the approach you suggested and it works nicely. Thanks!

I have a few more questions if you don’t mind:

1. Is there a way to retrieve in one stream data that's stored in another
stream? I have a location stream that I can use to store the latest
subscriber location. I have another stream that needs access to the latest
subscriber location processed by the location stream. I read a bit on
broadcast variables but they’re only available for DataSets, not
DataStreams. Did I miss a way in Flink to do this?

2. We are planning to test this on a Flink cluster of 3 nodes (1 master
and 2 slaves).

   a. If I use a socket stream, does each node listen for data on its
socket or is it only the job manager node? I assume it’s the latter. This
is   important because I have to figure out how to make the system 
highly
available.
   b. Is there a way to split the afore-mentioned CSV file across the
three nodes in the cluster?

Sorry for bombarding you with questions.

Thanks,
Ali


On 2015-11-05, 10:47 AM, "Robert Metzger" <rmetz...@apache.org> wrote:

>Hi Ali,
>
>great, the start-local-streaming.sh script sounds right.
>
>I can explain why your first approach didn't work:
>
>You were trying to send the CSV files from the Flink client to the cluster
>using our RPC system (Akka). When you submit a job to Flink, we serialize
>all the objects the user created (mappers, sources, ...) and send it to
>the
>cluster.
>There is a method StreamExecutionEnvironment.fromElements(..) which allows
>users to serialize a few objects along with the job submission. But the
>amount of data you can transfer like this is limited by the Akka frame
>size. In our case I think the default is 10 megabytes.
>After that, Akka will probably just drop or reject the deployment message.
>
>I'm pretty sure the approach I've suggested will resolve the issue.
>
>Please let me know if you need further assistance.
>
>Regards,
>Robert
>
>
>
>On Thu, Nov 5, 2015 at 3:39 PM, Kashmar, Ali <ali.kash...@emc.com> wrote:
>
>> I did not load the CSV file using the approach you suggested. I was
>> loading it outside the operators (at the beginning of the main method of
>> my class), since the file will be needed by multiple operators for sure.
>> When the file was small, I saw the job registered and started, but when
>>I
>> used a big CSV file, the job never got registered with the task manager
>>(I
>> tried the ‘list' command and got nothing).
>>
>> Here’s what I saw with the small(ish) file:
>>
>> # flink run analytics-flink.jar 19001 minisubs.csv output.csv
>> loaded 20 subscribers from csv file
>> 11/02/2015 16:36:59 Job execution switched to status RUNNING.
>> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream
>> Sink(1/1) switched to SCHEDULED
>> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream
>> Sink(1/1) switched to DEPLOYING
>> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream
>> Sink(1/1) switched to RUNNING
>>
>>
>> And here’s what I saw with the big file:
>>
>> # flink run analytics-flink.jar 19001 subs.csv output.csv
>> loaded 1173547 subscribers from csv file
>>
>>
>> I’m already using the streaming mode. I’m running a single Flink node
>> right now on Centos 7 using the ‘start-local-streaming.sh’ script.
>>
>> Thanks,
>> Ali
>>
>> On 2015-11-05, 10:22 AM, "Robert Metzger" <rmetz...@apache.org> wrote:
>>
>> >Okay.
>> >
>> >you should be able to implement it as you described initially. I would
>>do
>> >the transformation in a map() operator of Flink. The RichMapFunction
>> >provides you with an open() method which is called before the first
>>record
>> >arrives.
>> >In the open() method, I would read the csv file(s) from HDFS or another
>> >file system accessible by all nodes.
>> >
>> >Then, you can access the data from the files in the map operator.
>> >
>> >In order to utilize the memory best, I would recommend to start Flink
>>in
>> >the "streaming" mode. (-st argument on YARN). With that enabled, we
>> >provide
>> >more memory to streaming operators.
>> >Also, I would only expose one processing slot per TaskManager, this
>>way we
>> >ensure that the files are only read once per TaskManager. (make sure
>>you
>> >have only one TaskManager per machine).
>> >
>> >Why did your previous approach fail? Do you still have the error
>>message?
>> >
>&

Re: How to use static data with streams?

2015-11-05 Thread Kashmar, Ali
Hi Robert,

The CSV file (or files as there will definitely be more than one) can be
large (let¹s say 1 GB). Memory is not an issue though. Each node has at
least 64 GB RAM mounted. The CSV files should easily fit in the memory of
each node.

Regards,
Ali



On 2015-11-05, 6:30 AM, "Robert Metzger" <rmetz...@apache.org> wrote:

>Hi Ali,
>
>I'm excited to hear that EMC is looking into Apache Flink. I think the
>solution to this problem depends on one question: What is the size of the
>data in the CSV file compared to the memory you have available in the
>cluster?
>Would the mapping table from the file fit into the memory of all nodes
>running Flink?
>
>Regards,
>Robert
>
>PS: Did you subscribe to the mailing list? I've CCed you in case you're
>not
>subscribed yet
>
>On Wed, Nov 4, 2015 at 4:54 PM, Kashmar, Ali <ali.kash...@emc.com> wrote:
>
>> Hi there,
>>
>> I¹m trying to design and implement a use case in Flink where I¹m
>>receiving
>> protocol packets over a socket. Each packet has the subscriber IMSI in
>>it
>> and a bunch of more data. At the same time, I have a csv file with a
>> mapping from IMSI -> subscriber group. I need to inject the group into
>> packet and then send it to the sink.
>>
>> I¹ve tried loading the CSV into a memory map and then accessing the map
>> from within the Flink operators but that only works when the CSV is very
>> small (a few hundred subscribers). I¹ve tried creating another stream
>>for
>> the CSV and connecting the streams but that doesn¹t yield anything as I
>> can¹t have access to objects from both streams at the same time.
>>
>> How would you guys approach this?
>>
>> Thanks,
>> Ali
>>