Re: Cannot cancel job with savepoint due to timeout

2017-01-31 Thread Yury Ruchin
Hi Bruno,

>From the code I conclude that "akka.client.timeout" setting is what affects
this. It defaults to 60 seconds.

I'm not sure why this setting is not documented though as well as many
other "akka.*" settings - maybe there are some good reasons behind.

Regards,
Yury

2017-01-31 17:47 GMT+03:00 Bruno Aranda :

> Hi there,
>
> I am trying to cancel a job and create a savepoint (ie flink cancel -s)
> but it takes more than a minute to do that and then it fails due to the
> timeout. However, it seems that the job will be cancelled successfully and
> the savepoint made, but I can only see that through the dasboard.
>
> Cancelling job 790b60a2b44bc98854782d4e0cac05d5 with savepoint to default
> savepoint directory.
>
> 
>  The program finished with the following exception:
>
> java.util.concurrent.TimeoutException: Futures timed out after [6
> milliseconds]
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
> at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(
> BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:190)
> at scala.concurrent.Await.result(package.scala)
> at org.apache.flink.client.CliFrontend.cancel(CliFrontend.java:618)
> at org.apache.flink.client.CliFrontend.parseParameters(
> CliFrontend.java:1079)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
> at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(
> HadoopSecurityContext.java:43)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1698)
> at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(
> HadoopSecurityContext.java:40)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1117)
>
> Is there any way to configure this timeout? So we can depend on the
> outcome of this execution for scripts, etc.
>
> Thanks!
>
> Bruno
>


Re: How to get help on ClassCastException when re-submitting a job

2017-01-18 Thread Yury Ruchin
For my case I tracked down the culprit. It's been Avro indeed. I'm
providing details below, since I believe the pattern is pretty common for
such issues.

In YARN setup there are several sources where classes are loaded from:
Flink lib directory, YARN lib directories, user code. The first two sources
are handled by system classloader, the last one is loaded by
FlinkUserCodeClassLoader.

My streaming job parses Avro-encoded data using SpecificRecord facility. In
essence, the job looks like this: Source -> Avro parser (Map) -> Sink.
Parallelism is 1. Job operates inside a long-lived YARN session. I have a
subclass of SpecificRecord, say it's name is MySpecificRecord. From class
loading perspective, Avro library classes, including the SpecificRecord,
are loaded by system class loader from YARN lib dir - such classes are
shared across different Flink tasks within task manager. On the other side,
MySpecificRecord is in the job fat jar, so it gets loaded by
FlinkUserCodeClassLoader. Upon every job restart, task gets a new
FlinkUserCodeClassLoader instance, so classes from user code are confined
to a task instance.

Simply put, the parsing itself looks like this:

val bean = new
SpecificDatumReader[MySpecificRecord](MySpecificRecord.getClassSchema).read(...)

Now, the scenario:

1. I start my job. Parsing is initiated, so the SpecificDatumReader and
SpecificData get loaded by system classloader. A new
FlinkUserCodeClassloader is instantiated, let's denote its instance as "A".
MySpecificRecord then gets loaded by A.

2. SpecificData gets a singleton SpecificData.INSTANCE that holds a cache
that maps some string key derived from Avro schema to the implementing
class. So during parsing I get MySpecificRecord (A) cached there.

3. I stop the job and re-submit it. The JVM process is the same, so all
standard Avro classes, including SpecificData, remain loaded. A new task
instance is created and gets a new FlinkUserCodeClassLoader instance, let's
name it "B". A new MySpecificRecord class incarnation is loaded by B. From
JVM standpoint MySpecificRecord (B) is different from MySpecificRecord (A),
even though their bytecode is identical.

4. The job starts parsing again. SpecificDatumReader consults
SpecificData.INSTANCE's cache for any stashed classes and finds
MySpecificRecord (A) there.

5. SpecificDatumReader uses the cached MySpecificRecord (A) to instantiate
a bean for filling the parsed data in.

6. SpecificDatumReader hands the filled instance of MySpecificRecord (A)
back to job.

7. Job tries to cast MySpecificRecord (A) to MySpecificRecord (B).

