Re: Monitoring Flink on Yarn

2016-12-19 Thread Shannon Carey
Check out Logstash or Splunk. Those can pipe your logs into an external 
database which can be used by a purpose-built UI for examining logs, and as a 
result it doesn't matter if the original files or machines are still around or 
not.


From: Lydia Ickler >
Date: Monday, December 19, 2016 at 7:38 AM
To: >
Subject: Monitoring Flink on Yarn

Hi all,

I am using Flink 1.1.3 on Yarn and I wanted to ask how I can save the 
monitoring logs, e.g. for I/O or network, to HDFS or local FS?
Since Yarn closes the Flink session after finishing the job I can't access the 
log via REST API.

I am looking forward to your answer!
Best regards,
Lydia


Serializing NULLs

2016-12-19 Thread Matt
Hello list,

I'm getting this error:


*java.lang.RuntimeException: Could not forward element to next operator*
*...*
*Caused by: java.lang.NullPointerException: in com.entities.Sector in map
in double null of double of map in field properties of com.entities.Sector*
*...*
*Caused by: java.lang.NullPointerException*

The field mentioned is a HashMap, and some keys are mapped
to null values.

Why isn't it possible to forward/serialize those elements with null values?
What do you do when your elements may contain nulls?

Regards,
Matt


Re: Updating a Tumbling Window every second?

2016-12-19 Thread Matt
Fabian,

Thanks for your answer. Since elements in B are expensive to create, I
wanted to reuse them. I understand I can plug two consumers into stream A,
but in that case -if I'm not wrong- I would have to create repeated
elements of B: one to save them into stream B and one to create C objects
for stream C.

Anyway, I've already solved this problem a few days back.

Regards,
Matt

On Mon, Dec 19, 2016 at 5:57 AM, Fabian Hueske  wrote:

> Hi Matt,
>
> the combination of a tumbling time window and a count window is one way to
> define a sliding window.
> In your example of a 30 secs tumbling window and a (3,1) count window
> results in a time sliding window of 90 secs width and 30 secs slide.
>
> You could define a time sliding window of 90 secs width and 1 secs slide
> on stream A to get a stream C with faster updates.
> If you still need stream B with the 30 secs tumbling window, you can have
> both windows defined on stream A.
>
> Hope this helps,
> Fabian
>
> 2016-12-16 12:58 GMT+01:00 Matt :
>
>> I have reduced the problem to a simple image [1].
>>
>> Those shown on the image are the streams I have, and the problem now is
>> how to create a custom window assigner such that objects in B that *don't
>> share* elements in A, are put together in the same window.
>>
>> Why? Because in order to create elements in C (triangles), I have to
>> process n *independent* elements of B (n=2 in the example).
>>
>> Maybe there's a better or simpler way to do this. Any idea is appreciated!
>>
>> Regards,
>> Matt
>>
>> [1] http://i.imgur.com/dG5AkJy.png
>>
>> On Thu, Dec 15, 2016 at 3:22 AM, Matt  wrote:
>>
>>> Hello,
>>>
>>> I have a rather simple problem with a difficult explanation...
>>>
>>> I have 3 streams, one of objects of class A (stream A), one of class B
>>> (stream B) and one of class C (stream C). The elements of A are generated
>>> at a rate of about 3 times every second. Elements of type B encapsulates
>>> some key features of the stream A (like the number of elements of A in the
>>> window) during the last 30 seconds (tumbling window 30s). Finally, the
>>> elements of type C contains statistics (for simplicity let's say the
>>> average of elements processed by each element in B) of the last 3 elements
>>> in B and are produced on every new element of B (count window 3, 1).
>>>
>>> Illustrative example, () and [] denotes windows:
>>>
>>> ... [a10 a9 a8] [a7 a6] [a5 a4] [a3 a2 a1]
>>> ... (b4 [b3 b2) b1]
>>> ... [c2] [c1]
>>>
>>> This works fine, except for a dashboard that depends on the elements of
>>> C to be updated, and 30s is way too big of a delay. I thought I could
>>> change the tumbling window for a sliding window of size 30s and a slide of
>>> 1s, but this doesn't work.
>>>
>>> If I use a sliding window to create elements of B as mentioned, each
>>> count window would contain 3 elements of B, and I would get one element of
>>> C every second as intended, but those elements in B encapsulates almost the
>>> same elements of A. This results in stats that are wrong.
>>>
>>> For instance, c1 may have the average of b1, b2 and b3. But b1, b2 and
>>> b3 share most of the elements from stream A.
>>>
>>> Question: is there any way to create a count window with the last 3
>>> elements of B that would have gone into the same tumbling window, not with
>>> the last 3 consecutive elements?
>>>
>>> I hope the problem is clear, don't hesitate to ask for further
>>> clarification!
>>>
>>> Regards,
>>> Matt
>>>
>>>
>>
>


