Re: Using a ProcessFunction as a "Source"

2018-08-24 Thread vino yang
Hi Addison,

I have a lot of things I don't understand. Is your source self-generated
message? Why can't source receive input? If the source is unacceptable then
why is it called source? Isn't kafka-connector the input as source?

If you mean that under normal circumstances it can't receive another input
about control messages, there are some ways to solve it.

1) Access external systems in your source to get or subscribe to control
messages, such as Zookeeper.
2) If your source is followed by a map or process operator, then they can
be chained together as a "big" source, then you can pass your control
message via Flink's new feature "broadcast state". See this blog post for
details.[1]
3) Mix control messages with normal messages in the same message flow.
After the control message is parsed, the corresponding action is taken. Of
course, this kind of program is not very recommended.

[1]:
https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink

Thanks, vino.

Addison Higham  于2018年8月25日周六 上午12:46写道:

> HI,
>
> I am writing a parallel source function that ideally needs to receive some
> messages as control information (specifically, a state message on where to
> start reading from a kinesis stream). As far as I can tell, there isn't a
> way to make a sourceFunction receive input (which makes sense) so I am
> thinking it makes sense to use a processFunction that will occasionally
> receive control messages and mostly just output a lot of messages.
>
> This works from an API perspective, with a few different options, I could
> either:
>
> A) have the processElement function block on calling the loop that will
> produce messages or
> B) have the processEelement function return (by pass the collector and
> starting the reading on a different thread), but continue to produce
> messages downstream
>
> This obviously does raise some concerns though:
>
> 1. Does this break any assumptions flink has of message lifecycle? Option
> A of blocking on processElement for very long periods seems straight
> forward but less than ideal, not to mention not being able to handle any
> other control messages.
>
> However, I am not sure if a processFunction sending messages after the
> processElement function has returned would break some expectations flink
> has of operator lifeycles. Messages are also emitted by timers, etc, but
> this would be completely outside any of those function calls as it is
> started on another thread. This is obviously how most SourceFunctions work,
> but it isn't clear if the same technique holds for ProcessFunctions
>
> 2. Would this have a negative impact on backpressure downstream? Since I
> am still going to be using the same collector instance, it seems like it
> should ultimately work, but I wonder if there are other details I am not
> aware of.
>
> 3. Is this just a terrible idea in general? It seems like I could maybe do
> this by implementing directly on top of an Operator, but I am not as
> familiar with that API
>
> Thanks in advance for any thoughts!
>
> Addison
>


Re: anybody can start flink with job mode?

2018-08-24 Thread Hao Sun
Thanks, I'll look into it.

On Fri, Aug 24, 2018, 19:44 vino yang  wrote:

> Hi Hao Sun,
>
> From the error log, it seems that the jar package for the job was not
> found.
> You must make sure your Jar is in the classpath.
> Related documentation may not be up-to-date, and there is a discussion on
> this issue on this mailing list. [1]
>
> I see that the status of FLINK-10001 [2] is closed and it will be updated
> with the release of 1.6.1 and 1.7.0.
>
> [1]:
> http://mail-archives.apache.org/mod_mbox/flink-dev/201808.mbox/%3CCAC27z=OaohMbmcryB-+m3GBmZP=xpha8mihv7zs1grgsekk...@mail.gmail.com%3E
> 
> [2]: https://issues.apache.org/jira/browse/FLINK-10001
> 
>
> Thanks, vino.
>
>
> Hao Sun  于2018年8月25日周六 上午6:37写道:
>
>> I got an error like this.
>>
>> $ docker run -it flink-job:latest job-cluster
>> Starting the job-cluster
>> config file:
>> jobmanager.rpc.address: localhost
>> jobmanager.rpc.port: 6123
>> jobmanager.heap.size: 1024m
>> taskmanager.heap.size: 1024m
>> taskmanager.numberOfTaskSlots: 1
>> parallelism.default: 1
>> rest.port: 8081
>> Starting standalonejob as a console application on host cf9bd047082c.
>> 2018-08-24 22:33:00,773 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> 
>> 2018-08-24 22:33:00,774 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Starting
>> StandaloneJobClusterEntryPoint (Version: 1.6.0, Rev:ff472b4,
>> Date:07.08.2018 @ 13:31:13 UTC)
>> 2018-08-24 22:33:00,775 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - OS current user:
>> flink
>> 2018-08-24 22:33:01,168 WARN org.apache.hadoop.util.NativeCodeLoader -
>> Unable to load native-hadoop library for your platform... using
>> builtin-java classes where applicable
>> 2018-08-24 22:33:01,232 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Current
>> Hadoop/Kerberos user: flink
>> 2018-08-24 22:33:01,232 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JVM: OpenJDK 64-Bit
>> Server VM - Oracle Corporation - 1.8/25.111-b14
>> 2018-08-24 22:33:01,232 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Maximum heap size:
>> 981 MiBytes
>> 2018-08-24 22:33:01,232 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JAVA_HOME:
>> /usr/lib/jvm/java-1.8-openjdk/jre
>> 2018-08-24 22:33:01,236 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Hadoop version:
>> 2.8.3
>> 2018-08-24 22:33:01,236 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JVM Options:
>> 2018-08-24 22:33:01,236 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Xms1024m
>> 2018-08-24 22:33:01,236 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Xmx1024m
>> 2018-08-24 22:33:01,237 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> -Dlog4j.configuration=file:/opt/flink-1.6.0/conf/log4j-console.properties
>> 2018-08-24 22:33:01,237 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> -Dlogback.configurationFile=file:/opt/flink-1.6.0/conf/logback-console.xml
>> 2018-08-24 22:33:01,237 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Program Arguments:
>> 2018-08-24 22:33:01,237 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - --configDir
>> 2018-08-24 22:33:01,238 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> /opt/flink-1.6.0/conf
>> 2018-08-24 22:33:01,238 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Classpath:
>> /opt/flink-1.6.0/lib/flink-python_2.11-1.6.0.jar:/opt/flink-1.6.0/lib/flink-shaded-hadoop2-uber-1.6.0.jar:/opt/flink-1.6.0/lib/job.jar:/opt/flink-1.6.0/lib/log4j-1.2.17.jar:/opt/flink-1.6.0/lib/slf4j-log4j12-1.7.7.jar:/opt/flink-1.6.0/lib/flink-dist_2.11-1.6.0.jar:::
>> 2018-08-24 22:33:01,238 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> 
>> 2018-08-24 22:33:01,240 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Registered UNIX
>> signal handlers for [TERM, HUP, INT]
>> 2018-08-24 22:33:01,248 ERROR 
>> *org.apache.flink.runtime.entrypoint.ClusterEntrypoint
>> - Could not parse command line arguments [--configDir,
>> /opt/flink-1.6.0/conf].*
>> org.apache.flink.runtime.entrypoint.FlinkParseException: Failed to parse
>> the command line arguments.
>> at
>> org.apache.flink.runtime.entrypoint.parser.CommandLineParser.parse(CommandLineParser.java:52)
>> at
>> org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.main(StandaloneJobClusterEntryPoint.java:143)
>> Caused by: org.apache.commons.cli.MissingOptionException: *Missing
>> required option: j*
>> at
>> org.apache.commons.cli.DefaultParser.checkRequiredOpt

Re: Can I only use checkpoints instead of savepoints in production?

2018-08-24 Thread vino yang
Hi Averell,

The checkpoint is automatically triggered periodically according to the
checkpoint interval set by the user. I believe that you should have no
doubt about this.

There are many reasons for the Job failure.
The technical definition is that the Job does not normally enter the final
termination state.
Here is a document [1] with a transformation map of Job status, you can see
how Flink Job status is converted.
There are many reasons why a job fails.
For example, if a sub task fails or throws an exception, a sub task throws
an exception when doing a checkpoint (but this does not necessarily lead to
a job failure),
Connection timeout between a TM and JM, TM downtime, JM leader switch and
more.

*So, in these scenarios (including your own enumeration) you can simulate
the failure recovery of a job.*

More specifically, Job recovery is based on the child nodes of Zookeeper's
"/jobgraph".
If any job does not enter the termination state normally, the child nodes
of this job will not be cleaned up.

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/internals/job_scheduling.html

Thanks, vino.

Averell  于2018年8月24日周五 下午9:17写道:

> Hi Vino,
>
> Regarding this statement "/Checkpoints are taken automatically and are used
> for automatic restarting job in case of a failure/", I do not quite
> understand the definition of a failure, and how to simulate that while
> testing my application. Possible scenarios that I can think of:
>(1) flink application killed
>(2) cluster crashed
>(3) one of the servers in the cluster crashed
>(4) unhandled exception raised when abnormal data received
>...
>
> Could you please help explain?
>
> Thanks and regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: anybody can start flink with job mode?

2018-08-24 Thread vino yang
Hi Hao Sun,

>From the error log, it seems that the jar package for the job was not
found.
You must make sure your Jar is in the classpath.
Related documentation may not be up-to-date, and there is a discussion on
this issue on this mailing list. [1]

I see that the status of FLINK-10001 [2] is closed and it will be updated
with the release of 1.6.1 and 1.7.0.

[1]:
http://mail-archives.apache.org/mod_mbox/flink-dev/201808.mbox/%3CCAC27z=OaohMbmcryB-+m3GBmZP=xpha8mihv7zs1grgsekk...@mail.gmail.com%3E
[2]: https://issues.apache.org/jira/browse/FLINK-10001

Thanks, vino.


Hao Sun  于2018年8月25日周六 上午6:37写道:

> I got an error like this.
>
> $ docker run -it flink-job:latest job-cluster
> Starting the job-cluster
> config file:
> jobmanager.rpc.address: localhost
> jobmanager.rpc.port: 6123
> jobmanager.heap.size: 1024m
> taskmanager.heap.size: 1024m
> taskmanager.numberOfTaskSlots: 1
> parallelism.default: 1
> rest.port: 8081
> Starting standalonejob as a console application on host cf9bd047082c.
> 2018-08-24 22:33:00,773 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> 
> 2018-08-24 22:33:00,774 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Starting
> StandaloneJobClusterEntryPoint (Version: 1.6.0, Rev:ff472b4,
> Date:07.08.2018 @ 13:31:13 UTC)
> 2018-08-24 22:33:00,775 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - OS current user:
> flink
> 2018-08-24 22:33:01,168 WARN org.apache.hadoop.util.NativeCodeLoader -
> Unable to load native-hadoop library for your platform... using
> builtin-java classes where applicable
> 2018-08-24 22:33:01,232 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Current
> Hadoop/Kerberos user: flink
> 2018-08-24 22:33:01,232 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JVM: OpenJDK 64-Bit
> Server VM - Oracle Corporation - 1.8/25.111-b14
> 2018-08-24 22:33:01,232 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Maximum heap size:
> 981 MiBytes
> 2018-08-24 22:33:01,232 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JAVA_HOME:
> /usr/lib/jvm/java-1.8-openjdk/jre
> 2018-08-24 22:33:01,236 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Hadoop version:
> 2.8.3
> 2018-08-24 22:33:01,236 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JVM Options:
> 2018-08-24 22:33:01,236 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Xms1024m
> 2018-08-24 22:33:01,236 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Xmx1024m
> 2018-08-24 22:33:01,237 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Dlog4j.configuration=file:/opt/flink-1.6.0/conf/log4j-console.properties
> 2018-08-24 22:33:01,237 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Dlogback.configurationFile=file:/opt/flink-1.6.0/conf/logback-console.xml
> 2018-08-24 22:33:01,237 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Program Arguments:
> 2018-08-24 22:33:01,237 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - --configDir
> 2018-08-24 22:33:01,238 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> /opt/flink-1.6.0/conf
> 2018-08-24 22:33:01,238 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Classpath:
> /opt/flink-1.6.0/lib/flink-python_2.11-1.6.0.jar:/opt/flink-1.6.0/lib/flink-shaded-hadoop2-uber-1.6.0.jar:/opt/flink-1.6.0/lib/job.jar:/opt/flink-1.6.0/lib/log4j-1.2.17.jar:/opt/flink-1.6.0/lib/slf4j-log4j12-1.7.7.jar:/opt/flink-1.6.0/lib/flink-dist_2.11-1.6.0.jar:::
> 2018-08-24 22:33:01,238 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> 
> 2018-08-24 22:33:01,240 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Registered UNIX
> signal handlers for [TERM, HUP, INT]
> 2018-08-24 22:33:01,248 ERROR 
> *org.apache.flink.runtime.entrypoint.ClusterEntrypoint
> - Could not parse command line arguments [--configDir,
> /opt/flink-1.6.0/conf].*
> org.apache.flink.runtime.entrypoint.FlinkParseException: Failed to parse
> the command line arguments.
> at
> org.apache.flink.runtime.entrypoint.parser.CommandLineParser.parse(CommandLineParser.java:52)
> at
> org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.main(StandaloneJobClusterEntryPoint.java:143)
> Caused by: org.apache.commons.cli.MissingOptionException: *Missing
> required option: j*
> at
> org.apache.commons.cli.DefaultParser.checkRequiredOptions(DefaultParser.java:199)
> at org.apache.commons.cli.DefaultParser.parse(DefaultParser.java:130)
> at org.apache.commons.cli.DefaultParser.parse(DefaultParser.java:81)
> at
> org.apache.flink.runtime.entrypoint.parser.CommandLineParser.parse(CommandLineParser.java:50)
> ... 1 more
> Exception in thread "main" java.lang.IllegalArgumentException:
> cmdLineSyntax not provided
> at

anybody can start flink with job mode?

2018-08-24 Thread Hao Sun
I got an error like this.

$ docker run -it flink-job:latest job-cluster
Starting the job-cluster
config file:
jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
rest.port: 8081
Starting standalonejob as a console application on host cf9bd047082c.
2018-08-24 22:33:00,773 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -

2018-08-24 22:33:00,774 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Starting
StandaloneJobClusterEntryPoint (Version: 1.6.0, Rev:ff472b4,
Date:07.08.2018 @ 13:31:13 UTC)
2018-08-24 22:33:00,775 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - OS current user:
flink
2018-08-24 22:33:01,168 WARN org.apache.hadoop.util.NativeCodeLoader -
Unable to load native-hadoop library for your platform... using
builtin-java classes where applicable
2018-08-24 22:33:01,232 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Current
Hadoop/Kerberos user: flink
2018-08-24 22:33:01,232 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JVM: OpenJDK 64-Bit
Server VM - Oracle Corporation - 1.8/25.111-b14
2018-08-24 22:33:01,232 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Maximum heap size:
981 MiBytes
2018-08-24 22:33:01,232 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JAVA_HOME:
/usr/lib/jvm/java-1.8-openjdk/jre
2018-08-24 22:33:01,236 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Hadoop version:
2.8.3
2018-08-24 22:33:01,236 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JVM Options:
2018-08-24 22:33:01,236 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Xms1024m
2018-08-24 22:33:01,236 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Xmx1024m
2018-08-24 22:33:01,237 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Dlog4j.configuration=file:/opt/flink-1.6.0/conf/log4j-console.properties
2018-08-24 22:33:01,237 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Dlogback.configurationFile=file:/opt/flink-1.6.0/conf/logback-console.xml
2018-08-24 22:33:01,237 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Program Arguments:
2018-08-24 22:33:01,237 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - --configDir
2018-08-24 22:33:01,238 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
/opt/flink-1.6.0/conf
2018-08-24 22:33:01,238 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Classpath:
/opt/flink-1.6.0/lib/flink-python_2.11-1.6.0.jar:/opt/flink-1.6.0/lib/flink-shaded-hadoop2-uber-1.6.0.jar:/opt/flink-1.6.0/lib/job.jar:/opt/flink-1.6.0/lib/log4j-1.2.17.jar:/opt/flink-1.6.0/lib/slf4j-log4j12-1.7.7.jar:/opt/flink-1.6.0/lib/flink-dist_2.11-1.6.0.jar:::
2018-08-24 22:33:01,238 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -

2018-08-24 22:33:01,240 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Registered UNIX
signal handlers for [TERM, HUP, INT]
2018-08-24 22:33:01,248 ERROR
*org.apache.flink.runtime.entrypoint.ClusterEntrypoint
- Could not parse command line arguments [--configDir,
/opt/flink-1.6.0/conf].*
org.apache.flink.runtime.entrypoint.FlinkParseException: Failed to parse
the command line arguments.
at
org.apache.flink.runtime.entrypoint.parser.CommandLineParser.parse(CommandLineParser.java:52)
at
org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.main(StandaloneJobClusterEntryPoint.java:143)
Caused by: org.apache.commons.cli.MissingOptionException: *Missing required
option: j*
at
org.apache.commons.cli.DefaultParser.checkRequiredOptions(DefaultParser.java:199)
at org.apache.commons.cli.DefaultParser.parse(DefaultParser.java:130)
at org.apache.commons.cli.DefaultParser.parse(DefaultParser.java:81)
at
org.apache.flink.runtime.entrypoint.parser.CommandLineParser.parse(CommandLineParser.java:50)
... 1 more
Exception in thread "main" java.lang.IllegalArgumentException:
cmdLineSyntax not provided
at org.apache.commons.cli.HelpFormatter.printHelp(HelpFormatter.java:546)
at org.apache.commons.cli.HelpFormatter.printHelp(HelpFormatter.java:492)
at org.apache.commons.cli.HelpFormatter.printHelp(HelpFormatter.java:408)
at
org.apache.flink.runtime.entrypoint.parser.CommandLineParser.printHelp(CommandLineParser.java:60)
at
org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.main(StandaloneJobClusterEntryPoint.java:146)


Re: Data loss when restoring from savepoint

2018-08-24 Thread Juho Autio
Hi,

Using StreamingFileSink is not a convenient option for production use for
us as it doesn't support s3*. I could use StreamingFileSink just to verify,
but I don't see much point in doing so. Please consider my previous comment:

> I realized that BucketingSink must not play any role in this problem.
This is because only when the 24-hour window triggers, BucketingSink gets a
burst of input. Around the state restoring point (middle of the day) it
doesn't get any input, so it can't lose anything either (right?).

I could also use a kafka sink instead, but I can't imagine how there could
be any difference. It's very real that the sink doesn't get any input for a
long time until the 24-hour window closes, and then it quickly writes out
everything because it's not that much data eventually for the distinct
values.

Any ideas for debugging what's happening around the savepoint & restoration
time?

*) I actually implemented StreamingFileSink as an alternative sink. This
was before I came to realize that most likely the sink component has
nothing to do with the data loss problem. I tried it with s3n:// path just
to see an exception being thrown. In the source code I indeed then found an
explicit check for the target path scheme to be "hdfs://".

On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin 
wrote:

> Ok, I think before further debugging the window reduced state,
> could you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0
> instead of the previous 'BucketingSink’?
>
> Cheers,
> Andrey
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>
> On 24 Aug 2018, at 18:03, Juho Autio  wrote:
>
> Yes, sorry for my confusing comment. I just meant that it seems like
> there's a bug somewhere now that the output is missing some data.
>
> > I would wait and check the actual output in s3 because it is the main
> result of the job
>
> Yes, and that's what I have already done. There seems to be always some
> data loss with the production data volumes, if the job has been restarted
> on that day.
>
> Would you have any suggestions for how to debug this further?
>
> Many thanks for stepping in.
>
> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin 
> wrote:
>
>> Hi Juho,
>>
>> So it is a per key deduplication job.
>>
>> Yes, I would wait and check the actual output in s3 because it is the
>> main result of the job and
>>
>> > The late data around the time of taking savepoint might be not included
>> into the savepoint but it should be behind the snapshotted offset in Kafka.
>>
>> is not a bug, it is a possible behaviour.
>>
>> The savepoint is a snapshot of the data in transient which is already
>> consumed from Kafka.
>> Basically the full contents of the window result is split between the
>> savepoint and what can come after the savepoint'ed offset in Kafka but
>> before the window result is written into s3.
>>
>> Allowed lateness should not affect it, I am just saying that the final
>> result in s3 should include all records after it.
>> This is what should be guaranteed but not the contents of the
>> intermediate savepoint.
>>
>> Cheers,
>> Andrey
>>
>> On 24 Aug 2018, at 16:52, Juho Autio  wrote:
>>
>> Thanks for your answer!
>>
>> I check for the missed data from the final output on s3. So I wait until
>> the next day, then run the same thing re-implemented in batch, and compare
>> the output.
>>
>> > The late data around the time of taking savepoint might be not included
>> into the savepoint but it should be behind the snapshotted offset in Kafka.
>>
>> Yes, I would definitely expect that. It seems like there's a bug
>> somewhere.
>>
>> > Then it should just come later after the restore and should be reduced
>> within the allowed lateness into the final result which is saved into s3.
>>
>> Well, as far as I know, allowed lateness doesn't play any role here,
>> because I started running the job with allowedLateness=0, and still get the
>> data loss, while my late data output doesn't receive anything.
>>
>> > Also, is this `DistinctFunction.reduce` just an example or the actual
>> implementation, basically saving just one of records inside the 24h window
>> in s3? then what is missing there?
>>
>> Yes, it's the actual implementation. Note that there's a keyBy before
>> the DistinctFunction. So there's one record for each key (which is the
>> combination of a couple of fields). In practice I've seen that we're
>> missing ~2000-4000 elements on each restore, and the total output is
>> obviously much more than that.
>>
>> Here's the full code for the key selector:
>>
>> public class MapKeySelector implements KeySelector,
>> Object> {
>>
>> private final String[] fields;
>>
>> public MapKeySelector(String... fields) {
>> this.fields = fields;
>> }
>>
>> @Override
>> public Object getKey(Map event) throws Exception {
>> Tuple key = Tuple.getTupleClass(fields.length).newInstance();
>> for (int i = 0; i < fields.length; i++)

Re: [External] Re: How to do test in Flink?

2018-08-24 Thread Joe Malt
Hi Chang,

A time-saving tip for finding which library contains a class: go to
https://search.maven.org/
and enter fc: followed by the fully-qualified name of the class. You should
get the library as a search result.

In this case for example, you'd search for
fc:org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder

Best,

Joe Malt
Engineering Intern, Stream Processing
Yelp Inc.

On Fri, Aug 24, 2018 at 4:50 AM, Chang Liu  wrote:

> No worries, I found it here:
>
> 
> org.apache.flink
> flink-runtime_${scala.binary.version}
> ${flink.version}
> test-jar
> test
> 
>
>
> Best regards/祝好,
>
> Chang Liu 刘畅
>
>
>
> On Fri, Aug 24, 2018 at 1:16 PM Chang Liu  wrote:
>
>> Hi Hequn,
>>
>> I have added the following dependencies:
>>
>> 
>> org.apache.flink
>> flink-streaming-java_${scala.binary.version}
>> ${flink.version}
>> test-jar
>> test
>> 
>> 
>> org.mockito
>> mockito-core
>> 2.21.0
>> test
>> 
>>
>>
>> But got the exception:   java.lang.NoClassDefFoundError:
>> org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder
>>
>> Do you know which library contains this class? Thanks :)
>>
>> Best regards/祝好,
>>
>> Chang Liu 刘畅
>> DevOps Engineer
>> WB TECH / Cyber Crime Prevention Team
>>
>> Mobile: +31(0)687859981
>> Email: fluency...@gmail.com  &  chang.l...@ing.nl
>>
>>
>>
>> On Mon, Aug 13, 2018 at 1:42 PM Hequn Cheng  wrote:
>>
>>> Hi Change,
>>>
>>> Try
>>> 
>>> org.apache.flink
>>> flink-streaming-java_2.11
>>> ${flink.version}
>>> test-jar
>>> test
>>> 
>>> .
>>>
>>> On Mon, Aug 13, 2018 at 6:42 PM, Chang Liu  wrote:
>>>
 And another question: which library should I include in order to use
 these harnesses? I do have this flink-test-utils_2.11 in my pom, but I
 cannot find the harnesses.

 I also have the following in my pom:

- flink-core
- flink-clients_2.11
- flink-scala_2.11
- flink-streaming-java_2.11
- flink-streaming-java_2.11
- flink-connector-kafka-0.11_2.11


 Best regards/祝好,

 Chang Liu 刘畅


 On 13 Aug 2018, at 04:01, Hequn Cheng  wrote:

 Hi Chang,

 There are some harness tests which can be used to test your function.
 It is also a common way to test function or operator in flink internal
 tests. Currently, the harness classes mainly include:

- KeyedOneInputStreamOperatorTestHarness
- KeyedTwoInputStreamOperatorTestHarness
- OneInputStreamOperatorTestHarness
- TwoInputStreamOperatorTestHarness

 You can take a look at the source code of these classes.

 To be more specific, you can take a look at the
 testSlidingEventTimeWindowsApply[1], in which the RichSumReducer
 window function has been tested.

 Best, Hequn

 [1] https://github.com/apache/flink/blob/master/flink-
 streaming-java/src/test/java/org/apache/flink/streaming/
 runtime/operators/windowing/WindowOperatorTest.java#L213


 On Mon, Aug 13, 2018 at 7:10 AM, Chang Liu 
 wrote:

> Dear all,
>
> I have some questions regarding testing in Flink. The more general
> question is: is there any guideline, template, or best practices that we
> can follow if we want to test our flink code (more in scala)?
>
> I know there is this page: https://ci.apache.org/
> projects/flink/flink-docs-release-1.6/dev/stream/testing.html but not
> so much written there. And I also did not find a more comprehensive
> documentation of this library: flink-test-utils_2.11.
>
> One detailed question: how do you test this WindowFunction below? The
> return type is Unit right? We cannot do unit test on like, like how the
> ReduceFunction was tested in the example link above. Then we only have the
> option of doing integration testing on it?
> 
>
>
> Your ideas would be very helpful :) Thanks in advance !
>
> Best regards/祝好,
>
> Chang Liu 刘畅
>
>
>


>>>


Re: Data loss when restoring from savepoint

2018-08-24 Thread Andrey Zagrebin
Ok, I think before further debugging the window reduced state, 
could you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0 instead 
of the previous 'BucketingSink’?

Cheers,
Andrey

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
 


> On 24 Aug 2018, at 18:03, Juho Autio  wrote:
> 
> Yes, sorry for my confusing comment. I just meant that it seems like there's 
> a bug somewhere now that the output is missing some data.
> 
> > I would wait and check the actual output in s3 because it is the main 
> > result of the job
> 
> Yes, and that's what I have already done. There seems to be always some data 
> loss with the production data volumes, if the job has been restarted on that 
> day.
> 
> Would you have any suggestions for how to debug this further?
> 
> Many thanks for stepping in.
> 
> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin  > wrote:
> Hi Juho,
> 
> So it is a per key deduplication job.
> 
> Yes, I would wait and check the actual output in s3 because it is the main 
> result of the job and
> 
> > The late data around the time of taking savepoint might be not included 
> > into the savepoint but it should be behind the snapshotted offset in Kafka.
> 
> is not a bug, it is a possible behaviour.
> 
> The savepoint is a snapshot of the data in transient which is already 
> consumed from Kafka.
> Basically the full contents of the window result is split between the 
> savepoint and what can come after the savepoint'ed offset in Kafka but before 
> the window result is written into s3. 
> 
> Allowed lateness should not affect it, I am just saying that the final result 
> in s3 should include all records after it. 
> This is what should be guaranteed but not the contents of the intermediate 
> savepoint.
> 
> Cheers,
> Andrey
> 
>> On 24 Aug 2018, at 16:52, Juho Autio > > wrote:
>> 
>> Thanks for your answer!
>> 
>> I check for the missed data from the final output on s3. So I wait until the 
>> next day, then run the same thing re-implemented in batch, and compare the 
>> output.
>> 
>> > The late data around the time of taking savepoint might be not included 
>> > into the savepoint but it should be behind the snapshotted offset in Kafka.
>> 
>> Yes, I would definitely expect that. It seems like there's a bug somewhere.
>> 
>> > Then it should just come later after the restore and should be reduced 
>> > within the allowed lateness into the final result which is saved into s3.
>> 
>> Well, as far as I know, allowed lateness doesn't play any role here, because 
>> I started running the job with allowedLateness=0, and still get the data 
>> loss, while my late data output doesn't receive anything.
>> 
>> > Also, is this `DistinctFunction.reduce` just an example or the actual 
>> > implementation, basically saving just one of records inside the 24h window 
>> > in s3? then what is missing there?
>> 
>> Yes, it's the actual implementation. Note that there's a keyBy before the 
>> DistinctFunction. So there's one record for each key (which is the 
>> combination of a couple of fields). In practice I've seen that we're missing 
>> ~2000-4000 elements on each restore, and the total output is obviously much 
>> more than that.
>> 
>> Here's the full code for the key selector:
>> 
>> public class MapKeySelector implements KeySelector, 
>> Object> {
>> 
>> private final String[] fields;
>> 
>> public MapKeySelector(String... fields) {
>> this.fields = fields;
>> }
>> 
>> @Override
>> public Object getKey(Map event) throws Exception {
>> Tuple key = Tuple.getTupleClass(fields.length).newInstance();
>> for (int i = 0; i < fields.length; i++) {
>> key.setField(event.getOrDefault(fields[i], ""), i);
>> }
>> return key;
>> }
>> }
>> 
>> And a more exact example on how it's used:
>> 
>> .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", 
>> "KEY_NAME", "KEY_VALUE"))
>> .timeWindow(Time.days(1))
>> .reduce(new DistinctFunction())
>> 
>> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin > > wrote:
>> Hi Juho,
>> 
>> Where exactly does the data miss? When do you notice that? 
>> Do you check it:
>> - debugging `DistinctFunction.reduce` right after resume in the middle of 
>> the day 
>> or 
>> - some distinct records miss in the final output of BucketingSink in s3 
>> after window result is actually triggered and saved into s3 at the end of 
>> the day? is this the main output?
>> 
>> The late data around the time of taking savepoint might be not included into 
>> the savepoint but it should be behind the snapshotted offset in Kafka. Then 
>> it should just come later after the restore and should be reduced within the 
>> allowed lateness into the fina