8. ClassCastException :^(

I fixed the issue by not using the SpecificData.INSTANCE singleton (even
though this is considered a common and expected practice). I feed every
parser a new instance of SpecificData. This way the class cache is confined
to a parser instance and gets recycled along with it.

Hope this helps,
Yury

2017-01-16 14:03 GMT+03:00 Stephan Ewen <se...@apache.org>:

> Hi!
>
> I think Yury pointed out the correct diagnosis. Caching the classes across
> multiple jobs in the same session can cause these types of issues.
>
> For YARN single-job deployments, Flink 1.2 will not to any dynamic
> classloading any more, but start with everything in the application
> classpath.
> For YARN sessions, Flink 1.2 still uses dynamic loading, to re-use hot
> containers.
>
> Best,
> Stephan
>
>
>
> On Mon, Jan 16, 2017 at 11:07 AM, Ufuk Celebi <u...@apache.org> wrote:
>
>> @Giuliano: any updates? Very curious to figure out what's causing
>> this. As Fabian said, this is most likely a class loading issue.
>> Judging from the stack trace, you are not running with YARN but a
>> standalone cluster. Is that correct? Class loading wise nothing
>> changed between Flink 1.1 and Flink 1.2 with respect to class loading
>> and standalone clusters. Did you put any JARs into the lib folder of
>> Flink before submitting the job?
>>
>> – Ufuk
>>
>> On Thu, Jan 12, 2017 at 7:16 PM, Yury Ruchin <yuri.ruc...@gmail.com>
>> wrote:
>> > Hi,
>> >
>> > I'd like to chime in since I've faced the same issue running Flink
>> 1.1.4. I
>> > have a long-running YARN session which I use to run multiple streaming
>> jobs
>> > concurrently. Once after cancelling and resubmitting the job I saw the
>> "X
>> > cannot be cast to X" ClassCastException exception in logs. I restarted
>> YARN
>> > session, then the problem disappeared.
>> >
>> > The class that failed to be cast was autogenerated by Avro compiler. I
>> know
>> > that Avro's Java binding does caching schemas in some static
>> WeakHashMap.
>> > I'm wondering whether that may step in the way of Flink classloading

Re: How to get help on ClassCastException when re-submitting a job

2017-01-12 Thread Yury Ruchin
Hi,

I'd like to chime in since I've faced the same issue running Flink 1.1.4. I
have a long-running YARN session which I use to run multiple streaming jobs
concurrently. Once after cancelling and resubmitting the job I saw the "X
cannot be cast to X" ClassCastException exception in logs. I restarted YARN
session, then the problem disappeared.

The class that failed to be cast was autogenerated by Avro compiler. I know
that Avro's Java binding does caching schemas in some static WeakHashMap.
I'm wondering whether that may step in the way of Flink classloading design.

Anyway, I would be interested in watching the issue in Flink JIRA.

Giuliano, could you provide the issue number?

Thanks,
Yury

2017-01-11 14:11 GMT+03:00 Fabian Hueske :

> Hi Guiliano,
>
> thanks for bringing up this issue.
> A "ClassCastException: X cannot be cast to X" often points to a
> classloader issue.
> So it might actually be a bug in Flink.
>
> I assume you submit the same application (same jar file) with the same
> command right?
> Did you cancel the job before resubmitting?
>
> Can you create a JIRA issue [1] for this bug (hit the read CREATE button
> on top) and include the commit hash from which you built Flink?
> It would be great if you could provide a short example program and
> instructions how to reproduce the problem.
>
> Thank you very much,
> Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK
>
>
>
> 2017-01-11 1:22 GMT+01:00 Giuliano Caliari :
>
>> Hello,
>>
>>
>>
>> I need some guidance on how to report a bug.
>>
>>
>>
>> I’m testing version 1.2 on my local cluster and the first time I submit
>> the job everything works but whenever I re-submit the same job it fails
>> with
>>
>> org.apache.flink.client.program.ProgramInvocationException: The program
>> execution failed: Job execution failed.
>>
>> at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>> ent.java:427)
>>
>> at org.apache.flink.client.program.StandaloneClusterClient.subm
>> itJob(StandaloneClusterClient.java:101)
>>
>> at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>> ent.java:400)
>>
>> at org.apache.flink.streaming.api.environment.StreamContextEnvi
>> ronment.execute(StreamContextEnvironment.java:66)
>>
>> at org.apache.flink.streaming.api.scala.StreamExecutionEnvironm
>> ent.execute(StreamExecutionEnvironment.scala:634)
>>
>> at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147)
>>
>> at au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoi
>> nt$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22)
>>
>> at au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$
>> body.apply(TraitorApp.scala:21)
>>
>> at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>>
>> at scala.runtime.AbstractFunction0.apply$mcV$sp(
>> AbstractFunction0.scala:12)
>>
>> at scala.App$$anonfun$main$1.apply(App.scala:76)
>>
>> at scala.App$$anonfun$main$1.apply(App.scala:76)
>>
>> at scala.collection.immutable.List.foreach(List.scala:381)
>>
>> at scala.collection.generic.TraversableForwarder$class.foreach(
>> TraversableForwarder.scala:35)
>>
>> at scala.App$class.main(App.scala:76)
>>
>> at au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorA
>> pp.scala:21)
>>
>> at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>>
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:498)
>>
>> at org.apache.flink.client.program.PackagedProgram.callMainMeth
>> od(PackagedProgram.java:528)
>>
>> at org.apache.flink.client.program.PackagedProgram.invokeIntera
>> ctiveModeForExecution(PackagedProgram.java:419)
>>
>> at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>> ent.java:339)
>>
>> at org.apache.flink.client.CliFrontend.executeProgram(CliFronte
>> nd.java:831)
>>
>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
>>
>> at org.apache.flink.client.CliFrontend.parseParameters(CliFront
>> end.java:1073)
>>
>> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
>>
>> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
>>
>> at org.apache.flink.runtime.security.NoOpSecurityContext.runSec
>> ured(NoOpSecurityContext.java:29)
>>
>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
>>
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>> execution failed.
>>
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>>
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>>
>> at 

Re: window function outputs two different values

2017-01-10 Thread Yury Ruchin
Hi,

Is there a strict requirement that elements must proceed along the
processing pipeline exactly after being accounted by the reduce function?
If not, you could derive two streams from the original one to be processed
concurrently, something like this:

val protoStream = kafka source -> keyBy

val aggregateStream = protoStream -> window -> reduce
val someOtherStream = protoStream -> 

Or, if the above is not an option and window collection latency is not an
issue, you could just use generic window function or fold function. The
former gives access to window elements as an iterable, the latter allows
using custom accumulator that contains the intermediate count and window
elements seen so far.

Regards,
Yury

2017-01-10 17:43 GMT+03:00 Aljoscha Krettek :

> Hi,
> I'm afraid this is not possible with the current model. A reduce function
> is only meant to combine two values and output the result of that. Side
> effects, such as emitting further data are not allowed right now.
>
> Cheers,
> Aljoscha
>
> On Mon, 9 Jan 2017 at 15:27 tao xiao  wrote:
>
>> Hi team,
>>
>> any suggestions on below topic?
>>
>> I have a requirement that wants to output two different values from a
>> time window reduce function. Here is basic workflow
>>
>> 1. fetch data from Kafka
>> 2. flow the data to a event session window. kafka source -> keyBy ->
>> session window -> reduce
>> 3. inside the reduce function, count the number of data and also emit the
>> data itself to another operator for further processing
>>
>> As the reduce function can only emit the count, I want to know how to
>> also emit the data as well?
>>
>>
>>
>> On Sat, 7 Jan 2017 at 20:30 tao xiao  wrote:
>>
>> Hi team,
>>
>> I have a requirement that wants to output two different values from a
>> time window reduce function. Here is basic workflow
>>
>> 1. fetch data from Kafka
>> 2. flow the data to a event session window. kafka source -> keyBy ->
>> session window -> reduce
>> 3. inside the reduce function, count the number of data and also emit the
>> data itself to another operator for further processing
>>
>> As the reduce function can only emit the count, I want to know how to
>> also emit the data as well?
>>
>>