Re: Reg Checkpoint size using RocksDb

2016-12-19 Thread Anirudh Mallem
Hi Stephan,
Thanks for your response. I shall try switching to the fully Async mode and see.
On another note, is there any option available to configure compaction 
capabilities using the default checkpointing mode? Thanks.

From: Stephan Ewen
Reply-To: "user@flink.apache.org"
Date: Monday, December 19, 2016 at 11:51 AM
To: "user@flink.apache.org"
Subject: Re: Reg Checkpoint size using RocksDb

Hi!

If you use the default checkpoint mode, Flink will checkpoint the current 
RocksDB instance. It may be that there simply has not been a compaction in 
RocksDB when checkpointing, so the checkpoint contains some "old data" as well.

If you switch to the "fully async" mode, it should always only checkpoint the 
latest state of RocksDB.

Best,
Stephan


On Mon, Dec 19, 2016 at 10:47 AM, Anirudh Mallem 
> wrote:
Hi,
I was experimenting with using RocksDb as the state backend for my job and to 
test its behavior I modified the socket word count program to store states. I 
also wrote a RichMapFunction which stores the states as a ValueState with 
default value as null.
What the job does basically is, for every word received if the current state is 
null then it updates the state with a fixed value say “abc” and in case the 
state is nonNull then it is cleared.
So ideally if my input stream has the word “foo” twice then the corresponding 
state is first set to “abc” and then cleared at the second “foo”. I see that 
this behavior is occurring as expected but the checkpointed size keeps 
increasing! Is this expected? I believe the checkpointed size as shown on the 
dashboard should decrease when some of the states are cleared right?
In this case if each of the “foo” word come in successive checkpointing 
intervals then we should observe rise and one fall in the checkpointing size 
right? I see the checkpointed size is increasing in both cases!!!

Any ideas as to what is happening here? My checkpoint duration is 5 secs. 
Thanks.

Regards,
Anirudh





Re: Reg Checkpoint size using RocksDb

2016-12-19 Thread Stephan Ewen
Hi!

If you use the default checkpoint mode, Flink will checkpoint the current
RocksDB instance. It may be that there simply has not been a compaction in
RocksDB when checkpointing, so the checkpoint contains some "old data" as
well.

If you switch to the "fully async" mode, it should always only checkpoint
the latest state of RocksDB.

Best,
Stephan


On Mon, Dec 19, 2016 at 10:47 AM, Anirudh Mallem  wrote:

> Hi,
> I was experimenting with using RocksDb as the state backend for my job and
> to test its behavior I modified the socket word count program to store
> states. I also wrote a RichMapFunction which stores the states as a
> ValueState with default value as null.
> What the job does basically is, for every word received if the current
> state is null then it updates the state with a fixed value say “abc” and in
> case the state is nonNull then it is cleared.
> So ideally if my input stream has the word “foo” twice then the
> corresponding state is first set to “abc” and then cleared at the second
> “foo”. I see that this behavior is occurring as expected but the
> checkpointed size keeps increasing! Is this expected? I believe the
> checkpointed size as shown on the dashboard should decrease when some of
> the states are cleared right?
> In this case if each of the “foo” word come in successive checkpointing
> intervals then we should observe rise and one fall in the checkpointing
> size right? I see the checkpointed size is increasing in both cases!!!
>
> Any ideas as to what is happening here? My checkpoint duration is 5 secs.
> Thanks.
>
> Regards,
> Anirudh
>
>
>


