Re: watermark trigger doesn't check whether element's timestamp is passed

2016-10-31 Thread Manu Zhang
Yes, here's the example
https://github.com/manuzhang/flink/blob/pv/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/session/PageViewSessionWindowing.scala

If you print and compare the timestamp of timer with that of "PageView" in
the outputs, you could see what I mean.

I think the recently introduced TimelyFlatMapFunction is close to what I
want to achieve. It will be great if we can query time information in the
window function so I filed https://issues.apache.org/jira/browse/FLINK-4953

Thanks for your time.

Manu

On Mon, Oct 31, 2016 at 10:29 PM Aljoscha Krettek 
wrote:

> Hmm, I don't completely understand what's going on. Could you maybe post
> an example, with the trigger code that shows this behaviour?
>
> Cheers,
> Aljoscha
>
> On Thu, 27 Oct 2016 at 17:12 Manu Zhang  wrote:
>
> Hi,
>
> It's what I'm seeing. If timers are not fired at the end of window, a
> state (in the window) whose timestamp is *after *the timer will also be
> emitted. That's a problem for event-time trigger.
>
> Thanks,
> Manu
>
>
> On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek 
> wrote:
>
> Hi,
> is that example input/output what you would like to achieve or what you
> are currently seeing with Flink? I think for your use case a custom Trigger
> would be required that works like the event-time trigger but additionally
> registers timers for each element where you want to emit.
>
> Cheers,
> Aljoscha
>
> On Wed, 26 Oct 2016 at 04:04 Manu Zhang  wrote:
>
> Hi Aljoscha,
>
> Thanks for your response.  My use case is to track user trajectory based
> on page view event when they visit a website.  The input would be like a
> list of PageView(userId, url, eventTimestamp) with watermarks (=
> eventTimestamp - duration). I'm trying SessionWindows with some event time
> trigger. Note we can't wait for the end of session window due to latency.
> Instead, we want to emit the user trajectories whenever a buffered
> PageView's event time is passed by watermark. I tried
> ContinuousEventTimeTrigger and a custom trigger which sets timer on each
> element's timestamp. For both triggers I've witnessed a problem like the
> following (e.g. a session gap of 5)
>
> PageView("user1", "http://foo;, 1)
> PageView("user1", "http://foo/bar;, 2)
> Watermark(1) => emit UserTrajectory("user1", "http://foo -> *http://foo/bar
> *", [1,6])
> PageView("user1", "http://foo/bar/foobar;, 5)
> Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar ->
> *http://foo/bar/foobar *", [1, 10])
>
> The urls in bold should be included since there could be events before
> them not arrived yet.
>
>
> Thanks,
> Manu
>
>
> On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek 
> wrote:
>
> Hi,
> with some additional information we might be able to figure this out
> together. What specific combination of WindowAssigner/Trigger are you using
> for your example and what is the input stream (including watermarks)?
>
> Cheers,
> Aljoscha
>
> On Mon, 24 Oct 2016 at 06:30 Manu Zhang  wrote:
>
> Hi,
>
> Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06))
> which is triggered to emit when watermark passes the timestamp of an
> element. For example,
>
> on watermark(1:01), List(("a", 1:00)) is emitted
> on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
> on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted
>
> It seems that if *("c", 1:06) is processed before watermark(1:04)*
> List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on
> watermark(1:04). This is incorrect since there could be elements with
> timestamp between 1:04 and 1:06 that have not arrived yet.
>
> I guess this is because watermark trigger doesn't check whether element's
> timestamp has been passed.
>
> Please correct me if any of the above is not right.
>
> Thanks,
> Manu Zhang
>
>
>
>


Kinesis Connector Dependency Problems

2016-10-31 Thread Justin Yan
Hi all - first time on the mailing list, so my apologies if I break
protocol on anything.  Really excited to be using Flink, and hoping to be
active here in the future!  Also, apologies for the length of this email -
I tried to include details but may have gone overboard.

The gist of my problem is an issue with packaging the Flink Kinesis
Connector into my user code for execution on a YARN cluster in EMR -
there's some dependency trouble happening, but after about 48 hours of
attempts, I'm not sure how to make progress, and I'd really appreciate any
ideas or assistance. Thank you in advance!

### First, Some Context.

We're hoping to write our Flink jobs in scala 2.11.  The Flink JM/TMs
currently run on an EMR cluster with Hadoop 2.7 as YARN containers.  We run
our jobs via an Azkaban server, which has the Hadoop and Flink clients
installed, and the configurations are set to point at the YARN master on
our EMR cluster (with $HADOOP_HOME set so Flink can discover the hadoop
configs).  We're using Java OpenJDK7 everywhere, and Maven 3.3.9 when
building Flink from source.

We use SBT and the assembly plugin to create an Uberjar of our code and its
dependencies.  This gets uploaded to Azkaban, whereupon the following
command is run on the azkaban server to execute a Flink job:

flink run -c  usercodeuberjar-assembly-1.0.jar

I've successfully run a few flink jobs that execute on our EMR cluster in
this fashion (the WordCount example, etc.).

### The Problem

We use AWS Kinesis, and are hoping to integrate Flink with it.  Naturally,
we were hoping to use the Kinesis connector: <
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/connectors/kinesis.html
>.

After following the instructions with some experimentation, I was able to
run a Flink Kinesis application on my laptop in Local Cluster mode.
 (Ubuntu 16.04, local cluster initiated with the `./start-local.sh`
command, job submitted via `flink run -c 
usercodeuberjar-assembly-1.0.jar`)

I uploaded the same JAR to Azkaban and tried to run the same command to
submit to our EMR cluster, and got a `java.lang.NoSuchMethodError:
com.amazonaws.SDKGlobalConfiguration.isInRegionOptimizedModeEnabled()`
(I've included the full stack trace at the bottom of this email).  I went
to inspect the uploaded JAR with a `unzip
usercodeuberjar-assembly-1.0.jar`, looked in `com/amazonaws` and found the
SDKGlobalConfiguration.class file.  I decompiled and inspected it, and the
isInRegionOptimizedModeEnabled method that was purportedly missing was
indeed present.

I've included the steps I took to manifest this problem below, along with a
variety of things that I tried to do to resolve the problem - any help or
insight is greatly appreciated!

### Repro

I'm not sure how to provide a clear repro, but I'll try to include as much
detail as I can about the sequence of actions and commands I ran since
there may be some obvious mistakes:

Downloading the flink release to my laptop:

wget
http://www-us.apache.org/dist/flink/flink-1.1.3/flink-1.1.3-bin-hadoop27-scala_2.11.tgz
tar xfzv flink-1.1.3-bin-hadoop27-scala_2.11.tgz

I then SSH'd into Azkaban, and ran the same two commands, while adding the
bin/ directory to my PATH and tweaking the config for fs.hdfs.hadoopconf.
Next, after getting the flink binaries, I went to fetch the source code in
order to follow the instructions here: <
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/connectors/kinesis.html
>

wget https://github.com/apache/flink/archive/release-1.1.3.tar.gz
tar xfzv release-1.1.3.tar.gz

Here, I wanted to leverage our EMR instance profile Role instead of passing
in credentials, hence I wanted the AUTO value for the
"aws.credentials.provider" config, which seems to have been added after
1.1.3 - I made a couple of small tweaks to AWSConfigConstants.java and
AWSUtil.java to allow for that AUTO value.

Next, we're using Scala 2.11, so per the instructions here, I changed the
scala version: <
https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/building.html#scala-versions
>

tools/change-scala-version.sh 2.11

Back to the Kinesis Connector documentation...

mvn clean install -Pinclude-kinesis -DskipTests
cd flink-dist
mvn clean install -Pinclude-kinesis -DskipTests

When running that second mvn clean install, I get some warnings about the
maven shade plugin having conflicting versions.  I also get a "[WARNING]
The requested profile "include-kinesis" could not be activated because it
does not exist."

At this point, the instructions are not too clear on what to do.  I proceed
to this section to try and figure it out: <
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution
>

My goal is to package everything in my usercode JAR, and I'll try to do
that with SBT.  My first try is to install the Flink Kinesis Connector JAR
generated by mvn clean install to my local Maven 

Re: Kafka + Flink, Basic Questions

2016-10-31 Thread Robert Metzger
Hi Matt,

This is a fairly extensive question. I'll try to answer all of them, but I
don't have the time right now to extensively discuss the architecture of
your application. Maybe there's some other person on the ML who can extend
my answers.

(Answers in-line below)

On Mon, Oct 31, 2016 at 3:37 AM, Dromit  wrote:

> Hello,
>
> I'm writing an application to generate reports of a real time stream of
> sale records (product, quantity, price, time) that I get from a websocket,
> and I plan to use Kafka and Flink to do so. The result of the pipeline
> should be, for instance, "the average price for a certain product in the
> last 3 hours, minute by minute", "the volume (sum of quantities) for a
> certain product in the last hour, second by second", etc.
>
> I want to balance the load on a cluster given that I'm receiving way too
> many records to even keep them in memory in a single computer, around
> 10,000 each second.  I intend to distribute them on a Kafka topic "sales",
> with multiple partitions (one for each product). That way all the records
> for a given product are kept on the same node but different products are
> processed on different nodes.
>

The data for such windows doesn't need to fit into memory. With the RocksDB
state backend, your limit is the available disk space on each machine.
What size does each individual record have? (I'm asking to get a feeling
for the load you are putting on Flink ... I think 10k records / sec isn't
that much).

As long as the products have a similar number of messages, you won't run
into any load balancing issues.
I would recommend not to over-optimize this on the first implementation.
The bottleneck can very well be somewhere else in the end.


>
> There are some things I still don't get about how Kafka and Flink are
> supposed to work together. The following questions may be rather general
> and that's because I've no experience with either of them, so any
> additional insight or advice you can provide is very welcome.
>

Both Flink and Kafka are easy to set up. I would recommend to just dive in
and try it out.


>
> *1. Architecture*
>
> Keeping all records for a given product on the same node as mentioned
> should lower the latency since all consumers will handle only local data.
> That works in theory only, because if I'm not wrong Flink executes the
> operators over its own cluster of nodes.
>
> How does Flink know where to process what? What happens if there are more
> nodes running Flink than Kafka, or the other way around? In case I have one
> pair of Kafka and Flink node on each node on the cluster (AWS), how am I
> sure that nodes get the correct data (ie, only local data)?
>

Flink does currently not account for locality when reading from Kafka.
What latencies are you aiming for? I don't expect the Kafka connector to
add a lot of latency to your pipeline.

This blog post goes into the details of how data consumers are assigned to
partitions: http://data-artisans.com/kafka-flink-a-practical-how-to/


>
> *2. Performance*
>
> In the case of computing the volume of a product in a sliding window and
> provided the quantities are 10, 3, 6, 1, 0, 15,  For the first window I
> may need to compute 10+3+6+1, for the second 3+6+1+0, for the third
> 6+1+0+15, and so on. The problem here is that in each case I compute some
> of the sums multiple times (6+1 for instance is computed 3 times), and it
> is even worse considering a window may have thousands of records and that
> some operations are not that simple (standard deviation for instance).
>
> Is there a way to reuse the result of previous operations to avoid this
> problem? Any performance improvement to apply on this cases?
>

I think the default operators are not doing any of these optimizations. But
I think you could implement them with our custom windowing API.
Check out this blog post:
https://flink.apache.org/news/2015/12/04/Introducing-windows.html



>
> *3. Topology*
>
> The pipeline of operations to get all the information for a report is
> really big. It may require an average of prices, volume, standard
> deviation, etc. But some of these operations can be executed concurrently.
>
> How do I define the workflow (topology) so that certain "steps" are
> executed concurrently, while others wait for all the previous steps to be
> completed before proceeding?
>

I'm skipping this one. I would recommend to play around with our windowing
API. Then, it should be obvious how we parallelize, how you emit results,
how you can compute multiple reports / numbers at once.


>
> *4. Launch*
>
> The producer and all the consumers have in common many Java classes, so
> for simplicity I intend to create and launch them from a single
> application/process, maybe creating one thread for each one if required.
>
> Is there any problem with that? Any advantage of creating an independent
> application for each producer and consumer as shown in the documentation?
>

Check out this 

Re: Flink on YARN - Fault Tolerance | use case supported or not

2016-10-31 Thread Stephan Ewen
Hi Anchit!

In high-availability cases, a Flink cluster recovers jobs that it considers
belonging to the cluster. That is determined by what is set in the
Zookeeper Cluster Namespace: "recovery.zookeeper.path.namespace"
https://github.com/apache/flink/blob/release-1.1.3/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java#L646

If you submit the job in the "per-job-yarn" mode (via 'bin/flink run -m
yarn-cluster ...') then this gets a unique auto-generated namespace. The
assumption is that the job recovers itself as long as the yarn job keeps
running. If you force yarn to terminate the job, it is gone.

If you start a "yarn session", then it picks up the namespace from the
config. If you kill that yarn session while jobs are running, and then
start a new session with the same namespace, it will start recovering the
previously running jobs.

Does that make sense?

Greetings,
Stephan



On Mon, Oct 31, 2016 at 4:17 PM, Kostas Kloudas  wrote:

> Hi Jatana,
>
> As you pointed out, the correct way to do the above is to use savepoints.
> If you kill your application, then this is not a crass but rather a
> voluntary action.
>
> I am also looping in Max, as he may have something more to say on this.
>
> Cheers,
> Kostas
>
> On Sat, Oct 29, 2016 at 12:13 AM, Anchit Jatana <
> development.anc...@gmail.com> wrote:
>
>> Hi All,
>>
>> I tried testing fault tolerance in a different way(not sure if it as
>> appropriate way) of my running flink application. I ran the flink
>> application on YARN and after completing few checkpoints, killed the YARN
>> application using:
>>
>> yarn application -kill application_1476277440022_
>>
>> Further, tried restarting the application by providing the same path of
>> the checkpointing directory. The application started afresh and did not
>> resume from the last check-pointed state. Just wanted to make sure if fault
>> tolerance in this usecase is valid or not. If yes, what am I doing wrong?
>>
>> I'm aware of the savepoint process- to create savepoint, stop the
>> application and resume new application from the same savepoint but wished
>> to check the above usecase considering the fact that for some reason if the
>> YARN application gets killed perhaps accidentally or due to any other
>> reason, is this kind of fault tolerance supported or not.
>>
>>
>> Regards,
>> Anchit
>>
>
>


Re: Flink on YARN - Fault Tolerance | use case supported or not

2016-10-31 Thread Kostas Kloudas
Hi Jatana,

As you pointed out, the correct way to do the above is to use savepoints.
If you kill your application, then this is not a crass but rather a
voluntary action.

I am also looping in Max, as he may have something more to say on this.

Cheers,
Kostas

On Sat, Oct 29, 2016 at 12:13 AM, Anchit Jatana <
development.anc...@gmail.com> wrote:

> Hi All,
>
> I tried testing fault tolerance in a different way(not sure if it as
> appropriate way) of my running flink application. I ran the flink
> application on YARN and after completing few checkpoints, killed the YARN
> application using:
>
> yarn application -kill application_1476277440022_
>
> Further, tried restarting the application by providing the same path of
> the checkpointing directory. The application started afresh and did not
> resume from the last check-pointed state. Just wanted to make sure if fault
> tolerance in this usecase is valid or not. If yes, what am I doing wrong?
>
> I'm aware of the savepoint process- to create savepoint, stop the
> application and resume new application from the same savepoint but wished
> to check the above usecase considering the fact that for some reason if the
> YARN application gets killed perhaps accidentally or due to any other
> reason, is this kind of fault tolerance supported or not.
>
>
> Regards,
> Anchit
>


Re: watermark trigger doesn't check whether element's timestamp is passed

2016-10-31 Thread Aljoscha Krettek
Hmm, I don't completely understand what's going on. Could you maybe post an
example, with the trigger code that shows this behaviour?

Cheers,
Aljoscha

On Thu, 27 Oct 2016 at 17:12 Manu Zhang  wrote:

> Hi,
>
> It's what I'm seeing. If timers are not fired at the end of window, a
> state (in the window) whose timestamp is *after *the timer will also be
> emitted. That's a problem for event-time trigger.
>
> Thanks,
> Manu
>
>
> On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek 
> wrote:
>
> Hi,
> is that example input/output what you would like to achieve or what you
> are currently seeing with Flink? I think for your use case a custom Trigger
> would be required that works like the event-time trigger but additionally
> registers timers for each element where you want to emit.
>
> Cheers,
> Aljoscha
>
> On Wed, 26 Oct 2016 at 04:04 Manu Zhang  wrote:
>
> Hi Aljoscha,
>
> Thanks for your response.  My use case is to track user trajectory based
> on page view event when they visit a website.  The input would be like a
> list of PageView(userId, url, eventTimestamp) with watermarks (=
> eventTimestamp - duration). I'm trying SessionWindows with some event time
> trigger. Note we can't wait for the end of session window due to latency.
> Instead, we want to emit the user trajectories whenever a buffered
> PageView's event time is passed by watermark. I tried
> ContinuousEventTimeTrigger and a custom trigger which sets timer on each
> element's timestamp. For both triggers I've witnessed a problem like the
> following (e.g. a session gap of 5)
>
> PageView("user1", "http://foo;, 1)
> PageView("user1", "http://foo/bar;, 2)
> Watermark(1) => emit UserTrajectory("user1", "http://foo -> *http://foo/bar
> *", [1,6])
> PageView("user1", "http://foo/bar/foobar;, 5)
> Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar ->
> *http://foo/bar/foobar *", [1, 10])
>
> The urls in bold should be included since there could be events before
> them not arrived yet.
>
>
> Thanks,
> Manu
>
>
> On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek 
> wrote:
>
> Hi,
> with some additional information we might be able to figure this out
> together. What specific combination of WindowAssigner/Trigger are you using
> for your example and what is the input stream (including watermarks)?
>
> Cheers,
> Aljoscha
>
> On Mon, 24 Oct 2016 at 06:30 Manu Zhang  wrote:
>
> Hi,
>
> Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06))
> which is triggered to emit when watermark passes the timestamp of an
> element. For example,
>
> on watermark(1:01), List(("a", 1:00)) is emitted
> on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
> on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted
>
> It seems that if *("c", 1:06) is processed before watermark(1:04)*
> List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on
> watermark(1:04). This is incorrect since there could be elements with
> timestamp between 1:04 and 1:06 that have not arrived yet.
>
> I guess this is because watermark trigger doesn't check whether element's
> timestamp has been passed.
>
> Please correct me if any of the above is not right.
>
> Thanks,
> Manu Zhang
>
>
>
>


Re: Session windows - Evaluation of addition of element + window expires/gets discarded after some set time of inactivity

2016-10-31 Thread Aljoscha Krettek
Hi Anchit,
the timers don't necessarily have to be cleaned up. So you should be good
to go.

Cheers,
Aljoscha

On Fri, 28 Oct 2016 at 23:33 Anchit Jatana 
wrote:

> Hi Aljoscha,
>
> I am using the custom trigger with GlobalWindows window assigner. Do I
> still
> need to override clear method and delete the ProcessingTimeTimer using-
> triggerContext.deleteProcessingTimeTimer(prevTime)?
>
> Regards,
> Anchit
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Session-windows-Evaluation-of-addition-of-element-window-expires-gets-discarded-after-some-set-time-y-tp9665p9774.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Looping over a DataSet and accesing another DataSet

2016-10-31 Thread otherwise777
Thank you for your reply, this is new information for me,

Regarding the algorithm, i gave it a better look and i don't think it will
work with joining. When looping over the Edge set (u,v) we need to be able
to write and read A[u] and A[v]. If i join them it will create a new
instances of that value and it doesn't matter if it's changed in one
instance.

For example i have the following edges:
 u v
 1 2
 1 3

With vertices and values:
 1 a
 2 b
 3 c

If i join them i get:
 u v u' v'
 1 2 a b
 1 3 a c

If i loop over the joined set and change the u' value of the first instance
to "d" then in my next loop step it will be 'a'.




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Looping-over-a-DataSet-and-accesing-another-DataSet-tp9778p9784.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Looping over a DataSet and accesing another DataSet

2016-10-31 Thread Greg Hogan
The DataSet API only supports binary joins but one can simulate an n-ary
join by chaining successive join operations.

Your algorithm requires a global ordering on edges, requiring a parallelism
of 1, and will not scale in a distributed processing system. Flink excels
at processing bulk (larger than memory) data in serial.

Greg

On Mon, Oct 31, 2016 at 5:54 AM, otherwise777 
wrote:

> Thank you for your reply and explanation, I think there is one issue with
> your method though, you said that i should make a join with the the key
> value pair A on v and  the Edge set (u,v), this would work, however i not
> only need to access A[v] in one iteration but also A[u], so if i join on v
> that won't be possible
>
> Did i understand it correctly?
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Looping-over-a-
> DataSet-and-accesing-another-DataSet-tp9778p9782.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Looping over a DataSet and accesing another DataSet

2016-10-31 Thread otherwise777
Thank you for your reply and explanation, I think there is one issue with
your method though, you said that i should make a join with the the key
value pair A on v and  the Edge set (u,v), this would work, however i not
only need to access A[v] in one iteration but also A[u], so if i join on v
that won't be possible

Did i understand it correctly?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Looping-over-a-DataSet-and-accesing-another-DataSet-tp9778p9782.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.