TimelyFlatMapFunction and DataStream

2016-11-01 Thread Ken Krugler
I’m curious why it seems like a TimelyFlatMapFunction can’t be used with a 
regular DataStream, but it can be used with a KeyedStream.

Or maybe I’m missing something obvious (this is with 1.2-SNAPSHOT, pulled 
today).

Also the documentation of TimelyFlatMapFunction 
(https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.html
 
)
 shows using it with a DataStream.flatMap(xxx) call.

Thanks,

— Ken

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





Re: Flink Metrics - InfluxDB + Grafana | Help with query influxDB query for Grafana to plot 'numRecordsIn' & 'numRecordsOut' for each operator/operation

2016-11-01 Thread Anchit Jatana
I've set the metric reporting frequency to InfluxDB as 10s. In the
screenshot, I'm using Grafana query interval of 1s. I've tried 10s and more
too, the graph shape changes a bit but the incorrect negative values are
still plotted(makes no difference).

Something to add: If the subtasks are less than equal to 30, the same query
yields correct results. For subtask index > 30 (for my case being 50) it
plots junk negative and poistive values.

Regards,
Anchit



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Metrics-InfluxDB-Grafana-Help-with-query-influxDB-query-for-Grafana-to-plot-numRecordsIn-numRen-tp9775p9819.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Flink Metrics - InfluxDB + Grafana | Help with query influxDB query for Grafana to plot 'numRecordsIn' & 'numRecordsOut' for each operator/operation

2016-11-01 Thread Jamie Grier
Hmm.  I can't recreate that behavior here.  I have seen some issues like
this if you're grouping by a time interval different from the metrics
reporting interval you're using, though.  How often are you reporting
metrics to Influx?  Are you using the same interval in your Grafana
queries?  I see in your queries you are using a time interval of 10
seconds.  Have you tried 1 second?  Do you see the same behavior?

-Jamie


On Tue, Nov 1, 2016 at 4:30 PM, Anchit Jatana 
wrote:

> Hi Jamie,
>
> Thank you so much for your response.
>
> The below query:
>
> SELECT derivative(sum("count"), 1s) FROM "numRecordsIn" WHERE "task_name" =
> 'Sink: Unnamed' AND $timeFilter GROUP BY time(1s)
>
> behaves the same as with the use of the templating variable in the 'All'
> case i.e. shows a plots of junk 'negative values'
>
> It shows accurate results/plot when an additional where clause for
> "subtask_index" is applied to the query.
>
> But without the "subtask_index" where clause (which means for all the
> subtask_indexes) it shows some junk/incorrect values on the graph (both
> highly positive & highly negative values in orders of millions)
>
> Images:
>
>  n4.nabble.com/file/n9816/Incorrect_%28for_all_subtasks%29.png>
>
>  n4.nabble.com/file/n9816/Correct_for_specific_subtask.png>
>
> Regards,
> Anchit
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Flink-Metrics-
> InfluxDB-Grafana-Help-with-query-influxDB-query-for-
> Grafana-to-plot-numRecordsIn-numRen-tp9775p9816.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier 
ja...@data-artisans.com


Re: Flink on YARN - Fault Tolerance | use case supported or not

2016-11-01 Thread Anchit Jatana
Yes, thank Stephan.

Regards,
Anchit



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-on-YARN-Fault-Tolerance-use-case-supported-or-not-tp9776p9817.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Flink Metrics - InfluxDB + Grafana | Help with query influxDB query for Grafana to plot 'numRecordsIn' & 'numRecordsOut' for each operator/operation

2016-11-01 Thread Anchit Jatana
Hi Jamie,

Thank you so much for your response. 

The below query:

SELECT derivative(sum("count"), 1s) FROM "numRecordsIn" WHERE "task_name" =
'Sink: Unnamed' AND $timeFilter GROUP BY time(1s)

behaves the same as with the use of the templating variable in the 'All'
case i.e. shows a plots of junk 'negative values'

It shows accurate results/plot when an additional where clause for
"subtask_index" is applied to the query.

But without the "subtask_index" where clause (which means for all the
subtask_indexes) it shows some junk/incorrect values on the graph (both
highly positive & highly negative values in orders of millions)

Images:


  


 

Regards,
Anchit



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Metrics-InfluxDB-Grafana-Help-with-query-influxDB-query-for-Grafana-to-plot-numRecordsIn-numRen-tp9775p9816.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: watermark trigger doesn't check whether element's timestamp is passed

2016-11-01 Thread aj heller
Hi Manu, Aljoscha,

I had been interested in implementing FLIP-2, but I haven't been able to
make time for it. There is no implementation yet that I'm aware of, and
I'll gladly step aside (or help out how I can) if you or anyone is
interested to take charge of it.

That said, I'm also not sure if discussions are ongoing. I had hoped to
prototype the proposal as is, to have something more concrete to discuss.

Cheers,
aj
On Nov 1, 2016 3:24 PM, "Manu Zhang"  wrote:

> Thanks.  The ideal case is to fire after watermark past each element from
> the window but that requires a custom trigger and FLIP-2 as well. The
> enhanced window evictor will help to avoid the last firing.
>
> Are the discussions on FLIP-2 still going on ?
> Are there any opening JIRAs or PRs ? (The proposed `ProcessWindowFunction`
> will be sufficient for my case)
> Is there a workaround now for my case ?
>
> Thanks again for following through this.
> Manu
>
> On Wed, Nov 2, 2016 at 1:07 AM Aljoscha Krettek 
> wrote:
>
>> Ah, I finally understand it. You would a way to query the current
>> watermark in the window function to only emit those elements where the
>> timestamp is lower than the watermark.
>>
>> When the window fires again, do you want to emit elements that you
>> emitted during the last firing again? If not, I think you also need to use
>> an evictor to evict the elements from the window where the timestamp is
>> lower than the watermark. With this FLIP https://cwiki.apache.org/
>> confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata we
>> should be able to extend the WindowFunction Context to also provide the
>> current watermark. With this recent PR https://github.com/apache/
>> flink/pull/2736 you would be able to evict elements from the window
>> state after the window function was called.
>>
>> Cheers,
>> Aljoscha
>>
>> On Tue, 1 Nov 2016 at 02:27 Manu Zhang  wrote:
>>
>> Yes, here's the example https://github.com/manuzhang/flink/blob/pv/flink-
>> examples/flink-examples-streaming/src/main/scala/org/
>> apache/flink/streaming/scala/examples/session/
>> PageViewSessionWindowing.scala
>>
>> If you print and compare the timestamp of timer with that of "PageView"
>> in the outputs, you could see what I mean.
>>
>> I think the recently introduced TimelyFlatMapFunction is close to what I
>> want to achieve. It will be great if we can query time information in the
>> window function so I filed https://issues.apache.
>> org/jira/browse/FLINK-4953
>>
>> Thanks for your time.
>>
>> Manu
>>
>> On Mon, Oct 31, 2016 at 10:29 PM Aljoscha Krettek 
>> wrote:
>>
>> Hmm, I don't completely understand what's going on. Could you maybe post
>> an example, with the trigger code that shows this behaviour?
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 27 Oct 2016 at 17:12 Manu Zhang  wrote:
>>
>> Hi,
>>
>> It's what I'm seeing. If timers are not fired at the end of window, a
>> state (in the window) whose timestamp is *after *the timer will also be
>> emitted. That's a problem for event-time trigger.
>>
>> Thanks,
>> Manu
>>
>>
>> On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek 
>> wrote:
>>
>> Hi,
>> is that example input/output what you would like to achieve or what you
>> are currently seeing with Flink? I think for your use case a custom Trigger
>> would be required that works like the event-time trigger but additionally
>> registers timers for each element where you want to emit.
>>
>> Cheers,
>> Aljoscha
>>
>> On Wed, 26 Oct 2016 at 04:04 Manu Zhang  wrote:
>>
>> Hi Aljoscha,
>>
>> Thanks for your response.  My use case is to track user trajectory based
>> on page view event when they visit a website.  The input would be like a
>> list of PageView(userId, url, eventTimestamp) with watermarks (=
>> eventTimestamp - duration). I'm trying SessionWindows with some event time
>> trigger. Note we can't wait for the end of session window due to latency.
>> Instead, we want to emit the user trajectories whenever a buffered
>> PageView's event time is passed by watermark. I tried
>> ContinuousEventTimeTrigger and a custom trigger which sets timer on each
>> element's timestamp. For both triggers I've witnessed a problem like the
>> following (e.g. a session gap of 5)
>>
>> PageView("user1", "http://foo;, 1)
>> PageView("user1", "http://foo/bar;, 2)
>> Watermark(1) => emit UserTrajectory("user1", "http://foo -> *http://foo/bar
>> *", [1,6])
>> PageView("user1", "http://foo/bar/foobar;, 5)
>> Watermark(4) => emit UserTrajectory("user1", "http://foo ->
>> http://foo/bar -> *http://foo/bar/foobar *", [1,
>> 10])
>>
>> The urls in bold should be included since there could be events before
>> them not arrived yet.
>>
>>
>> Thanks,
>> Manu
>>
>>
>> On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek 
>> wrote:
>>
>> Hi,
>> with 

Re: Question about the checkpoint mechanism in Flink.

2016-11-01 Thread Renjie Liu
Hi, Till:
I think the multiple input should include the more general case where
redistribution happens between subtasks, right? Since in this case we also
need to align check barrier.

Till Rohrmann 于2016年11月1日周二 下午11:05写道:

> The tuples are not buffered until the snapshot is globally complete (a
> snapshot is globally complete iff all operators have successfully taken a
> snapshot). They are only buffered until the corresponding checkpoint
> barrier on the second input is received. Once this is the case, the
> checkpoint barrier will directly be send to the downstream operators. Next
> a snapshot is taken. Depending on the state backend this can happen
> asynchronously or synchronously. After this is done, the operator continues
> processing elements (for the first input, the buffered elements are
> consumed first).
>
> With multiple inputs I referred to a coFlatMap operator or a join operator
> which have both two inputs.
>
> Cheers,
> Till
>
> On Tue, Nov 1, 2016 at 3:29 PM, Renjie Liu 
> wrote:
>
> Hi, Till:
> By operator with multiple inputs, do you mean inputs from multiple
> subtasks?
>
> On Tue, Nov 1, 2016 at 8:56 PM Till Rohrmann  wrote:
>
> Hi Li,
>
> the statement refers to operators with multiple inputs (two in this case).
> With the current implementation you will indeed block one of the inputs
> after receiving a checkpoint barrier n until you've received the
> corresponding checkpoint barrier n on the other input as well. This is what
> we call checkpoint barrier alignment. If the processing time on both input
> paths is similar and thus there is no back pressure on any of the inputs,
> the alignment should not take too long. In case where one of the inputs is
> considerably slower than the other, you should an additional delay.
>
> For single input operators, you don't have to align the checkpoint
> barriers.
>
> The checkpoint barrier alignment is not strictly necessary, but it allows
> us to not having to store all in flight records from the second input which
> arrive between the checkpoint barrier on the first input and the
> corresponding barrier on the second input. We might change this
> implementation in the future, though.
>
> Cheers,
> Till
>
> On Tue, Nov 1, 2016 at 8:05 AM, Li Wang  wrote:
>
> Hi all,
>
> I have a question regarding to the state checkpoint mechanism in Flink. I
> find the statement  "Once the last stream has received barrier n, the
> operator emits all pending outgoing records, and then emits
> snapshot n barriers itself” on the document
> https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html#exactly-once-vs-at-least-once
> .
>
> Does this mean that to achieve exactly-once semantic, instead of sending
> tuples downstream immediately the operator buffers its outgoing tuples in a
> pending queue until the current snapshot is committed? If yes, will this
> introduce significant processing delay?
>
> Thanks,
> Li
>
>
> --
> Liu, Renjie
> Software Engineer, MVAD
>
>
> --
Liu, Renjie
Software Engineer, MVAD


Re: watermark trigger doesn't check whether element's timestamp is passed

2016-11-01 Thread Manu Zhang
Thanks.  The ideal case is to fire after watermark past each element from
the window but that requires a custom trigger and FLIP-2 as well. The
enhanced window evictor will help to avoid the last firing.

Are the discussions on FLIP-2 still going on ?
Are there any opening JIRAs or PRs ? (The proposed `ProcessWindowFunction`
will be sufficient for my case)
Is there a workaround now for my case ?

Thanks again for following through this.
Manu

On Wed, Nov 2, 2016 at 1:07 AM Aljoscha Krettek  wrote:

> Ah, I finally understand it. You would a way to query the current
> watermark in the window function to only emit those elements where the
> timestamp is lower than the watermark.
>
> When the window fires again, do you want to emit elements that you emitted
> during the last firing again? If not, I think you also need to use an
> evictor to evict the elements from the window where the timestamp is lower
> than the watermark. With this FLIP
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata
>  we
> should be able to extend the WindowFunction Context to also provide the
> current watermark. With this recent PR
> https://github.com/apache/flink/pull/2736 you would be able to evict
> elements from the window state after the window function was called.
>
> Cheers,
> Aljoscha
>
> On Tue, 1 Nov 2016 at 02:27 Manu Zhang  wrote:
>
> Yes, here's the example
> https://github.com/manuzhang/flink/blob/pv/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/session/PageViewSessionWindowing.scala
>
> If you print and compare the timestamp of timer with that of "PageView" in
> the outputs, you could see what I mean.
>
> I think the recently introduced TimelyFlatMapFunction is close to what I
> want to achieve. It will be great if we can query time information in the
> window function so I filed
> https://issues.apache.org/jira/browse/FLINK-4953
>
> Thanks for your time.
>
> Manu
>
> On Mon, Oct 31, 2016 at 10:29 PM Aljoscha Krettek 
> wrote:
>
> Hmm, I don't completely understand what's going on. Could you maybe post
> an example, with the trigger code that shows this behaviour?
>
> Cheers,
> Aljoscha
>
> On Thu, 27 Oct 2016 at 17:12 Manu Zhang  wrote:
>
> Hi,
>
> It's what I'm seeing. If timers are not fired at the end of window, a
> state (in the window) whose timestamp is *after *the timer will also be
> emitted. That's a problem for event-time trigger.
>
> Thanks,
> Manu
>
>
> On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek 
> wrote:
>
> Hi,
> is that example input/output what you would like to achieve or what you
> are currently seeing with Flink? I think for your use case a custom Trigger
> would be required that works like the event-time trigger but additionally
> registers timers for each element where you want to emit.
>
> Cheers,
> Aljoscha
>
> On Wed, 26 Oct 2016 at 04:04 Manu Zhang  wrote:
>
> Hi Aljoscha,
>
> Thanks for your response.  My use case is to track user trajectory based
> on page view event when they visit a website.  The input would be like a
> list of PageView(userId, url, eventTimestamp) with watermarks (=
> eventTimestamp - duration). I'm trying SessionWindows with some event time
> trigger. Note we can't wait for the end of session window due to latency.
> Instead, we want to emit the user trajectories whenever a buffered
> PageView's event time is passed by watermark. I tried
> ContinuousEventTimeTrigger and a custom trigger which sets timer on each
> element's timestamp. For both triggers I've witnessed a problem like the
> following (e.g. a session gap of 5)
>
> PageView("user1", "http://foo;, 1)
> PageView("user1", "http://foo/bar;, 2)
> Watermark(1) => emit UserTrajectory("user1", "http://foo -> *http://foo/bar
> *", [1,6])
> PageView("user1", "http://foo/bar/foobar;, 5)
> Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar ->
> *http://foo/bar/foobar *", [1, 10])
>
> The urls in bold should be included since there could be events before
> them not arrived yet.
>
>
> Thanks,
> Manu
>
>
> On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek 
> wrote:
>
> Hi,
> with some additional information we might be able to figure this out
> together. What specific combination of WindowAssigner/Trigger are you using
> for your example and what is the input stream (including watermarks)?
>
> Cheers,
> Aljoscha
>
> On Mon, 24 Oct 2016 at 06:30 Manu Zhang  wrote:
>
> Hi,
>
> Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06))
> which is triggered to emit when watermark passes the timestamp of an
> element. For example,
>
> on watermark(1:01), List(("a", 1:00)) is emitted
> on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
> on watermark(1:07), 

Re: Flink Metrics - InfluxDB + Grafana | Help with query influxDB query for Grafana to plot 'numRecordsIn' & 'numRecordsOut' for each operator/operation

2016-11-01 Thread Jamie Grier
Ahh.. I haven’t used templating all that much but this also works for your
substask variable so that you don’t have to enumerate all the possible
values:

Template Variable Type: query

query: SHOW TAG VALUES FROM numRecordsIn WITH KEY = "subtask_index"
​

On Tue, Nov 1, 2016 at 2:51 PM, Jamie Grier  wrote:

> Another note.  In the example the template variable type is "custom" and
> the values have to be enumerated manually.  So in your case you would have
> to configure all the possible values of "subtask" to be 0-49.
>
> On Tue, Nov 1, 2016 at 2:43 PM, Jamie Grier 
> wrote:
>
>> This works well for me. This will aggregate the data across all sub-task
>> instances:
>>
>> SELECT derivative(sum("count"), 1s) FROM "numRecordsIn" WHERE "task_name"
>> = 'Sink: Unnamed' AND $timeFilter GROUP BY time(1s)
>>
>> You can also plot each sub-task instance separately on the same graph by
>> doing:
>>
>> SELECT derivative(sum("count"), 1s) FROM "numRecordsIn" WHERE "task_name"
>> = 'Sink: Unnamed' AND $timeFilter GROUP BY time(1s), "subtask_index"
>>
>> Or select just a single subtask instance by using:
>>
>> SELECT derivative(sum("count"), 1s) FROM "numRecordsIn" WHERE "task_name"
>> = 'Sink: Unnamed' AND "subtask_index" = '7' AND $timeFilter GROUP BY
>> time(1s)
>>
>> I haven’t used the templating features much but this also seems to work
>> fine and allows you to select an individual subtask_index or ‘all’ and it
>> works as it should — summing across all subtasks when you select ‘all’.
>>
>> SELECT derivative(sum("count"), 1s) FROM "numRecordsIn" WHERE "task_name"
>> = 'Sink: Unnamed' AND "subtask_index" =~ /^$subtask$/ AND $timeFilter GROUP
>> BY time(1s)
>> ​
>>
>> On Fri, Oct 28, 2016 at 2:53 PM, Anchit Jatana <
>> development.anc...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I'm trying to plot the flink application metrics using grafana backed by
>>> influxdb. I need to plot/monitor the 'numRecordsIn' & 'numRecordsOut' for
>>> each operator/operation. I'm finding it hard to generate the influxdb query
>>> in grafana which can help me make this plot.
>>>
>>> I am able to plot the 'numRecordsIn' & 'numRecordsOut' for each
>>> subtask(parallelism set to 50) of the operator but not the operator as a
>>> whole.
>>>
>>> If somebody has knowledge or has successfully implemented this kind of a
>>> plot on grafana backed by influxdb, please share with me the process/query
>>> to achieve the same.
>>>
>>> Below is the query which I have to monitor the 'numRecordsIn' &
>>> 'numRecordsOut' for each subtask
>>>
>>> SELECT derivative(sum("count"), 10s) FROM "numRecordsOut" WHERE
>>> "task_name" = 'Source: Reading from Kafka' AND "subtask_index" =~
>>> /^$subtask$/ AND $timeFilter GROUP BY time(10s), "task_name"
>>>
>>> PS: $subtask is the templating variable that I'm using in order to have
>>> multiple subtask values. I have tried the 'All' option for this templating
>>> variable- This give me an incorrect plot showing me negative values while
>>> the individual selection of subtask values when selected from the
>>> templating variable drop down yields correct result.
>>>
>>> Thank you!
>>>
>>> Regards,
>>> Anchit
>>>
>>>
>>>
>>
>>
>> --
>>
>> Jamie Grier
>> data Artisans, Director of Applications Engineering
>> @jamiegrier 
>> ja...@data-artisans.com
>>
>>
>
>
> --
>
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier 
> ja...@data-artisans.com
>
>


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier 
ja...@data-artisans.com


Re: Flink Metrics - InfluxDB + Grafana | Help with query influxDB query for Grafana to plot 'numRecordsIn' & 'numRecordsOut' for each operator/operation

2016-11-01 Thread Jamie Grier
Another note.  In the example the template variable type is "custom" and
the values have to be enumerated manually.  So in your case you would have
to configure all the possible values of "subtask" to be 0-49.

On Tue, Nov 1, 2016 at 2:43 PM, Jamie Grier  wrote:

> This works well for me. This will aggregate the data across all sub-task
> instances:
>
> SELECT derivative(sum("count"), 1s) FROM "numRecordsIn" WHERE "task_name"
> = 'Sink: Unnamed' AND $timeFilter GROUP BY time(1s)
>
> You can also plot each sub-task instance separately on the same graph by
> doing:
>
> SELECT derivative(sum("count"), 1s) FROM "numRecordsIn" WHERE "task_name"
> = 'Sink: Unnamed' AND $timeFilter GROUP BY time(1s), "subtask_index"
>
> Or select just a single subtask instance by using:
>
> SELECT derivative(sum("count"), 1s) FROM "numRecordsIn" WHERE "task_name"
> = 'Sink: Unnamed' AND "subtask_index" = '7' AND $timeFilter GROUP BY
> time(1s)
>
> I haven’t used the templating features much but this also seems to work
> fine and allows you to select an individual subtask_index or ‘all’ and it
> works as it should — summing across all subtasks when you select ‘all’.
>
> SELECT derivative(sum("count"), 1s) FROM "numRecordsIn" WHERE "task_name"
> = 'Sink: Unnamed' AND "subtask_index" =~ /^$subtask$/ AND $timeFilter GROUP
> BY time(1s)
> ​
>
> On Fri, Oct 28, 2016 at 2:53 PM, Anchit Jatana <
> development.anc...@gmail.com> wrote:
>
>> Hi All,
>>
>> I'm trying to plot the flink application metrics using grafana backed by
>> influxdb. I need to plot/monitor the 'numRecordsIn' & 'numRecordsOut' for
>> each operator/operation. I'm finding it hard to generate the influxdb query
>> in grafana which can help me make this plot.
>>
>> I am able to plot the 'numRecordsIn' & 'numRecordsOut' for each
>> subtask(parallelism set to 50) of the operator but not the operator as a
>> whole.
>>
>> If somebody has knowledge or has successfully implemented this kind of a
>> plot on grafana backed by influxdb, please share with me the process/query
>> to achieve the same.
>>
>> Below is the query which I have to monitor the 'numRecordsIn' &
>> 'numRecordsOut' for each subtask
>>
>> SELECT derivative(sum("count"), 10s) FROM "numRecordsOut" WHERE
>> "task_name" = 'Source: Reading from Kafka' AND "subtask_index" =~
>> /^$subtask$/ AND $timeFilter GROUP BY time(10s), "task_name"
>>
>> PS: $subtask is the templating variable that I'm using in order to have
>> multiple subtask values. I have tried the 'All' option for this templating
>> variable- This give me an incorrect plot showing me negative values while
>> the individual selection of subtask values when selected from the
>> templating variable drop down yields correct result.
>>
>> Thank you!
>>
>> Regards,
>> Anchit
>>
>>
>>
>
>
> --
>
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier 
> ja...@data-artisans.com
>
>


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier 
ja...@data-artisans.com


Re: Flink Metrics - InfluxDB + Grafana | Help with query influxDB query for Grafana to plot 'numRecordsIn' & 'numRecordsOut' for each operator/operation

2016-11-01 Thread Jamie Grier
This works well for me. This will aggregate the data across all sub-task
instances:

SELECT derivative(sum("count"), 1s) FROM "numRecordsIn" WHERE "task_name" =
'Sink: Unnamed' AND $timeFilter GROUP BY time(1s)

You can also plot each sub-task instance separately on the same graph by
doing:

SELECT derivative(sum("count"), 1s) FROM "numRecordsIn" WHERE "task_name" =
'Sink: Unnamed' AND $timeFilter GROUP BY time(1s), "subtask_index"

Or select just a single subtask instance by using:

SELECT derivative(sum("count"), 1s) FROM "numRecordsIn" WHERE "task_name" =
'Sink: Unnamed' AND "subtask_index" = '7' AND $timeFilter GROUP BY time(1s)

I haven’t used the templating features much but this also seems to work
fine and allows you to select an individual subtask_index or ‘all’ and it
works as it should — summing across all subtasks when you select ‘all’.

SELECT derivative(sum("count"), 1s) FROM "numRecordsIn" WHERE "task_name" =
'Sink: Unnamed' AND "subtask_index" =~ /^$subtask$/ AND $timeFilter GROUP
BY time(1s)
​

On Fri, Oct 28, 2016 at 2:53 PM, Anchit Jatana  wrote:

> Hi All,
>
> I'm trying to plot the flink application metrics using grafana backed by
> influxdb. I need to plot/monitor the 'numRecordsIn' & 'numRecordsOut' for
> each operator/operation. I'm finding it hard to generate the influxdb query
> in grafana which can help me make this plot.
>
> I am able to plot the 'numRecordsIn' & 'numRecordsOut' for each
> subtask(parallelism set to 50) of the operator but not the operator as a
> whole.
>
> If somebody has knowledge or has successfully implemented this kind of a
> plot on grafana backed by influxdb, please share with me the process/query
> to achieve the same.
>
> Below is the query which I have to monitor the 'numRecordsIn' &
> 'numRecordsOut' for each subtask
>
> SELECT derivative(sum("count"), 10s) FROM "numRecordsOut" WHERE
> "task_name" = 'Source: Reading from Kafka' AND "subtask_index" =~
> /^$subtask$/ AND $timeFilter GROUP BY time(10s), "task_name"
>
> PS: $subtask is the templating variable that I'm using in order to have
> multiple subtask values. I have tried the 'All' option for this templating
> variable- This give me an incorrect plot showing me negative values while
> the individual selection of subtask values when selected from the
> templating variable drop down yields correct result.
>
> Thank you!
>
> Regards,
> Anchit
>
>
>


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier 
ja...@data-artisans.com


Re: BoundedOutOfOrdernessTimestampExtractor and timestamps in the future

2016-11-01 Thread Konstantin Knauf
Hi Dominik,

out of curiosity, how come that you receive timestamps from the future? ;)

Depending on the semantics of these future events, it might also make
sense to already "floor" the timestamp to processing time in the
extractTimestamp()-Method.

I am not sure, if I understand your follow up question correctly, but
afaik Flink does not have a notion of future and past. Events just have
just timestamps and the general assumption is that time runs forward (at
least in the long run). "Future" events can potentially advance the
curent watermark. So (event time) windows might be closed "too early"
w.r.t. to the rest of the events (These events can be processed with
"allowed lateness".) There are some sections in the documentation, which
might help you
(https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html).
Depending on the particular problem, you might be able to develop a
fancy watermarking mechanism, which mitigates the effect of these future
timestamps. Does this answer your question in any way? :)

Cheers,

Konstantin


On 01.11.2016 15:05, Dominik Bruhn wrote:
> Hey,
> I'm using a BoundedOutOfOrdernessTimestampExtractor for assigning my
> timestamps and discarding to old events (which happens sometimes).
> 
> Now my problem is that some events, by accident have timestamps in the
> future. If the timestamps are more in the future than my
> `maxOutOfOrderness`, I'm discarding valid events. So I need a way of
> saying that the
> BoundedOutOfOrdernessTimestampExtractor should exclude timestamps from
> the future for the watermark calculation. I still want to keep the
> events if they are in the future and assign them to the right watermarks.
> 
> How can I achieve this? I thought about checking whether the potential
> timestamp is in the future before considering it for a watermark. I
> cloned the BoundedOutOfOrdernessTimestampExtractor and added the idea
> https://gist.github.com/theomega/090054032e0b3c3b9fb06767f0fec7e7
> 
> Does this make sense? Or is there a better approach?
> 
> In general, how does Flink handle readings from the future?
> 
> Thanks,
> Dominik
> 

-- 
Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082



signature.asc
Description: OpenPGP digital signature


Re: A custom FileInputFormat

2016-11-01 Thread Fabian Hueske
Hi Niklas,

I don't know exactly what is going wrong there, but I have a few pointers
for you:

1) in cluster setups, Flink redirects println() to ./log/*.out files, i.e,
you have to search for the task manager that ran the DirReader and check
its ./log/*.out file
2) you are using Java's File class. That will only work if you are
accessing the local file system of the machine the DirReader runs on. If
you want to read the files from an HDFS you have to use the corresponding
HDFS client.
3) I would not extend the FileInputFormat for your purpose. The
FileInputFormat is meant to *read* files, not just look up file names. I'd
rather implement an InputFormat from scratch. Since you are only running a
single instance, you can return a single dummy InputSplit.

Let me know, if you have further questions.
Best, Fabian

2016-10-28 18:38 GMT+02:00 Niklas Semmler :

> Hello Flink community,
>
> I am running into an issue with a custom FileInputFormat class and would
> appreciate your help.
>
> My goal is to read all files from a directory as paths:
>
> val env : ExecutionEnvironment = ExecutionEnvironment.getExecut
> ionEnvironment
>
> var source : DataSet[String] = env.readFile(new DirReader,
> "/tmp/mydir").setParallelism(1)
>
> source.writeAsText("/tmp/results", WriteMode.OVERWRITE)
>
> env.execute("Job")
>
> It works, when I execute the program from within my IDE or execute it
> directly as a fat jar. When I run it through the Flink CLI the file
> "/tmp/results" is created, but not filled with entries.
>
> There seems to be something wrong with my custom DirReader (see below).
> The output of the println statements is not visible when running the code
> from the Flink CLI.
>
> No exception is stated in the logs (see below). I am at a loss at what to
> try. Even worse, when I copy the fat jar to a remote system, the problem
> appears also when I execute the fat jar directly.
>
> Local System
> Flink: 1.0.2
> Java: 1.8.0_102
> Scala: 2.11.8
>
> Remote System
> Flink: 1.1.3
> Java: 1.8.0_92
> Scala: 2.11.6
>
> Help or ideas to try out are welcome!
>
> Best,
> Niklas
>
>
>
> 
> import java.io.File
>
> import org.apache.flink.api.common.io.FileInputFormat
>
> class DirReader extends FileInputFormat[String] {
>   var running : Boolean = false
>   var fileList : Array[String] = null
>
>   override def openInputFormat() = {
>   println("Path: " + this.filePath.toString)
>   val directory = new File(this.filePath.toString)
>   if (directory != null && directory.isDirectory) {
> fileList = directory.listFiles.filter(_.i
> sDirectory).map(_.listFiles).flatten
>  .map(_.toString)
> running = if (fileList.length > 1) true else false
>   }
>   println("fileList " + fileList.length + " running " + running)
>   }
>
>   override def nextRecord(reuse: String): String = {
> val head = fileList.head
> println("File: " + head)
> fileList = fileList.tail
> running = if (fileList.length == 0) false else true
> head
>   }
>
>   override def reachedEnd(): Boolean = ! running
> }
> 
>
> The output from the CLI:
>
> 10/28/2016 18:27:56 Job execution switched to status RUNNING.
> 10/28/2016 18:27:56 DataSource (at org.apache.flink.api.scala.Exe
> cutionEnvironment.readFile(ExecutionEnvironment.scala:385)
> (de.tuberlin.inet.plag.DirReader))(1/1) switched to SCHEDULED
> 10/28/2016 18:27:56 DataSource (at org.apache.flink.api.scala.Exe
> cutionEnvironment.readFile(ExecutionEnvironment.scala:385)
> (de.tuberlin.inet.plag.DirReader))(1/1) switched to DEPLOYING
> 10/28/2016 18:27:56 DataSource (at org.apache.flink.api.scala.Exe
> cutionEnvironment.readFile(ExecutionEnvironment.scala:385)
> (de.tuberlin.inet.plag.DirReader))(1/1) switched to RUNNING
> 10/28/2016 18:27:56 DataSink (TextOutputFormat (/tmp/results) -
> UTF-8)(1/1) switched to SCHEDULED
> 10/28/2016 18:27:56 DataSink (TextOutputFormat (/tmp/results) -
> UTF-8)(1/1) switched to DEPLOYING
> 10/28/2016 18:27:56 DataSource (at org.apache.flink.api.scala.Exe
> cutionEnvironment.readFile(ExecutionEnvironment.scala:385)
> (de.tuberlin.inet.plag.DirReader))(1/1) switched to FINISHED
> 10/28/2016 18:27:56 DataSink (TextOutputFormat (/tmp/results) -
> UTF-8)(1/1) switched to RUNNING
> 10/28/2016 18:27:56 DataSink (TextOutputFormat (/tmp/results) -
> UTF-8)(1/1) switched to FINISHED
> 10/28/2016 18:27:56 Job execution switched to status FINISHED.
>
>
> --
> Niklas Semmler
> PhD Student / Research Assistant
> TU Berlin, INET, Room MAR 4.027
> Marchstr 23, 10587 Berlin
> Tel.: +49 (0)30 314 75739
> http://inet.tu-berlin.de/~nsemmler/
>


Re: Can we do batch writes on cassandra using flink while leveraging the locality?

2016-11-01 Thread Chesnay Schepler

Hello,

the main issue that prevented us from writing batches is that there is a 
server-side limit as to how big a batch may be,
however there was no way to tell how big the batch that you are 
currently building up actually is.


Regarding locality, I'm not sure if a partitioner alone solves this 
issue. While the data and sink instance may be on the right node, the sink
would still have to know which Cassandra instance to write to to 
actually make use of the locality. Never looked to deeply into data 
locality,

so I don't whether/how we would have to change the sink to do that :(

Regards,
Chesnay

On 01.11.2016 20:29, Stephan Ewen wrote:

Hi!

I do not know the details of how Cassandra supports batched writes, 
but here are some thoughts:


  - Grouping writes that go to the same partition together into one 
batch write request makes sense. If you have some sample code for 
that, it should be not too hard to integrate into the Flink Cassandra 
connector


  - If you know the partitioning scheme in Cassandra and you use 
"DataStream.partitionCustom(partitioner, key)" it should result in a 
way that all write requests from one parallel sink instance go to the 
same Cassandra node (or a small number of nodes). Would that help?


Greetings,
Stephan




On Fri, Oct 28, 2016 at 8:57 AM, kant kodali > wrote:


Spark Cassandra connector does it! but I don't think it really
implements a custom partitioner I think it just leverages token
aware policy and does batch writes by default within a partition
but you can also do across partitions with the same replica!

On Thu, Oct 27, 2016 at 8:41 AM, Shannon Carey > wrote:

It certainly seems possible to write a Partitioner that does
what you describe. I started implementing one but didn't have
time to finish it. I think the main difficulty is in properly
dealing with partition ownership changes in Cassandra… if you
are maintaining state in Flink and the partitioning changes,
your job might produce inaccurate output. If, on the other
hand, you are only using the partitioner just before the
output, dynamic partitioning changes might be ok.


From: kant kodali >
Date: Thursday, October 27, 2016 at 3:17 AM
To: >
Subject: Can we do batch writes on cassandra using flink while
leveraging the locality?

locality? For example the batch writes in Cassandra will put
pressure on the coordinator but since the connectors are built
by leveraging the locality I was wondering if we could do
batch of writes on a node where the batch belongs?







Re: Can we do batch writes on cassandra using flink while leveraging the locality?

2016-11-01 Thread Stephan Ewen
Hi!

I do not know the details of how Cassandra supports batched writes, but
here are some thoughts:

  - Grouping writes that go to the same partition together into one batch
write request makes sense. If you have some sample code for that, it should
be not too hard to integrate into the Flink Cassandra connector

  - If you know the partitioning scheme in Cassandra and you use
"DataStream.partitionCustom(partitioner, key)" it should result in a way
that all write requests from one parallel sink instance go to the same
Cassandra node (or a small number of nodes). Would that help?

Greetings,
Stephan




On Fri, Oct 28, 2016 at 8:57 AM, kant kodali  wrote:

> Spark Cassandra connector does it! but I don't think it really implements
> a custom partitioner I think it just leverages token aware policy and does
> batch writes by default within a partition but you can also do across
> partitions with the same replica!
>
> On Thu, Oct 27, 2016 at 8:41 AM, Shannon Carey  wrote:
>
>> It certainly seems possible to write a Partitioner that does what you
>> describe. I started implementing one but didn't have time to finish it. I
>> think the main difficulty is in properly dealing with partition ownership
>> changes in Cassandra… if you are maintaining state in Flink and the
>> partitioning changes, your job might produce inaccurate output. If, on the
>> other hand, you are only using the partitioner just before the output,
>> dynamic partitioning changes might be ok.
>>
>>
>> From: kant kodali 
>> Date: Thursday, October 27, 2016 at 3:17 AM
>> To: 
>> Subject: Can we do batch writes on cassandra using flink while
>> leveraging the locality?
>>
>> locality? For example the batch writes in Cassandra will put pressure on
>> the coordinator but since the connectors are built by leveraging the
>> locality I was wondering if we could do batch of writes on a node where the
>> batch belongs?
>>
>
>


Re: Kinesis Connector Dependency Problems

2016-11-01 Thread Robert Metzger
Hi Justin,

thank you for sharing the classpath of the Flink container with us. It
contains what Till was already expecting: An older version of the AWS SDK.

If you have some spare time, could you quickly try to run your program with
a newer EMR version, just to validate our suspicion?
If the error doesn't occur on a more recent EMR version, then we know why
its happening.

We'll then probably need to shade (relocate) the Kinesis code to make it
work with older EMR libraries.

Regards,
Robert


On Tue, Nov 1, 2016 at 6:27 PM, Justin Yan  wrote:

> Hi there,
>
> We're using EMR 4.4.0 -> I suppose this is a bit old, and I can migrate
> forward if you think that would be best.
>
> I've appended the classpath that the Flink cluster was started with at the
> end of this email (with a slight improvement to the formatting to make it
> readable).
>
> Willing to poke around or fiddle with this as necessary - thanks very much
> for the help!
>
> Justin
>
> Task Manager's classpath from logs:
>
> lib/flink-dist_2.11-1.1.3.jar
> lib/flink-python_2.11-1.1.3.jar
> lib/log4j-1.2.17.jar
> lib/slf4j-log4j12-1.7.7.jar
> logback.xml
> log4j.properties
> flink.jar
> flink-conf.yaml
> /etc/hadoop/conf
> /usr/lib/hadoop/hadoop-annotations-2.7.1-amzn-1.jar
> /usr/lib/hadoop/hadoop-extras.jar
> /usr/lib/hadoop/hadoop-archives-2.7.1-amzn-1.jar
> /usr/lib/hadoop/hadoop-aws-2.7.1-amzn-1.jar
> /usr/lib/hadoop/hadoop-sls-2.7.1-amzn-1.jar
> /usr/lib/hadoop/hadoop-auth-2.7.1-amzn-1.jar
> /usr/lib/hadoop/hadoop-sls.jar
> /usr/lib/hadoop/hadoop-gridmix.jar
> /usr/lib/hadoop/hadoop-auth.jar
> /usr/lib/hadoop/hadoop-gridmix-2.7.1-amzn-1.jar
> /usr/lib/hadoop/hadoop-rumen.jar
> /usr/lib/hadoop/hadoop-azure-2.7.1-amzn-1.jar
> /usr/lib/hadoop/hadoop-common-2.7.1-amzn-1.jar
> /usr/lib/hadoop/hadoop-azure.jar
> /usr/lib/hadoop/hadoop-datajoin-2.7.1-amzn-1.jar
> /usr/lib/hadoop/hadoop-nfs.jar
> /usr/lib/hadoop/hadoop-aws.jar
> /usr/lib/hadoop/hadoop-streaming-2.7.1-amzn-1.jar
> /usr/lib/hadoop/hadoop-archives.jar
> /usr/lib/hadoop/hadoop-openstack.jar
> /usr/lib/hadoop/hadoop-distcp.jar
> /usr/lib/hadoop/hadoop-annotations.jar
> /usr/lib/hadoop/hadoop-distcp-2.7.1-amzn-1.jar
> /usr/lib/hadoop/hadoop-streaming.jar
> /usr/lib/hadoop/hadoop-rumen-2.7.1-amzn-1.jar
> /usr/lib/hadoop/hadoop-common.jar
> /usr/lib/hadoop/hadoop-nfs-2.7.1-amzn-1.jar
> /usr/lib/hadoop/hadoop-common-2.7.1-amzn-1-tests.jar
> /usr/lib/hadoop/hadoop-ant-2.7.1-amzn-1.jar
> /usr/lib/hadoop/hadoop-datajoin.jar
> /usr/lib/hadoop/hadoop-ant.jar
> /usr/lib/hadoop/hadoop-extras-2.7.1-amzn-1.jar
> /usr/lib/hadoop/hadoop-openstack-2.7.1-amzn-1.jar
> /usr/lib/hadoop/lib/jackson-xc-1.9.13.jar
> /usr/lib/hadoop/lib/api-asn1-api-1.0.0-M20.jar
> /usr/lib/hadoop/lib/curator-client-2.7.1.jar
> /usr/lib/hadoop/lib/jackson-mapper-asl-1.9.13.jar
> /usr/lib/hadoop/lib/commons-io-2.4.jar
> /usr/lib/hadoop/lib/jackson-jaxrs-1.9.13.jar
> /usr/lib/hadoop/lib/log4j-1.2.17.jar
> /usr/lib/hadoop/lib/junit-4.11.jar
> /usr/lib/hadoop/lib/apacheds-i18n-2.0.0-M15.jar
> /usr/lib/hadoop/lib/commons-cli-1.2.jar
> /usr/lib/hadoop/lib/curator-recipes-2.7.1.jar
> /usr/lib/hadoop/lib/xmlenc-0.52.jar
> /usr/lib/hadoop/lib/zookeeper-3.4.6.jar
> /usr/lib/hadoop/lib/jsr305-3.0.0.jar
> /usr/lib/hadoop/lib/htrace-core-3.1.0-incubating.jar
> /usr/lib/hadoop/lib/httpclient-4.3.4.jar
> /usr/lib/hadoop/lib/jettison-1.1.jar
> /usr/lib/hadoop/lib/commons-beanutils-1.7.0.jar
> /usr/lib/hadoop/lib/commons-math3-3.1.1.jar
> /usr/lib/hadoop/lib/jersey-core-1.9.jar
> /usr/lib/hadoop/lib/httpcore-4.3.2.jar
> /usr/lib/hadoop/lib/commons-compress-1.4.1.jar
> /usr/lib/hadoop/lib/asm-3.2.jar
> /usr/lib/hadoop/lib/slf4j-api-1.7.10.jar
> /usr/lib/hadoop/lib/xz-1.0.jar
> /usr/lib/hadoop/lib/commons-collections-3.2.1.jar
> /usr/lib/hadoop/lib/commons-net-3.1.jar
> /usr/lib/hadoop/lib/commons-configuration-1.6.jar
> /usr/lib/hadoop/lib/jetty-util-6.1.26-emr.jar
> /usr/lib/hadoop/lib/commons-codec-1.4.jar
> /usr/lib/hadoop/lib/protobuf-java-2.5.0.jar
> /usr/lib/hadoop/lib/jetty-6.1.26-emr.jar
> /usr/lib/hadoop/lib/java-xmlbuilder-0.4.jar
> /usr/lib/hadoop/lib/apacheds-kerberos-codec-2.0.0-M15.jar
> /usr/lib/hadoop/lib/commons-logging-1.1.3.jar
> /usr/lib/hadoop/lib/jersey-json-1.9.jar
> /usr/lib/hadoop/lib/jackson-core-asl-1.9.13.jar
> /usr/lib/hadoop/lib/gson-2.2.4.jar
> /usr/lib/hadoop/lib/stax-api-1.0-2.jar
> /usr/lib/hadoop/lib/commons-digester-1.8.jar
> /usr/lib/hadoop/lib/servlet-api-2.5.jar
> /usr/lib/hadoop/lib/curator-framework-2.7.1.jar
> /usr/lib/hadoop/lib/commons-httpclient-3.1.jar
> /usr/lib/hadoop/lib/jets3t-0.9.0.jar
> /usr/lib/hadoop/lib/jaxb-api-2.2.2.jar
> /usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar
> /usr/lib/hadoop/lib/mockito-all-1.8.5.jar
> /usr/lib/hadoop/lib/snappy-java-1.0.4.1.jar
> /usr/lib/hadoop/lib/jaxb-impl-2.2.3-1.jar
> /usr/lib/hadoop/lib/paranamer-2.3.jar
> /usr/lib/hadoop/lib/avro-1.7.4.jar
> /usr/lib/hadoop/lib/commons-beanutils-core-1.8.0.jar
> 

Re: Kinesis Connector Dependency Problems

2016-11-01 Thread Justin Yan
Hi there,

We're using EMR 4.4.0 -> I suppose this is a bit old, and I can migrate
forward if you think that would be best.

I've appended the classpath that the Flink cluster was started with at the
end of this email (with a slight improvement to the formatting to make it
readable).

Willing to poke around or fiddle with this as necessary - thanks very much
for the help!

Justin

Task Manager's classpath from logs:

lib/flink-dist_2.11-1.1.3.jar
lib/flink-python_2.11-1.1.3.jar
lib/log4j-1.2.17.jar
lib/slf4j-log4j12-1.7.7.jar
logback.xml
log4j.properties
flink.jar
flink-conf.yaml
/etc/hadoop/conf
/usr/lib/hadoop/hadoop-annotations-2.7.1-amzn-1.jar
/usr/lib/hadoop/hadoop-extras.jar
/usr/lib/hadoop/hadoop-archives-2.7.1-amzn-1.jar
/usr/lib/hadoop/hadoop-aws-2.7.1-amzn-1.jar
/usr/lib/hadoop/hadoop-sls-2.7.1-amzn-1.jar
/usr/lib/hadoop/hadoop-auth-2.7.1-amzn-1.jar
/usr/lib/hadoop/hadoop-sls.jar
/usr/lib/hadoop/hadoop-gridmix.jar
/usr/lib/hadoop/hadoop-auth.jar
/usr/lib/hadoop/hadoop-gridmix-2.7.1-amzn-1.jar
/usr/lib/hadoop/hadoop-rumen.jar
/usr/lib/hadoop/hadoop-azure-2.7.1-amzn-1.jar
/usr/lib/hadoop/hadoop-common-2.7.1-amzn-1.jar
/usr/lib/hadoop/hadoop-azure.jar
/usr/lib/hadoop/hadoop-datajoin-2.7.1-amzn-1.jar
/usr/lib/hadoop/hadoop-nfs.jar
/usr/lib/hadoop/hadoop-aws.jar
/usr/lib/hadoop/hadoop-streaming-2.7.1-amzn-1.jar
/usr/lib/hadoop/hadoop-archives.jar
/usr/lib/hadoop/hadoop-openstack.jar
/usr/lib/hadoop/hadoop-distcp.jar
/usr/lib/hadoop/hadoop-annotations.jar
/usr/lib/hadoop/hadoop-distcp-2.7.1-amzn-1.jar
/usr/lib/hadoop/hadoop-streaming.jar
/usr/lib/hadoop/hadoop-rumen-2.7.1-amzn-1.jar
/usr/lib/hadoop/hadoop-common.jar
/usr/lib/hadoop/hadoop-nfs-2.7.1-amzn-1.jar
/usr/lib/hadoop/hadoop-common-2.7.1-amzn-1-tests.jar
/usr/lib/hadoop/hadoop-ant-2.7.1-amzn-1.jar
/usr/lib/hadoop/hadoop-datajoin.jar
/usr/lib/hadoop/hadoop-ant.jar
/usr/lib/hadoop/hadoop-extras-2.7.1-amzn-1.jar
/usr/lib/hadoop/hadoop-openstack-2.7.1-amzn-1.jar
/usr/lib/hadoop/lib/jackson-xc-1.9.13.jar
/usr/lib/hadoop/lib/api-asn1-api-1.0.0-M20.jar
/usr/lib/hadoop/lib/curator-client-2.7.1.jar
/usr/lib/hadoop/lib/jackson-mapper-asl-1.9.13.jar
/usr/lib/hadoop/lib/commons-io-2.4.jar
/usr/lib/hadoop/lib/jackson-jaxrs-1.9.13.jar
/usr/lib/hadoop/lib/log4j-1.2.17.jar
/usr/lib/hadoop/lib/junit-4.11.jar
/usr/lib/hadoop/lib/apacheds-i18n-2.0.0-M15.jar
/usr/lib/hadoop/lib/commons-cli-1.2.jar
/usr/lib/hadoop/lib/curator-recipes-2.7.1.jar
/usr/lib/hadoop/lib/xmlenc-0.52.jar
/usr/lib/hadoop/lib/zookeeper-3.4.6.jar
/usr/lib/hadoop/lib/jsr305-3.0.0.jar
/usr/lib/hadoop/lib/htrace-core-3.1.0-incubating.jar
/usr/lib/hadoop/lib/httpclient-4.3.4.jar
/usr/lib/hadoop/lib/jettison-1.1.jar
/usr/lib/hadoop/lib/commons-beanutils-1.7.0.jar
/usr/lib/hadoop/lib/commons-math3-3.1.1.jar
/usr/lib/hadoop/lib/jersey-core-1.9.jar
/usr/lib/hadoop/lib/httpcore-4.3.2.jar
/usr/lib/hadoop/lib/commons-compress-1.4.1.jar
/usr/lib/hadoop/lib/asm-3.2.jar
/usr/lib/hadoop/lib/slf4j-api-1.7.10.jar
/usr/lib/hadoop/lib/xz-1.0.jar
/usr/lib/hadoop/lib/commons-collections-3.2.1.jar
/usr/lib/hadoop/lib/commons-net-3.1.jar
/usr/lib/hadoop/lib/commons-configuration-1.6.jar
/usr/lib/hadoop/lib/jetty-util-6.1.26-emr.jar
/usr/lib/hadoop/lib/commons-codec-1.4.jar
/usr/lib/hadoop/lib/protobuf-java-2.5.0.jar
/usr/lib/hadoop/lib/jetty-6.1.26-emr.jar
/usr/lib/hadoop/lib/java-xmlbuilder-0.4.jar
/usr/lib/hadoop/lib/apacheds-kerberos-codec-2.0.0-M15.jar
/usr/lib/hadoop/lib/commons-logging-1.1.3.jar
/usr/lib/hadoop/lib/jersey-json-1.9.jar
/usr/lib/hadoop/lib/jackson-core-asl-1.9.13.jar
/usr/lib/hadoop/lib/gson-2.2.4.jar
/usr/lib/hadoop/lib/stax-api-1.0-2.jar
/usr/lib/hadoop/lib/commons-digester-1.8.jar
/usr/lib/hadoop/lib/servlet-api-2.5.jar
/usr/lib/hadoop/lib/curator-framework-2.7.1.jar
/usr/lib/hadoop/lib/commons-httpclient-3.1.jar
/usr/lib/hadoop/lib/jets3t-0.9.0.jar
/usr/lib/hadoop/lib/jaxb-api-2.2.2.jar
/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar
/usr/lib/hadoop/lib/mockito-all-1.8.5.jar
/usr/lib/hadoop/lib/snappy-java-1.0.4.1.jar
/usr/lib/hadoop/lib/jaxb-impl-2.2.3-1.jar
/usr/lib/hadoop/lib/paranamer-2.3.jar
/usr/lib/hadoop/lib/avro-1.7.4.jar
/usr/lib/hadoop/lib/commons-beanutils-core-1.8.0.jar
/usr/lib/hadoop/lib/jsp-api-2.1.jar
/usr/lib/hadoop/lib/api-util-1.0.0-M20.jar
/usr/lib/hadoop/lib/activation-1.1.jar
/usr/lib/hadoop/lib/emr-metrics-client-2.1.0.jar
/usr/lib/hadoop/lib/commons-lang-2.6.jar
/usr/lib/hadoop/lib/jersey-server-1.9.jar
/usr/lib/hadoop/lib/guava-11.0.2.jar
/usr/lib/hadoop/lib/jsch-0.1.42.jar
/usr/lib/hadoop/lib/netty-3.6.2.Final.jar
/usr/lib/hadoop/lib/hamcrest-core-1.3.jar
/usr/lib/hadoop-hdfs/hadoop-hdfs.jar
/usr/lib/hadoop-hdfs/hadoop-hdfs-nfs-2.7.1-amzn-1.jar
/usr/lib/hadoop-hdfs/hadoop-hdfs-2.7.1-amzn-1.jar
/usr/lib/hadoop-hdfs/hadoop-hdfs-2.7.1-amzn-1-tests.jar
/usr/lib/hadoop-hdfs/hadoop-hdfs-nfs.jar
/usr/lib/hadoop-hdfs/lib/jackson-mapper-asl-1.9.13.jar
/usr/lib/hadoop-hdfs/lib/commons-io-2.4.jar
/usr/lib/hadoop-hdfs/lib/log4j-1.2.17.jar

Re: watermark trigger doesn't check whether element's timestamp is passed

2016-11-01 Thread Aljoscha Krettek
Ah, I finally understand it. You would a way to query the current watermark
in the window function to only emit those elements where the timestamp is
lower than the watermark.

When the window fires again, do you want to emit elements that you emitted
during the last firing again? If not, I think you also need to use an
evictor to evict the elements from the window where the timestamp is lower
than the watermark. With this FLIP
https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata
we
should be able to extend the WindowFunction Context to also provide the
current watermark. With this recent PR
https://github.com/apache/flink/pull/2736 you would be able to evict
elements from the window state after the window function was called.

Cheers,
Aljoscha

On Tue, 1 Nov 2016 at 02:27 Manu Zhang  wrote:

> Yes, here's the example
> https://github.com/manuzhang/flink/blob/pv/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/session/PageViewSessionWindowing.scala
>
> If you print and compare the timestamp of timer with that of "PageView" in
> the outputs, you could see what I mean.
>
> I think the recently introduced TimelyFlatMapFunction is close to what I
> want to achieve. It will be great if we can query time information in the
> window function so I filed
> https://issues.apache.org/jira/browse/FLINK-4953
>
> Thanks for your time.
>
> Manu
>
> On Mon, Oct 31, 2016 at 10:29 PM Aljoscha Krettek 
> wrote:
>
> Hmm, I don't completely understand what's going on. Could you maybe post
> an example, with the trigger code that shows this behaviour?
>
> Cheers,
> Aljoscha
>
> On Thu, 27 Oct 2016 at 17:12 Manu Zhang  wrote:
>
> Hi,
>
> It's what I'm seeing. If timers are not fired at the end of window, a
> state (in the window) whose timestamp is *after *the timer will also be
> emitted. That's a problem for event-time trigger.
>
> Thanks,
> Manu
>
>
> On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek 
> wrote:
>
> Hi,
> is that example input/output what you would like to achieve or what you
> are currently seeing with Flink? I think for your use case a custom Trigger
> would be required that works like the event-time trigger but additionally
> registers timers for each element where you want to emit.
>
> Cheers,
> Aljoscha
>
> On Wed, 26 Oct 2016 at 04:04 Manu Zhang  wrote:
>
> Hi Aljoscha,
>
> Thanks for your response.  My use case is to track user trajectory based
> on page view event when they visit a website.  The input would be like a
> list of PageView(userId, url, eventTimestamp) with watermarks (=
> eventTimestamp - duration). I'm trying SessionWindows with some event time
> trigger. Note we can't wait for the end of session window due to latency.
> Instead, we want to emit the user trajectories whenever a buffered
> PageView's event time is passed by watermark. I tried
> ContinuousEventTimeTrigger and a custom trigger which sets timer on each
> element's timestamp. For both triggers I've witnessed a problem like the
> following (e.g. a session gap of 5)
>
> PageView("user1", "http://foo;, 1)
> PageView("user1", "http://foo/bar;, 2)
> Watermark(1) => emit UserTrajectory("user1", "http://foo -> *http://foo/bar
> *", [1,6])
> PageView("user1", "http://foo/bar/foobar;, 5)
> Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar ->
> *http://foo/bar/foobar *", [1, 10])
>
> The urls in bold should be included since there could be events before
> them not arrived yet.
>
>
> Thanks,
> Manu
>
>
> On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek 
> wrote:
>
> Hi,
> with some additional information we might be able to figure this out
> together. What specific combination of WindowAssigner/Trigger are you using
> for your example and what is the input stream (including watermarks)?
>
> Cheers,
> Aljoscha
>
> On Mon, 24 Oct 2016 at 06:30 Manu Zhang  wrote:
>
> Hi,
>
> Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06))
> which is triggered to emit when watermark passes the timestamp of an
> element. For example,
>
> on watermark(1:01), List(("a", 1:00)) is emitted
> on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
> on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted
>
> It seems that if *("c", 1:06) is processed before watermark(1:04)*
> List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on
> watermark(1:04). This is incorrect since there could be elements with
> timestamp between 1:04 and 1:06 that have not arrived yet.
>
> I guess this is because watermark trigger doesn't check whether element's
> timestamp has been passed.
>
> Please correct me if any of the above is not right.
>
> Thanks,
> Manu Zhang
>
>
>
>


Re: Looping over a DataSet and accesing another DataSet

2016-11-01 Thread Greg Hogan
By 'loop' do you refer to an iteration? The output of a bulk iteration is
processed as the input of the following iteration. Values updated in an
iteration are available in the next iteration just as values updated by an
operator are available to the following operator.

Your chosen algorithm may not be a good fit for distributed processing
frameworks like Flink, Spark, and Hadoop. You may need to recast your
problem into an appropriate, scalable algorithm. Both the Gelly and Machine
Learning libraries have good examples of efficient, scalable algorithms
(Flink's "examples" demonstrate specific functionality).

Greg

On Mon, Oct 31, 2016 at 8:52 AM, otherwise777 
wrote:

> Thank you for your reply, this is new information for me,
>
> Regarding the algorithm, i gave it a better look and i don't think it will
> work with joining. When looping over the Edge set (u,v) we need to be able
> to write and read A[u] and A[v]. If i join them it will create a new
> instances of that value and it doesn't matter if it's changed in one
> instance.
>
> For example i have the following edges:
>  u v
>  1 2
>  1 3
>
> With vertices and values:
>  1 a
>  2 b
>  3 c
>
> If i join them i get:
>  u v u' v'
>  1 2 a b
>  1 3 a c
>
> If i loop over the joined set and change the u' value of the first instance
> to "d" then in my next loop step it will be 'a'.
>
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Looping-over-a-
> DataSet-and-accesing-another-DataSet-tp9778p9784.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Question about the checkpoint mechanism in Flink.

2016-11-01 Thread Till Rohrmann
The tuples are not buffered until the snapshot is globally complete (a
snapshot is globally complete iff all operators have successfully taken a
snapshot). They are only buffered until the corresponding checkpoint
barrier on the second input is received. Once this is the case, the
checkpoint barrier will directly be send to the downstream operators. Next
a snapshot is taken. Depending on the state backend this can happen
asynchronously or synchronously. After this is done, the operator continues
processing elements (for the first input, the buffered elements are
consumed first).

With multiple inputs I referred to a coFlatMap operator or a join operator
which have both two inputs.

Cheers,
Till

On Tue, Nov 1, 2016 at 3:29 PM, Renjie Liu  wrote:

> Hi, Till:
> By operator with multiple inputs, do you mean inputs from multiple
> subtasks?
>
> On Tue, Nov 1, 2016 at 8:56 PM Till Rohrmann  wrote:
>
>> Hi Li,
>>
>> the statement refers to operators with multiple inputs (two in this
>> case). With the current implementation you will indeed block one of the
>> inputs after receiving a checkpoint barrier n until you've received the
>> corresponding checkpoint barrier n on the other input as well. This is what
>> we call checkpoint barrier alignment. If the processing time on both input
>> paths is similar and thus there is no back pressure on any of the inputs,
>> the alignment should not take too long. In case where one of the inputs is
>> considerably slower than the other, you should an additional delay.
>>
>> For single input operators, you don't have to align the checkpoint
>> barriers.
>>
>> The checkpoint barrier alignment is not strictly necessary, but it allows
>> us to not having to store all in flight records from the second input which
>> arrive between the checkpoint barrier on the first input and the
>> corresponding barrier on the second input. We might change this
>> implementation in the future, though.
>>
>> Cheers,
>> Till
>>
>> On Tue, Nov 1, 2016 at 8:05 AM, Li Wang  wrote:
>>
>> Hi all,
>>
>> I have a question regarding to the state checkpoint mechanism in Flink. I
>> find the statement  "Once the last stream has received barrier n, the
>> operator emits all pending outgoing records, and then emits
>> snapshot n barriers itself” on the document https://ci.apache.org/
>> projects/flink/flink-docs-master/internals/stream_
>> checkpointing.html#exactly-once-vs-at-least-once.
>>
>> Does this mean that to achieve exactly-once semantic, instead of sending
>> tuples downstream immediately the operator buffers its outgoing tuples in a
>> pending queue until the current snapshot is committed? If yes, will this
>> introduce significant processing delay?
>>
>> Thanks,
>> Li
>>
>>
>> --
> Liu, Renjie
> Software Engineer, MVAD
>


Re: Question about the checkpoint mechanism in Flink.

2016-11-01 Thread Renjie Liu
Hi, Till:
By operator with multiple inputs, do you mean inputs from multiple
subtasks?

On Tue, Nov 1, 2016 at 8:56 PM Till Rohrmann  wrote:

> Hi Li,
>
> the statement refers to operators with multiple inputs (two in this case).
> With the current implementation you will indeed block one of the inputs
> after receiving a checkpoint barrier n until you've received the
> corresponding checkpoint barrier n on the other input as well. This is what
> we call checkpoint barrier alignment. If the processing time on both input
> paths is similar and thus there is no back pressure on any of the inputs,
> the alignment should not take too long. In case where one of the inputs is
> considerably slower than the other, you should an additional delay.
>
> For single input operators, you don't have to align the checkpoint
> barriers.
>
> The checkpoint barrier alignment is not strictly necessary, but it allows
> us to not having to store all in flight records from the second input which
> arrive between the checkpoint barrier on the first input and the
> corresponding barrier on the second input. We might change this
> implementation in the future, though.
>
> Cheers,
> Till
>
> On Tue, Nov 1, 2016 at 8:05 AM, Li Wang  wrote:
>
> Hi all,
>
> I have a question regarding to the state checkpoint mechanism in Flink. I
> find the statement  "Once the last stream has received barrier n, the
> operator emits all pending outgoing records, and then emits
> snapshot n barriers itself” on the document
> https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html#exactly-once-vs-at-least-once
> .
>
> Does this mean that to achieve exactly-once semantic, instead of sending
> tuples downstream immediately the operator buffers its outgoing tuples in a
> pending queue until the current snapshot is committed? If yes, will this
> introduce significant processing delay?
>
> Thanks,
> Li
>
>
> --
Liu, Renjie
Software Engineer, MVAD


Re: Question about the checkpoint mechanism in Flink.

2016-11-01 Thread Renjie Liu
Sorry the incorrect reply, please ignore this.

On Tue, Nov 1, 2016 at 8:47 PM Renjie Liu  wrote:

> Essentially you are right, but the snapshot commit process is
> asynchronous. That's what you have to pay for exactly once semantics.
>
> Li Wang 于2016年11月1日周二 下午3:05写道:
>
> Hi all,
>
> I have a question regarding to the state checkpoint mechanism in Flink. I
> find the statement  "Once the last stream has received barrier n, the
> operator emits all pending outgoing records, and then emits
> snapshot n barriers itself” on the document
> https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html#exactly-once-vs-at-least-once
> .
>
> Does this mean that to achieve exactly-once semantic, instead of sending
> tuples downstream immediately the operator buffers its outgoing tuples in a
> pending queue until the current snapshot is committed? If yes, will this
> introduce significant processing delay?
>
> Thanks,
> Li
>
> --
> Liu, Renjie
> Software Engineer, MVAD
>
-- 
Liu, Renjie
Software Engineer, MVAD


Re: Question about the checkpoint mechanism in Flink.

2016-11-01 Thread Renjie Liu
Essentially you are right, but the snapshot commit process is asynchronous.
That's what you have to pay for exactly once semantics.

Li Wang 于2016年11月1日周二 下午3:05写道:

> Hi all,
>
> I have a question regarding to the state checkpoint mechanism in Flink. I
> find the statement  "Once the last stream has received barrier n, the
> operator emits all pending outgoing records, and then emits
> snapshot n barriers itself” on the document
> https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html#exactly-once-vs-at-least-once
> .
>
> Does this mean that to achieve exactly-once semantic, instead of sending
> tuples downstream immediately the operator buffers its outgoing tuples in a
> pending queue until the current snapshot is committed? If yes, will this
> introduce significant processing delay?
>
> Thanks,
> Li
>
> --
Liu, Renjie
Software Engineer, MVAD


BoundedOutOfOrdernessTimestampExtractor and timestamps in the future

2016-11-01 Thread Dominik Bruhn

Hey,
I'm using a BoundedOutOfOrdernessTimestampExtractor for assigning my 
timestamps and discarding to old events (which happens sometimes).


Now my problem is that some events, by accident have timestamps in the 
future. If the timestamps are more in the future than my 
`maxOutOfOrderness`, I'm discarding valid events. So I need a way of 
saying that the
BoundedOutOfOrdernessTimestampExtractor should exclude timestamps from 
the future for the watermark calculation. I still want to keep the 
events if they are in the future and assign them to the right 
watermarks.


How can I achieve this? I thought about checking whether the potential 
timestamp is in the future before considering it for a watermark. I 
cloned the BoundedOutOfOrdernessTimestampExtractor and added the idea

https://gist.github.com/theomega/090054032e0b3c3b9fb06767f0fec7e7

Does this make sense? Or is there a better approach?

In general, how does Flink handle readings from the future?

Thanks,
Dominik

--
Dominik


Re: Question about the checkpoint mechanism in Flink.

2016-11-01 Thread Li Wang

Hi Till,

Thanks for your prompt reply. I understand that input streams should be aligned 
such that a consistent state snapshot can be generated. In my opinion, that 
statement indicates that an operator will buffer its output tuples until the 
snapshot is committed. I am wondering if my understand to that very statement 
is right. If yes, why an operator should pend its output tuples? Is that for 
replaying output tuples during the state recovery of an downstream operator?

Thanks and regards,
Li


Sent from my iPhone

> On 1 Nov 2016, at 8:56 PM, Till Rohrmann  wrote:
> 
> Hi Li,
> 
> the statement refers to operators with multiple inputs (two in this case). 
> With the current implementation you will indeed block one of the inputs after 
> receiving a checkpoint barrier n until you've received the corresponding 
> checkpoint barrier n on the other input as well. This is what we call 
> checkpoint barrier alignment. If the processing time on both input paths is 
> similar and thus there is no back pressure on any of the inputs, the 
> alignment should not take too long. In case where one of the inputs is 
> considerably slower than the other, you should an additional delay.
> 
> For single input operators, you don't have to align the checkpoint barriers.
> 
> The checkpoint barrier alignment is not strictly necessary, but it allows us 
> to not having to store all in flight records from the second input which 
> arrive between the checkpoint barrier on the first input and the 
> corresponding barrier on the second input. We might change this 
> implementation in the future, though.
> 
> Cheers,
> Till
> 
>> On Tue, Nov 1, 2016 at 8:05 AM, Li Wang  wrote:
>> Hi all,
>> 
>> I have a question regarding to the state checkpoint mechanism in Flink. I 
>> find the statement  "Once the last stream has received barrier n, the 
>> operator emits all pending outgoing records, and then emits snapshot n 
>> barriers itself” on the document 
>> https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html#exactly-once-vs-at-least-once.
>> 
>> Does this mean that to achieve exactly-once semantic, instead of sending 
>> tuples downstream immediately the operator buffers its outgoing tuples in a 
>> pending queue until the current snapshot is committed? If yes, will this 
>> introduce significant processing delay?
>> 
>> Thanks,
>> Li
>> 
> 


Re: Elasticsearch sink: Java.lang.NoSuchMethodError: org.elasticsearch.common.settings.Settings.settingsBuilder

2016-11-01 Thread Till Rohrmann
Hi Pedro,

this looks like a version mismatch. Could you check which version of
elasticsearch you've in your classpath respectively uber jar? It should be
the version 2.3.5.

Cheers,
Till

On Fri, Oct 28, 2016 at 6:59 PM, PedroMrChaves 
wrote:

> Hello,
>
> I am using Flink to write data to elasticsearch.
>
> Flink version : 1.1.3
> Elasticsearch version: 2.4.1
>
> But I am getting the following error:
>
> 1/0/28/2016 18:58:56 Job execution switched to status FAILING.
> java.lang.NoSuchMethodError:
> org.elasticsearch.common.settings.Settings.settingsBuilder()Lorg/
> elasticsearch/common/settings/Settings$Builder;
> at
> org.apache.flink.streaming.connectors.elasticsearch2.
> ElasticsearchSink.open(ElasticsearchSink.java:162)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(
> AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
> at java.lang.Thread.run(Thread.java:745)/
>
> This is the code I use to configure the sink (similar to the taxi ride
> example in
> https://www.elastic.co/blog/building-real-time-dashboard-
> applications-with-apache-flink-elasticsearch-and-kibana)
>
> /private void elasticSink() {
> Map config = new HashMap<>();
> // This instructs the sink to emit after every element, otherwise
> they would be buffered
> config.put("bulk.flush.max.actions", "10");
> config.put("cluster.name", "elasticdemo");
>
> List transports = new ArrayList<>();
> try {
> transports.add(new
> InetSocketAddress(InetAddress.getByName("localhost"), 9200));
> } catch (UnknownHostException ex) {
> Logger.getLogger(CEPEngine.class.getName()).log(Level.SEVERE,
> null, ex);
> }
>
> stream.addSink(new ElasticsearchSink<>(
> config,
> transports,
> new AccessDataInsert()));
>
> }/
>
> What could be the problem?
>
> Regards,
> Pedro Chaves
>
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Elasticsearch-sink-Java-lang-
> NoSuchMethodError-org-elasticsearch-common-settings-
> Settings-settingsBur-tp9773.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Question about the checkpoint mechanism in Flink.

2016-11-01 Thread Till Rohrmann
Hi Li,

the statement refers to operators with multiple inputs (two in this case).
With the current implementation you will indeed block one of the inputs
after receiving a checkpoint barrier n until you've received the
corresponding checkpoint barrier n on the other input as well. This is what
we call checkpoint barrier alignment. If the processing time on both input
paths is similar and thus there is no back pressure on any of the inputs,
the alignment should not take too long. In case where one of the inputs is
considerably slower than the other, you should an additional delay.

For single input operators, you don't have to align the checkpoint barriers.

The checkpoint barrier alignment is not strictly necessary, but it allows
us to not having to store all in flight records from the second input which
arrive between the checkpoint barrier on the first input and the
corresponding barrier on the second input. We might change this
implementation in the future, though.

Cheers,
Till

On Tue, Nov 1, 2016 at 8:05 AM, Li Wang  wrote:

> Hi all,
>
> I have a question regarding to the state checkpoint mechanism in Flink. I
> find the statement  "Once the last stream has received barrier n, the
> operator emits all pending outgoing records, and then emits
> snapshot n barriers itself” on the document https://ci.apache.org/
> projects/flink/flink-docs-master/internals/stream_
> checkpointing.html#exactly-once-vs-at-least-once.
>
> Does this mean that to achieve exactly-once semantic, instead of sending
> tuples downstream immediately the operator buffers its outgoing tuples in a
> pending queue until the current snapshot is committed? If yes, will this
> introduce significant processing delay?
>
> Thanks,
> Li
>
>


Re: Kinesis Connector Dependency Problems

2016-11-01 Thread Till Rohrmann
Hi Justin,

I think this might be a problem in Flink's Kinesis consumer. The Flink
Kinesis consumer uses the aws-java-sdk version 1.10.71 which indeed
contains the afore mentioned methods. However, already version 1.10.46 no
longer contains this method. Thus, I suspect, that Yarn puts some older
version of this jar into the classpath. For these cases, I think we have to
shade our aws-java-sdk dependency so that it also works with older versions
of EMR.

In order to verify this, could you tell us which EMR version you're
running? Additionally, it would be helpful if you sent us the classpath
with which the Flink cluster was started on Yarn. You can find this
information at the beginning of your TaskManager log file. Thanks a lot.

Cheers,
Till

On Mon, Oct 31, 2016 at 8:22 PM, Justin Yan  wrote:

> Hi all - first time on the mailing list, so my apologies if I break
> protocol on anything.  Really excited to be using Flink, and hoping to be
> active here in the future!  Also, apologies for the length of this email -
> I tried to include details but may have gone overboard.
>
> The gist of my problem is an issue with packaging the Flink Kinesis
> Connector into my user code for execution on a YARN cluster in EMR -
> there's some dependency trouble happening, but after about 48 hours of
> attempts, I'm not sure how to make progress, and I'd really appreciate any
> ideas or assistance. Thank you in advance!
>
> ### First, Some Context.
>
> We're hoping to write our Flink jobs in scala 2.11.  The Flink JM/TMs
> currently run on an EMR cluster with Hadoop 2.7 as YARN containers.  We run
> our jobs via an Azkaban server, which has the Hadoop and Flink clients
> installed, and the configurations are set to point at the YARN master on
> our EMR cluster (with $HADOOP_HOME set so Flink can discover the hadoop
> configs).  We're using Java OpenJDK7 everywhere, and Maven 3.3.9 when
> building Flink from source.
>
> We use SBT and the assembly plugin to create an Uberjar of our code and
> its dependencies.  This gets uploaded to Azkaban, whereupon the following
> command is run on the azkaban server to execute a Flink job:
>
> flink run -c  usercodeuberjar-assembly-1.0.jar
>
> I've successfully run a few flink jobs that execute on our EMR cluster in
> this fashion (the WordCount example, etc.).
>
> ### The Problem
>
> We use AWS Kinesis, and are hoping to integrate Flink with it.  Naturally,
> we were hoping to use the Kinesis connector:  projects/flink/flink-docs-release-1.1/apis/streaming/
> connectors/kinesis.html>.
>
> After following the instructions with some experimentation, I was able to
> run a Flink Kinesis application on my laptop in Local Cluster mode.
>  (Ubuntu 16.04, local cluster initiated with the `./start-local.sh`
> command, job submitted via `flink run -c 
> usercodeuberjar-assembly-1.0.jar`)
>
> I uploaded the same JAR to Azkaban and tried to run the same command to
> submit to our EMR cluster, and got a `java.lang.NoSuchMethodError:
> com.amazonaws.SDKGlobalConfiguration.isInRegionOptimizedModeEnabled()`
> (I've included the full stack trace at the bottom of this email).  I went
> to inspect the uploaded JAR with a `unzip usercodeuberjar-assembly-1.0.jar`,
> looked in `com/amazonaws` and found the SDKGlobalConfiguration.class file.
> I decompiled and inspected it, and the isInRegionOptimizedModeEnabled
> method that was purportedly missing was indeed present.
>
> I've included the steps I took to manifest this problem below, along with
> a variety of things that I tried to do to resolve the problem - any help or
> insight is greatly appreciated!
>
> ### Repro
>
> I'm not sure how to provide a clear repro, but I'll try to include as much
> detail as I can about the sequence of actions and commands I ran since
> there may be some obvious mistakes:
>
> Downloading the flink release to my laptop:
>
> wget http://www-us.apache.org/dist/flink/flink-1.1.3/flink-1.1.3-
> bin-hadoop27-scala_2.11.tgz
> tar xfzv flink-1.1.3-bin-hadoop27-scala_2.11.tgz
>
> I then SSH'd into Azkaban, and ran the same two commands, while adding the
> bin/ directory to my PATH and tweaking the config for fs.hdfs.hadoopconf.
> Next, after getting the flink binaries, I went to fetch the source code in
> order to follow the instructions here:  projects/flink/flink-docs-release-1.1/apis/streaming/
> connectors/kinesis.html>
>
> wget https://github.com/apache/flink/archive/release-1.1.3.tar.gz
> tar xfzv release-1.1.3.tar.gz
>
> Here, I wanted to leverage our EMR instance profile Role instead of
> passing in credentials, hence I wanted the AUTO value for the
> "aws.credentials.provider" config, which seems to have been added after
> 1.1.3 - I made a couple of small tweaks to AWSConfigConstants.java and
> AWSUtil.java to allow for that AUTO value.
>
> Next, we're using Scala 2.11, so per the instructions here, I changed the
> scala version: 

Question about the checkpoint mechanism in Flink.

2016-11-01 Thread Li Wang
Hi all,

I have a question regarding to the state checkpoint mechanism in Flink. I find 
the statement  "Once the last stream has received barrier n, the operator emits 
all pending outgoing records, and then emits snapshot n barriers itself” on the 
document 
https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html#exactly-once-vs-at-least-once
 
.

Does this mean that to achieve exactly-once semantic, instead of sending tuples 
downstream immediately the operator buffers its outgoing tuples in a pending 
queue until the current snapshot is committed? If yes, will this introduce 
significant processing delay?

Thanks,
Li