Re: Alert on state change.

2016-12-19 Thread Scott Kidder
Hi Rudra,

You could accomplish this with a rolling-fold on the stream of stock
prices. The accumulator argument to the fold can track the last price that
triggered an alert and the timestamp of the alert. When evaluating a new
stock price it can compare the the price against the last one that
triggered an alert. If it meets your alerting criteria, then send an alert
and update the accumulator state to reflect the most recent stock price &
timestamp.

Best,

--Scott Kidder



On Mon, Dec 19, 2016 at 7:15 AM, Rudra Tripathy  wrote:

> Hi All,
>
> I have a use case where I am monitoring price change.
>
> Lets s say the original price is $100
> in case of 20% rise , send the alert.
>
> In the stream I am getting updated prices.
>
> If in the next data $200 comes, send the alerts.
>
> Next I am getting 230 I would keep it but no alert
> . When I would get 240., I would send the alert.
>
> Is it possible to achieve using Flink.
>
> Thanks in advance,
> Rudra
>
>


Calculating stateful counts per key

2016-12-19 Thread Mäki Hanna
Hi,

I'm trying to calculate stateful counts per key with checkpoints following the 
example in 
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html#checkpointing-instance-fields.
 I would expect my test program to calculate the counts per key, but it seems 
to group the data by task rather than by key. Is this a Flink bug or have I 
misunderstood something?

The output of inputData.keyBy(0).flatMap(new TestCounters).print is

1> (A,count=1)
1> (F,count=2)
2> (B,count=1)
2> (C,count=2)
2> (D,count=3)
2> (E,count=4)
2> (E,count=5)
2> (E,count=6)
2> (H,count=7)
4> (G,count=1)

while the output of inputData.keyBy(0).flatMapWithState(...).print is (as I 
would expect)

2> (B,1)
4> (G,1)
1> (A,1)
2> (C,1)
1> (F,1)
2> (D,1)
2> (E,1)
2> (E,2)
2> (E,3)
2> (H,1)

I would expect both to give the same results.

The full code:

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.checkpoint.Checkpointed
import org.apache.flink.util.Collector

object FlinkStreamingTest {

  def main(args: Array[String]) {

val env = StreamExecutionEnvironment.createLocalEnvironment()

val checkpointIntervalMillis = 1
env.enableCheckpointing(checkpointIntervalMillis)

val inputData = env.fromElements(("A",0),("B",0),("C",0),("D",0),
  ("E",0),("E",0),("E",0),
  ("F",0),("G",0),("H",0))

inputData.keyBy(0).flatMap(new TestCounters).print

/*
inputData.keyBy(0).flatMapWithState((keyAndCount: (String, Int), count: 
Option[Int]) =>
  count match {
case None => (Iterator((keyAndCount._1, 1)), Some(1))
case Some(c) => (Iterator((keyAndCount._1, c+1)), Some(c+1))
  }).print
*/

env.execute("Counters test")
  }
}

case class CounterClass(var count: Int)

class TestCounters extends RichFlatMapFunction[(String, Int), (String, String)] 
with Checkpointed[CounterClass] {

  var counterValue: CounterClass = null

  override def flatMap(in: (String, Int), out: Collector[(String, String)]) = {
counterValue.count = counterValue.count + 1
out.collect((in._1,"count="+counterValue.count))
  }

  override def open(config: Configuration): Unit = {
if(counterValue == null) {
  counterValue = new CounterClass(0)
}
  }

  override def snapshotState(l: Long, l1: Long): CounterClass = {
counterValue
  }

  override def restoreState(state: CounterClass): Unit = {
counterValue = state
  }
}
Disclaimer: This message and any attachments thereto are intended solely for 
the addressed recipient(s) and may contain confidential information. If you are 
not the intended recipient, please notify the sender by reply e-mail and delete 
the e-mail (including any attachments thereto) without producing, distributing 
or retaining any copies thereof. Any review, dissemination or other use of, or 
taking of any action in reliance upon, this information by persons or entities 
other than the intended recipient(s) is prohibited. Thank you.


Alert on state change.