Using a ProcessFunction as a "Source"

2018-08-24 Thread Addison Higham
HI,

I am writing a parallel source function that ideally needs to receive some
messages as control information (specifically, a state message on where to
start reading from a kinesis stream). As far as I can tell, there isn't a
way to make a sourceFunction receive input (which makes sense) so I am
thinking it makes sense to use a processFunction that will occasionally
receive control messages and mostly just output a lot of messages.

This works from an API perspective, with a few different options, I could
either:

A) have the processElement function block on calling the loop that will
produce messages or
B) have the processEelement function return (by pass the collector and
starting the reading on a different thread), but continue to produce
messages downstream

This obviously does raise some concerns though:

1. Does this break any assumptions flink has of message lifecycle? Option A
of blocking on processElement for very long periods seems straight forward
but less than ideal, not to mention not being able to handle any other
control messages.

However, I am not sure if a processFunction sending messages after the
processElement function has returned would break some expectations flink
has of operator lifeycles. Messages are also emitted by timers, etc, but
this would be completely outside any of those function calls as it is
started on another thread. This is obviously how most SourceFunctions work,
but it isn't clear if the same technique holds for ProcessFunctions

2. Would this have a negative impact on backpressure downstream? Since I am
still going to be using the same collector instance, it seems like it
should ultimately work, but I wonder if there are other details I am not
aware of.

3. Is this just a terrible idea in general? It seems like I could maybe do
this by implementing directly on top of an Operator, but I am not as
familiar with that API

Thanks in advance for any thoughts!

Addison


Re: Data loss when restoring from savepoint

2018-08-24 Thread Juho Autio
Yes, sorry for my confusing comment. I just meant that it seems like
there's a bug somewhere now that the output is missing some data.

> I would wait and check the actual output in s3 because it is the main
result of the job

Yes, and that's what I have already done. There seems to be always some
data loss with the production data volumes, if the job has been restarted
on that day.

Would you have any suggestions for how to debug this further?

Many thanks for stepping in.

On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin 
wrote:

> Hi Juho,
>
> So it is a per key deduplication job.
>
> Yes, I would wait and check the actual output in s3 because it is the main
> result of the job and
>
> > The late data around the time of taking savepoint might be not included
> into the savepoint but it should be behind the snapshotted offset in Kafka.
>
> is not a bug, it is a possible behaviour.
>
> The savepoint is a snapshot of the data in transient which is already
> consumed from Kafka.
> Basically the full contents of the window result is split between the
> savepoint and what can come after the savepoint'ed offset in Kafka but
> before the window result is written into s3.
>
> Allowed lateness should not affect it, I am just saying that the final
> result in s3 should include all records after it.
> This is what should be guaranteed but not the contents of the intermediate
> savepoint.
>
> Cheers,
> Andrey
>
> On 24 Aug 2018, at 16:52, Juho Autio  wrote:
>
> Thanks for your answer!
>
> I check for the missed data from the final output on s3. So I wait until
> the next day, then run the same thing re-implemented in batch, and compare
> the output.
>
> > The late data around the time of taking savepoint might be not included
> into the savepoint but it should be behind the snapshotted offset in Kafka.
>
> Yes, I would definitely expect that. It seems like there's a bug somewhere.
>
> > Then it should just come later after the restore and should be reduced
> within the allowed lateness into the final result which is saved into s3.
>
> Well, as far as I know, allowed lateness doesn't play any role here,
> because I started running the job with allowedLateness=0, and still get the
> data loss, while my late data output doesn't receive anything.
>
> > Also, is this `DistinctFunction.reduce` just an example or the actual
> implementation, basically saving just one of records inside the 24h window
> in s3? then what is missing there?
>
> Yes, it's the actual implementation. Note that there's a keyBy before
> the DistinctFunction. So there's one record for each key (which is the
> combination of a couple of fields). In practice I've seen that we're
> missing ~2000-4000 elements on each restore, and the total output is
> obviously much more than that.
>
> Here's the full code for the key selector:
>
> public class MapKeySelector implements KeySelector,
> Object> {
>
> private final String[] fields;
>
> public MapKeySelector(String... fields) {
> this.fields = fields;
> }
>
> @Override
> public Object getKey(Map event) throws Exception {
> Tuple key = Tuple.getTupleClass(fields.length).newInstance();
> for (int i = 0; i < fields.length; i++) {
> key.setField(event.getOrDefault(fields[i], ""), i);
> }
> return key;
> }
> }
>
> And a more exact example on how it's used:
>
> .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD",
> "KEY_NAME", "KEY_VALUE"))
> .timeWindow(Time.days(1))
> .reduce(new DistinctFunction())
>
> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin 
> wrote:
>
>> Hi Juho,
>>
>> Where exactly does the data miss? When do you notice that?
>> Do you check it:
>> - debugging `DistinctFunction.reduce` right after resume in the middle of
>> the day
>> or
>> - some distinct records miss in the final output of BucketingSink in s3
>> after window result is actually triggered and saved into s3 at the end of
>> the day? is this the main output?
>>
>> The late data around the time of taking savepoint might be not included
>> into the savepoint but it should be behind the snapshotted offset in Kafka.
>> Then it should just come later after the restore and should be reduced
>> within the allowed lateness into the final result which is saved into s3.
>>
>> Also, is this `DistinctFunction.reduce` just an example or the actual
>> implementation, basically saving just one of records inside the 24h window
>> in s3? then what is missing there?
>>
>> Cheers,
>> Andrey
>>
>> On 23 Aug 2018, at 15:42, Juho Autio  wrote:
>>
>> I changed to allowedLateness=0, no change, still missing data when
>> restoring from savepoint.
>>
>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio  wrote:
>>
>>> I realized that BucketingSink must not play any role in this problem.
>>> This is because only when the 24-hour window triggers, BucketinSink gets a
>>> burst of input. Around the state restoring point (middle of the day) it
>>> do

Re: Data loss when restoring from savepoint

2018-08-24 Thread Andrey Zagrebin
Hi Juho,

So it is a per key deduplication job.

Yes, I would wait and check the actual output in s3 because it is the main 
result of the job and

> The late data around the time of taking savepoint might be not included into 
> the savepoint but it should be behind the snapshotted offset in Kafka.

is not a bug, it is a possible behaviour.

The savepoint is a snapshot of the data in transient which is already consumed 
from Kafka.
Basically the full contents of the window result is split between the savepoint 
and what can come after the savepoint'ed offset in Kafka but before the window 
result is written into s3. 

Allowed lateness should not affect it, I am just saying that the final result 
in s3 should include all records after it. 
This is what should be guaranteed but not the contents of the intermediate 
savepoint.

Cheers,
Andrey

> On 24 Aug 2018, at 16:52, Juho Autio  wrote:
> 
> Thanks for your answer!
> 
> I check for the missed data from the final output on s3. So I wait until the 
> next day, then run the same thing re-implemented in batch, and compare the 
> output.
> 
> > The late data around the time of taking savepoint might be not included 
> > into the savepoint but it should be behind the snapshotted offset in Kafka.
> 
> Yes, I would definitely expect that. It seems like there's a bug somewhere.
> 
> > Then it should just come later after the restore and should be reduced 
> > within the allowed lateness into the final result which is saved into s3.
> 
> Well, as far as I know, allowed lateness doesn't play any role here, because 
> I started running the job with allowedLateness=0, and still get the data 
> loss, while my late data output doesn't receive anything.
> 
> > Also, is this `DistinctFunction.reduce` just an example or the actual 
> > implementation, basically saving just one of records inside the 24h window 
> > in s3? then what is missing there?
> 
> Yes, it's the actual implementation. Note that there's a keyBy before the 
> DistinctFunction. So there's one record for each key (which is the 
> combination of a couple of fields). In practice I've seen that we're missing 
> ~2000-4000 elements on each restore, and the total output is obviously much 
> more than that.
> 
> Here's the full code for the key selector:
> 
> public class MapKeySelector implements KeySelector, 
> Object> {
> 
> private final String[] fields;
> 
> public MapKeySelector(String... fields) {
> this.fields = fields;
> }
> 
> @Override
> public Object getKey(Map event) throws Exception {
> Tuple key = Tuple.getTupleClass(fields.length).newInstance();
> for (int i = 0; i < fields.length; i++) {
> key.setField(event.getOrDefault(fields[i], ""), i);
> }
> return key;
> }
> }
> 
> And a more exact example on how it's used:
> 
> .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", 
> "KEY_NAME", "KEY_VALUE"))
> .timeWindow(Time.days(1))
> .reduce(new DistinctFunction())
> 
> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin  > wrote:
> Hi Juho,
> 
> Where exactly does the data miss? When do you notice that? 
> Do you check it:
> - debugging `DistinctFunction.reduce` right after resume in the middle of the 
> day 
> or 
> - some distinct records miss in the final output of BucketingSink in s3 after 
> window result is actually triggered and saved into s3 at the end of the day? 
> is this the main output?
> 
> The late data around the time of taking savepoint might be not included into 
> the savepoint but it should be behind the snapshotted offset in Kafka. Then 
> it should just come later after the restore and should be reduced within the 
> allowed lateness into the final result which is saved into s3.
> 
> Also, is this `DistinctFunction.reduce` just an example or the actual 
> implementation, basically saving just one of records inside the 24h window in 
> s3? then what is missing there?
> 
> Cheers,
> Andrey
> 
>> On 23 Aug 2018, at 15:42, Juho Autio > > wrote:
>> 
>> I changed to allowedLateness=0, no change, still missing data when restoring 
>> from savepoint.
>> 
>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio > > wrote:
>> I realized that BucketingSink must not play any role in this problem. This 
>> is because only when the 24-hour window triggers, BucketinSink gets a burst 
>> of input. Around the state restoring point (middle of the day) it doesn't 
>> get any input, so it can't lose anything either (right?).
>> 
>> I will next try removing the allowedLateness entirely from the equation.
>> 
>> In the meanwhile, please let me know if you have any suggestions for 
>> debugging the lost data, for example what logs to enable.
>> 
>> We use FlinkKafkaConsumer010 btw. Are there any known issues with that, that 
>> could contribute to lost data when restoring a savepoint?
>> 
>> On Fri, Au

