Can flink aggregate in local TM,then aggregate in global TM?

2017-12-14 Thread zhaifengwei
I have a cluster environment, I need aggregate dataStream on it. 
I`m wonder whether I can aggregate in local server first, then aggregate in
global. 
When I aggregate dataStream in global, the Network IO will increase fast. 
I just want decrease the Network IO, So I need aggregate in local server
first. 
How can I do it. 

DataStream dataIn 
dataIn.map().filter().assignTimestampsAndWatermarks().keyBy().window().Fold()



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink 1.4.0 can not override JAVA_HOME for single-job deployment on YARN

2017-12-14 Thread 杨光
Yes , i'm using Java8 , and i found the 1.4  version  provided  new
parameters : "containerized.master.env.ENV_VAR1" and
"containerized.taskmanager.env".
I change my start command from "-yD yarn.taskmanager.env.JAVA_HOME" to
" -yD containerized.taskmanager.env.JAVA_HOME=/opt/jdk1.8.0_121 -yD
containerized.master.env.JAVA_HOME=/opt/jdk1.8.0_121 " and it  works
.
Thanks a lot  .

2017-12-14 20:52 GMT+08:00 Nico Kruber :
> Hi,
> are you running Flink in an JRE >= 8? We dropped Java 7 support for
> Flink 1.4.
>
>
> Nico
>
> On 14/12/17 12:35, 杨光 wrote:
>> Hi,
>> I am usring flink single-job mode on YARN. After i upgrade flink
>> verson from 1.3.2 to  1.4.0, the parameter
>> "yarn.taskmanager.env.JAVA_HOME" doesn’t work  as before.
>> I can only found error log on yarn like this:
>>
>> Exception in thread "main" java.lang.UnsupportedClassVersionError:
>> org/apache/flink/yarn/YarnApplicationMasterRunner : Unsupported
>> major.minor version 52.0
>> at java.lang.ClassLoader.defineClass1(Native Method)
>> at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
>> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
>> at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>> at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:482)
>>
>> Is there something different i should know  to avoid this problem ?
>> Thanks!
>>
>


docker-flink images and CI

2017-12-14 Thread Colin Williams
I created the following issue on here:
https://github.com/docker-flink/docker-flink/issues/29 where it was
suggested I should bring this up on the list.

1.4 has been released. Hurray!

But there isn't yet a dockerfile / image for the release. Furthermore, it
would be nice to have dockerfile / images for each RC release, so people
can incrementally test features using docker before release.

Is it possible to use CI to generate images and add Dockerfiles for each
release?


Consecutive windowed operations

2017-12-14 Thread Ron Crocker
In the 1.4 docs I stumbled on this section: Consecutive windowed operations 


This is very nice, but now I’m curious if it has always applied and was just 
not well documented or this is new behavior in 1.4 (side note: I’m trying to 
decide if I need to upgrade from 1.3.2 to 1.4).

Thanks!
Ron
—
Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
rcroc...@newrelic.com
M: +1 630 363 8835



Re: Flink long-running streaming job, Keytab authentication

2017-12-14 Thread Eron Wright
To my knowledge the various RPC clients take care of renewal (whether
reactively or using a renewal thread).  Some examples:
https://github.com/apache/hadoop/blob/release-2.7.3-RC2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java#L638
https://github.com/apache/kafka/blob/0.10.2/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java#L139

So I don't think Flink needs a renewal thread but the overall situation is
complex.  Some stack traces and logs may be needed to understand the issue.

Eron

On Thu, Dec 14, 2017 at 8:17 AM, Oleksandr Nitavskyi  wrote:

> Hello all,
>
>
>
> I have a question about Kerberos authentication in Yarn environment for
> long running streaming job. According to the documentation (
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/security-
> kerberos.html#yarnmesos-mode ) Flink’s solution is to use keytab in order
> to perform authentication in YARN perimeter.
>
>
>
> If keytab is configured, Flink uses
> *UserGroupInformation#loginUserFromKeytab* method in order to perform
> authentication. In the YARN Security documentation (
>
> https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-
> project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/
> YarnApplicationSecurity.md#keytabs-for-am-and-containers-
> distributed-via-yarn ) mentioned that it should be enough:
>
>
>
> *Launched containers must themselves log in
> via UserGroupInformation.loginUserFromKeytab(). UGI handles the login, and
> schedules a background thread to relogin the user periodically.*
>
>
>
> But in reality if we check the Source code of UGI, we can see that no
> background Thread is created: https://github.com/apache/
> hadoop/blob/trunk/hadoop-common-project/hadoop-common/
> src/main/java/org/apache/hadoop/security/UserGroupInformation.java#L1153.
> There are just created javax.security.auth.login.LoginContext
>
> and performed authentication. Looks like it is true for different Hadoop
> branches - 2.7, 2.8, 3.0, trunk. So Flink also doesn’t create any
> background Threads: https://github.com/apache/flink/blob/master/flink-
> runtime/src/main/java/org/apache/flink/runtime/security/
> modules/HadoopModule.java#L69. So in my case job loses credentials for
> ResourceManager and HDFS after some time (12 hours in my case).
>
>
>
> Looks like UGI’s code is not aligned with the documentation and it
> doesn’t relogin periodically.
>
> But do you think patching with background Thread which performs
> UGI#reloginUserFromKeytab can be a solution?
>
>
>
> P.S. We are running Flink as a single job on Yarn.
>
>
>
>
>


Re: Fenzo NoClassDefFoundError: ObjectMapper

2017-12-14 Thread Jared Stehler
No problem, thanks! 

--
Jared Stehler
Chief Architect - Intellify Learning
o: 617.701.6330 x703



> On Dec 14, 2017, at 3:59 PM, Eron Wright  wrote:
> 
> Jared, I think you're correct that the shaded `ObjectMapper` is missing.   
> Based on the above details and a quick look at the Fenzo code, it appears 
> that this bug is expressed when you've configured some hard constraints that 
> Mesos couldn't satisfy.  Do you agree?  Thanks also for suggesting a fix, 
> would you mind annotating FLINK-8265?
> 
> https://issues.apache.org/jira/browse/FLINK-8265 
> 
> 
> Sorry about this!
>  
> Eron
> 
> 
> On Thu, Dec 14, 2017 at 12:17 PM, Jared Stehler 
>  > wrote:
> Possibly missing an include for jackson here in the flink-mesos pom?
> 
>   
>combine.children="append">
>   
> com.google.protobuf:protobuf-java
>   
> org.apache.mesos:mesos
>   
> com.netflix.fenzo:fenzo-core
>   
>   
>combine.children="append">
>   
>   
> com.google.protobuf
>   
> org.apache.flink.mesos.shaded.com.google.protobuf
>   
>   
>   
> com.fasterxml.jackson
>   
> org.apache.flink.mesos.shaded.com 
> .fasterxml.jackson
>   
>   
> 
> --
> Jared Stehler
> Chief Architect - Intellify Learning
> o: 617.701.6330 x703 
> 
> 
> 
>> On Dec 14, 2017, at 3:10 PM, Jared Stehler 
>> > > wrote:
>> 
>> I see a shaded jackson class with jackson2 in the package, but none with the 
>> path shown below.
>> 
>> --
>> Jared Stehler
>> Chief Architect - Intellify Learning
>> o: 617.701.6330 x703 
>> 
>> 
>> 
>>> On Dec 14, 2017, at 3:05 PM, Jared Stehler 
>>> >> > wrote:
>>> 
>>> Getting the following error on app master startup with flink-mesos 1.4.0:
>>> 
>>> ExecutionException: java.lang.NoClassDefFoundError: 
>>> org/apache/flink/mesos/shaded/com/fasterxml/jackson/databind/ObjectMapper
>>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>> at com.netflix.fenzo.TaskScheduler.doSchedule(TaskScheduler.java:678)
>>> at com.netflix.fenzo.TaskScheduler.scheduleOnce(TaskScheduler.java:600)
>>> at 
>>> org.apache.flink.mesos.scheduler.LaunchCoordinator$$anonfun$5.applyOrElse(LaunchCoordinator.scala:173)
>>> ...
>>> (17 additional frame(s) were not displayed)
>>> 
>>> NoClassDefFoundError: 
>>> org/apache/flink/mesos/shaded/com/fasterxml/jackson/databind/ObjectMapper
>>> at 
>>> com.netflix.fenzo.ConstraintFailure.(ConstraintFailure.java:35)
>>> at 
>>> com.netflix.fenzo.AssignableVirtualMachine.findFailedHardConstraints(AssignableVirtualMachine.java:784)
>>> at 
>>> com.netflix.fenzo.AssignableVirtualMachine.tryRequest(AssignableVirtualMachine.java:581)
>>> at 
>>> com.netflix.fenzo.TaskScheduler.evalAssignments(TaskScheduler.java:796)
>>> at com.netflix.fenzo.TaskScheduler.access$1500(TaskScheduler.java:70)
>>> ...
>>> (6 additional frame(s) were not displayed)
>>> 
>>> ClassNotFoundException: 
>>> org.apache.flink.mesos.shaded.com.fasterxml.jackson.databind.ObjectMapper
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>> at 
>>> com.netflix.fenzo.ConstraintFailure.(ConstraintFailure.java:35)
>>> ...
>>> (10 additional frame(s) were not displayed)
>>> 
>>> 
>>> --
>>> Jared Stehler
>>> Chief Architect - Intellify Learning
>>> o: 617.701.6330 x703 
>>> 
>>> 
>>> 
>> 
> 

Re: Fenzo NoClassDefFoundError: ObjectMapper

2017-12-14 Thread Eron Wright
Jared, I think you're correct that the shaded `ObjectMapper` is missing.
 Based on the above details and a quick look at the Fenzo code, it appears
that this bug is expressed when you've configured some hard constraints
that Mesos couldn't satisfy.  Do you agree?  Thanks also for suggesting a
fix, would you mind annotating FLINK-8265?

https://issues.apache.org/jira/browse/FLINK-8265

Sorry about this!

Eron


On Thu, Dec 14, 2017 at 12:17 PM, Jared Stehler <
jared.steh...@intellifylearning.com> wrote:

> Possibly missing an include for jackson here in the flink-mesos pom?
>
> 
> * *
> com.google.protobuf:protobuf-java
> org.apache.mesos:mesos
> com.netflix.fenzo:fenzo-core
> 
> 
> 
> 
> com.google.protobuf
> org.apache.flink.mesos.shaded.com.google.
> protobuf
> 
> 
> com.fasterxml.jackson
> org.apache.flink.mesos.shaded.com.fasterxml.jackson shadedPattern>
> 
> 
>
> --
> Jared Stehler
> Chief Architect - Intellify Learning
> o: 617.701.6330 x703 <(617)%20701-6330>
>
>
>
> On Dec 14, 2017, at 3:10 PM, Jared Stehler  intellifylearning.com> wrote:
>
> I see a shaded jackson class with *jackson2* in the package, but none
> with the path shown below.
>
> --
> Jared Stehler
> Chief Architect - Intellify Learning
> o: 617.701.6330 x703 <(617)%20701-6330>
>
>
>
> On Dec 14, 2017, at 3:05 PM, Jared Stehler  intellifylearning.com> wrote:
>
> Getting the following error on app master startup with flink-mesos 1.4.0:
>
> ExecutionException: java.lang.NoClassDefFoundError: 
> org/apache/flink/mesos/shaded/com/fasterxml/jackson/databind/ObjectMapper
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at com.netflix.fenzo.TaskScheduler.doSchedule(TaskScheduler.java:678)
> at com.netflix.fenzo.TaskScheduler.scheduleOnce(TaskScheduler.java:600)
> at 
> org.apache.flink.mesos.scheduler.LaunchCoordinator$$anonfun$5.applyOrElse(LaunchCoordinator.scala:173)
> ...
> (17 additional frame(s) were not displayed)
>
> NoClassDefFoundError: 
> org/apache/flink/mesos/shaded/com/fasterxml/jackson/databind/ObjectMapper
> at com.netflix.fenzo.ConstraintFailure.(ConstraintFailure.java:35)
> at 
> com.netflix.fenzo.AssignableVirtualMachine.findFailedHardConstraints(AssignableVirtualMachine.java:784)
> at 
> com.netflix.fenzo.AssignableVirtualMachine.tryRequest(AssignableVirtualMachine.java:581)
> at com.netflix.fenzo.TaskScheduler.evalAssignments(TaskScheduler.java:796)
> at com.netflix.fenzo.TaskScheduler.access$1500(TaskScheduler.java:70)
> ...
> (6 additional frame(s) were not displayed)
>
> ClassNotFoundException: 
> org.apache.flink.mesos.shaded.com.fasterxml.jackson.databind.ObjectMapper
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at com.netflix.fenzo.ConstraintFailure.(ConstraintFailure.java:35)
> ...
> (10 additional frame(s) were not displayed)
>
>
>
> --
> Jared Stehler
> Chief Architect - Intellify Learning
> o: 617.701.6330 x703 <(617)%20701-6330>
>
>
>
>
>
>


Re: Fenzo NoClassDefFoundError: ObjectMapper

2017-12-14 Thread Jared Stehler
Possibly missing an include for jackson here in the flink-mesos pom?




com.google.protobuf:protobuf-java

org.apache.mesos:mesos

com.netflix.fenzo:fenzo-core





com.google.protobuf

org.apache.flink.mesos.shaded.com.google.protobuf



com.fasterxml.jackson

org.apache.flink.mesos.shaded.com.fasterxml.jackson



--
Jared Stehler
Chief Architect - Intellify Learning
o: 617.701.6330 x703



> On Dec 14, 2017, at 3:10 PM, Jared Stehler 
>  wrote:
> 
> I see a shaded jackson class with jackson2 in the package, but none with the 
> path shown below.
> 
> --
> Jared Stehler
> Chief Architect - Intellify Learning
> o: 617.701.6330 x703
> 
> 
> 
>> On Dec 14, 2017, at 3:05 PM, Jared Stehler 
>> > > wrote:
>> 
>> Getting the following error on app master startup with flink-mesos 1.4.0:
>> 
>> ExecutionException: java.lang.NoClassDefFoundError: 
>> org/apache/flink/mesos/shaded/com/fasterxml/jackson/databind/ObjectMapper
>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>> at com.netflix.fenzo.TaskScheduler.doSchedule(TaskScheduler.java:678)
>> at com.netflix.fenzo.TaskScheduler.scheduleOnce(TaskScheduler.java:600)
>> at 
>> org.apache.flink.mesos.scheduler.LaunchCoordinator$$anonfun$5.applyOrElse(LaunchCoordinator.scala:173)
>> ...
>> (17 additional frame(s) were not displayed)
>> 
>> NoClassDefFoundError: 
>> org/apache/flink/mesos/shaded/com/fasterxml/jackson/databind/ObjectMapper
>> at 
>> com.netflix.fenzo.ConstraintFailure.(ConstraintFailure.java:35)
>> at 
>> com.netflix.fenzo.AssignableVirtualMachine.findFailedHardConstraints(AssignableVirtualMachine.java:784)
>> at 
>> com.netflix.fenzo.AssignableVirtualMachine.tryRequest(AssignableVirtualMachine.java:581)
>> at 
>> com.netflix.fenzo.TaskScheduler.evalAssignments(TaskScheduler.java:796)
>> at com.netflix.fenzo.TaskScheduler.access$1500(TaskScheduler.java:70)
>> ...
>> (6 additional frame(s) were not displayed)
>> 
>> ClassNotFoundException: 
>> org.apache.flink.mesos.shaded.com.fasterxml.jackson.databind.ObjectMapper
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at 
>> com.netflix.fenzo.ConstraintFailure.(ConstraintFailure.java:35)
>> ...
>> (10 additional frame(s) were not displayed)
>> 
>> 
>> --
>> Jared Stehler
>> Chief Architect - Intellify Learning
>> o: 617.701.6330 x703
>> 
>> 
>> 
> 



Re: Fenzo NoClassDefFoundError: ObjectMapper

2017-12-14 Thread Jared Stehler
I see a shaded jackson class with jackson2 in the package, but none with the 
path shown below.

--
Jared Stehler
Chief Architect - Intellify Learning
o: 617.701.6330 x703



> On Dec 14, 2017, at 3:05 PM, Jared Stehler 
>  wrote:
> 
> Getting the following error on app master startup with flink-mesos 1.4.0:
> 
> ExecutionException: java.lang.NoClassDefFoundError: 
> org/apache/flink/mesos/shaded/com/fasterxml/jackson/databind/ObjectMapper
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at com.netflix.fenzo.TaskScheduler.doSchedule(TaskScheduler.java:678)
> at com.netflix.fenzo.TaskScheduler.scheduleOnce(TaskScheduler.java:600)
> at 
> org.apache.flink.mesos.scheduler.LaunchCoordinator$$anonfun$5.applyOrElse(LaunchCoordinator.scala:173)
> ...
> (17 additional frame(s) were not displayed)
> 
> NoClassDefFoundError: 
> org/apache/flink/mesos/shaded/com/fasterxml/jackson/databind/ObjectMapper
> at com.netflix.fenzo.ConstraintFailure.(ConstraintFailure.java:35)
> at 
> com.netflix.fenzo.AssignableVirtualMachine.findFailedHardConstraints(AssignableVirtualMachine.java:784)
> at 
> com.netflix.fenzo.AssignableVirtualMachine.tryRequest(AssignableVirtualMachine.java:581)
> at com.netflix.fenzo.TaskScheduler.evalAssignments(TaskScheduler.java:796)
> at com.netflix.fenzo.TaskScheduler.access$1500(TaskScheduler.java:70)
> ...
> (6 additional frame(s) were not displayed)
> 
> ClassNotFoundException: 
> org.apache.flink.mesos.shaded.com.fasterxml.jackson.databind.ObjectMapper
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at com.netflix.fenzo.ConstraintFailure.(ConstraintFailure.java:35)
> ...
> (10 additional frame(s) were not displayed)
> 
> 
> --
> Jared Stehler
> Chief Architect - Intellify Learning
> o: 617.701.6330 x703
> 
> 
> 



Fenzo NoClassDefFoundError: ObjectMapper

2017-12-14 Thread Jared Stehler
Getting the following error on app master startup with flink-mesos 1.4.0:

ExecutionException: java.lang.NoClassDefFoundError: 
org/apache/flink/mesos/shaded/com/fasterxml/jackson/databind/ObjectMapper
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.netflix.fenzo.TaskScheduler.doSchedule(TaskScheduler.java:678)
at com.netflix.fenzo.TaskScheduler.scheduleOnce(TaskScheduler.java:600)
at 
org.apache.flink.mesos.scheduler.LaunchCoordinator$$anonfun$5.applyOrElse(LaunchCoordinator.scala:173)
...
(17 additional frame(s) were not displayed)

NoClassDefFoundError: 
org/apache/flink/mesos/shaded/com/fasterxml/jackson/databind/ObjectMapper
at com.netflix.fenzo.ConstraintFailure.(ConstraintFailure.java:35)
at 
com.netflix.fenzo.AssignableVirtualMachine.findFailedHardConstraints(AssignableVirtualMachine.java:784)
at 
com.netflix.fenzo.AssignableVirtualMachine.tryRequest(AssignableVirtualMachine.java:581)
at com.netflix.fenzo.TaskScheduler.evalAssignments(TaskScheduler.java:796)
at com.netflix.fenzo.TaskScheduler.access$1500(TaskScheduler.java:70)
...
(6 additional frame(s) were not displayed)

ClassNotFoundException: 
org.apache.flink.mesos.shaded.com.fasterxml.jackson.databind.ObjectMapper
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at com.netflix.fenzo.ConstraintFailure.(ConstraintFailure.java:35)
...
(10 additional frame(s) were not displayed)


--
Jared Stehler
Chief Architect - Intellify Learning
o: 617.701.6330 x703





Re: Could not flush and close the file system output stream to s3a, is this fixed?

2017-12-14 Thread Bowen Li
Hi,

The problem reported in FLINK-7590 only happened one time on our end. And,
as you can see from its comments,  we suspected it's caused by AWS-SDK or
Hadoop's s3a implementation, which we have no control over.

Flink 1.4.0 has its own S3 implementations. I haven't tried it yet.


On Thu, Dec 14, 2017 at 2:05 AM, Fabian Hueske  wrote:

> Bowen Li (in CC) closed the issue but there is no fix (or at least it is
> not linked in the JIRA).
> Maybe it was resolved in another issue or can be differently resolved.
>
> @Bowen, can you comment on how to fix this problem? Will it work in Flink
> 1.4.0?
>
> Thank you,
> Fabian
>
> 2017-12-13 5:28 GMT+01:00 Hao Sun :
>
>> https://issues.apache.org/jira/browse/FLINK-7590
>>
>> I have a similar situation with Flink 1.3.2 on K8S
>>
>> =
>> 2017-12-13 00:57:12,403 INFO 
>> org.apache.flink.runtime.executiongraph.ExecutionGraph
>> - Source: KafkaSource(maxwell.tickets) -> 
>> MaxwellFilter->Maxwell(maxwell.tickets)
>> -> FixedDelayWatermark(maxwell.tickets) -> 
>> MaxwellFPSEvent->InfluxDBData(maxwell.tickets)
>> -> Sink: influxdbSink(maxwell.tickets) (1/3) 
>> (6ad009755a6009975d197e75afa05e14)
>> switched from RUNNING to FAILED. AsynchronousException{java.lang.Exception:
>> Could not materialize checkpoint 803 for operator Source:
>> KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets)
>> -> FixedDelayWatermark(maxwell.tickets) -> 
>> MaxwellFPSEvent->InfluxDBData(maxwell.tickets)
>> -> Sink: influxdbSink(maxwell.tickets) (1/3).} at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:970) at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception:
>> Could not materialize checkpoint 803 for operator Source:
>> KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets)
>> -> FixedDelayWatermark(maxwell.tickets) -> 
>> MaxwellFPSEvent->InfluxDBData(maxwell.tickets)
>> -> Sink: influxdbSink(maxwell.tickets) (1/3). ... 6 more Caused by:
>> java.util.concurrent.ExecutionException: java.io.IOException: Could not
>> flush and close the file system output stream to
>> s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d
>> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
>> in order to obtain the stream state handle at 
>> java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192) at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:906) ... 5 more Suppressed:
>> java.lang.Exception: Could not properly cancel managed operator state
>> future. at org.apache.flink.streaming.api.operators.OperatorSnapshotRes
>> ult.cancel(OperatorSnapshotResult.java:98) at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.cleanup(StreamTask.java:1023) at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:961) ... 5 more Caused by:
>> java.util.concurrent.ExecutionException: java.io.IOException: Could not
>> flush and close the file system output stream to
>> s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d
>> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
>> in order to obtain the stream state handle at 
>> java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192) at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>> at 
>> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>> at org.apache.flink.streaming.api.operators.OperatorSnapshotRes
>> ult.cancel(OperatorSnapshotResult.java:96) ... 7 more Caused by:
>> java.io.IOException: Could not flush and close the file system output
>> stream to s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d
>> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
>> in order to obtain the stream state handle at org.apache.flink.runtime.state
>> .filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutpu
>> tStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) at
>> org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOC
>> allable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
>> at org.apache.flink.runtime.state.DefaultOperatorStateBackend$
>> 1.performOperation(DefaultOperatorStateBackend.java:270) at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend$
>> 

Re: how does time-windowed join and Over-Window Aggregation implemented in flink SQL?

2017-12-14 Thread Yan Zhou [FDS Science]
Thanks for the information.

Best
Yan

From: Xingcan Cui 
Date: Wednesday, December 13, 2017 at 6:02 PM
To: "Yan Zhou [FDS Science]" 
Cc: "user@flink.apache.org" 
Subject: Re: how does time-windowed join and Over-Window Aggregation 
implemented in flink SQL?

Hi Yan Zhou,

as you may have noticed, the SQL level stream join was not built on top of some 
join APIs but was implemented with the low-level CoProcessFunction (see 
TimeBoundedStreamInnerJoin.scala).
 The pipeline is generated in 
DataStreamWindowJoin.scala.

Regarding the over-window aggregation, most of the implementations can be found 
in this 
package.
 The pipeline is generated in 
DataStreamOverAggregate.scala.

In summary, they use built-in state tools to cache the rows/intermediate 
results and clean/fire them when necessary.

Hope that helps.

Best,
Xingcan

On Thu, Dec 14, 2017 at 7:09 AM, Yan Zhou [FDS Science] 
> wrote:

Hi,



I am building a data pipeline with a lot of streaming join and over window 
aggregation. And flink SQL have these feature supported. However, there is no 
similar DataStream APIs provided(maybe there is and I didn't find them. please 
point out if there is). I got confused because I assume that the SQL logical 
plan will be translated into a graph of operators or transformations.



Could someone explain how these two sql query are  implemented or translated 
into low level code ( operators or transformations)? I am asking this because I 
have implemented these features without using SQL and the performance looks 
good. And I certainly love to migrate to SQL, but I want to understand them 
well first. Any information or hints or links are appreciated.



  1.  Time-Windowed Join

The DataStream API only provides streaming join within same window. But the SQL 
API (time-windowed join) can join two streams within quite different time 
range. Below is an sample query that listed in official doc, and we can see 
that Orders and Shipments have 4 hours difference. Is it implemented by 
CoProcessFunction or TwoInputOperator which buffers the event for a certain 
period?



SELECT *

FROM Orders o, Shipments s

WHERE o.id = s.orderId AND

  o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime

2. Over-Window Aggregation
There is no similar feature in DataStream API. How does this get implemented? 
Does it use keyed state to buffer the previous events, and pull the records 
when there is a need? How does sorting get handled?


Best
Yan









Re: streamin Table API - strange behavior

2017-12-14 Thread Fabian Hueske
Hi,

yes you are right. I forgot that the interval is set by default when
enabling event time.

Also your comment about triggering the window is correct. Technically, you
don't need a record that falls into the next window, but just a watermark
that is past the window boundary.
In your case, watermarks only advance if the assigner sees more records and
you'd need a record with a timestamp of at least 2017-12-14 13:10:15 (or
16), because the watermark assigner subtracts 10 seconds.
Given the current watermark assigner, there is no other way than sending
more records to trigger a window computation. You can implement a custom
assigner to also emit watermarks without data, but that would somewhat bind
the event-time watermarks to the clock of the generating machine such that
watermarks wouldn't be only data-driven.

Best, Fabian

2017-12-14 17:25 GMT+01:00 Plamen Paskov :

> Hi Fabian,
>
> Thank you for your response! I think it's not necessary to do that because
> i have a call to anyway:
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> which do exactly what you say. It set the watermark interval to 200ms .
> I think i found the problem and it is the default event-time trigger attached 
> to the assigner?.
> According to the docs here 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html 
> : "*all the event-time window assigners have an EventTimeTrigger as default 
> trigger.
> This trigger simply fires once the watermark passes the end of a window.*" . 
> All i have to do in order to trigger the computation is to send an event 
> which will fall in "next" window.
> So the question now is how can i set trigger to fire in regular intervals 
> (e.g. every 5 seconds) using table API?
>
>
> On 14.12.2017 17:57, Fabian Hueske wrote:
>
> Hi,
>
> you are using a BoundedOutOfOrdernessTimestampExtractor to generate
> watermarks.
> The BoundedOutOfOrdernessTimestampExtractor is a periodic watermark
> assigner and only generates watermarks if a watermark interval is
> configured.
> Without watermarks, the query cannot "make progress" and only computes its
> result when the program is closed (sources emit a MAX_LONG watermark when
> being canceled).
>
> Long story short: you need to configure the watermark interval:
> env.getConfig.setAutoWatermarkInterval(100L);
>
> Best, Fabian
>
> 2017-12-14 16:30 GMT+01:00 Plamen Paskov :
>
>> Hi,
>>
>> I'm trying to run the following streaming program in my local flink 1.3.2
>> environment. The program compile and run without any errors but the print()
>> call doesn't display anything. Once i stop the program i receive all
>> aggregated data. Any ideas how to make it output regularly or when new data
>> come/old data updated?
>>
>> package flink;
>> import org.apache.flink.api.common.functions.MapFunction;import 
>> org.apache.flink.api.java.tuple.Tuple2;import 
>> org.apache.flink.streaming.api.TimeCharacteristic;import 
>> org.apache.flink.streaming.api.datastream.DataStream;import 
>> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import 
>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;import
>>  org.apache.flink.streaming.api.windowing.time.Time;import 
>> org.apache.flink.table.api.Table;import 
>> org.apache.flink.table.api.java.Slide;import 
>> org.apache.flink.table.api.java.StreamTableEnvironment;
>> import java.sql.Timestamp;
>>
>> public class StreamingJob {
>> public static void main(String[] args) throws Exception {
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> StreamTableEnvironment tEnv = 
>> StreamTableEnvironment.getTableEnvironment(env);
>>
>>
>> SingleOutputStreamOperator input = env
>> .socketTextStream("localhost", 9000, "\n")
>> .map(new MapFunction() {
>> @Overridepublic WC map(String value) 
>> throws Exception {
>> String[] row = value.split(",");
>> Timestamp timestamp = Timestamp.valueOf(row[2]);
>> return new WC(row[0], Long.valueOf(row[1]), 
>> timestamp);
>> }
>> })
>> .assignTimestampsAndWatermarks(new 
>> BoundedOutOfOrdernessTimestampExtractor(Time.seconds(10)) {
>> @Overridepublic long 
>> extractTimestamp(WC element) {
>> return element.dt.getTime();
>> }
>> });
>>
>>
>> tEnv.registerDataStream("WordCount", input, "word, frequency, 
>> dt.rowtime");
>>
>> Table table = tEnv.scan("WordCount")
>> 
>> 

Re: streamin Table API - strange behavior

2017-12-14 Thread Plamen Paskov

Hi Fabian,

Thank you for your response! I think it's not necessary to do that 
because i have a call to anyway:


env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

which do exactly what you say. It set the watermark interval to 200ms .
I think i found the problem and it is the default event-time trigger attached 
to the assigner?.
According to the docs here https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html : "*all the event-time window assigners have an EventTimeTrigger as default 
trigger. This trigger simply fires once the watermark passes the end of 
a window.*" . All i have to do in order to trigger the computation is to send an event which will fall in "next" window.

So the question now is how can i set trigger to fire in regular intervals (e.g. 
every 5 seconds) using table API?


On 14.12.2017 17:57, Fabian Hueske wrote:

Hi,

you are using a BoundedOutOfOrdernessTimestampExtractor to generate 
watermarks.
The BoundedOutOfOrdernessTimestampExtractor is a periodic watermark 
assigner and only generates watermarks if a watermark interval is 
configured.
Without watermarks, the query cannot "make progress" and only computes 
its result when the program is closed (sources emit a MAX_LONG 
watermark when being canceled).


Long story short: you need to configure the watermark interval: 
env.getConfig.setAutoWatermarkInterval(100L);


Best, Fabian

2017-12-14 16:30 GMT+01:00 Plamen Paskov 
>:


Hi,

I'm trying to run the following streaming program in my local
flink 1.3.2 environment. The program compile and run without any
errors but the print() call doesn't display anything. Once i stop
the program i receive all aggregated data. Any ideas how to make
it output regularly or when new data come/old data updated?

package flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.Slide;
import org.apache.flink.table.api.java.StreamTableEnvironment;

import java.sql.Timestamp;


public class StreamingJob {
 public static void main(String[] args)throws Exception {
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 StreamTableEnvironment tEnv = 
StreamTableEnvironment.getTableEnvironment(env);


 SingleOutputStreamOperator input = env
 .socketTextStream("localhost",9000,"\n")
 .map(new MapFunction() {
 @Override public WC map(String value)throws Exception {
 String[] row = value.split(",");
 Timestamp timestamp = Timestamp.valueOf(row[2]);
 return new WC(row[0], Long.valueOf(row[1]), 
timestamp);
 }
 })
 .assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor(Time.seconds(10)) {
 @Override public long extractTimestamp(WC element) {
 return element.dt.getTime();
 }
 });


 tEnv.registerDataStream("WordCount", input,"word, frequency, 
dt.rowtime");

 Table table = tEnv.scan("WordCount")
 
.window(Slide.over("10.seconds").every("5.seconds").on("dt").as("w"))
 .groupBy("w, word")
 .select("word, frequency.sum as frequency, w.start as 
dt");DataStream> result = tEnv.toRetractStream(table, WC.class);
 result.print();

 env.execute();
 }

 public static class WC {
 public Stringword;
 public long frequency;
 public Timestampdt;

 public WC() {
 }

 public WC(String word,long frequency, Timestamp dt) {
 this.word = word;
 this.frequency = frequency;
 this.dt = dt;
 }

 @Override public String toString() {
 return "WC " +word +" " +frequency +" " +dt.getTime();
 }
 }
}


Sample input:

hello,1,2017-12-14 13:10:01
ciao,1,2017-12-14 13:10:02

Flink long-running streaming job, Keytab authentication

2017-12-14 Thread Oleksandr Nitavskyi
Hello all,

I have a question about Kerberos authentication in Yarn environment for long 
running streaming job. According to the documentation ( 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/security-kerberos.html#yarnmesos-mode
 ) Flink’s solution is to use keytab in order to perform authentication in YARN 
perimeter.

If keytab is configured, Flink uses UserGroupInformation#loginUserFromKeytab 
method in order to perform authentication. In the YARN Security documentation (
https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#keytabs-for-am-and-containers-distributed-via-yarn
 ) mentioned that it should be enough:

Launched containers must themselves log in via 
UserGroupInformation.loginUserFromKeytab(). UGI handles the login, and 
schedules a background thread to relogin the user periodically.

But in reality if we check the Source code of UGI, we can see that no 
background Thread is created: 
https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java#L1153.
 There are just created javax.security.auth.login.LoginContext
and performed authentication. Looks like it is true for different Hadoop 
branches - 2.7, 2.8, 3.0, trunk. So Flink also doesn’t create any background 
Threads: 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java#L69.
 So in my case job loses credentials for ResourceManager and HDFS after some 
time (12 hours in my case).

Looks like UGI’s code is not aligned with the documentation and it doesn’t 
relogin periodically.
But do you think patching with background Thread which performs 
UGI#reloginUserFromKeytab can be a solution?

P.S. We are running Flink as a single job on Yarn.




Re: streamin Table API - strange behavior

2017-12-14 Thread Fabian Hueske
Hi,

you are using a BoundedOutOfOrdernessTimestampExtractor to generate
watermarks.
The BoundedOutOfOrdernessTimestampExtractor is a periodic watermark
assigner and only generates watermarks if a watermark interval is
configured.
Without watermarks, the query cannot "make progress" and only computes its
result when the program is closed (sources emit a MAX_LONG watermark when
being canceled).

Long story short: you need to configure the watermark interval:
env.getConfig.setAutoWatermarkInterval(100L);

Best, Fabian

2017-12-14 16:30 GMT+01:00 Plamen Paskov :

> Hi,
>
> I'm trying to run the following streaming program in my local flink 1.3.2
> environment. The program compile and run without any errors but the print()
> call doesn't display anything. Once i stop the program i receive all
> aggregated data. Any ideas how to make it output regularly or when new data
> come/old data updated?
>
> package flink;
> import org.apache.flink.api.common.functions.MapFunction;import 
> org.apache.flink.api.java.tuple.Tuple2;import 
> org.apache.flink.streaming.api.TimeCharacteristic;import 
> org.apache.flink.streaming.api.datastream.DataStream;import 
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import 
> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;import
>  org.apache.flink.streaming.api.windowing.time.Time;import 
> org.apache.flink.table.api.Table;import 
> org.apache.flink.table.api.java.Slide;import 
> org.apache.flink.table.api.java.StreamTableEnvironment;
> import java.sql.Timestamp;
>
> public class StreamingJob {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> StreamTableEnvironment tEnv = 
> StreamTableEnvironment.getTableEnvironment(env);
>
>
> SingleOutputStreamOperator input = env
> .socketTextStream("localhost", 9000, "\n")
> .map(new MapFunction() {
> @Overridepublic WC map(String value) 
> throws Exception {
> String[] row = value.split(",");
> Timestamp timestamp = Timestamp.valueOf(row[2]);
> return new WC(row[0], Long.valueOf(row[1]), 
> timestamp);
> }
> })
> .assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor(Time.seconds(10)) {
> @Overridepublic long 
> extractTimestamp(WC element) {
> return element.dt.getTime();
> }
> });
>
>
> tEnv.registerDataStream("WordCount", input, "word, frequency, 
> dt.rowtime");
>
> Table table = tEnv.scan("WordCount")
> 
> .window(Slide.over("10.seconds").every("5.seconds").on("dt").as("w"))
> .groupBy("w, word")
> .select("word, frequency.sum as frequency, w.start as dt");   
>  DataStream> result = tEnv.toRetractStream(table, 
> WC.class);
> result.print();
>
> env.execute();
> }
>
> public static class WC {
> public String word;
> public long frequency;
> public Timestamp dt;
>
> public WC() {
> }
>
> public WC(String word, long frequency, Timestamp dt) {
> this.word = word;
> this.frequency = frequency;
> this.dt = dt;
> }
>
> @Overridepublic String toString() {
> return "WC " + word + " " + frequency + " " + dt.getTime();
> }
> }
> }
>
>
> Sample input:
>
> hello,1,2017-12-14 13:10:01
> ciao,1,2017-12-14 13:10:02
> hello,1,2017-12-14 13:10:03
> hello,1,2017-12-14 13:10:04
>
>
> Thanks
>


Service discovery for flink-metrics-prometheus

2017-12-14 Thread Kien Truong

Hi,

Does anyone have recommendations about integrating 
flink-metrics-prometheus with some SD mechanism


so that Prometheus can pick up the Task Manager's location dynamically ?

Best regards,

Kien



streamin Table API - strange behavior

2017-12-14 Thread Plamen Paskov

Hi,

I'm trying to run the following streaming program in my local flink 
1.3.2 environment. The program compile and run without any errors but 
the print() call doesn't display anything. Once i stop the program i 
receive all aggregated data. Any ideas how to make it output regularly 
or when new data come/old data updated?


package flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.Slide;
import org.apache.flink.table.api.java.StreamTableEnvironment;

import java.sql.Timestamp;


public class StreamingJob {
public static void main(String[] args)throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tEnv = 
StreamTableEnvironment.getTableEnvironment(env);


SingleOutputStreamOperator input = env
.socketTextStream("localhost",9000,"\n")
.map(new MapFunction() {
@Override public WC map(String value)throws Exception {
String[] row = value.split(",");
Timestamp timestamp = Timestamp.valueOf(row[2]);
return new WC(row[0], Long.valueOf(row[1]), timestamp);
}
})
.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor(Time.seconds(10)) {
@Override public long extractTimestamp(WC element) {
return element.dt.getTime();
}
});


tEnv.registerDataStream("WordCount", input,"word, frequency, 
dt.rowtime");

Table table = tEnv.scan("WordCount")

.window(Slide.over("10.seconds").every("5.seconds").on("dt").as("w"))
.groupBy("w, word")
.select("word, frequency.sum as frequency, w.start as 
dt");DataStream> result = tEnv.toRetractStream(table, WC.class);
result.print();

env.execute();
}

public static class WC {
public Stringword;
public long frequency;
public Timestampdt;

public WC() {
}

public WC(String word,long frequency, Timestamp dt) {
this.word = word;
this.frequency = frequency;
this.dt = dt;
}

@Override public String toString() {
return "WC " +word +" " +frequency +" " +dt.getTime();
}
}
}


Sample input:

hello,1,2017-12-14 13:10:01
ciao,1,2017-12-14 13:10:02
hello,1,2017-12-14 13:10:03
hello,1,2017-12-14 13:10:04


Thanks



Re: [Docs] Can't add metrics to RichFilterFunction

2017-12-14 Thread Kien Truong

That syntax is incorrect, should be.

@transient private var counter:Counter = _


Regards,

Kien


On 12/14/2017 8:03 PM, Julio Biason wrote:

@transient private var counter:Counter


Re: [Docs] Can't add metrics to RichFilterFunction

2017-12-14 Thread Julio Biason
Oh, obviously, code is Scala. Also we are using Flink 1.4.0 and
flink-metrics-core-1.4-SNAPSHOT.

On Thu, Dec 14, 2017 at 10:56 AM, Julio Biason 
wrote:

> Hello,
>
> I'm trying to add a metric to a filter function, but following the example
> in the docs is not working.
>
> So I have this class:
>
> ```
> class LogBrokenFilter extends RichFilterFunction[LineData] {
>   private val logger = LoggerFactory.getLogger(this.getClass)
>   @transient private var counter:Counter
>
>   override def open(parameters:Configuration):Unit = {
> counter = getRuntimeContext
>   .getMetricGroup()
>   .counter("brokenLogs")
>   }
>
>   // and then I override `filter` to do filter events out and count those.
> ```
>
> The problem is that, when compiling, I get the following errors:
>
> abstract member may not have private modifier
> [error]   @transient private var counter:Counter
>
> and
>
> no valid targets for annotation on method counter - it is discarded
> unused. You may specify targets with meta-annotations, e.g. @(transient
> @getter)
> [warn]   @transient private var counter:Counter
> [warn]^
>
> Any ideas? Are the docs wrong?
>
> --
> *Julio Biason*, Sofware Engineer
> *AZION*  |  Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101   |  Mobile: +55 51
> *99907 0554*
>



-- 
*Julio Biason*, Sofware Engineer
*AZION*  |  Deliver. Accelerate. Protect.
Office: +55 51 3083 8101   |  Mobile: +55 51
*99907 0554*


[Docs] Can't add metrics to RichFilterFunction

2017-12-14 Thread Julio Biason
Hello,

I'm trying to add a metric to a filter function, but following the example
in the docs is not working.

So I have this class:

```
class LogBrokenFilter extends RichFilterFunction[LineData] {
  private val logger = LoggerFactory.getLogger(this.getClass)
  @transient private var counter:Counter

  override def open(parameters:Configuration):Unit = {
counter = getRuntimeContext
  .getMetricGroup()
  .counter("brokenLogs")
  }

  // and then I override `filter` to do filter events out and count those.
```

The problem is that, when compiling, I get the following errors:

abstract member may not have private modifier
[error]   @transient private var counter:Counter

and

no valid targets for annotation on method counter - it is discarded unused.
You may specify targets with meta-annotations, e.g. @(transient @getter)
[warn]   @transient private var counter:Counter
[warn]^

Any ideas? Are the docs wrong?

-- 
*Julio Biason*, Sofware Engineer
*AZION*  |  Deliver. Accelerate. Protect.
Office: +55 51 3083 8101   |  Mobile: +55 51
*99907 0554*


Re: Flink 1.4.0 can not override JAVA_HOME for single-job deployment on YARN

2017-12-14 Thread Nico Kruber
Hi,
are you running Flink in an JRE >= 8? We dropped Java 7 support for
Flink 1.4.


Nico

On 14/12/17 12:35, 杨光 wrote:
> Hi,
> I am usring flink single-job mode on YARN. After i upgrade flink
> verson from 1.3.2 to  1.4.0, the parameter
> "yarn.taskmanager.env.JAVA_HOME" doesn’t work  as before.
> I can only found error log on yarn like this:
> 
> Exception in thread "main" java.lang.UnsupportedClassVersionError:
> org/apache/flink/yarn/YarnApplicationMasterRunner : Unsupported
> major.minor version 52.0
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
> at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:482)
> 
> Is there something different i should know  to avoid this problem ?
> Thanks!
> 



signature.asc
Description: OpenPGP digital signature


Flink 1.4.0 can not override JAVA_HOME for single-job deployment on YARN

2017-12-14 Thread 杨光
Hi,
I am usring flink single-job mode on YARN. After i upgrade flink
verson from 1.3.2 to  1.4.0, the parameter
"yarn.taskmanager.env.JAVA_HOME" doesn’t work  as before.
I can only found error log on yarn like this:

Exception in thread "main" java.lang.UnsupportedClassVersionError:
org/apache/flink/yarn/YarnApplicationMasterRunner : Unsupported
major.minor version 52.0
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:482)

Is there something different i should know  to avoid this problem ?
Thanks!


Fwd: Replace classes at runtime

2017-12-14 Thread Jayant Ameta
Jayant Ameta

-- Forwarded message --
From: Jayant Ameta 
Date: Thu, Dec 14, 2017 at 4:46 PM
Subject: Replace classes at runtime
To: user 


Hi,
I need to update a few classes of my flink job at runtime. What would be
the best way to achieve this?


Re: FlinkKafkaProducer011 and Flink 1.4.0 Kafka docs

2017-12-14 Thread Fabian Hueske
Hi Elias,

thanks for reporting this issue.
I created FLINK-8260 [1] to extend the documentation.

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-8260

2017-12-14 1:07 GMT+01:00 Elias Levy :

> Looks like the Flink Kafka connector page, in the Producer section
> ,
> is missing a section for the new FlinkKafkaProducer011 producer.  Given
> that the new producer no longer has a static writeToKafkaWithTimestamps
> method, it would be good to add a section that specifies that you must now
> use DataStream.addSink.
>


Re: ProgramInvocationException: Could not upload the jar files to the job manager / No space left on device

2017-12-14 Thread Nico Kruber
Hi Regina,
judging from the exception you posted, this is not about storing the
file in HDFS, but a step before that where the BlobServer first puts the
incoming file into its local file system in the directory given by the
`blob.storage.directory` configuration property. If this property is not
set or empty, it will fall back to `java.io.tmpdir`. The BlobServer
creates a subdirectory `blobStore-` and put incoming files into
`/blobStore-/incoming` with file names
`temp-12345678` (using an atomic file counter). It seems that there is
no space left in the filesystem of this directory.

If you set the log level to INFO, you should see a message like "Created
BLOB server storage directory ..." with the path. Can you double check
whether there is really no space left there?


Nico

On 12/12/17 08:02, Chan, Regina wrote:
> And if it helps, I’m running on flink 1.2.1. I saw this ticket:
> https://issues.apache.org/jira/browse/FLINK-5828 It only started
> happening when I was running all 50 flows at the same time. However, it
> looks like it’s not an issue with creating the cache directory but with
> running out of space there? But what’s in there is also tiny.
> 
>  
> 
> bash-4.1$ hdfs dfs -du -h
> hdfs://d191291/user/delp/.flink/application_1510733430616_2098853
> 
> 1.1 K   
> hdfs://d191291/user/delp/.flink/application_1510733430616_2098853/5c71e4b6-2567-4d34-98dc-73b29c502736-taskmanager-conf.yaml
> 
> 1.4 K   
> hdfs://d191291/user/delp/.flink/application_1510733430616_2098853/flink-conf.yaml
> 
> 93.5 M  
> hdfs://d191291/user/delp/.flink/application_1510733430616_2098853/flink-dist_2.10-1.2.1.jar
> 
> 264.8 M 
> hdfs://d191291/user/delp/.flink/application_1510733430616_2098853/lib
> 
> 1.9 K   
> hdfs://d191291/user/delp/.flink/application_1510733430616_2098853/log4j.properties
> 
>  
> 
>  
> 
> *From:*Chan, Regina [Tech]
> *Sent:* Tuesday, December 12, 2017 1:56 AM
> *To:* 'user@flink.apache.org'
> *Subject:* ProgramInvocationException: Could not upload the jar files to
> the job manager / No space left on device
> 
>  
> 
> Hi,
> 
>  
> 
> I’m currently submitting 50 separate jobs to a 50TM, 1 slot set up. Each
> job has 1 parallelism. There’s plenty of space left in my cluster and on
> that node. It’s not clear to me what’s happening. Any pointers?
> 
>  
> 
> On the client side, when I try to execute, I see the following:
> 
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Could not upload the jar files to the job manager.
> 
>     at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
> 
>     at
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
> 
>     at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
> 
>     at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:387)
> 
>     at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
> 
>     at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:926)
> 
>     at
> com.gs.ep.da.lake.refinerlib.flink.FlowData.execute(FlowData.java:143)
> 
>     at
> com.gs.ep.da.lake.refinerlib.flink.FlowData.flowPartialIngestionHalf(FlowData.java:107)
> 
>     at
> com.gs.ep.da.lake.refinerlib.flink.FlowData.call(FlowData.java:72)
> 
>     at
> com.gs.ep.da.lake.refinerlib.flink.FlowData.call(FlowData.java:39)
> 
>     at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> 
>     at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> 
>     at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> 
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 
>     at java.lang.Thread.run(Thread.java:745)
> 
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Could
> not upload the jar files to the job manager.
> 
>     at
> org.apache.flink.runtime.client.JobSubmissionClientActor$1.call(JobSubmissionClientActor.java:150)
> 
>     at akka.dispatch.Futures$$anonfun$future$1.apply(Future.scala:95)
> 
>     at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> 
>     at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> 
>     at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> 
>     at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> 
>     at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 
>     at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 
>     at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 
>     at
> 

Re: Watermark in broadcast

2017-12-14 Thread Tzu-Li (Gordon) Tai
Hi Seth,

Some clarifications to point out:

Quick follow up question. Is there some way to notify a TimestampAssigner that 
is consuming from an idle source? 

In Flink, idle sources would emit a special idleness marker event that notifies 
downstream time-based operators to not wait for its watermark.
This would avoid the need for the TimestampAssigner to generate watermarks just 
for the sake of letting downstream operators to advance their watermark in the 
event of idle sources.
However, there is 2 cases for idle sources and only one of them is handled at 
the moment: 1) the source subtask simply has no Kafka partitions to read from, 
or 2) the Kafka partitions do not have any records.
Only case 1) is handled, as of Flink 1.3+.

I think you are correct. This stream is consumed from Kafka and the number of 
partitions is much less than the parallelism of the program so there would be 
many partitions that never forward watermarks greater than Long.Min_Value.

In this case, Flink consumer subtasks that do not have Kafka partitions would 
mark themselves as idle and emit the special idleness marker.
Therefore, the expected behavior is that downstream time-based operators will 
not wait on these idle sources, even if they don’t produce watermarks.

Best,
Gordon

On 14 December 2017 at 6:08:20 PM, Fabian Hueske (fhue...@gmail.com) wrote:

Hi Seth,

that's not possible with the current interface.
There have been some discussions about how to address issues of idle sources 
(or partitions).
Aljoscha (in CC) should know more about that.

Best, Fabian

2017-12-13 18:13 GMT+01:00 Seth Wiesman :
Quick follow up question. Is there some way to notify a TimestampAssigner that 
is consuming from an idle source?

 



Seth Wiesman | Software Engineer, Data


4 World Trade Center, 46th Floor, New York, NY 10007


 

 

From: Seth Wiesman 
Date: Wednesday, December 13, 2017 at 12:04 PM
To: Timo Walther , "user@flink.apache.org" 

Subject: Re: Watermark in broadcast

 

Hi Timo,

 

I think you are correct. This stream is consumed from Kafka and the number of 
partitions is much less than the parallelism of the program so there would be 
many partitions that never forward watermarks greater than Long.Min_Value.

 

Thank you for the quick response.  

 



Seth Wiesman | Software Engineer, Data


4 World Trade Center, 46th Floor, New York, NY 10007


 

 

From: Timo Walther 
Date: Wednesday, December 13, 2017 at 11:46 AM
To: "user@flink.apache.org" 
Subject: Re: Watermark in broadcast

 

Hi Seth,

are you sure that all partitions of the broadcasted stream send a watermark? 
processWatermark is only called if a minimum watermark arrived from all 
partitions.

Regards,
Timo

Am 12/13/17 um 5:10 PM schrieb Seth Wiesman:

Hi,

 

How are watermarks propagated during a broadcast partition? I have a 
TwoInputStreamTransformation that takes a broadcast stream as one of its 
inputs. Both streams are assigned timestamps and watermarks before being 
connected however I only ever see watermarks from my non-broadcast stream. Is 
this expected behavior? Currently I have overridden processWatermark1 to 
unconditionally call processWatermark but that does not seem like an ideal 
solution.

 

Thank you,



Seth Wiesman | Software Engineer, Data


4 World Trade Center, 46th Floor, New York, NY 10007


 

 




Re: Could not flush and close the file system output stream to s3a, is this fixed?

2017-12-14 Thread Stephan Ewen
@Hao Can you provide a better formatted stack trace? Very hard to read it
like it is...

On Thu, Dec 14, 2017 at 11:05 AM, Fabian Hueske  wrote:

> Bowen Li (in CC) closed the issue but there is no fix (or at least it is
> not linked in the JIRA).
> Maybe it was resolved in another issue or can be differently resolved.
>
> @Bowen, can you comment on how to fix this problem? Will it work in Flink
> 1.4.0?
>
> Thank you,
> Fabian
>
> 2017-12-13 5:28 GMT+01:00 Hao Sun :
>
>> https://issues.apache.org/jira/browse/FLINK-7590
>>
>> I have a similar situation with Flink 1.3.2 on K8S
>>
>> =
>> 2017-12-13 00:57:12,403 INFO 
>> org.apache.flink.runtime.executiongraph.ExecutionGraph
>> - Source: KafkaSource(maxwell.tickets) -> 
>> MaxwellFilter->Maxwell(maxwell.tickets)
>> -> FixedDelayWatermark(maxwell.tickets) -> 
>> MaxwellFPSEvent->InfluxDBData(maxwell.tickets)
>> -> Sink: influxdbSink(maxwell.tickets) (1/3) 
>> (6ad009755a6009975d197e75afa05e14)
>> switched from RUNNING to FAILED. AsynchronousException{java.lang.Exception:
>> Could not materialize checkpoint 803 for operator Source:
>> KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets)
>> -> FixedDelayWatermark(maxwell.tickets) -> 
>> MaxwellFPSEvent->InfluxDBData(maxwell.tickets)
>> -> Sink: influxdbSink(maxwell.tickets) (1/3).} at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:970) at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception:
>> Could not materialize checkpoint 803 for operator Source:
>> KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets)
>> -> FixedDelayWatermark(maxwell.tickets) -> 
>> MaxwellFPSEvent->InfluxDBData(maxwell.tickets)
>> -> Sink: influxdbSink(maxwell.tickets) (1/3). ... 6 more Caused by:
>> java.util.concurrent.ExecutionException: java.io.IOException: Could not
>> flush and close the file system output stream to
>> s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d
>> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
>> in order to obtain the stream state handle at 
>> java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192) at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:906) ... 5 more Suppressed:
>> java.lang.Exception: Could not properly cancel managed operator state
>> future. at org.apache.flink.streaming.api.operators.OperatorSnapshotRes
>> ult.cancel(OperatorSnapshotResult.java:98) at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.cleanup(StreamTask.java:1023) at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:961) ... 5 more Caused by:
>> java.util.concurrent.ExecutionException: java.io.IOException: Could not
>> flush and close the file system output stream to
>> s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d
>> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
>> in order to obtain the stream state handle at 
>> java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192) at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>> at 
>> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>> at org.apache.flink.streaming.api.operators.OperatorSnapshotRes
>> ult.cancel(OperatorSnapshotResult.java:96) ... 7 more Caused by:
>> java.io.IOException: Could not flush and close the file system output
>> stream to s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d
>> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
>> in order to obtain the stream state handle at org.apache.flink.runtime.state
>> .filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutpu
>> tStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) at
>> org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOC
>> allable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
>> at org.apache.flink.runtime.state.DefaultOperatorStateBackend$
>> 1.performOperation(DefaultOperatorStateBackend.java:270) at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend$
>> 1.performOperation(DefaultOperatorStateBackend.java:233) at
>> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.ca
>> ll(AbstractAsyncIOCallable.java:72) at 
>> 

Re: Watermark in broadcast

2017-12-14 Thread Fabian Hueske
Hi Seth,

that's not possible with the current interface.
There have been some discussions about how to address issues of idle
sources (or partitions).
Aljoscha (in CC) should know more about that.

Best, Fabian

2017-12-13 18:13 GMT+01:00 Seth Wiesman :

> Quick follow up question. Is there some way to notify a TimestampAssigner
> that is consuming from an idle source?
>
>
>
> 
>
> *Seth Wiesman *| Software Engineer, Data
>
> 4 World Trade Center, 46th Floor, New York, NY 10007
>
>
>
>
>
> *From: *Seth Wiesman 
> *Date: *Wednesday, December 13, 2017 at 12:04 PM
> *To: *Timo Walther , "user@flink.apache.org" <
> user@flink.apache.org>
> *Subject: *Re: Watermark in broadcast
>
>
>
> Hi Timo,
>
>
>
> I think you are correct. This stream is consumed from Kafka and the number
> of partitions is much less than the parallelism of the program so there
> would be many partitions that never forward watermarks greater than
> Long.Min_Value.
>
>
>
> Thank you for the quick response.
>
>
>
> 
>
> *Seth Wiesman *| Software Engineer, Data
>
> 4 World Trade Center, 46th Floor, New York, NY 10007
>
>
>
>
>
> *From: *Timo Walther 
> *Date: *Wednesday, December 13, 2017 at 11:46 AM
> *To: *"user@flink.apache.org" 
> *Subject: *Re: Watermark in broadcast
>
>
>
> Hi Seth,
>
> are you sure that all partitions of the broadcasted stream send a
> watermark? processWatermark is only called if a minimum watermark arrived
> from all partitions.
>
> Regards,
> Timo
>
> Am 12/13/17 um 5:10 PM schrieb Seth Wiesman:
>
> Hi,
>
>
>
> How are watermarks propagated during a broadcast partition? I have a
> TwoInputStreamTransformation that takes a broadcast stream as one of its
> inputs. Both streams are assigned timestamps and watermarks before being
> connected however I only ever see watermarks from my non-broadcast stream.
> Is this expected behavior? Currently I have overridden processWatermark1 to
> unconditionally call processWatermark but that does not seem like an ideal
> solution.
>
>
>
> Thank you,
>
> 
>
> *Seth Wiesman *| Software Engineer, Data
>
> 4 World Trade Center, 46th Floor, New York, NY 10007
>
>
>
>
>


Re: Could not flush and close the file system output stream to s3a, is this fixed?

2017-12-14 Thread Fabian Hueske
Bowen Li (in CC) closed the issue but there is no fix (or at least it is
not linked in the JIRA).
Maybe it was resolved in another issue or can be differently resolved.

@Bowen, can you comment on how to fix this problem? Will it work in Flink
1.4.0?

Thank you,
Fabian

2017-12-13 5:28 GMT+01:00 Hao Sun :

> https://issues.apache.org/jira/browse/FLINK-7590
>
> I have a similar situation with Flink 1.3.2 on K8S
>
> =
> 2017-12-13 00:57:12,403 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Source: KafkaSource(maxwell.tickets) -> 
> MaxwellFilter->Maxwell(maxwell.tickets)
> -> FixedDelayWatermark(maxwell.tickets) -> 
> MaxwellFPSEvent->InfluxDBData(maxwell.tickets)
> -> Sink: influxdbSink(maxwell.tickets) (1/3) (
> 6ad009755a6009975d197e75afa05e14) switched from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 803 for operator Source: KafkaSource(maxwell.tickets) ->
> MaxwellFilter->Maxwell(maxwell.tickets) -> 
> FixedDelayWatermark(maxwell.tickets)
> -> MaxwellFPSEvent->InfluxDBData(maxwell.tickets) -> Sink:
> influxdbSink(maxwell.tickets) (1/3).} at org.apache.flink.streaming.
> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception:
> Could not materialize checkpoint 803 for operator Source:
> KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets)
> -> FixedDelayWatermark(maxwell.tickets) -> 
> MaxwellFPSEvent->InfluxDBData(maxwell.tickets)
> -> Sink: influxdbSink(maxwell.tickets) (1/3). ... 6 more Caused by:
> java.util.concurrent.ExecutionException: java.io.IOException: Could not
> flush and close the file system output stream to s3a://zendesk-euc1-fraud-
> prevention-production/checkpoints/d5a8b2ab61625cf0aa1e66360b7ad0
> af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc in order to obtain the
> stream state handle at 
> java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192) at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRunnable.run(StreamTask.java:906) ... 5 more Suppressed:
> java.lang.Exception: Could not properly cancel managed operator state
> future. at org.apache.flink.streaming.api.operators.
> OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:98) at
> org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) at
> org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRunnable.run(StreamTask.java:961) ... 5 more Caused by:
> java.util.concurrent.ExecutionException: java.io.IOException: Could not
> flush and close the file system output stream to s3a://zendesk-euc1-fraud-
> prevention-production/checkpoints/d5a8b2ab61625cf0aa1e66360b7ad0
> af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc in order to obtain the
> stream state handle at 
> java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192) at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
> at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
> at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(
> OperatorSnapshotResult.java:96) ... 7 more Caused by:
> java.io.IOException: Could not flush and close the file system output
> stream to s3a://zendesk-euc1-fraud-prevention-production/checkpoints/
> d5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
> in order to obtain the stream state handle at org.apache.flink.runtime.
> state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.
> closeAndGetHandle(FsCheckpointStreamFactory.java:336) at
> org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.
> closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100) at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.
> performOperation(DefaultOperatorStateBackend.java:270) at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.
> performOperation(DefaultOperatorStateBackend.java:233) at
> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.
> call(AbstractAsyncIOCallable.java:72) at java.util.concurrent.
> FutureTask.run(FutureTask.java:266) at org.apache.flink.util.
> FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) at
> org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRunnable.run(StreamTask.java:906) ... 5 more Caused by:
> 

Re: Custom Metrics

2017-12-14 Thread Piotr Nowojski
Hi,

> I have couple more questions related to metrics. I use Influx db reporter to 
> report flink metrics and I see a lot of metrics are bring reported. Is there 
> a way to select only a subset of metrics that we need to monitor the 
> application?

At this point is up to either reporter, or up to the system that metrics are 
reported. You would need to extend an Influx db reporter to add some 
configuration options to ignore some metrics.

> Also, Is there a way to specify custom metics scope? Basically I register 
> metrics like below, add a custom metric group and then add a meter per user. 
> I would like this to be reported as measurement "Users" and tags with user 
> id. This way I can easily visualize the data in grafana or any other tool by 
> selecting the measurement and group by tag. Is there a way to report like 
> that instead of host, process_type, tm_id, job_name, task_name & 
> subtask_index?


Can not you ignore first couple of groups/scopes in the Grafana? I think you 
can also add more groups in the user scope.

metricGroup.addGroup("Users”).addGroup(“Foo”).addGroup(“Bar”).

Piotrek

> On 13 Dec 2017, at 22:34, Navneeth Krishnan  wrote:
> 
> Thanks Pitor.
> 
> I have couple more questions related to metrics. I use Influx db reporter to 
> report flink metrics and I see a lot of metrics are bring reported. Is there 
> a way to select only a subset of metrics that we need to monitor the 
> application?
> 
> Also, Is there a way to specify custom metics scope? Basically I register 
> metrics like below, add a custom metric group and then add a meter per user. 
> I would like this to be reported as measurement "Users" and tags with user 
> id. This way I can easily visualize the data in grafana or any other tool by 
> selecting the measurement and group by tag. Is there a way to report like 
> that instead of host, process_type, tm_id, job_name, task_name & 
> subtask_index?
> 
> metricGroup.addGroup("Users")
> .meter(userId, new DropwizardMeterWrapper(new 
> com.codahale.metrics.Meter()));
> Thanks a bunch.
> 
> On Mon, Dec 11, 2017 at 11:12 PM, Piotr Nowojski  > wrote:
> Hi,
> 
> Reporting once per 10 seconds shouldn’t create problems. Best to try it out. 
> Let us know if you get into some troubles :)
> 
> Piotrek
> 
>> On 11 Dec 2017, at 18:23, Navneeth Krishnan > > wrote:
>> 
>> Thanks Piotr. 
>> 
>> Yes, passing the metric group should be sufficient. The subcomponents will 
>> not be able to provide the list of metrics to register since the metrics are 
>> created based on incoming data by tenant. Also I am planning to have the 
>> metrics reported every 10 seconds and hope it shouldn't be a problem. We use 
>> influx and grafana to plot the metrics.
>> 
>> The option 2 that I had in mind was to collect all metrics and use influx db 
>> sink to report it directly inside the pipeline. But it seems reporting per 
>> node might not be possible.
>> 
>> 
>> On Mon, Dec 11, 2017 at 3:14 AM, Piotr Nowojski > > wrote:
>> Hi,
>> 
>> I’m not sure if I completely understand your issue.
>> 
>> 1.
>> - You don’t have to pass RuntimeContext, you can always pass just the 
>> MetricGroup or ask your components/subclasses “what metrics do you want to 
>> register” and register them at the top level.
>> - Reporting tens/hundreds/thousands of metrics shouldn’t be an issue for 
>> Flink, as long as you have a reasonable reporting interval. However keep in 
>> mind that Flink only reports your metrics and you still need something to 
>> read/handle/process/aggregate your metrics
>> 2.
>> I don’t think that reporting per node/jvm is possible with Flink’s metric 
>> system. For that you would need some other solution, like report your 
>> metrics using JMX (directly register MBeans from your code)
>> 
>> Piotrek
>> 
>> > On 10 Dec 2017, at 18:51, Navneeth Krishnan > > > wrote:
>> >
>> > Hi,
>> >
>> > I have a streaming pipeline running on flink and I need to collect metrics 
>> > to identify how my algorithm is performing. The entire pipeline is 
>> > multi-tenanted and I also need metrics per tenant. Lets say there would be 
>> > around 20 metrics to be captured per tenant. I have the following ideas 
>> > for implemention but any suggestions on which one might be better will 
>> > help.
>> >
>> > 1. Use flink metric group and register a group per tenant at the operator 
>> > level. The disadvantage of this approach for me is I need the 
>> > runtimecontext parameter to register a metric and I have various 
>> > subclasses to which I need to pass this object to limit the metric scope 
>> > within the operator. Also there will be too many metrics reported if there 
>> > are higher number of subtasks.
>> > How is everyone accessing 

Flink State monitoring

2017-12-14 Thread Netzer, Liron
Hi group,

We are using Flink keyed state in several operators.
Is there an easy was to expose the data that is stored in the state, i.e. the 
key and the values?
This is needed for both monitoring as well as debugging. We would like to 
understand how many key+values are stored in each state and also to view the 
data itself.
I know that there is the "Queryable state" option, but this is still in Beta, 
and doesn't really give us what we want easily.


*We are using Flink 1.3.2 with Java.

Thanks,
Liron


Re: Flink flick cancel vs stop

2017-12-14 Thread Piotr Nowojski
Hi,

Yes we are aware of this issue and we would like to have it soon, but at the 
moment it does not look like clean shutdown will be ready for Flink 1.5.

Another solution is Kafka exactly-once producer implemented on top of the 
GenericWriteAheadSink. It could avoid this issue (at a cost of significantly 
higher overhead). There are plans to implement such producer as an alternative 
to the current one, but I do not know the timeline for that. It should be 
relatively easy task and we would welcome such contribution. 

Piotrek

> On 14 Dec 2017, at 01:43, Elias Levy  wrote:
> 
> I am re-upping this thread now that FlinkKafkaProducer011 is out.  The new 
> producer, when used with the exactly once semantics, has the rather 
> troublesome behavior that it will fallback to at-most-once, rather than 
> at-least-once, if the job is down for longer than the Kafka broker's 
> transaction.max.timeout.ms  setting.
> 
> In situations that require extended maintenance downtime, this behavior is 
> nearly certain to lead to message loss, as a canceling a job while taking a 
> savepoint will not wait for the Kafka transactions to bet committed and is 
> not atomic.
> 
> So it seems like there is a need for an atomic stop or cancel with savepoint 
> that waits for transactional sinks to commit and then immediately stop any 
> further message processing.
>  
> 
> On Tue, Oct 24, 2017 at 4:46 AM, Piotr Nowojski  > wrote:
> I would propose implementations of NewSource to be not blocking/asynchronous. 
> For example something like
> 
> public abstract Future getCurrent();
> 
> Which would allow us to perform some certain actions while there are no data 
> available to process (for example flush output buffers). Something like this 
> came up recently when we were discussing possible future changes in the 
> network stack. It wouldn’t complicate API by a lot, since default 
> implementation could just:
> 
> public Future getCurrent() {
>   return completedFuture(getCurrentBlocking());
> }
> 
> Another thing to consider is maybe we would like to leave the door open for 
> fetching records in some batches from the source’s input buffers? Source 
> function (like Kafka) have some internal buffers and it would be more 
> efficient to read all/deserialise all data present in the input buffer at 
> once, instead of paying synchronisation/calling virtual method/etc costs once 
> per each record.
> 
> Piotrek
> 
>> On 22 Sep 2017, at 11:13, Aljoscha Krettek > > wrote:
>> 
>> @Eron Yes, that would be the difference in characterisation. I think 
>> technically all sources could be transformed by that by pushing data into a 
>> (blocking) queue and having the "getElement()" method pull from that.
>> 
>>> On 15. Sep 2017, at 20:17, Elias Levy >> > wrote:
>>> 
>>> On Fri, Sep 15, 2017 at 10:02 AM, Eron Wright >> > wrote:
>>> Aljoscha, would it be correct to characterize your idea as a 'pull' source 
>>> rather than the current 'push'?  It would be interesting to look at the 
>>> existing connectors to see how hard it would be to reverse their 
>>> orientation.   e.g. the source might require a buffer pool.
>>> 
>>> The Kafka client works that way.  As does the QueueingConsumer used by the 
>>> RabbitMQ source.  The Kinesis and NiFi sources also seems to poll. Those 
>>> are all the bundled sources.
>> 
> 
>