2016-12-19 Thread Rudra Tripathy
Hi All,

I have a use case where I am monitoring price change.

Lets s say the original price is $100
in case of 20% rise , send the alert.

In the stream I am getting updated prices.

If in the next data $200 comes, send the alerts.

Next I am getting 230 I would keep it but no alert
. When I would get 240., I would send the alert.

Is it possible to achieve using Flink.

Thanks in advance,
Rudra


Monitoring Flink on Yarn

2016-12-19 Thread Lydia Ickler
Hi all, 

I am using Flink 1.1.3 on Yarn and I wanted to ask how I can save the 
monitoring logs, e.g. for I/O or network, to HDFS or local FS?
Since Yarn closes the Flink session after finishing the job I can't access the 
log via REST API.

I am looking forward to your answer!
Best regards,
Lydia 

Re: High virtual memory usage

2016-12-19 Thread Stephan Ewen
Hi Paulo!

Hmm, interesting. The high discrepancy between virtual and physical memory
usually means that the process either maps large files into memory, or that
it pre-allocates a lot of memory without immediately using it.
Neither of these things are done by Flink.

Could this be an effect of either the Docker environment (mapping certain
kernel spaces / libraries / whatever) or a result of one of the libraries
(gRPC or so)?

Stephan


On Mon, Dec 19, 2016 at 12:32 PM, Paulo Cezar  wrote:

>   - Are you using RocksDB?
>
> No.
>
>
>   - What is your flink configuration, especially around memory settings?
>
> I'm using default config with 2GB for jobmanager and 5GB for taskmanagers.
> I'm starting flink via "./bin/yarn-session.sh -d -n 5 -jm 2048 -tm 5120 -s
> 4 -nm 'Flink'"
>
>   - What do you use for TaskManager heap size? Any manual value, or do you
> let Flink/Yarn set it automatically based on container size?
>
> No manual values here. YARN config is pretty much default with maximum
> allocation of 12GB of physical memory and ratio between virtual memory to
> physical memory 2.1 (via yarn.nodemanager.vmem-pmem-ratio).
>
>
>   - Do you use any libraries or connectors in your program?
>
> I'm using  flink-connector-kafka-0.10_2.11, a MongoDB client, a gRPC
> client and some http libraries like unirest and Apache HttpClient.
>
>   - Also, can you tell us what OS you are running on?
>
> My YARN cluster runs on Docker containers (docker version 1.12) with
> images based on Ubuntu 14.04. Host OS is Ubuntu 14.04.4 LTS (GNU/Linux
> 3.19.0-65-generic x86_64).
>
>


Re: High virtual memory usage

2016-12-19 Thread Paulo Cezar
  - Are you using RocksDB?

No.


  - What is your flink configuration, especially around memory settings?

I'm using default config with 2GB for jobmanager and 5GB for taskmanagers.
I'm starting flink via "./bin/yarn-session.sh -d -n 5 -jm 2048 -tm 5120 -s
4 -nm 'Flink'"

  - What do you use for TaskManager heap size? Any manual value, or do you
let Flink/Yarn set it automatically based on container size?

No manual values here. YARN config is pretty much default with maximum
allocation of 12GB of physical memory and ratio between virtual memory to
physical memory 2.1 (via yarn.nodemanager.vmem-pmem-ratio).


  - Do you use any libraries or connectors in your program?

I'm using  flink-connector-kafka-0.10_2.11, a MongoDB client, a gRPC client
and some http libraries like unirest and Apache HttpClient.

  - Also, can you tell us what OS you are running on?

My YARN cluster runs on Docker containers (docker version 1.12) with images
based on Ubuntu 14.04. Host OS is Ubuntu 14.04.4 LTS (GNU/Linux
3.19.0-65-generic x86_64).


Reg Checkpoint size using RocksDb