Re: AvroSchemaConverter and Tuple classes

2018-08-24 Thread françois lacombe
Hi Timo,

Thanks for your answer
I was looking for a Tuple as to feed a BatchTableSink subclass, but it
may be achived with a Row instead?

All the best

François

2018-08-24 10:21 GMT+02:00 Timo Walther :

> Hi,
>
> tuples are just a sub category of rows. Because the tuple arity is limited
> to 25 fields. I think the easiest solution would be to write your own
> converter that maps rows to tuples if you know that you will not need more
> than 25 fields. Otherwise it might be easier to just use a TextInputFormat
> and do the parsing yourself with a library.
>
> Regards,
> Timo
>
>
> Am 23.08.18 um 18:54 schrieb françois lacombe:
>
> Hi all,
>>
>> I'm looking for best practices regarding Tuple instances creation.
>>
>> I have a TypeInformation object produced by AvroSchemaConverter.convertToT
>> ypeInfo("{...}");
>> Is this possible to define a corresponding Tuple instance with it?
>> (get the T from the TypeInformation)
>>
>> Example :
>> {
>>   "type": "record",
>>   "fields": [
>> { "name": "field1", "type": "int" },
>> { "name": "field2", "type": "string"}
>> ]}
>>  = Tuple2
>>
>> The same question rises with DataSet or other any record handling class
>> with parametrized types.
>>
>> My goal is to parse several CsvFiles with different structures described
>> in an Avro schema.
>> It would be great to not hard-code structures in my Java code and only
>> get types information at runtime from Avro schemas
>>
>> Is this possible?
>>
>> Thanks in advance
>>
>> François Lacombe
>>
>
>
>


Re: Data loss when restoring from savepoint

2018-08-24 Thread Juho Autio
Thanks for your answer!

I check for the missed data from the final output on s3. So I wait until
the next day, then run the same thing re-implemented in batch, and compare
the output.

> The late data around the time of taking savepoint might be not included
into the savepoint but it should be behind the snapshotted offset in Kafka.

Yes, I would definitely expect that. It seems like there's a bug somewhere.

> Then it should just come later after the restore and should be reduced
within the allowed lateness into the final result which is saved into s3.

Well, as far as I know, allowed lateness doesn't play any role here,
because I started running the job with allowedLateness=0, and still get the
data loss, while my late data output doesn't receive anything.

> Also, is this `DistinctFunction.reduce` just an example or the actual
implementation, basically saving just one of records inside the 24h window
in s3? then what is missing there?

Yes, it's the actual implementation. Note that there's a keyBy before
the DistinctFunction. So there's one record for each key (which is the
combination of a couple of fields). In practice I've seen that we're
missing ~2000-4000 elements on each restore, and the total output is
obviously much more than that.

Here's the full code for the key selector:

public class MapKeySelector implements KeySelector,
Object> {

private final String[] fields;

public MapKeySelector(String... fields) {
this.fields = fields;
}

@Override
public Object getKey(Map event) throws Exception {
Tuple key = Tuple.getTupleClass(fields.length).newInstance();
for (int i = 0; i < fields.length; i++) {
key.setField(event.getOrDefault(fields[i], ""), i);
}
return key;
}
}

And a more exact example on how it's used:

.keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD",
"KEY_NAME", "KEY_VALUE"))
.timeWindow(Time.days(1))
.reduce(new DistinctFunction())

On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin 
wrote:

> Hi Juho,
>
> Where exactly does the data miss? When do you notice that?
> Do you check it:
> - debugging `DistinctFunction.reduce` right after resume in the middle of
> the day
> or
> - some distinct records miss in the final output of BucketingSink in s3
> after window result is actually triggered and saved into s3 at the end of
> the day? is this the main output?
>
> The late data around the time of taking savepoint might be not included
> into the savepoint but it should be behind the snapshotted offset in Kafka.
> Then it should just come later after the restore and should be reduced
> within the allowed lateness into the final result which is saved into s3.
>
> Also, is this `DistinctFunction.reduce` just an example or the actual
> implementation, basically saving just one of records inside the 24h window
> in s3? then what is missing there?
>
> Cheers,
> Andrey
>
> On 23 Aug 2018, at 15:42, Juho Autio  wrote:
>
> I changed to allowedLateness=0, no change, still missing data when
> restoring from savepoint.
>
> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio  wrote:
>
>> I realized that BucketingSink must not play any role in this problem.
>> This is because only when the 24-hour window triggers, BucketinSink gets a
>> burst of input. Around the state restoring point (middle of the day) it
>> doesn't get any input, so it can't lose anything either (right?).
>>
>> I will next try removing the allowedLateness entirely from the equation.
>>
>> In the meanwhile, please let me know if you have any suggestions for
>> debugging the lost data, for example what logs to enable.
>>
>> We use FlinkKafkaConsumer010 btw. Are there any known issues with that,
>> that could contribute to lost data when restoring a savepoint?
>>
>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio  wrote:
>>
>>> Some data is silently lost on my Flink stream job when state is restored
>>> from a savepoint.
>>>
>>> Do you have any debugging hints to find out where exactly the data gets
>>> dropped?
>>>
>>> My job gathers distinct values using a 24-hour window. It doesn't have
>>> any custom state management.
>>>
>>> When I cancel the job with savepoint and restore from that savepoint,
>>> some data is missed. It seems to be losing just a small amount of data. The
>>> event time of lost data is probably around the time of savepoint. In other
>>> words the rest of the time window is not entirely missed – collection works
>>> correctly also for (most of the) events that come in after restoring.
>>>
>>> When the job processes a full 24-hour window without interruptions it
>>> doesn't miss anything.
>>>
>>> Usually the problem doesn't happen in test environments that have
>>> smaller parallelism and smaller data volumes. But in production volumes the
>>> job seems to be consistently missing at least something on every restore.
>>>
>>> This issue has consistently happened since the job was initially
>>> created. It was a

Re: Data loss when restoring from savepoint

2018-08-24 Thread Andrey Zagrebin
Hi Juho,

Where exactly does the data miss? When do you notice that? 
Do you check it:
- debugging `DistinctFunction.reduce` right after resume in the middle of the 
day 
or 
- some distinct records miss in the final output of BucketingSink in s3 after 
window result is actually triggered and saved into s3 at the end of the day? is 
this the main output?

The late data around the time of taking savepoint might be not included into 
the savepoint but it should be behind the snapshotted offset in Kafka. Then it 
should just come later after the restore and should be reduced within the 
allowed lateness into the final result which is saved into s3.

Also, is this `DistinctFunction.reduce` just an example or the actual 
implementation, basically saving just one of records inside the 24h window in 
s3? then what is missing there?

Cheers,
Andrey

> On 23 Aug 2018, at 15:42, Juho Autio  wrote:
> 
> I changed to allowedLateness=0, no change, still missing data when restoring 
> from savepoint.
> 
> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio  > wrote:
> I realized that BucketingSink must not play any role in this problem. This is 
> because only when the 24-hour window triggers, BucketinSink gets a burst of 
> input. Around the state restoring point (middle of the day) it doesn't get 
> any input, so it can't lose anything either (right?).
> 
> I will next try removing the allowedLateness entirely from the equation.
> 
> In the meanwhile, please let me know if you have any suggestions for 
> debugging the lost data, for example what logs to enable.
> 
> We use FlinkKafkaConsumer010 btw. Are there any known issues with that, that 
> could contribute to lost data when restoring a savepoint?
> 
> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio  > wrote:
> Some data is silently lost on my Flink stream job when state is restored from 
> a savepoint.
> 
> Do you have any debugging hints to find out where exactly the data gets 
> dropped?
> 
> My job gathers distinct values using a 24-hour window. It doesn't have any 
> custom state management.
> 
> When I cancel the job with savepoint and restore from that savepoint, some 
> data is missed. It seems to be losing just a small amount of data. The event 
> time of lost data is probably around the time of savepoint. In other words 
> the rest of the time window is not entirely missed – collection works 
> correctly also for (most of the) events that come in after restoring.
> 
> When the job processes a full 24-hour window without interruptions it doesn't 
> miss anything.
> 
> Usually the problem doesn't happen in test environments that have smaller 
> parallelism and smaller data volumes. But in production volumes the job seems 
> to be consistently missing at least something on every restore.
> 
> This issue has consistently happened since the job was initially created. It 
> was at first run on an older version of Flink 1.5-SNAPSHOT and it still 
> happens on both Flink 1.5.2 & 1.6.0.
> 
> I'm wondering if this could be for example some synchronization issue between 
> the kafka consumer offsets vs. what's been written by BucketingSink?
> 
> 1. Job content, simplified
> 
> kafkaStream
> .flatMap(new ExtractFieldsFunction())
> .keyBy(new MapKeySelector(1, 2, 3, 4))
> .timeWindow(Time.days(1))
> .allowedLateness(allowedLateness)
> .sideOutputLateData(lateDataTag)
> .reduce(new DistinctFunction())
> .addSink(sink)
> // use a fixed number of output partitions
> .setParallelism(8))
> 
> /**
>  * Usage: .keyBy("the", "distinct", "fields").reduce(new DistinctFunction())
>  */
> public class DistinctFunction implements ReduceFunction String>> {
> @Override
> public Map reduce(Map value1, Map String> value2) {
> return value1;
> }
> }
> 
> 2. State configuration
> 
> boolean enableIncrementalCheckpointing = true;
> String statePath = "s3n://bucket/savepoints";
> new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);
> 
> Checkpointing ModeExactly Once
> Interval  1m 0s
> Timeout   10m 0s
> Minimum Pause Between Checkpoints 1m 0s
> Maximum Concurrent Checkpoints1
> Persist Checkpoints ExternallyEnabled (retain on cancellation)
> 
> 3. BucketingSink configuration
> 
> We use BucketingSink, I don't think there's anything special here, if not the 
> fact that we're writing to S3.
> 
> String outputPath = "s3://bucket/output";
> BucketingSink> sink = new 
> BucketingSink>(outputPath)
> .setBucketer(new ProcessdateBucketer())
> .setBatchSize(batchSize)
> .setInactiveBucketThreshold(inactiveBucketThreshold)
> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
> sink.setWriter(new IdJsonWriter());
> 
> 4. Kafka & event time