Re: Running into memory issues while running on Yarn

2017-01-05 Thread Yury Ruchin
Hi,

You containers got killed by YARN for exceeding virtual memory limits. For
some reason your container intensively allocate virtual memory while having
free physical memory.

There are some gotchas regarding such issue on CentOS, caused by
OS-specific aggressive virtual memory allocation: [1], [2]. They disable
YARN virtual memory checker to work around that.

Also in this mailing list people recently reported that high virtual memory
consumption may be caused by some libraries.

Links:
[1]
http://blog.cloudera.com/blog/2014/04/apache-hadoop-yarn-avoiding-6-time-consuming-gotchas/,
section "Killing of Tasks Due to Virtual Memory Usage"
[2] https://www.mapr.com/blog/best-practices-yarn-resource-management,
section "3. Virtual/physical memory checker".

Regards,
Yury

2017-01-05 11:54 GMT+03:00 Sachin Goel :

> Hey!
>
> I'm running locally under this configuration(copied from nodemanager logs):
> physical-memory=8192 virtual-memory=17204 virtual-cores=8
>
> Before starting a flink deployment, memory usage stats show 3.7 GB used on
> system, indicating lots of free memory for flink containers.
> However, after I submit using minimal resource requirements,
> ./yarn-session.sh -n 1 -tm 768, the cluster deploys successfully but then
> every application on system receives a sigterm and it basically kills the
> current user session, logging out of the system.
>
> The job manager and task manager logs contain just the information that a
> SIGTERM was received and shut down gracefully.
> All yarn and dfs process contain the log information showing the receipt
> of a sigterm.
>
> Here's the relevant log from nodemanager:
>
> 2017-01-05 17:00:06,089 INFO 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl:
>  Container container_1483603191971_0002_01_02 transitioned from LOCALIZED 
> to RUNNING
> 2017-01-05 17:00:06,092 INFO 
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: 
> launchContainer: [bash, 
> /opt/hadoop-2.7.3/tmp/nm-local-dir/usercache/kirk/appcache/application_1483603191971_0002/container_1483603191971_0002_01_02/default_container_executor.sh]
> 2017-01-05 17:00:08,731 INFO 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
>  Starting resource-monitoring for container_1483603191971_0002_01_02
> 2017-01-05 17:00:08,744 INFO 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
>  Memory usage of ProcessTree 17872 for container-id 
> container_1483603191971_0002_01_01: 282.7 MB of 1 GB physical memory 
> used; 2.1 GB of 2.1 GB virtual memory used
> 2017-01-05 17:00:08,744 WARN 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
>  Process tree for container: container_1483603191971_0002_01_01 has 
> processes older than 1 iteration running over the configured limit. 
> Limit=2254857728, current usage = 2255896576
> 2017-01-05 17:00:08,745 WARN 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
>  Container [pid=17872,containerID=container_1483603191971_0002_01_01] is 
> running beyond virtual memory limits. Current usage: 282.7 MB of 1 GB 
> physical memory used; 2.1 GB of 2.1 GB virtual memory used. Killing container.
> Dump of the process-tree for container_1483603191971_0002_01_01 :
>   |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) 
> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
>   |- 17872 17870 17872 17872 (bash) 0 0 21409792 812 /bin/bash -c 
> /usr/lib/jvm/java-8-openjdk-amd64//bin/java -Xmx424M  
> -Dlog.file=/opt/hadoop-2.7.3/logs/userlogs/application_1483603191971_0002/container_1483603191971_0002_01_01/jobmanager.log
>  -Dlogback.configurationFile=file:logback.xml 
> -Dlog4j.configuration=file:log4j.properties 
> org.apache.flink.yarn.YarnApplicationMasterRunner  
> 1>/opt/hadoop-2.7.3/logs/userlogs/application_1483603191971_0002/container_1483603191971_0002_01_01/jobmanager.out
>  
> 2>/opt/hadoop-2.7.3/logs/userlogs/application_1483603191971_0002/container_1483603191971_0002_01_01/jobmanager.err
>   |- 17879 17872 17872 17872 (java) 748 20 2234486784 71553 
> /usr/lib/jvm/java-8-openjdk-amd64//bin/java -Xmx424M 
> -Dlog.file=/opt/hadoop-2.7.3/logs/userlogs/application_1483603191971_0002/container_1483603191971_0002_01_01/jobmanager.log
>  -Dlogback.configurationFile=file:logback.xml 
> -Dlog4j.configuration=file:log4j.properties 
> org.apache.flink.yarn.YarnApplicationMasterRunner
>
> 2017-01-05 17:00:08,745 INFO 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
>  Removed ProcessTree with root 17872
> 2017-01-05 17:00:08,746 INFO 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl:
>  Container container_1483603191971_0002_01_01 transitioned from RUNNING 
> to KILLING
> 

Re: Rapidly failing job eventually causes "Not enough free slots"

2017-01-05 Thread Yury Ruchin
Hi,

I've faced a similar issue recently. Hope sharing my findings will help.
The problem can be split into 2 parts:

*Source of container failures*
The logs you provided indicate that YARN kills its containers for exceeding
memory limits. Important point here is that memory limit = JVM heap memory
+ off-heap memory. So if off-heap memory usage is high, YARN may kill
containers despite JVM heap consumption is fine. To solve this issue, Flink
reserves a share of container memory for off-heap memory. How much will be
reserved is controlled by yarn.heap-cutoff-ratio and
yarn.heap-cutoff-min configuration.
By default 25% of the requested container memory will be reserved for
off-heap. This is seems to be a good start, but one should experiment and
tune to meet their job specifics.