2016-12-19 Thread Anirudh Mallem
Hi,
I was experimenting with using RocksDb as the state backend for my job and to 
test its behavior I modified the socket word count program to store states. I 
also wrote a RichMapFunction which stores the states as a ValueState with 
default value as null.
What the job does basically is, for every word received if the current state is 
null then it updates the state with a fixed value say “abc” and in case the 
state is nonNull then it is cleared.
So ideally if my input stream has the word “foo” twice then the corresponding 
state is first set to “abc” and then cleared at the second “foo”. I see that 
this behavior is occurring as expected but the checkpointed size keeps 
increasing! Is this expected? I believe the checkpointed size as shown on the 
dashboard should decrease when some of the states are cleared right?
In this case if each of the “foo” word come in successive checkpointing 
intervals then we should observe rise and one fall in the checkpointing size 
right? I see the checkpointed size is increasing in both cases!!!

Any ideas as to what is happening here? My checkpoint duration is 5 secs. 
Thanks.

Regards,
Anirudh




Re: Blocking RichFunction.open() and backpressure

2016-12-19 Thread Yury Ruchin
Thanks Fabian, that quite explains what's going on.

2016-12-19 12:19 GMT+03:00 Fabian Hueske :

> Hi Yury,
>
> Flink's operators start processing as soon as they receive data. If an
> operator produces more data than its successor task can process, the data
> is buffered in Flink's network stack, i.e., its network buffers.
> The backpressure mechanism kicks in when all network buffers are in use
> and no more data can be buffered. In this case, a producing task will block
> until a network buffer becomes available.
>
> If the window operator in your job aggregates the data, only the
> aggregates will be buffered.
> This might explain why the first operators of job are able to start
> processing while the FlatMap operator is still setting up itself.
>
> Best,
> Fabian
>
> 2016-12-17 13:42 GMT+01:00 Yury Ruchin :
>
>> Hi all,
>>
>> I have a streaming job that essentially looks like this: KafkaSource ->
>> Map -> EventTimeWindow -> RichFlatMap -> CustomSink. The RichFlatMap part
>> does some heavy lifting in open(), so that the open() call blocks for
>> several minutes. I assumed that until open() returns the backpressure
>> mechanism would slow down the entire upstream up to the KafkaSource, so
>> that no new records would be emitted to the pipeline until the RichFlatMap
>> is ready. What I actually observe is that the KafkaSource, Map and
>> EventTimeWindow continue processing - the in/out records, in/out MB
>> counters keep increasing. The RichFlatMap and its downstream CustomSink
>> have 0 as expected, until the RichFlatMap is actually done with open(). The
>> backpressure monitor in Flink UI shows "OK" for all operators.
>>
>> Why doesn't backpressure mechanism work in this case?
>>
>> Thanks,
>> Yury
>>
>
>


Re: How to analyze space usage of Flink algorithms

2016-12-19 Thread Fabian Hueske
Your functions do not need to implement RichFunction (although, each
function can be a RichFunction and it should not be a problem to adapt the
job).
The system metrics are automatically collected. Metrics are exposed via a
Reporter [1].
So you do not need to take care of the collection but rather specify where
the collected metrics should be reported to.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/metrics.html#reporter

2016-12-19 9:59 GMT+01:00 otherwise777 :

> Thank you for your reply,
> I'm afraid i still don't understand it, the part i don't understand is how
> to actually analyze it. It's ok if i can just analyze the system instead of
> the actual job, but how would i actually do that?
> I don't have any function in my program that extends the richfunction
> afaik,
> so how would i call the getRuntimeContext() to print or store it?
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/How-to-analyze-
> space-usage-of-Flink-algorithms-tp10555p10686.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Blocking RichFunction.open() and backpressure

2016-12-19 Thread Fabian Hueske
Hi Yury,

Flink's operators start processing as soon as they receive data. If an
operator produces more data than its successor task can process, the data
is buffered in Flink's network stack, i.e., its network buffers.
The backpressure mechanism kicks in when all network buffers are in use and
no more data can be buffered. In this case, a producing task will block
until a network buffer becomes available.

If the window operator in your job aggregates the data, only the aggregates
will be buffered.
This might explain why the first operators of job are able to start
processing while the FlatMap operator is still setting up itself.

Best,
Fabian

2016-12-17 13:42 GMT+01:00 Yury Ruchin :