Re: Can I only use checkpoints instead of savepoints in production?

2018-08-24 Thread Averell
Hi Vino,

Regarding this statement "/Checkpoints are taken automatically and are used
for automatic restarting job in case of a failure/", I do not quite
understand the definition of a failure, and how to simulate that while
testing my application. Possible scenarios that I can think of:
   (1) flink application killed
   (2) cluster crashed
   (3) one of the servers in the cluster crashed
   (4) unhandled exception raised when abnormal data received
   ...

Could you please help explain?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Can I only use checkpoints instead of savepoints in production?

2018-08-24 Thread Andrey Zagrebin
This thread is also useful in this context:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/difference-between-checkpoints-amp-savepoints-td14787.html
 


> On 24 Aug 2018, at 14:49, Andrey Zagrebin  wrote:
> 
> Hi Henry,
> 
> In addition to Vino’s answer, there are several things to keep in mind about 
> “checkpoints vs savepoints".
> 
> Checkpoints are designed mostly for fault tolerance of running Flink job and 
> automatic recovery
> that is why by default Flink manages their storage itself. Though it is 
> correct that you can configure the checkpoints to be retained (externalised), 
> have control over their storage and resume a failed/canceled job from them.
> 
> But their format might be optimised for any of new Flink versions and change 
> between them.
> It means that in general you might not be able to upgrade Flink version or 
> the running job structure using only checkpoints.
> 
> Moreover, currently, it is not guaranteed that you will be always able to 
> rescale your job from the checkpoint (change parallelism). Although, it is 
> technically possible for Flink 1.6.0 at the moment, even for incremental 
> checkpoints.
> 
> Savepoints are designed for manual intervention of the user for maintenance 
> operations
> that is why their storage is under control of the user in the first place. 
> They have more stable internal format which allows manual migration between 
> Flink or job versions and rescaling.
> 
> Cheers,
> Andrey
> 
>> On 24 Aug 2018, at 12:55, vino yang > > wrote:
>> 
>> Hi Henry,
>> 
>> A good answer from stackoverflow:
>> 
>> Apache Flink's Checkpoints and Savepoints are similar in that way they both 
>> are mechanisms for preserving internal state of Flink's applications.
>> 
>> Checkpoints are taken automatically and are used for automatic restarting 
>> job in case of a failure.
>> 
>> Savepoints on the other hand are taken manually, are always stored 
>> externally and are used for starting a "new" job with previous internal 
>> state in case of e.g.
>> 
>> Bug fixing
>> Flink version upgrade
>> A/B testing, etc.
>> Underneath they are in fact the same mechanism/code path with some subtle 
>> nuances.
>> 
>> About your question: 
>> 
>> 1) No problem, The main purpose of checkpoint itself is to automatically 
>> restart the recovery when the job fails.
>> 2) You can also use REST client to trigger savepoint.
>> 3) I don't know, But it seems that their usage scenarios and purposes are 
>> still different. May Till and Chesnay can answer this question.
>> 
>> Thanks, vino.
>> 
>> 徐涛 mailto:happydexu...@gmail.com>> 于2018年8月24日周五 
>> 下午3:19写道:
>> Hi All,
>>  I check the documentation of Flink release 1.6, find that I can use 
>> checkpoints to resume the program either. As I encountered some problems 
>> when using savepoints, I have the following questions:
>>  1. Can I use checkpoints only, but not use savepoints, because it can 
>> also use to resume programs. If I do so, is there any problem?
>>  2. Checkpoint can be generated automatically, but savepoints seems can 
>> only be generated manually. I have to write a crontab to generate the 
>> savepoint, more than this, my Flink program is run on Yarn, and on the 
>> machines, only the Hadoop and Yarn are installed, so I can not use flink 
>> savepoint command to generate savepoint, and I have no authorization to 
>> install Flink on the machines.
>>  3. Will checkpoint and savepoint merged in later releases?
>>  Thanks very much.
>> 
>> Best,
>> Henry
> 



Re: Can I only use checkpoints instead of savepoints in production?

2018-08-24 Thread Andrey Zagrebin
Hi Henry,

In addition to Vino’s answer, there are several things to keep in mind about 
“checkpoints vs savepoints".

Checkpoints are designed mostly for fault tolerance of running Flink job and 
automatic recovery
that is why by default Flink manages their storage itself. Though it is correct 
that you can configure the checkpoints to be retained (externalised), have 
control over their storage and resume a failed/canceled job from them.

But their format might be optimised for any of new Flink versions and change 
between them.
It means that in general you might not be able to upgrade Flink version or the 
running job structure using only checkpoints.

Moreover, currently, it is not guaranteed that you will be always able to 
rescale your job from the checkpoint (change parallelism). Although, it is 
technically possible for Flink 1.6.0 at the moment, even for incremental 
checkpoints.

Savepoints are designed for manual intervention of the user for maintenance 
operations
that is why their storage is under control of the user in the first place. They 
have more stable internal format which allows manual migration between Flink or 
job versions and rescaling.

Cheers,
Andrey

> On 24 Aug 2018, at 12:55, vino yang  wrote:
> 
> Hi Henry,
> 
> A good answer from stackoverflow:
> 
> Apache Flink's Checkpoints and Savepoints are similar in that way they both 
> are mechanisms for preserving internal state of Flink's applications.
> 
> Checkpoints are taken automatically and are used for automatic restarting job 
> in case of a failure.
> 
> Savepoints on the other hand are taken manually, are always stored externally 
> and are used for starting a "new" job with previous internal state in case of 
> e.g.
> 
> Bug fixing
> Flink version upgrade
> A/B testing, etc.
> Underneath they are in fact the same mechanism/code path with some subtle 
> nuances.
> 
> About your question: 
> 
> 1) No problem, The main purpose of checkpoint itself is to automatically 
> restart the recovery when the job fails.
> 2) You can also use REST client to trigger savepoint.
> 3) I don't know, But it seems that their usage scenarios and purposes are 
> still different. May Till and Chesnay can answer this question.
> 
> Thanks, vino.
> 
> 徐涛 mailto:happydexu...@gmail.com>> 于2018年8月24日周五 
> 下午3:19写道:
> Hi All,
>   I check the documentation of Flink release 1.6, find that I can use 
> checkpoints to resume the program either. As I encountered some problems when 
> using savepoints, I have the following questions:
>   1. Can I use checkpoints only, but not use savepoints, because it can 
> also use to resume programs. If I do so, is there any problem?
>   2. Checkpoint can be generated automatically, but savepoints seems can 
> only be generated manually. I have to write a crontab to generate the 
> savepoint, more than this, my Flink program is run on Yarn, and on the 
> machines, only the Hadoop and Yarn are installed, so I can not use flink 
> savepoint command to generate savepoint, and I have no authorization to 
> install Flink on the machines.
>   3. Will checkpoint and savepoint merged in later releases?
>   Thanks very much.
> 
> Best,
> Henry



Re: How to do test in Flink?

2018-08-24 Thread Chang Liu
No worries, I found it here:


org.apache.flink
flink-runtime_${scala.binary.version}
${flink.version}
test-jar
test



Best regards/祝好,

Chang Liu 刘畅



On Fri, Aug 24, 2018 at 1:16 PM Chang Liu  wrote:

> Hi Hequn,
>
> I have added the following dependencies:
>
> 
> org.apache.flink
> flink-streaming-java_${scala.binary.version}
> ${flink.version}
> test-jar
> test
> 
> 
> org.mockito
> mockito-core
> 2.21.0
> test
> 
>
>
> But got the exception:   java.lang.NoClassDefFoundError:
> org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder
>
> Do you know which library contains this class? Thanks :)
>
> Best regards/祝好,
>
> Chang Liu 刘畅
> DevOps Engineer
> WB TECH / Cyber Crime Prevention Team
>
> Mobile: +31(0)687859981
> Email: fluency...@gmail.com  &  chang.l...@ing.nl
>
>
>
> On Mon, Aug 13, 2018 at 1:42 PM Hequn Cheng  wrote:
>
>> Hi Change,
>>
>> Try
>> 
>> org.apache.flink
>> flink-streaming-java_2.11
>> ${flink.version}
>> test-jar
>> test
>> 
>> .
>>
>> On Mon, Aug 13, 2018 at 6:42 PM, Chang Liu  wrote:
>>
>>> And another question: which library should I include in order to use
>>> these harnesses? I do have this flink-test-utils_2.11 in my pom, but I
>>> cannot find the harnesses.
>>>
>>> I also have the following in my pom:
>>>
>>>- flink-core
>>>- flink-clients_2.11
>>>- flink-scala_2.11
>>>- flink-streaming-java_2.11
>>>- flink-streaming-java_2.11
>>>- flink-connector-kafka-0.11_2.11
>>>
>>>
>>> Best regards/祝好,
>>>
>>> Chang Liu 刘畅
>>>
>>>
>>> On 13 Aug 2018, at 04:01, Hequn Cheng  wrote:
>>>
>>> Hi Chang,
>>>
>>> There are some harness tests which can be used to test your function. It
>>> is also a common way to test function or operator in flink internal tests.
>>> Currently, the harness classes mainly include:
>>>
>>>- KeyedOneInputStreamOperatorTestHarness
>>>- KeyedTwoInputStreamOperatorTestHarness
>>>- OneInputStreamOperatorTestHarness
>>>- TwoInputStreamOperatorTestHarness
>>>
>>> You can take a look at the source code of these classes.
>>>
>>> To be more specific, you can take a look at
>>> the testSlidingEventTimeWindowsApply[1], in which the RichSumReducer window
>>> function has been tested.
>>>
>>> Best, Hequn
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java#L213
>>>
>>>
>>> On Mon, Aug 13, 2018 at 7:10 AM, Chang Liu  wrote:
>>>
 Dear all,

 I have some questions regarding testing in Flink. The more general
 question is: is there any guideline, template, or best practices that we
 can follow if we want to test our flink code (more in scala)?

 I know there is this page:
 https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/testing.html
  but
 not so much written there. And I also did not find a more comprehensive
 documentation of this library: flink-test-utils_2.11.

 One detailed question: how do you test this WindowFunction below? The
 return type is Unit right? We cannot do unit test on like, like how the
 ReduceFunction was tested in the example link above. Then we only have the
 option of doing integration testing on it?
 


 Your ideas would be very helpful :) Thanks in advance !

 Best regards/祝好,

 Chang Liu 刘畅



>>>
>>>
>>


Re: How to do test in Flink?

2018-08-24 Thread Chang Liu
Hi Hequn,

I have added the following dependencies:


org.apache.flink
flink-streaming-java_${scala.binary.version}
${flink.version}
test-jar
test


org.mockito
mockito-core
2.21.0
test



But got the exception:   java.lang.NoClassDefFoundError:
org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder

Do you know which library contains this class? Thanks :)

Best regards/祝好,

Chang Liu 刘畅
DevOps Engineer
WB TECH / Cyber Crime Prevention Team

Mobile: +31(0)687859981
Email: fluency...@gmail.com  &  chang.l...@ing.nl



On Mon, Aug 13, 2018 at 1:42 PM Hequn Cheng  wrote:

> Hi Change,
>
> Try
> 
> org.apache.flink
> flink-streaming-java_2.11
> ${flink.version}
> test-jar
> test
> 
> .
>
> On Mon, Aug 13, 2018 at 6:42 PM, Chang Liu  wrote:
>
>> And another question: which library should I include in order to use
>> these harnesses? I do have this flink-test-utils_2.11 in my pom, but I
>> cannot find the harnesses.
>>
>> I also have the following in my pom:
>>
>>- flink-core
>>- flink-clients_2.11
>>- flink-scala_2.11
>>- flink-streaming-java_2.11
>>- flink-streaming-java_2.11
>>- flink-connector-kafka-0.11_2.11
>>
>>
>> Best regards/祝好,
>>
>> Chang Liu 刘畅
>>
>>
>> On 13 Aug 2018, at 04:01, Hequn Cheng  wrote:
>>
>> Hi Chang,
>>
>> There are some harness tests which can be used to test your function. It
>> is also a common way to test function or operator in flink internal tests.
>> Currently, the harness classes mainly include:
>>
>>- KeyedOneInputStreamOperatorTestHarness
>>- KeyedTwoInputStreamOperatorTestHarness
>>- OneInputStreamOperatorTestHarness
>>- TwoInputStreamOperatorTestHarness
>>
>> You can take a look at the source code of these classes.
>>
>> To be more specific, you can take a look at
>> the testSlidingEventTimeWindowsApply[1], in which the RichSumReducer window
>> function has been tested.
>>
>> Best, Hequn
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java#L213
>>
>>
>> On Mon, Aug 13, 2018 at 7:10 AM, Chang Liu  wrote:
>>
>>> Dear all,
>>>
>>> I have some questions regarding testing in Flink. The more general
>>> question is: is there any guideline, template, or best practices that we
>>> can follow if we want to test our flink code (more in scala)?
>>>
>>> I know there is this page:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/testing.html
>>>  but
>>> not so much written there. And I also did not find a more comprehensive
>>> documentation of this library: flink-test-utils_2.11.
>>>
>>> One detailed question: how do you test this WindowFunction below? The
>>> return type is Unit right? We cannot do unit test on like, like how the
>>> ReduceFunction was tested in the example link above. Then we only have the
>>> option of doing integration testing on it?
>>> 
>>>
>>>
>>> Your ideas would be very helpful :) Thanks in advance !
>>>
>>> Best regards/祝好,
>>>
>>> Chang Liu 刘畅
>>>
>>>
>>>
>>
>>
>


Raising a bug in Flink's unit test scripts

2018-08-24 Thread Averell
Good day everyone,

I'm writing unit test for the bug fix FLINK-9940, and found that in some
existing tests in flink-fs-tests cannot detect the issue when the file
monitoring function emits duplicated files (i.e. a same file is reported
multiple times).
Could I just fix this as part of that FLINK-9940 bug fix, or I have to raise
a separated bug?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Can I only use checkpoints instead of savepoints in production?

2018-08-24 Thread vino yang
Hi Henry,

A good answer from stackoverflow:

Apache Flink's Checkpoints and Savepoints are similar in that way they both
are mechanisms for preserving internal state of Flink's applications.

Checkpoints are taken automatically and are used for automatic restarting
job in case of a failure.

Savepoints on the other hand are taken manually, are always stored
externally and are used for starting a "new" job with previous internal
state in case of e.g.


   - Bug fixing
   - Flink version upgrade
   - A/B testing, etc.

Underneath they are in fact the same mechanism/code path with some subtle
nuances.

About your question:

1) No problem, The main purpose of checkpoint itself is to automatically
restart the recovery when the job fails.
2) You can also use REST client to trigger savepoint.
3) I don't know, But it seems that their usage scenarios and purposes are
still different. May Till and Chesnay can answer this question.

Thanks, vino.

徐涛  于2018年8月24日周五 下午3:19写道:

> Hi All,
> I check the documentation of Flink release 1.6, find that I can use
> checkpoints to resume the program either. As I encountered some problems
> when using savepoints, I have the following questions:
> 1. Can I use checkpoints only, but not use savepoints, because it can also
> use to resume programs. If I do so, is there any problem?
> 2. Checkpoint can be generated automatically, but savepoints seems can
> only be generated manually. I have to write a crontab to generate the
> savepoint, more than this, my Flink program is run on Yarn, and on the
> machines, only the Hadoop and Yarn are installed, so I can not use flink
> savepoint command to generate savepoint, and I have no authorization to
> install Flink on the machines.
> 3. Will checkpoint and savepoint merged in later releases?
> Thanks very much.
>
> Best,
> Henry
>


Re: Aggregator in CEP

2018-08-24 Thread antonio saldivar
hello thank you very much
I took a look on the link but now how can I check the conditions to get
aggregator results?

El vie., 24 ago. 2018 a las 5:27, aitozi () escribió:

> Hi,
>
> Now that it still not support the aggregator function in cep
> iterativeCondition. Now may be you need to check the condition by yourself
> to get the aggregator result. I will work for this these day, you can take
> a
> look on this issue:
>
> https://issues.apache.org/jira/projects/FLINK/issues/FLINK-9507
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Aggregator in CEP

2018-08-24 Thread aitozi
Hi,

Now that it still not support the aggregator function in cep
iterativeCondition. Now may be you need to check the condition by yourself 
to get the aggregator result. I will work for this these day, you can take a
look on this issue:

https://issues.apache.org/jira/projects/FLINK/issues/FLINK-9507



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: State TTL in Flink 1.6.0

2018-08-24 Thread Andrey Zagrebin
Hi Juho,

As Aljoscha mentioned the current TTL implementation was mostly targeted to 
data privacy applications
where only processing time matters.

I think the event time can be also useful for TTL and should address your 
concerns. 
The event time extension is on the road map for the future Flink releases.

Cheers,
Andrey