It's also worthwhile to figure out who consumes off-heap memory. Is it
Flink managed memory moved off heap (taskmanager.memory.off-heap = true)?
Is it some external library allocating something off heap? Is it your own
code?

*How Flink handles task manager failures*
Whenever a task manager fails, the Flink jobmanager decides whether it
should:
- reallocate failed task manager container
- fail application entirely
These decisions can be guided by certain configuration (
https://ci.apache.org/projects/flink/flink-docs-
release-1.1/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn). With
default settings, job manager does reallocate task manager containers up to
the point when N failures have been observed, where N is the number of
requested task managers. After that the application is stopped.

According to the logs, you have a finite number in
yarn.maximum-failed-containers (11, as I can see from the logs - this may
be set by Flink if not provided explicitly). On 12th container restart,
jobmanager gives up and the application stops. I'm not sure why it keeps
reporting not enough slots after that point. In my experience this may
happen when job eats up all the available slots, so that after container
failure its tasks cannot be restarted in other (live) containers. But I
believe once the decision to stop the application is made, there should not
be any further attempts to restart the job, hence no logs like those.
Hopefully, someone else will explain this to us :)

In my case I made jobmanager restart containers infinitely by setting
yarn.maximum-failed-containers
= -1, so that taskmanager failure never results in application death. Note
this is unlikely a good choice for a batch job.

Regards,
Yury

2017-01-05 3:21 GMT+03:00 Shannon Carey :