> Hi all,
>
> I have a streaming job that essentially looks like this: KafkaSource ->
> Map -> EventTimeWindow -> RichFlatMap -> CustomSink. The RichFlatMap part
> does some heavy lifting in open(), so that the open() call blocks for
> several minutes. I assumed that until open() returns the backpressure
> mechanism would slow down the entire upstream up to the KafkaSource, so
> that no new records would be emitted to the pipeline until the RichFlatMap
> is ready. What I actually observe is that the KafkaSource, Map and
> EventTimeWindow continue processing - the in/out records, in/out MB
> counters keep increasing. The RichFlatMap and its downstream CustomSink
> have 0 as expected, until the RichFlatMap is actually done with open(). The
> backpressure monitor in Flink UI shows "OK" for all operators.
>
> Why doesn't backpressure mechanism work in this case?
>
> Thanks,
> Yury
>


Re: How to analyze space usage of Flink algorithms

2016-12-19 Thread otherwise777
Thank you for your reply,
I'm afraid i still don't understand it, the part i don't understand is how
to actually analyze it. It's ok if i can just analyze the system instead of
the actual job, but how would i actually do that?
I don't have any function in my program that extends the richfunction afaik,
so how would i call the getRuntimeContext() to print or store it? 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-analyze-space-usage-of-Flink-algorithms-tp10555p10686.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Updating a Tumbling Window every second?

2016-12-19 Thread Fabian Hueske
Hi Matt,

the combination of a tumbling time window and a count window is one way to
define a sliding window.
In your example of a 30 secs tumbling window and a (3,1) count window
results in a time sliding window of 90 secs width and 30 secs slide.

You could define a time sliding window of 90 secs width and 1 secs slide on
stream A to get a stream C with faster updates.
If you still need stream B with the 30 secs tumbling window, you can have
both windows defined on stream A.

Hope this helps,
Fabian

2016-12-16 12:58 GMT+01:00 Matt :

> I have reduced the problem to a simple image [1].
>
> Those shown on the image are the streams I have, and the problem now is
> how to create a custom window assigner such that objects in B that *don't
> share* elements in A, are put together in the same window.
>
> Why? Because in order to create elements in C (triangles), I have to
> process n *independent* elements of B (n=2 in the example).
>
> Maybe there's a better or simpler way to do this. Any idea is appreciated!
>
> Regards,
> Matt
>
> [1] http://i.imgur.com/dG5AkJy.png
>
> On Thu, Dec 15, 2016 at 3:22 AM, Matt  wrote:
>
>> Hello,
>>
>> I have a rather simple problem with a difficult explanation...
>>
>> I have 3 streams, one of objects of class A (stream A), one of class B
>> (stream B) and one of class C (stream C). The elements of A are generated
>> at a rate of about 3 times every second. Elements of type B encapsulates
>> some key features of the stream A (like the number of elements of A in the
>> window) during the last 30 seconds (tumbling window 30s). Finally, the
>> elements of type C contains statistics (for simplicity let's say the
>> average of elements processed by each element in B) of the last 3 elements
>> in B and are produced on every new element of B (count window 3, 1).
>>
>> Illustrative example, () and [] denotes windows:
>>
>> ... [a10 a9 a8] [a7 a6] [a5 a4] [a3 a2 a1]
>> ... (b4 [b3 b2) b1]
>> ... [c2] [c1]
>>
>> This works fine, except for a dashboard that depends on the elements of C
>> to be updated, and 30s is way too big of a delay. I thought I could change
>> the tumbling window for a sliding window of size 30s and a slide of 1s, but
>> this doesn't work.
>>
>> If I use a sliding window to create elements of B as mentioned, each
>> count window would contain 3 elements of B, and I would get one element of
>> C every second as intended, but those elements in B encapsulates almost the
>> same elements of A. This results in stats that are wrong.
>>
>> For instance, c1 may have the average of b1, b2 and b3. But b1, b2 and b3
>> share most of the elements from stream A.
>>
>> Question: is there any way to create a count window with the last 3
>> elements of B that would have gone into the same tumbling window, not with
>> the last 3 consecutive elements?
>>
>> I hope the problem is clear, don't hesitate to ask for further
>> clarification!
>>
>> Regards,
>> Matt
>>
>>
>