> On 22 Aug 2018, at 11:57, Aljoscha Krettek  wrote:
> 
> Hi Juho,
> 
> The main motivation for the initial implementation of TTL was compliance with 
> new GDPR rules. I.e. data cannot be accessible and must be dropped according 
> to time in the real world, i.e. processing time. The behaviour you describe, 
> with data being dropped if you keep a savepoint for too long, is actually 
> what is required for this use case.
> 
> I do see that also having this for event time can also be useful and it might 
> get implemented in the future. Maybe Stefan can chime in here.
> 
> Best,
> Aljoscha
> 
>> On 22. Aug 2018, at 11:01, Chesnay Schepler  wrote:
>> 
>> Just a quick note for the docs: 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#state-time-to-live-ttl
>> 
>> On 22.08.2018 10:53, Juho Autio wrote:
>>> First, I couldn't find anything about State TTL in Flink docs, is there 
>>> anything like that? I can manage based on Javadocs & source code, but just 
>>> wondering.
>>> 
>>> Then to main main question, why doesn't the TTL support event time, and is 
>>> there any sensible use case for the TTL if the streaming charateristic of 
>>> my job is event time?
>>> 
>>> I have a job that is cleaning up old entries from a keyed MapState by 
>>> calling registerEventTimeTimer & implementing the onTimer method. This way 
>>> I can keep the state for a certain time in _event time_.
>>> 
>>> That's more complicated code than it would have to be, so I wanted to 
>>> convert by function to use Flink's own state TTL. I started writing this:
>>> 
>>>   MapStateDescriptor stateDesc = new 
>>> MapStateDescriptor<>(
>>>   "deviceState", String.class, String.class);
>>>   StateTtlConfig ttlConfig = StateTtlConfig
>>> .newBuilder(Time.milliseconds(stateRetentionMillis))
>>>   // TODO EventTime is not supported?
>>> .setTimeCharacteristic(StateTtlConfig.TimeCharacteristic.ProcessingTime)
>>>   .build();
>>>   stateDesc.enableTimeToLive(ttlConfig);
>>> 
>>> So, I realized that ProcessingTime is the only existing TimeCharacteristic 
>>> in StateTtlConfig.
>>> 
>>> Based on some comments in Flink tickets it seems that it was a conscious 
>>> choice, because supporting EventTime TTL would be much heavier:
>>> 
>>> https://issues.apache.org/jira/browse/FLINK-3089?focusedCommentId=16318013&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16318013
>>> 
>>> So I can't exactly match the current behaviour that guarantees to keep the 
>>> state available for 24 hours (or whatever is passed as 
>>> --stateRetentionMillis).
>>> 
>>> However, if we accept the restriction and switch to processing time in 
>>> state cleanup, what does it mean?
>>> 
>>> - As long as stream keeps up with the input rate (from kafka), there's no 
>>> big difference, because 24 hours in processing time ~= 24 hours in even 
>>> time.
>>> - If the stream is lagging behind a lot, then it would be possible that the 
>>> state is cleaned "too early". However we aim at not having a lot of lag, so 
>>> this is not a real issue – job would be scaled up to catch up before it 
>>> starts lagging too much to get misses because of cleared state. Still, if 
>>> we fail to scale up quickly enough, the state might be cleared too early 
>>> and cause real trouble.
>>> - One problem is that if the stream is quickly processing a long backlog 
>>> (say, start streaming 7 days back in event time), then the state size can 
>>> temporarily grow bigger than usual – maybe this wouldn't be a big problem, 
>>> but it could at least require extraneous upscaling of resources.
>>> - After restoring from a savepoint, the processing time on the state is as 
>>> much older than what was the time of downtime due to job restart. Even this 
>>> is not a huge issue as long as the deployment downtime is short compared to 
>>> the 24 hour TTL.
>>> 
>>> Any way, all these issues combined, I'm a bit confused on the whole TTL 
>>> feature. Can it be used in event time based streaming in any sensible way? 
>>> It seems like it would be more like a cache then, and can't be relied on 
>>> well enough.
>>> 
>>> Thanks.
>>> 
>>> Juho
>> 
>> 
> 



Aggregator in CEP

2018-08-24 Thread antonio saldivar
Hello

I am developing an application where I use Flink(v 1.4.2) CEP , is there
any aggregation function to match cumulative amounts or counts in a
IterativeCondition within a period of time for a KeyBy elements?

if a cumulative amount reaches thresholds fire a result

Thank you
Regards


答复: jobmanager holds too many CLOSE_WAIT connection to datanode

2018-08-24 Thread Yuan,Youjun
One more safer approach is to execute cancel with savepoint on all jobs first
>> this sounds great!

Thanks
Youjun

发件人: vino yang 
发送时间: Friday, August 24, 2018 2:43 PM
收件人: Yuan,Youjun ; user 
主题: Re: jobmanager holds too many CLOSE_WAIT connection to datanode

Hi Youjun,

You can see if there is any real data transfer between these connections.
I guess there may be some connection leaks here, and if so, it's a bug.
On the other hand, the 1.4 version is a bit old, can you compare the 1.5 or 1.6 
whether the same problem exists?
I suggest you create an issue on JIRA and maybe get more feedback.

Questions about how to force these connections to be closed.
If you have configured HA mode and the checkpoints are enabled for the job, you 
can try to show off the JM leader, then let ZK conduct the leader election and 
JM to switch.

But please be cautious about this process. One more safer approach is to 
execute cancel with savepoint on all jobs first. Then switch JM.

Thanks, vino.

Yuan,Youjun mailto:yuanyou...@baidu.com>> 于2018年8月24日周五 
下午1:06写道:
Hi vino,

My jobs are running for months now, on a standalone cluster, using flink 1.4.0.
The connections were accumulated over time, not in a short period of time. 
There is no timeout error in Jobmanager log.

So there are two questions:
1, how to force close those connections, ideally without restarting the running 
jobs.
2, in the future, how to avoid jobmanager holing so many, apparently not 
necessary, TCP connections?

Thanks
Youjun

发件人: vino yang mailto:yanghua1...@gmail.com>>
发送时间: Friday, August 24, 2018 10:26 AM
收件人: Yuan,Youjun mailto:yuanyou...@baidu.com>>
抄送: user mailto:user@flink.apache.org>>
主题: Re: jobmanager holds too many CLOSE_WAIT connection to datanode

Hi Youjun,

How long has your job been running for a long time?
As far as I know, if in a short time, for checkpoint, jobmanager will not 
generate so many connections to HDFS.
What is your Flink cluster environment? Standalone or Flink on YARN?
In addition, does JM's log show any timeout information? Has Checkpoint timed 
out?
If you can provide more information, it will help locate the problem.

Thanks, vino.

Yuan,Youjun mailto:yuanyou...@baidu.com>> 于2018年8月23日周四 
下午10:53写道:
Hi,

After running for a while , my job manager holds thousands of CLOSE_WAIT TCP 
connection to HDFS datanode, the number is growing up slowly, and it’s likely 
will hit the max open file limit. My jobs checkpoint to HDFS every minute.
If I run lsof -i -a -p $JMPID, I can get a tons of following output:
java9433  iot  408u  IPv4 4060901898  0t0  TCP 
jmHost:17922->datanode:50010 (CLOSE_WAIT)
java9433  iot  409u  IPv4 4061478455  0t0  TCP 
jmHost:52854->datanode:50010 (CLOSE_WAIT)
java9433  iot  410r  IPv4 4063170767  0t0  TCP 
jmHost:49384->datanode:50010 (CLOSE_WAIT)
java9433  iot  411w  IPv4 4063188376  0t0  TCP 
jmHost:50516->datanode:50010 (CLOSE_WAIT)
java9433  iot  412u  IPv4 4061459881  0t0  TCP 
jmHost:51651->datanode:50010 (CLOSE_WAIT)
java9433  iot  413u  IPv4 4063737603  0t0  TCP 
jmHost:31318->datanode:50010 (CLOSE_WAIT)
java9433  iot  414w  IPv4 4062030625  0t0  TCP 
jmHost:34033->datanode:50010 (CLOSE_WAIT)
java9433  iot  415u  IPv4 4062049134  0t0  TCP 
jmHost:35156->datanode:50010 (CLOSE_WAIT)
java9433  iot  416u  IPv4 4062615550  0t0  TCP 
jmHost:16962->datanode:50010 (CLOSE_WAIT)
java9433  iot  417r  IPv4 4063757056  0t0  TCP 
jmHost:32553->datanode:50010 (CLOSE_WAIT)
java9433  iot  418w  IPv4 4064304789  0t0  TCP 
jmHost:13375->datanode:50010 (CLOSE_WAIT)
java9433  iot  419u  IPv4 4062599328  0t0  TCP 
jmHost:15915->datanode:50010 (CLOSE_WAIT)
java9433  iot  420w  IPv4 4065462963  0t0  TCP 
jmHost:30432->datanode:50010 (CLOSE_WAIT)
java9433  iot  421u  IPv4 4067178257  0t0  TCP 
jmHost:28334->datanode:50010 (CLOSE_WAIT)
java9433  iot  422u  IPv4 4066022066  0t0  TCP 
jmHost:11843->datanode:50010 (CLOSE_WAIT)


I know restarting the job manager should cleanup those connections, but I 
wonder if there is any better solution?
Btw, I am using flink 1.4.0, and running a standalone cluster.

Thanks
Youjun


Re: AvroSchemaConverter and Tuple classes

2018-08-24 Thread Timo Walther

Hi,

tuples are just a sub category of rows. Because the tuple arity is 
limited to 25 fields. I think the easiest solution would be to write 
your own converter that maps rows to tuples if you know that you will 
not need more than 25 fields. Otherwise it might be easier to just use a 
TextInputFormat and do the parsing yourself with a library.


Regards,
Timo


Am 23.08.18 um 18:54 schrieb françois lacombe:

Hi all,

I'm looking for best practices regarding Tuple instances creation.

I have a TypeInformation object produced by 
AvroSchemaConverter.convertToTypeInfo("{...}");
Is this possible to define a corresponding Tuple instance with it? 
(get the T from the TypeInformation)


Example :
{
  "type": "record",
  "fields": [
    { "name": "field1", "type": "int" },
    { "name": "field2", "type": "string"}
]}
 = Tuple2

The same question rises with DataSet or other any record handling 
class with parametrized types.


My goal is to parse several CsvFiles with different structures 
described in an Avro schema.
It would be great to not hard-code structures in my Java code and only 
get types information at runtime from Avro schemas


Is this possible?

Thanks in advance

François Lacombe





Can I only use checkpoints instead of savepoints in production?

2018-08-24 Thread 徐涛
Hi All,	I check the documentation of Flink release 1.6, find that I can use checkpoints to resume the program either. As I encountered some problems when using savepoints, I have the following questions:	1. Can I use checkpoints only, but not use savepoints, because it can also use to resume programs. If I do so, is there any problem?	2. Checkpoint can be generated automatically, but savepoints seems can only be generated manually. I have to write a crontab to generate the savepoint, more than this, my Flink program is run on Yarn, and on the machines, only the Hadoop and Yarn are installed, so I can not use flink savepoint command to generate savepoint, and I have no authorization to install Flink on the machines.	3. Will checkpoint and savepoint merged in later releases?	Thanks very much.Best,Henry