> In Flink 1.1.3 on emr-5.2.0, I've experienced a particular problem twice
> and I'm wondering if anyone has some insight about it.
>
> In both cases, we deployed a job that fails very frequently (within 15s-1m
> of launch). Eventually, the Flink cluster dies.
>
> The sequence of events looks something like this:
>
>- bad job is launched
>- bad job fails & is restarted many times (I didn't have the
>"failure-rate" restart strategy configuration right)
>- Task manager logs: org.apache.flink.yarn.YarnTaskManagerRunner
>(SIGTERM handler): RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
>- At this point, the YARN resource manager also logs the container
>failure
>- More logs: Container ResourceID{resourceId='
>container_1481658997383_0003_01_13'} failed. Exit status: Pmem
>limit exceeded (-104)
>- Diagnostics for container ResourceID{resourceId='
>container_1481658997383_0003_01_13'} in state COMPLETE :
>exitStatus=Pmem limit exceeded (-104) diagnostics=Container
>[pid=21246,containerID=container_1481658997383_0003_01_13] is
>running beyond physical memory limits. Current usage: 5.6 GB of 5.6 GB
>physical memory used; 9.6 GB of 28.1 GB virtual memory used. Killing
>container.
>Container killed on request. Exit code is 143
>Container exited with a non-zero exit code 143
>Total number of failed containers so far: 12
>Stopping YARN session because the number of failed containers (12)
>exceeded the maximum failed containers (11). This number is controlled by
>the 'yarn.maximum-failed-containers' configuration setting. By default
>its the number of requested containers.
>- From here onward, the logs repeatedly show that jobs fail to restart
>due to 
> "org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>Not enough free slots available to run the job. You can decrease the
>operator parallelism or increase the number of slots per TaskManager in the
>configuration. Task to schedule: < Attempt #68 (Source: …) @ (unassigned) -
>[SCHEDULED] > with groupID < 73191c171abfff61fb5102c161274145 > in
>sharing group < SlotSharingGroup [73191c171abfff61fb5102c161274145,
>19596f7834805c8409c419f0edab1f1b] >. Resources available to scheduler:
>Number of instances=0, total number of slots=0, available slots=0"
>- 

Re: 1.1.4 IntelliJ Problem

2017-01-04 Thread Yury Ruchin
Hi Stephan,

It looks like you have libraries from different versions of Flink
distribution on the same classpath.

ForkableFlinkMiniCluster resides in flink-test-utils. As of distribution
version 1.1.3 it invokes JobManager.startJobManagerActors() with 6
arguments. The signature changed by 1.1.4, and ForkableFlinkMiniCluster now
invokes the method with 8 arguments of different types. This might mean
that flink-test-utils library on your classpath has version 1.1.3 whereas
flink-runtime has 1.1.4.

You might want to thoroughly inspect your classpath to ensure that every
Flink-related dependency has version 1.1.4.

Regards,
Yury

2017-01-04 11:20 GMT+03:00 Stephan Epping :

> I also changed the scala version of the packages/artifacts to 2.11, with
> no success.
> Further, I am not deeply familiar with maven or java dependency management
> at all.
>
> best Stephan
>
> On 03 Jan 2017, at 22:06, Stephan Ewen  wrote:
>
> Hi!
>
> It is probably some inconsistent configuration in the IDE.
>
> It often helps to do "Maven->Reimport" or use "restart and clear caches".
>
>
> On Tue, Jan 3, 2017 at 9:48 AM, Stephan Epping 
> wrote:
>
>> Hey,
>>
>>
>> thanks for the reply. I didn’t change the scala version, as it worked 
>> before. I just changed the flink version in my pom.xml thats it, a one line 
>> change.
>>
>> Maybe you could elaborate a bit more, what I can do to change the scala 
>> version?
>>
>>
>> best Stephan
>>
>>
>>
>> On 03 Jan 2017, at 03:11, Kurt Young  wrote:
>>
>> Seems like you didn't setup the correct scala SDK
>>
>> best,
>> Kurt
>>
>> On Mon, Jan 2, 2017 at 10:41 PM, Stephan Epping <
>> stephan.epp...@zweitag.de> wrote:
>>
>>> Hi,
>>>
>>> I am getting this error running my tests with 1.1.4 inside intellij ide.
>>>
>>> java.lang.NoSuchMethodError: org.apache.flink.runtime.jobma
>>> nager.JobManager$.startJobManagerActors(Lorg/apache/flink/co
>>> nfiguration/Configuration;Lakka/actor/ActorSystem;Lscala/
>>> Option;Lscala/Option;Ljava/lang/Class;Ljava/lang/Class;)Lscala/Tuple2;
>>>
>>> at org.apache.flink.test.util.ForkableFlinkMiniCluster.startJob
>>> Manager(ForkableFlinkMiniCluster.scala:103)
>>> at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonf
>>> un$1.apply(FlinkMiniCluster.scala:292)
>>> at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonf
>>> un$1.apply(FlinkMiniCluster.scala:286)
>>> at scala.collection.TraversableLike$$anonfun$map$1.apply(Traver
>>> sableLike.scala:244)
>>> at scala.collection.TraversableLike$$anonfun$map$1.apply(Traver
>>> sableLike.scala:244)
>>> at scala.collection.immutable.Range.foreach(Range.scala:141)
>>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>>> at org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(
>>> FlinkMiniCluster.scala:286)
>>> at org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(
>>> FlinkMiniCluster.scala:277)
>>> at org.apache.flink.test.util.ForkableFlinkMiniCluster.start(Fo
>>> rkableFlinkMiniCluster.scala:255)
>>> at org.apache.flink.test.util.TestBaseUtils.startCluster(TestBa
>>> seUtils.java:152)
>>> at org.apache.flink.test.util.TestBaseUtils.startCluster(TestBa
>>> seUtils.java:126)
>>> at org.flinkspector.datastream.DataStreamTestEnvironment.create
>>> TestEnvironment(DataStreamTestEnvironment.java:72)
>>>
>>> Any ideas?
>>>
>>> best,
>>> Stephan
>>>
>>>
>>
>>
>
>


Re: Implementing tee functionality in a streaming job

2016-12-20 Thread Yury Ruchin
My bad, the "Records Out" in the previous message should be read "Records
sent" as per Flink UI.

2016-12-20 18:42 GMT+03:00 Yury Ruchin <yuri.ruc...@gmail.com>:

> Well, it seems I figured it out. You're right, Fabian, it works the way
> you described. I wrote a simple test job:
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val stream = env.fromCollection(Seq.range(0, 100))
>
> stream.addSink(new DiscardingSink[Int]).disableChaining()
> stream.map {_ => 1}.countWindowAll(100).sum(0).print().disableChaining()
>
> env.setParallelism(2)
> env.execute("tee-test-job")
>
> I saw that "Records received" was 100 for both the DiscardingSink and
> window operators. I also noticed that "Records out" for the
> fromCollection() sink was 200 - and that was the key to understanding. In
> the original job I use Kafka source, and I treated its "Records out" as the
> number of records consumed by it, but it's not true. The correct
> interpretation should be " =  *  successor operators>". The additional source of confusion for me was some
> inaccuracy of sampled numbers - "Records In" values in successor operators
> were not exactly equal which made me think they receive different portions
> of the stream. I believe the inaccuracy is somewhat intrinsic to live
> stream sampling, so that's fine.
>
>
> 2016-12-20 14:35 GMT+03:00 Yury Ruchin <yuri.ruc...@gmail.com>:
>
>> Thanks Fabian, I will try creating a toy job illustrating the issue and
>> get back.
>>
>> 2016-12-20 12:58 GMT+03:00 Fabian Hueske <fhue...@gmail.com>:
>>
>>> Hi Yury,
>>>
>>> your solution should exactly solve your problem.
>>> An operator sends all outgoing records to all connected successor
>>> operators.
>>> There should not be any non-deterministic behavior or splitting of
>>> records.
>>>
>>> Can you share some example code that produces the non-deterministic
>>> behavior?
>>>
>>> Best, Fabian
>>>
>>>
>>> 2016-12-20 10:50 GMT+01:00 Yury Ruchin <yuri.ruc...@gmail.com>:
>>>
>>>> Hi all,
>>>>
>>>> I have a streaming job that I want at some point to duplicate stream,
>>>> so that one copy of it ends up in a sink, and another one goes down the
>>>> processing pipeline. This way I want to implement something similar to
>>>> "tee" Linux utility.
>>>>
>>>> I tried a naive approach that looked like this:
>>>>
>>>> val streamToMirror = env.addSource(mySource).
>>>> streamToMirror.addSink(someSink) // (1) tee
>>>> streamToMirror..addSink(anotherSink) // (2)
>>>> process further
>>>>
>>>> But the above results in the stream being split between the mySink (1)
>>>> and further processors (2) in a seemingly nondeterministic way.
>>>>
>>>> I could write through Kafka as shown in this pseudocode:
>>>>
>>>> val headStream = env.addSource(mySource).
>>>> headStream.addSink(KafkaSink("myTopic"))
>>>> val tailStream = env.addSource(KafkaSource("myTopic")).>>> here>
>>>>
>>>> But this would incur additional latency + deserialization overhead that
>>>> I would like to avoid.
>>>>
>>>> Is there any way to implement stream teeing as described?
>>>>
>>>> Thanks,
>>>> Yury
>>>>
>>>
>>>
>>
>


Re: Implementing tee functionality in a streaming job

2016-12-20 Thread Yury Ruchin
Well, it seems I figured it out. You're right, Fabian, it works the way you
described. I wrote a simple test job:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.fromCollection(Seq.range(0, 100))

stream.addSink(new DiscardingSink[Int]).disableChaining()
stream.map {_ => 1}.countWindowAll(100).sum(0).print().disableChaining()

env.setParallelism(2)
env.execute("tee-test-job")

I saw that "Records received" was 100 for both the DiscardingSink and
window operators. I also noticed that "Records out" for the
fromCollection() sink was 200 - and that was the key to understanding. In
the original job I use Kafka source, and I treated its "Records out" as the
number of records consumed by it, but it's not true. The correct
interpretation should be " =  * ". The additional source of confusion for me was some
inaccuracy of sampled numbers - "Records In" values in successor operators
were not exactly equal which made me think they receive different portions
of the stream. I believe the inaccuracy is somewhat intrinsic to live
stream sampling, so that's fine.


2016-12-20 14:35 GMT+03:00 Yury Ruchin <yuri.ruc...@gmail.com>:

> Thanks Fabian, I will try creating a toy job illustrating the issue and
> get back.
>
> 2016-12-20 12:58 GMT+03:00 Fabian Hueske <fhue...@gmail.com>:
>
>> Hi Yury,
>>
>> your solution should exactly solve your problem.
>> An operator sends all outgoing records to all connected successor
>> operators.
>> There should not be any non-deterministic behavior or splitting of
>> records.
>>
>> Can you share some example code that produces the non-deterministic
>> behavior?
>>
>> Best, Fabian
>>
>>
>> 2016-12-20 10:50 GMT+01:00 Yury Ruchin <yuri.ruc...@gmail.com>:
>>
>>> Hi all,
>>>
>>> I have a streaming job that I want at some point to duplicate stream, so
>>> that one copy of it ends up in a sink, and another one goes down the
>>> processing pipeline. This way I want to implement something similar to
>>> "tee" Linux utility.
>>>
>>> I tried a naive approach that looked like this:
>>>
>>> val streamToMirror = env.addSource(mySource).
>>> streamToMirror.addSink(someSink) // (1) tee
>>> streamToMirror..addSink(anotherSink) // (2) process
>>> further
>>>
>>> But the above results in the stream being split between the mySink (1)
>>> and further processors (2) in a seemingly nondeterministic way.
>>>
>>> I could write through Kafka as shown in this pseudocode:
>>>
>>> val headStream = env.addSource(mySource).
>>> headStream.addSink(KafkaSink("myTopic"))
>>> val tailStream = env.addSource(KafkaSource("myTopic")).>> here>
>>>
>>> But this would incur additional latency + deserialization overhead that
>>> I would like to avoid.
>>>
>>> Is there any way to implement stream teeing as described?
>>>
>>> Thanks,
>>> Yury
>>>
>>
>>
>


Re: Implementing tee functionality in a streaming job

2016-12-20 Thread Yury Ruchin
Thanks Fabian, I will try creating a toy job illustrating the issue and get
back.

2016-12-20 12:58 GMT+03:00 Fabian Hueske <fhue...@gmail.com>:

> Hi Yury,
>
> your solution should exactly solve your problem.
> An operator sends all outgoing records to all connected successor
> operators.
> There should not be any non-deterministic behavior or splitting of records.
>
> Can you share some example code that produces the non-deterministic
> behavior?
>
> Best, Fabian
>
>
> 2016-12-20 10:50 GMT+01:00 Yury Ruchin <yuri.ruc...@gmail.com>:
>
>> Hi all,
>>
>> I have a streaming job that I want at some point to duplicate stream, so
>> that one copy of it ends up in a sink, and another one goes down the
>> processing pipeline. This way I want to implement something similar to
>> "tee" Linux utility.
>>
>> I tried a naive approach that looked like this:
>>
>> val streamToMirror = env.addSource(mySource).
>> streamToMirror.addSink(someSink) // (1) tee
>> streamToMirror..addSink(anotherSink) // (2) process
>> further
>>
>> But the above results in the stream being split between the mySink (1)
>> and further processors (2) in a seemingly nondeterministic way.
>>
>> I could write through Kafka as shown in this pseudocode:
>>
>> val headStream = env.addSource(mySource).
>> headStream.addSink(KafkaSink("myTopic"))
>> val tailStream = env.addSource(KafkaSource("myTopic")).> here>
>>
>> But this would incur additional latency + deserialization overhead that I
>> would like to avoid.
>>
>> Is there any way to implement stream teeing as described?
>>
>> Thanks,
>> Yury
>>
>
>


Implementing tee functionality in a streaming job

2016-12-20 Thread Yury Ruchin
Hi all,

I have a streaming job that I want at some point to duplicate stream, so
that one copy of it ends up in a sink, and another one goes down the
processing pipeline. This way I want to implement something similar to
"tee" Linux utility.

I tried a naive approach that looked like this:

val streamToMirror = env.addSource(mySource).
streamToMirror.addSink(someSink) // (1) tee
streamToMirror..addSink(anotherSink) // (2) process
further

But the above results in the stream being split between the mySink (1) and
further processors (2) in a seemingly nondeterministic way.

I could write through Kafka as shown in this pseudocode:

val headStream = env.addSource(mySource).
headStream.addSink(KafkaSink("myTopic"))
val tailStream = env.addSource(KafkaSource("myTopic")).

But this would incur additional latency + deserialization overhead that I
would like to avoid.

Is there any way to implement stream teeing as described?

Thanks,
Yury


Re: Blocking RichFunction.open() and backpressure

2016-12-19 Thread Yury Ruchin
Thanks Fabian, that quite explains what's going on.

2016-12-19 12:19 GMT+03:00 Fabian Hueske <fhue...@gmail.com>:

> Hi Yury,
>
> Flink's operators start processing as soon as they receive data. If an
> operator produces more data than its successor task can process, the data
> is buffered in Flink's network stack, i.e., its network buffers.
> The backpressure mechanism kicks in when all network buffers are in use
> and no more data can be buffered. In this case, a producing task will block
> until a network buffer becomes available.
>
> If the window operator in your job aggregates the data, only the
> aggregates will be buffered.
> This might explain why the first operators of job are able to start
> processing while the FlatMap operator is still setting up itself.
>
> Best,
> Fabian
>
> 2016-12-17 13:42 GMT+01:00 Yury Ruchin <yuri.ruc...@gmail.com>:
>
>> Hi all,
>>
>> I have a streaming job that essentially looks like this: KafkaSource ->
>> Map -> EventTimeWindow -> RichFlatMap -> CustomSink. The RichFlatMap part
>> does some heavy lifting in open(), so that the open() call blocks for
>> several minutes. I assumed that until open() returns the backpressure
>> mechanism would slow down the entire upstream up to the KafkaSource, so
>> that no new records would be emitted to the pipeline until the RichFlatMap
>> is ready. What I actually observe is that the KafkaSource, Map and
>> EventTimeWindow continue processing - the in/out records, in/out MB
>> counters keep increasing. The RichFlatMap and its downstream CustomSink
>> have 0 as expected, until the RichFlatMap is actually done with open(). The
>> backpressure monitor in Flink UI shows "OK" for all operators.
>>
>> Why doesn't backpressure mechanism work in this case?
>>
>> Thanks,
>> Yury
>>
>
>


Blocking RichFunction.open() and backpressure

2016-12-17 Thread Yury Ruchin
Hi all,

I have a streaming job that essentially looks like this: KafkaSource -> Map
-> EventTimeWindow -> RichFlatMap -> CustomSink. The RichFlatMap part does
some heavy lifting in open(), so that the open() call blocks for several
minutes. I assumed that until open() returns the backpressure mechanism
would slow down the entire upstream up to the KafkaSource, so that no new
records would be emitted to the pipeline until the RichFlatMap is ready.
What I actually observe is that the KafkaSource, Map and EventTimeWindow
continue processing - the in/out records, in/out MB counters keep
increasing. The RichFlatMap and its downstream CustomSink have 0 as
expected, until the RichFlatMap is actually done with open(). The
backpressure monitor in Flink UI shows "OK" for all operators.

Why doesn't backpressure mechanism work in this case?

Thanks,
Yury


Re: Flink 1.1.3 web UI is loading very slowly

2016-12-16 Thread Yury Ruchin
Thanks for the clue, Stephan! I will check.

2016-12-16 12:44 GMT+03:00 Stephan Ewen <se...@apache.org>:

> Not sure if that is the problem, but upon first access to a resource, The
> web server extracts the resource from the JAR file and puts it into the
> temp directpry. Maybe that is slow for whatever reason?
>
> On Thu, Dec 15, 2016 at 8:04 PM, Yury Ruchin <yuri.ruc...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I'm seeing an issue with the load speed of Flink Web UI when running in
>> YARN session. Initial load takes several minutes or even more, although
>> according to the browser console there are only a couple of MBs to
>> download. When the loading is complete, the UI itself is quite responsive.
>>
>> I don't see such an issue with YARN UI loading from the same cluster over
>> the same network. I suspected that YARN proxy may cause the slowdown and
>> hit the direct Flink web UI URL - the same story. CPU, load average and
>> memory consumption on the node hosting job manager are low. I gave
>> jobmanager from 4Gb to 10Gb heap - no effect.
>>
>> Any ideas on how to approach the problem are appreciated.
>>
>> Thanks,
>> Yury
>>
>
>


Re: Jar hell when running streaming job in YARN session

2016-12-15 Thread Yury Ruchin
Hi Kidong, Stephan,

First of all, you've saved me days of investigation - thanks a lot! The
problem is solved now. More details follow.

I use the official Flink 1.1.3 + Hadoop 2.7 distribution. My problem was
indeed caused by clash of classes under "com.google" in my fat jar and in
the YARN library directories. The shaded Guava classes in Flink
distribution didn't hurt. Initially I took the wrong way - I tried to
change class loading order. Instead, I should have used the same shading
approach that Flink uses and that Kidong described above - simply relocate
problematic classes to other package in fat jar.

Thanks again,
Yury

2016-12-15 14:21 GMT+03:00 Stephan Ewen <se...@apache.org>:

> Hi Yuri!
>
> Flink should hide Hadoop's Guava, to avoid this issue.
>
> Did you build Flink yourself from source? Maybe you are affected by this
> issue: https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/setup/building.html#dependency-shading
>
> Stephan
>
>
> On Thu, Dec 15, 2016 at 11:18 AM, Kidong Lee <mykid...@gmail.com> wrote:
>
>> To avoid guava conflict, I use maven shade plugin to package my fat jar.
>> If you use maven, the shade plugin looks like this:
>> ...
>>
>> 
>>org.apache.maven.plugins
>>maven-shade-plugin
>>2.4.2
>>
>>   false
>>   true
>>   flink-job
>>   
>>  
>> com.google
>> yourpackage.shaded.google
>>  
>>   
>>   
>>  > implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
>>  > implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
>> META-INF/spring.handlers
>>  
>>  > implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
>> META-INF/spring.schemas
>>  
>>   
>>   
>>  
>> *:*
>> 
>>org/datanucleus/**
>>META-INF/*.SF
>>META-INF/*.DSA
>>META-INF/*.RSA
>> 
>>  
>>   
>>
>> 
>> ...
>>
>>
>> To package fat jar:
>>
>> mvn -e -DskipTests=true clean install shade:shade;
>>
>>
>> I hope, it helps.
>>
>> - Kidong Lee.
>>
>>
>>
>>
>>
>> 2016-12-15 19:04 GMT+09:00 Yury Ruchin <yuri.ruc...@gmail.com>:
>>
>>> Hi,
>>>
>>> I have run into a classpath issue when running Flink streaming job in
>>> YARN session. I package my app into a fat jar with all the dependencies
>>> needed. One of them is Google Guava. I then submit the jar to the session.
>>> The task managers pre-created by the session build their classpath from the
>>> FLINK_LIB_DIR and Hadoop / YARN lib directories. Unfortunately, there is a
>>> dated Guava version pulled along with Hadoop dependencies which conflicts
>>> with the version my app needs. Even worse, the Flink lib dir and Hadoop
>>> libraries take precedence over my jar.
>>>
>>> If I remember correctly, in Spark there is an option meaning "user
>>> classpath goes first when looking for classes". I couldn't find anything
>>> similar for Flink. I tried "flink run -C file:///path/to/my/libraries" in
>>> the hope to extend the classpath but the old Guava version takes precedence
>>> anyway.
>>>
>>> How else can I bump "priority" of my app jar during the load process?
>>>
>>> Thanks,
>>> Yury
>>>
>>
>>
>


Jar hell when running streaming job in YARN session

2016-12-15 Thread Yury Ruchin
Hi,

I have run into a classpath issue when running Flink streaming job in YARN
session. I package my app into a fat jar with all the dependencies needed.
One of them is Google Guava. I then submit the jar to the session. The task
managers pre-created by the session build their classpath from the
FLINK_LIB_DIR and Hadoop / YARN lib directories. Unfortunately, there is a
dated Guava version pulled along with Hadoop dependencies which conflicts
with the version my app needs. Even worse, the Flink lib dir and Hadoop
libraries take precedence over my jar.

If I remember correctly, in Spark there is an option meaning "user
classpath goes first when looking for classes". I couldn't find anything
similar for Flink. I tried "flink run -C file:///path/to/my/libraries" in
the hope to extend the classpath but the old Guava version takes precedence
anyway.

How else can I bump "priority" of my app jar during the load process?

Thanks,
Yury


Re: Container JMX port setting / discovery for Flink on YARN

2016-11-25 Thread Yury Ruchin
Thanks Robert, it works like a charm.

2016-11-25 12:55 GMT+03:00 Robert Metzger <rmetz...@apache.org>:

> Hi Yury,
>
> Flink is using its own JMX server instance (not the JVM's one). Therefore,
> you can configure the server yourself.
> Check out this documentation page: https://ci.apache.org/
> projects/flink/flink-docs-release-1.2/monitoring/metrics.html#reporter
>
> metrics.reporter.my_jmx_reporter.class: 
> org.apache.flink.metrics.jmx.JMXReporter
> metrics.reporter.my_jmx_reporter.port: 9020-9040
>
>
> Flink will print the port it is using in the end into the log of each
> TaskManager.
>
>
> On Fri, Nov 25, 2016 at 10:53 AM, Yury Ruchin <yuri.ruc...@gmail.com>
> wrote:
>
>> Thanks Stefan! I think this would help if I had just one container per
>> node. But that's not my case - there are multiple TaskManagers running on
>> the same node, so setting the same value will likely result in port
>> conflict.
>>
>> 2016-11-25 12:28 GMT+03:00 Stefan Richter <s.rich...@data-artisans.com>:
>>
>>> Hi,
>>>
>>> can you try adding the following to your flink.yaml?
>>>
>>> env.java.opts: -Dcom.sun.management.jmxremote
>>> -Dcom.sun.management.jmxremote.port=9999
>>> -Dcom.sun.management.jmxremote.authenticate=false
>>> -Dcom.sun.management.jmxremote.ssl=false
>>>
>>> Best,
>>> Stefan
>>>
>>>
>>> > Am 24.11.2016 um 16:47 schrieb Yury Ruchin <yuri.ruc...@gmail.com>:
>>> >
>>> > Hi,
>>> >
>>> > I want to enable JMX for my Flink streaming app running in YARN
>>> session. How can I specify which ports containers will listen to? If I
>>> cannot control it (e. g. they will be chosen randomly) - how can I detect
>>> which ports were picked by containers: inspecting logs, looking at the Web
>>> UI etc.?
>>> >
>>> > Example: in Apache Storm it is easy to derive JMX port numbers from
>>> worker port (slot) numbers and pass it as a JVM argument to the worker
>>> start command, so that every slot is always associated with a stable and
>>> well-known JMX port.
>>> >
>>> > Any clue is appreciated. Thanks!
>>>
>>>
>>
>


Re: Container JMX port setting / discovery for Flink on YARN

2016-11-25 Thread Yury Ruchin
Thanks Stefan! I think this would help if I had just one container per
node. But that's not my case - there are multiple TaskManagers running on
the same node, so setting the same value will likely result in port
conflict.

2016-11-25 12:28 GMT+03:00 Stefan Richter <s.rich...@data-artisans.com>:

> Hi,
>
> can you try adding the following to your flink.yaml?
>
> env.java.opts: -Dcom.sun.management.jmxremote
> -Dcom.sun.management.jmxremote.port=
> -Dcom.sun.management.jmxremote.authenticate=false
> -Dcom.sun.management.jmxremote.ssl=false
>
> Best,
> Stefan
>
>
> > Am 24.11.2016 um 16:47 schrieb Yury Ruchin <yuri.ruc...@gmail.com>:
> >
> > Hi,
> >
> > I want to enable JMX for my Flink streaming app running in YARN session.
> How can I specify which ports containers will listen to? If I cannot
> control it (e. g. they will be chosen randomly) - how can I detect which
> ports were picked by containers: inspecting logs, looking at the Web UI
> etc.?
> >
> > Example: in Apache Storm it is easy to derive JMX port numbers from
> worker port (slot) numbers and pass it as a JVM argument to the worker
> start command, so that every slot is always associated with a stable and
> well-known JMX port.
> >
> > Any clue is appreciated. Thanks!
>
>


Container JMX port setting / discovery for Flink on YARN

2016-11-24 Thread Yury Ruchin
Hi,

I want to enable JMX for my Flink streaming app running in YARN session.
How can I specify which ports containers will listen to? If I cannot
control it (e. g. they will be chosen randomly) - how can I detect which
ports were picked by containers: inspecting logs, looking at the Web UI
etc.?

Example: in Apache Storm it is easy to derive JMX port numbers from worker
port (slot) numbers and pass it as a JVM argument to the worker start
command, so that every slot is always associated with a stable and
well-known JMX port.

Any clue is appreciated. Thanks!


Freeing resources in SourceFunction

2016-11-03 Thread Yury Ruchin
Hello,

I'm writing a custom source function for my streaming job. The source
function manages some connection pool. I want to close that pool once my
job is "finished" (since the stream is unbounded, the only way I see is to
cancel the streaming job). Since I inherit RichSourceFunction, there are
two candidates: cancel() and close(). I'm wondering which one should be
picked. Looking for best practices, I resorted to the existing sources. One
example is FlinkKafkaConsumerBase which has both callbacks implemented
identically (one delegating to the other). Counterexample is
InputFormatSourceFunction which uses cancel() only to reset flag, while
actual cleanup is done in close(). Which of these approaches is a better
fit in the described case?

Just FYI, Flink version I use is 1.1.2.

Thanks,
Yury