Clarification in TumblingProcessing TimeWindow Documentation

2018-05-27 Thread Dhruv Kumar
Hi

I was looking at TumblingProcessingTimeWindows.java 
<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java>
 and was a bit confused with the documentation at the start of this class. It 
says the following:

/**
 * A {@link WindowAssigner} that windows elements into windows based on the 
current
 * system time of the machine the operation is running on. Windows cannot 
overlap.
 *
 * For example, in order to window into windows of 1 minute, every 10 
seconds:
 *  {@code
 * DataStream<Tuple2<String, Integer>> in = ...;
 * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
 * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
 *   keyed.window(TumblingProcessingTimeWindows.of(Time.of(1, MINUTES), 
Time.of(10, SECONDS));
 * } 
 */


It says one can have tumbling windows of 1 minute, every 10 seconds. Doesn’t 
this become a sliding window then? The SlidingProcessTimeWindows.java 
<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java>
 has the exact same documentation with just one tiny change (“Windows can 
possibly overlap”). It seems to me that in the above documentation, the second 
Time argument of 10 seconds is for providing the window offset (as confirmed 
here 
<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java#L92>)
 and not for starting the tumbling window every 10 seconds.

Thanks


------
Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me



Re: Replaying logs with microsecond delay

2018-05-15 Thread Dhruv Kumar
Yes, thanks!

--
Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me

> On May 15, 2018, at 21:31, Xingcan Cui <xingc...@gmail.com> wrote:
> 
> Yes, that makes sense and maybe you could also generate dynamic intervals 
> according to the time spans.
> 
> Thanks,
> Xingcan
> 
>> On May 16, 2018, at 9:41 AM, Dhruv Kumar <gargdhru...@gmail.com 
>> <mailto:gargdhru...@gmail.com>> wrote:
>> 
>> As a part of my PhD research, I have been working on few optimization 
>> algorithms which try to jointly optimize delay and traffic (WAN traffic) in 
>> a geo-distributed streaming analytics setting. So, to show that the 
>> optimization actually works in real life, I am trying to implement these 
>> optimization algorithms on top of Apache Flink. For emulating a real life 
>> example, I need to generate a stream of records with some realistic delay 
>> (order of microseconds for fast incoming stream) between any two records. 
>> This stream will then by ingested and processed by Flink. 
>> 
>> Using the timestamps as is, in the form of event timestamps, only proves the 
>> algorithms from a theoretical/simulation perspective. 
>> 
>> Hope this answers your question to some extent at least. Let me know. 
>> 
>> Thanks!
>> --
>> Dhruv Kumar
>> PhD Candidate
>> Department of Computer Science and Engineering
>> University of Minnesota
>> www.dhruvkumar.me <http://www.dhruvkumar.me/>
>> 
>>> On May 15, 2018, at 20:29, Xingcan Cui <xingc...@gmail.com 
>>> <mailto:xingc...@gmail.com>> wrote:
>>> 
>>> Hi Dhruv,
>>> 
>>> since there are timestamps associated with each record, I was wondering why 
>>> you try to replay them with a fixed interval. Can you give a little 
>>> explanation about that?
>>> 
>>> Thanks,
>>> Xingcan
>>> 
>>>> On May 16, 2018, at 2:11 AM, Ted Yu <yuzhih...@gmail.com 
>>>> <mailto:yuzhih...@gmail.com>> wrote:
>>>> 
>>>> Please see the following:
>>>> 
>>>> http://www.rationaljava.com/2015/10/measuring-microsecond-in-java.html 
>>>> <http://www.rationaljava.com/2015/10/measuring-microsecond-in-java.html>
>>>> 
>>>> https://stackoverflow.com/questions/11498585/how-to-suspend-a-java-thread-for-a-small-period-of-time-like-100-nanoseconds
>>>>  
>>>> <https://stackoverflow.com/questions/11498585/how-to-suspend-a-java-thread-for-a-small-period-of-time-like-100-nanoseconds>
>>>> 
>>>> On Tue, May 15, 2018 at 10:40 AM, Dhruv Kumar <gargdhru...@gmail.com 
>>>> <mailto:gargdhru...@gmail.com>> wrote:
>>>> Hi
>>>> 
>>>> I am trying to replay a log file in which each record has a timestamp 
>>>> associated with it. The time difference between the records is of the 
>>>> order of microseconds. I am trying to replay this log maintaining the same 
>>>> delay between the records (using Thread.sleep()) and sending it to a 
>>>> socket. And then the Flink program reads the incoming data from this 
>>>> socket. Currently, replay of the entire log file takes much more time (3 
>>>> times) then the expected time (last_timstamp - first_timstamp).
>>>> 
>>>> I wanted to know what are the standard ways of replaying log files if one 
>>>> wants to maintain the same arrival delay between the records.
>>>> 
>>>> Let me know if I am not clear above.
>>>> 
>>>> Thanks 
>>>> --
>>>> Dhruv Kumar
>>>> PhD Candidate
>>>> Department of Computer Science and Engineering
>>>> University of Minnesota
>>>> www.dhruvkumar.me <http://www.dhruvkumar.me/>
>>>> 
>>> 
>> 
> 



Re: Replaying logs with microsecond delay

2018-05-15 Thread Dhruv Kumar
As a part of my PhD research, I have been working on few optimization 
algorithms which try to jointly optimize delay and traffic (WAN traffic) in a 
geo-distributed streaming analytics setting. So, to show that the optimization 
actually works in real life, I am trying to implement these optimization 
algorithms on top of Apache Flink. For emulating a real life example, I need to 
generate a stream of records with some realistic delay (order of microseconds 
for fast incoming stream) between any two records. This stream will then by 
ingested and processed by Flink. 

Using the timestamps as is, in the form of event timestamps, only proves the 
algorithms from a theoretical/simulation perspective. 

Hope this answers your question to some extent at least. Let me know. 

Thanks!
--
Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me

> On May 15, 2018, at 20:29, Xingcan Cui <xingc...@gmail.com> wrote:
> 
> Hi Dhruv,
> 
> since there are timestamps associated with each record, I was wondering why 
> you try to replay them with a fixed interval. Can you give a little 
> explanation about that?
> 
> Thanks,
> Xingcan
> 
>> On May 16, 2018, at 2:11 AM, Ted Yu <yuzhih...@gmail.com 
>> <mailto:yuzhih...@gmail.com>> wrote:
>> 
>> Please see the following:
>> 
>> http://www.rationaljava.com/2015/10/measuring-microsecond-in-java.html 
>> <http://www.rationaljava.com/2015/10/measuring-microsecond-in-java.html>
>> 
>> https://stackoverflow.com/questions/11498585/how-to-suspend-a-java-thread-for-a-small-period-of-time-like-100-nanoseconds
>>  
>> <https://stackoverflow.com/questions/11498585/how-to-suspend-a-java-thread-for-a-small-period-of-time-like-100-nanoseconds>
>> 
>> On Tue, May 15, 2018 at 10:40 AM, Dhruv Kumar <gargdhru...@gmail.com 
>> <mailto:gargdhru...@gmail.com>> wrote:
>> Hi
>> 
>> I am trying to replay a log file in which each record has a timestamp 
>> associated with it. The time difference between the records is of the order 
>> of microseconds. I am trying to replay this log maintaining the same delay 
>> between the records (using Thread.sleep()) and sending it to a socket. And 
>> then the Flink program reads the incoming data from this socket. Currently, 
>> replay of the entire log file takes much more time (3 times) then the 
>> expected time (last_timstamp - first_timstamp).
>> 
>> I wanted to know what are the standard ways of replaying log files if one 
>> wants to maintain the same arrival delay between the records.
>> 
>> Let me know if I am not clear above.
>> 
>> Thanks 
>> --
>> Dhruv Kumar
>> PhD Candidate
>> Department of Computer Science and Engineering
>> University of Minnesota
>> www.dhruvkumar.me <http://www.dhruvkumar.me/>
>> 
> 



Re: Replaying logs with microsecond delay

2018-05-15 Thread Dhruv Kumar
Thanks a lot, Ted. Appreciate your help!

The approaches specified in the below links, are giving a very good level of 
accuracy. Solves my problem for now.

Thanks
--
Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me

> On May 15, 2018, at 13:11, Ted Yu <yuzhih...@gmail.com> wrote:
> 
> Please see the following:
> 
> http://www.rationaljava.com/2015/10/measuring-microsecond-in-java.html 
> <http://www.rationaljava.com/2015/10/measuring-microsecond-in-java.html>
> 
> https://stackoverflow.com/questions/11498585/how-to-suspend-a-java-thread-for-a-small-period-of-time-like-100-nanoseconds
>  
> <https://stackoverflow.com/questions/11498585/how-to-suspend-a-java-thread-for-a-small-period-of-time-like-100-nanoseconds>
> 
> On Tue, May 15, 2018 at 10:40 AM, Dhruv Kumar <gargdhru...@gmail.com 
> <mailto:gargdhru...@gmail.com>> wrote:
> Hi
> 
> I am trying to replay a log file in which each record has a timestamp 
> associated with it. The time difference between the records is of the order 
> of microseconds. I am trying to replay this log maintaining the same delay 
> between the records (using Thread.sleep()) and sending it to a socket. And 
> then the Flink program reads the incoming data from this socket. Currently, 
> replay of the entire log file takes much more time (3 times) then the 
> expected time (last_timstamp - first_timstamp).
> 
> I wanted to know what are the standard ways of replaying log files if one 
> wants to maintain the same arrival delay between the records.
> 
> Let me know if I am not clear above.
> 
> Thanks 
> --
> Dhruv Kumar
> PhD Candidate
> Department of Computer Science and Engineering
> University of Minnesota
> www.dhruvkumar.me <http://www.dhruvkumar.me/>
> 



Replaying logs with microsecond delay

2018-05-15 Thread Dhruv Kumar
Hi

I am trying to replay a log file in which each record has a timestamp 
associated with it. The time difference between the records is of the order of 
microseconds. I am trying to replay this log maintaining the same delay between 
the records (using Thread.sleep()) and sending it to a socket. And then the 
Flink program reads the incoming data from this socket. Currently, replay of 
the entire log file takes much more time (3 times) then the expected time 
(last_timstamp - first_timstamp).

I wanted to know what are the standard ways of replaying log files if one wants 
to maintain the same arrival delay between the records.

Let me know if I am not clear above.

Thanks 
--
Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me



Re: Signal for End of Stream

2018-05-08 Thread Dhruv Kumar
Fabian, Thanks a lot for your continuous help! Really appreciate it.

Sent from Phone.

> On May 8, 2018, at 03:06, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi Dhruv,
> 
> The changes look good to me.
> 
> Best, Fabian
> 
> 2018-05-08 5:37 GMT+02:00 Dhruv Kumar <gargdhru...@gmail.com>:
>> Thanks a lot, Fabian for your response.
>> 
>> What I understand is that if I write my own Sourcefunction such that it 
>> handles the "end of stream” record and make the source exit from run() 
>> method, the flink program will terminate. 
>> 
>> I have been using SocketTextStreamFunction till now.
>> So, I duplicated the SocketTextStreamFunction class into another class named 
>> CustomSocketTextStreamFunction which is exactly the same as 
>> SocketTextStreamFunction except for one change in the run() method. Change 
>> is highlighted in BOLD below. Can you take a look and let me know if this 
>> will work and it won’t have much of performance impact? I tested it on my 
>> machine locally and seems to work fine. But I just want to make sure that it 
>> won’t have any side effects/race conditions etc.
>> 
>> ```
>> @Override
>> public void run(SourceContext ctx) throws Exception {
>> final StringBuilder buffer = new StringBuilder();
>> long attempt = 0;
>> 
>> while (isRunning) {
>> 
>> try (Socket socket = new Socket()) {
>> currentSocket = socket;
>> 
>> LOG.info("Custom: Connecting to server socket " + hostname + 
>> ':' + port);
>> socket.connect(new InetSocketAddress(hostname, port), 
>> CONNECTION_TIMEOUT_TIME);
>> BufferedReader reader = new BufferedReader(new 
>> InputStreamReader(socket.getInputStream()));
>> 
>> char[] cbuf = new char[8192];
>> int bytesRead;
>> while (isRunning && (bytesRead = reader.read(cbuf)) != -1) {
>> buffer.append(cbuf, 0, bytesRead);
>> int delimPos;
>> while (buffer.length() >= delimiter.length() && 
>> (delimPos = buffer.indexOf(delimiter)) != -1) {
>> String record = buffer.substring(0, delimPos);
>> if(record.equals("END")) {
>> LOG.info("End of stream encountered");
>> isRunning = false;
>> buffer.delete(0, delimPos + delimiter.length());
>> break;
>> }
>> // truncate trailing carriage return
>> if (delimiter.equals("\n") && record.endsWith("\r")) 
>> {
>> record = record.substring(0, record.length() - 
>> 1);
>> }
>> ctx.collect(record);
>> buffer.delete(0, delimPos + delimiter.length());
>> }
>> }
>> }
>> 
>> // if we dropped out of this loop due to an EOF, sleep and retry
>> if (isRunning) {
>> attempt++;
>> if (maxNumRetries == -1 || attempt < maxNumRetries) {
>> LOG.warn("Lost connection to server socket. Retrying in 
>> " + delayBetweenRetries + " msecs...");
>> Thread.sleep(delayBetweenRetries);
>> }
>>         else {
>> // this should probably be here, but some examples 
>> expect simple exists of the stream source
>> // throw new EOFException("Reached end of stream and 
>> reconnects are not enabled.");
>> break;
>> }
>> }
>> }
>> 
>> // collect trailing data
>> if (buffer.length() > 0) {
>> ctx.collect(buffer.toString());
>> }
>> }
>> ```
>> 
>> 
>> --
>> Dhruv Kumar
>> PhD Candidate
>> Department of Computer Science and Engineering
>> University of Minnesota
>> www.dhruvkumar.me
>> 
>>> On May 7, 2018, at 11:04, Fabian Hueske <fhue...@gmail.com> wrote:
>>> 
>>> Hi,
>>> 
>>> Flink will automatically stop the execution of a DataStream prog

Re: Signal for End of Stream

2018-05-07 Thread Dhruv Kumar
Thanks a lot, Fabian for your response.

What I understand is that if I write my own Sourcefunction such that it handles 
the "end of stream” record and make the source exit from run() method, the 
flink program will terminate. 

I have been using SocketTextStreamFunction till now.
So, I duplicated the SocketTextStreamFunction class into another class named 
CustomSocketTextStreamFunction which is exactly the same as 
SocketTextStreamFunction except for one change in the run() method. Change is 
highlighted in BOLD below. Can you take a look and let me know if this will 
work and it won’t have much of performance impact? I tested it on my machine 
locally and seems to work fine. But I just want to make sure that it won’t have 
any side effects/race conditions etc.

```
@Override
public void run(SourceContext ctx) throws Exception {
final StringBuilder buffer = new StringBuilder();
long attempt = 0;

while (isRunning) {

try (Socket socket = new Socket()) {
currentSocket = socket;

LOG.info("Custom: Connecting to server socket " + hostname + 
':' + port);
socket.connect(new InetSocketAddress(hostname, port), 
CONNECTION_TIMEOUT_TIME);
BufferedReader reader = new BufferedReader(new 
InputStreamReader(socket.getInputStream()));

char[] cbuf = new char[8192];
int bytesRead;
while (isRunning && (bytesRead = reader.read(cbuf)) != -1) {
buffer.append(cbuf, 0, bytesRead);
int delimPos;
while (buffer.length() >= delimiter.length() && (delimPos = 
buffer.indexOf(delimiter)) != -1) {
String record = buffer.substring(0, delimPos);
if(record.equals("END")) {
LOG.info("End of stream encountered");
isRunning = false;
buffer.delete(0, delimPos + delimiter.length());
break;
}
// truncate trailing carriage return
if (delimiter.equals("\n") && record.endsWith("\r")) {
record = record.substring(0, record.length() - 1);
}
ctx.collect(record);
buffer.delete(0, delimPos + delimiter.length());
}
}
}

// if we dropped out of this loop due to an EOF, sleep and retry
if (isRunning) {
attempt++;
if (maxNumRetries == -1 || attempt < maxNumRetries) {
LOG.warn("Lost connection to server socket. Retrying in " + 
delayBetweenRetries + " msecs...");
Thread.sleep(delayBetweenRetries);
}
else {
// this should probably be here, but some examples expect 
simple exists of the stream source
// throw new EOFException("Reached end of stream and 
reconnects are not enabled.");
break;
}
}
}

// collect trailing data
if (buffer.length() > 0) {
    ctx.collect(buffer.toString());
}
}
```


--
Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me

> On May 7, 2018, at 11:04, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi,
> 
> Flink will automatically stop the execution of a DataStream program once all 
> sources have finished to provide data, i.e., when all SourceFunction return 
> from the run() method.
> The DeserializationSchema.isEndOfStream() method can be used to tell a 
> built-in SourceFunction such as a KafkaConsumer that it should leave the 
> run() method.
> If you implement your own SourceFunction you can leave run() after you 
> ingested all data.
> 
> Note, that Flink won't wait for all processing time timers but will 
> immediately shutdown the program after the last in-flight record was 
> processed. 
> Event-time timers will be handled because each source emits a Long.MAX_VALUE 
> watermark after it emitted its last record.
> 
> Best, Fabian
> 
> 2018-05-07 17:18 GMT+02:00 Dhruv Kumar <gargdhru...@gmail.com 
> <mailto:gargdhru...@gmail.com>>:
> I notice that there is some DeserializationSchema in 
> org.apache.flink.api.common.serialization which has a function isEndOfStream 
> but I am not sure if I can use it in my use case. 
> 
> ------
> Dhruv Kumar
> PhD Candidate
> Department of Computer Science and Engineering

Re: Signal for End of Stream

2018-05-07 Thread Dhruv Kumar
I notice that there is some DeserializationSchema in 
org.apache.flink.api.common.serialization which has a function isEndOfStream 
but I am not sure if I can use it in my use case. 

--
Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me

> On May 7, 2018, at 06:18, Dhruv Kumar <gargdhru...@gmail.com> wrote:
> 
> Hi
> 
> Is there a way I can capture the end of stream signal for streams which are 
> replayed from historical data? I need the end of stream signal to tell the 
> Flink program to finish its execution.
> 
> Below is the use case in detail:
> 1. An independent log replayer program sends the records to a socket 
> (identified by ip address and port).
> 2. Flink program reads the incoming records via socketTextStream from the 
> above mentioned socket, applies a KeyBy operator on the incoming records and 
> then does some processing, finally writing them to another socket.
> 
> How do I tell the Flink program to finish its execution? Is there any 
> information which I can add to the records while they are sent from the 
> replayer program and which can be parsed when the records arrive inside the 
> Flink program?
> 
> Let me know if anything is not clear.
> 
> Thanks
> 
> --
> Dhruv Kumar
> PhD Candidate
> Department of Computer Science and Engineering
> University of Minnesota
> www.dhruvkumar.me <http://www.dhruvkumar.me/>



Signal for End of Stream

2018-05-07 Thread Dhruv Kumar
Hi

Is there a way I can capture the end of stream signal for streams which are 
replayed from historical data? I need the end of stream signal to tell the 
Flink program to finish its execution.

Below is the use case in detail:
1. An independent log replayer program sends the records to a socket 
(identified by ip address and port).
2. Flink program reads the incoming records via socketTextStream from the above 
mentioned socket, applies a KeyBy operator on the incoming records and then 
does some processing, finally writing them to another socket.

How do I tell the Flink program to finish its execution? Is there any 
information which I can add to the records while they are sent from the 
replayer program and which can be parsed when the records arrive inside the 
Flink program?

Let me know if anything is not clear.

Thanks

--
Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me



Re: Apache Flink Examples

2018-04-27 Thread Dhruv Kumar
Thanks. Tests and the example folder will help.

--
Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me

> On Apr 27, 2018, at 06:47, Hung <unicorn.bana...@gmail.com> wrote:
> 
> in my case I usually check the tests they write for each function I want to
> use. 
> 
> Take CountTrigger as an example, if I want to customize my own way of
> counting, I will have a look at 
> the test the write
> 
> https://github.com/apache/flink/blob/8dfb9d00653271ea4adbeb752da8f62d7647b6d8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
> 
> Then I understand how this function is expected to work, and then I write my
> own test with my expected  result.
> 
> Test is the best documentation I would say. 
> 
> Also there is an example folder in github.
> https://github.com/apache/flink/tree/master/flink-examples
> 
> Best,
> 
> Sendoh
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Measure End-to-End latency/delay for each record

2018-04-26 Thread Dhruv Kumar
Ok thanks Michael for all your help!

--
Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me

> On Apr 26, 2018, at 19:24, TechnoMage <mla...@technomage.com> wrote:
> 
> Yes, Kafka for source and sink which makes monitoring the Flink in/out easy.
> 
> Michael
> 
>> On Apr 26, 2018, at 5:27 PM, Dhruv Kumar <gargdhru...@gmail.com 
>> <mailto:gargdhru...@gmail.com>> wrote:
>> 
>> Ok that answers my questions.
>> 
>> What are you keeping the source and sink as? Is it Kafka for both?
>> 
>> --
>> Dhruv Kumar
>> PhD Candidate
>> Department of Computer Science and Engineering
>> University of Minnesota
>> www.dhruvkumar.me <http://www.dhruvkumar.me/>
>> 
>>> On Apr 26, 2018, at 16:37, TechnoMage <mla...@technomage.com 
>>> <mailto:mla...@technomage.com>> wrote:
>>> 
>>> Yes NTP can still have skew.  It may be measured in fractions of a second, 
>>> but with Flink that can be significant if you care about sub-second latency 
>>> accuracy.  Since I have a 20 stage stream with 0.002 second latency it can 
>>> matter.
>>> 
>>> Back pressure is the limiting of input due to the inability of down-stream 
>>> tasks to accept input.  For example if you have a map that reads from a 
>>> database to enhance an element, that may limit earlier steps performance as 
>>> they can not push elements to it faster than it can read from the database. 
>>>  This can flow all the way back to the source and slow records coming into 
>>> the system.
>>> 
>>> Michael
>>> 
>>>> On Apr 26, 2018, at 12:38 PM, Dhruv Kumar <gargdhru...@gmail.com 
>>>> <mailto:gargdhru...@gmail.com>> wrote:
>>>> 
>>>> What do you mean by the time skew from one machine(source) to 
>>>> another(sink)? Do you mean the system time clocks of the source and sink 
>>>> may not be in sync. If I regularly use NTP to keep the system clocks in 
>>>> sync, will time skew still happen?
>>>> 
>>>> Could you also elaborate on what do you mean by back pressure on source 
>>>> and how will it impact the latency calculations?
>>>> 
>>>> Sorry if these are trivial questions. I am a bit new to the real world 
>>>> streaming systems.
>>>> 
>>>> --
>>>> Dhruv Kumar
>>>> PhD Candidate
>>>> Department of Computer Science and Engineering
>>>> University of Minnesota
>>>> www.dhruvkumar.me <http://www.dhruvkumar.me/>
>>>> 
>>>>> On Apr 26, 2018, at 13:26, TechnoMage <mla...@technomage.com 
>>>>> <mailto:mla...@technomage.com>> wrote:
>>>>> 
>>>>> In a single machine system this may work ok.  In a multi-machine system 
>>>>> this is not as reliable as the time skew from one machine (source) to 
>>>>> another (sink) can impact the measurements.  This also does not account 
>>>>> for back presure on the source.  We are using an external process to in 
>>>>> parallel read the source and output of the sink to measure the latency on 
>>>>> a single system clock.  It does account for those issues, but of course 
>>>>> does not account for delivery delays in the messaging system (kafka in 
>>>>> our case).  But, does measure real world latency as seen by the rest of 
>>>>> the system which is ultimately what matters to us.
>>>>> 
>>>>> Michael
>>>>> 
>>>>>> On Apr 26, 2018, at 12:01 PM, Dhruv Kumar <gargdhru...@gmail.com 
>>>>>> <mailto:gargdhru...@gmail.com>> wrote:
>>>>>> 
>>>>>> Hi
>>>>>> 
>>>>>> I was trying to compute the end-to-end-latency for each record processed 
>>>>>> by Flink. By end-to-end latency, I mean the difference between the time 
>>>>>> at which the record entered the Flink system (came at source) and the 
>>>>>> time at which the record is finally emitted into the sink. What is the 
>>>>>> best way to measure this? I was thinking of doing the following:
>>>>>> 1. Add the current system timestamp to the record when the record 
>>>>>> arrives at Flink.
>>>>>> 2. Add the current system timestamp to the record when the record is 
>>>>>> finally being emitted into the sink.
>>>>>> 3. Take the difference between 2 and 1 offline when all the records have 
>>>>>> been written into the sink.
>>>>>> 
>>>>>> Does this sound ok?
>>>>>> 
>>>>>> Also, if I use Processing time characteristic for this 
>>>>>> end-to-end-latency, will it be fine?
>>>>>> 
>>>>>> Thanks
>>>>>> --
>>>>>> Dhruv Kumar
>>>>>> PhD Candidate
>>>>>> Department of Computer Science and Engineering
>>>>>> University of Minnesota
>>>>>> www.dhruvkumar.me <http://www.dhruvkumar.me/>
>>>>> 
>>>> 
>>> 
>> 
> 



Re: Measure End-to-End latency/delay for each record

2018-04-26 Thread Dhruv Kumar
Ok that answers my questions.

What are you keeping the source and sink as? Is it Kafka for both?

--
Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me

> On Apr 26, 2018, at 16:37, TechnoMage <mla...@technomage.com> wrote:
> 
> Yes NTP can still have skew.  It may be measured in fractions of a second, 
> but with Flink that can be significant if you care about sub-second latency 
> accuracy.  Since I have a 20 stage stream with 0.002 second latency it can 
> matter.
> 
> Back pressure is the limiting of input due to the inability of down-stream 
> tasks to accept input.  For example if you have a map that reads from a 
> database to enhance an element, that may limit earlier steps performance as 
> they can not push elements to it faster than it can read from the database.  
> This can flow all the way back to the source and slow records coming into the 
> system.
> 
> Michael
> 
>> On Apr 26, 2018, at 12:38 PM, Dhruv Kumar <gargdhru...@gmail.com 
>> <mailto:gargdhru...@gmail.com>> wrote:
>> 
>> What do you mean by the time skew from one machine(source) to another(sink)? 
>> Do you mean the system time clocks of the source and sink may not be in 
>> sync. If I regularly use NTP to keep the system clocks in sync, will time 
>> skew still happen?
>> 
>> Could you also elaborate on what do you mean by back pressure on source and 
>> how will it impact the latency calculations?
>> 
>> Sorry if these are trivial questions. I am a bit new to the real world 
>> streaming systems.
>> 
>> --
>> Dhruv Kumar
>> PhD Candidate
>> Department of Computer Science and Engineering
>> University of Minnesota
>> www.dhruvkumar.me <http://www.dhruvkumar.me/>
>> 
>>> On Apr 26, 2018, at 13:26, TechnoMage <mla...@technomage.com 
>>> <mailto:mla...@technomage.com>> wrote:
>>> 
>>> In a single machine system this may work ok.  In a multi-machine system 
>>> this is not as reliable as the time skew from one machine (source) to 
>>> another (sink) can impact the measurements.  This also does not account for 
>>> back presure on the source.  We are using an external process to in 
>>> parallel read the source and output of the sink to measure the latency on a 
>>> single system clock.  It does account for those issues, but of course does 
>>> not account for delivery delays in the messaging system (kafka in our 
>>> case).  But, does measure real world latency as seen by the rest of the 
>>> system which is ultimately what matters to us.
>>> 
>>> Michael
>>> 
>>>> On Apr 26, 2018, at 12:01 PM, Dhruv Kumar <gargdhru...@gmail.com 
>>>> <mailto:gargdhru...@gmail.com>> wrote:
>>>> 
>>>> Hi
>>>> 
>>>> I was trying to compute the end-to-end-latency for each record processed 
>>>> by Flink. By end-to-end latency, I mean the difference between the time at 
>>>> which the record entered the Flink system (came at source) and the time at 
>>>> which the record is finally emitted into the sink. What is the best way to 
>>>> measure this? I was thinking of doing the following:
>>>> 1. Add the current system timestamp to the record when the record arrives 
>>>> at Flink.
>>>> 2. Add the current system timestamp to the record when the record is 
>>>> finally being emitted into the sink.
>>>> 3. Take the difference between 2 and 1 offline when all the records have 
>>>> been written into the sink.
>>>> 
>>>> Does this sound ok?
>>>> 
>>>> Also, if I use Processing time characteristic for this end-to-end-latency, 
>>>> will it be fine?
>>>> 
>>>> Thanks
>>>> --
>>>> Dhruv Kumar
>>>> PhD Candidate
>>>> Department of Computer Science and Engineering
>>>> University of Minnesota
>>>> www.dhruvkumar.me <http://www.dhruvkumar.me/>
>>> 
>> 
> 



Apache Flink Examples

2018-04-26 Thread Dhruv Kumar
Hi

I have been exploring Apache Flink for sometime now and I notice that although 
the documentation has good amount of information, there may not be sufficient 
examples (code snippets) which quickly explain what a particular feature will 
do. A good example which comes to my mind is Plotly <https://plot.ly/>. Plotly 
has a large number of examples for each of the their features which helps the 
user quickly implement those features for his or her own use. I wanted to know 
if anyone has similar thoughts on this. 

I am happy to contribute if I can get some guidance/direction.

I may be absolutely wrong. Please correct me if I am.

Thanks.
--
Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me



Re: Measure End-to-End latency/delay for each record

2018-04-26 Thread Dhruv Kumar
What do you mean by the time skew from one machine(source) to another(sink)? Do 
you mean the system time clocks of the source and sink may not be in sync. If I 
regularly use NTP to keep the system clocks in sync, will time skew still 
happen?

Could you also elaborate on what do you mean by back pressure on source and how 
will it impact the latency calculations?

Sorry if these are trivial questions. I am a bit new to the real world 
streaming systems.

--
Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me

> On Apr 26, 2018, at 13:26, TechnoMage <mla...@technomage.com> wrote:
> 
> In a single machine system this may work ok.  In a multi-machine system this 
> is not as reliable as the time skew from one machine (source) to another 
> (sink) can impact the measurements.  This also does not account for back 
> presure on the source.  We are using an external process to in parallel read 
> the source and output of the sink to measure the latency on a single system 
> clock.  It does account for those issues, but of course does not account for 
> delivery delays in the messaging system (kafka in our case).  But, does 
> measure real world latency as seen by the rest of the system which is 
> ultimately what matters to us.
> 
> Michael
> 
>> On Apr 26, 2018, at 12:01 PM, Dhruv Kumar <gargdhru...@gmail.com 
>> <mailto:gargdhru...@gmail.com>> wrote:
>> 
>> Hi
>> 
>> I was trying to compute the end-to-end-latency for each record processed by 
>> Flink. By end-to-end latency, I mean the difference between the time at 
>> which the record entered the Flink system (came at source) and the time at 
>> which the record is finally emitted into the sink. What is the best way to 
>> measure this? I was thinking of doing the following:
>> 1. Add the current system timestamp to the record when the record arrives at 
>> Flink.
>> 2. Add the current system timestamp to the record when the record is finally 
>> being emitted into the sink.
>> 3. Take the difference between 2 and 1 offline when all the records have 
>> been written into the sink.
>> 
>> Does this sound ok?
>> 
>> Also, if I use Processing time characteristic for this end-to-end-latency, 
>> will it be fine?
>> 
>> Thanks
>> --
>> Dhruv Kumar
>> PhD Candidate
>> Department of Computer Science and Engineering
>> University of Minnesota
>> www.dhruvkumar.me <http://www.dhruvkumar.me/>
> 



Measure End-to-End latency/delay for each record

2018-04-26 Thread Dhruv Kumar
Hi

I was trying to compute the end-to-end-latency for each record processed by 
Flink. By end-to-end latency, I mean the difference between the time at which 
the record entered the Flink system (came at source) and the time at which the 
record is finally emitted into the sink. What is the best way to measure this? 
I was thinking of doing the following:
1. Add the current system timestamp to the record when the record arrives at 
Flink.
2. Add the current system timestamp to the record when the record is finally 
being emitted into the sink.
3. Take the difference between 2 and 1 offline when all the records have been 
written into the sink.

Does this sound ok?

Also, if I use Processing time characteristic for this end-to-end-latency, will 
it be fine?

Thanks
--
Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me



Re: Custom Processing per window

2018-03-19 Thread Dhruv Kumar
Is there a way I can leverage OperatorState (instead of KeyState) to solve my 
issue?


> On Mar 19, 2018, at 09:00, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi,
> 
> Data is partitioned by key across machines and state is kept per key. It is 
> not possible to interact with two keys at the same time.
> 
> Best, Fabian
> 
> 2018-03-19 14:47 GMT+01:00 Dhruv Kumar <gargdhru...@gmail.com 
> <mailto:gargdhru...@gmail.com>>:
> In other words, while using the Flink streaming APIs, is it possible to take 
> a decision on emitting a particular key based on the state of some other key 
> present in the same window?
> 
> Thanks!
> --
> Dhruv Kumar
> PhD Candidate
> Department of Computer Science and Engineering
> University of Minnesota
> www.dhruvkumar.me <http://www.dhruvkumar.me/>
> 
>> On Mar 19, 2018, at 05:11, Dhruv Kumar <gargdhru...@gmail.com 
>> <mailto:gargdhru...@gmail.com>> wrote:
>> 
>> Task 1: I implemented it using a custom Trigger (see attached file). Looks 
>> like it is doing what I want it to. I copied the code from 
>> EventTimeTrigger.java and overwrote the onElement method. 
>> 
>> Task 2: I will need to maintain the state (this will be the LRU cache) for 
>> multiple keys in the same data structure. But it looks like that the Keyed 
>> states are on a per key basis. Should I use OperatorState in some way? Can I 
>> use a data structure not directly managed by Flink? What will happen in the 
>> case of keys across multiple machines?
>> 
>> 
>> 
>> 
>> Dhruv Kumar
>> PhD Candidate
>> Department of Computer Science and Engineering
>> University of Minnesota
>> www.dhruvkumar.me <http://www.dhruvkumar.me/>
>> 
>>> On Mar 19, 2018, at 02:04, Jörn Franke <jornfra...@gmail.com 
>>> <mailto:jornfra...@gmail.com>> wrote:
>>> 
>>> How would you start implementing it? Where are you stuck?
>>> 
>>> Did you already try to implement this?
>>> 
>>> On 18. Mar 2018, at 04:10, Dhruv Kumar <gargdhru...@gmail.com 
>>> <mailto:gargdhru...@gmail.com>> wrote:
>>> 
>>>> Hi
>>>> 
>>>> I am a CS PhD student at UMN Twin Cities. I am trying to use Flink for 
>>>> implementing some very specific use-cases: (They may not seem relevant but 
>>>> I need to implement them or I at least need to know if it is possible to 
>>>> implement them in Flink)
>>>> 
>>>> Assumptions:
>>>> 1. Data stream is of the form (key, value). We achieve this by the .key 
>>>> operation provided by Flink API.
>>>> 2. By emitting a key, I mean sending/outputting its aggregated value to 
>>>> any data sink. 
>>>> 
>>>> 1. For each Tumbling window in the Event Time space, for each key, I would 
>>>> like to aggregate its value until it crosses a particular threshold (same 
>>>> threshold for all the keys). As soon as the key’s aggregated value crosses 
>>>> this threshold, I would like to emit this key. At the end of every 
>>>> tumbling window, all the (key, value) aggregated pairs  would be emitted 
>>>> irrespective of whether they have crossed the threshold or not.
>>>> 
>>>> 2. For each Tumbling window in the event time space, I would like to 
>>>> maintain a LRU cache which stores the keys along with their aggregated 
>>>> values and their latest arrival time. The least recently used (LRU) key 
>>>> would be the key whose latest arrival time is earlier than the latest 
>>>> arrival times of all the other keys present in the LRU cache. The LRU 
>>>> cache is of a limited size. So, it is possible that the number of unique 
>>>> keys in a particular window is greater than the size of LRU cache. 
>>>> Whenever any (key, value) pair arrives, if the key already exists, its 
>>>> aggregated value is updated with the value of the newly arrived value and 
>>>> its latest arrival time is updated with the current event time. If the key 
>>>> does not exist and there is some free slot in the LRU cache, it is added 
>>>> into the LRU. As soon as the LRU cache gets occupied fully and a new key 
>>>> comes in which does not exist in the LRU cache, we would like to emit the 
>>>> least recently used key to accommodate the newly arrived key. As in the 
>>>> case of 1, at the end of every tumbling window, all the (key, value) 
>>>> aggregated pairs in the LRU cache would be emitted.  
>>>> 
>>>> Would like to know how can we implement these algorithms using Flink. Any 
>>>> help would be greatly appreciated.
>>>> 
>>>> Dhruv Kumar
>>>> PhD Candidate
>>>> Department of Computer Science and Engineering
>>>> University of Minnesota
>>>> www.dhruvkumar.me <http://www.dhruvkumar.me/>
>> 
> 
> 



Re: Custom Processing per window

2018-03-19 Thread Dhruv Kumar
In other words, while using the Flink streaming APIs, is it possible to take a 
decision on emitting a particular key based on the state of some other key 
present in the same window?

Thanks!
--
Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me

> On Mar 19, 2018, at 05:11, Dhruv Kumar <gargdhru...@gmail.com> wrote:
> 
> Task 1: I implemented it using a custom Trigger (see attached file). Looks 
> like it is doing what I want it to. I copied the code from 
> EventTimeTrigger.java and overwrote the onElement method. 
> 
> Task 2: I will need to maintain the state (this will be the LRU cache) for 
> multiple keys in the same data structure. But it looks like that the Keyed 
> states are on a per key basis. Should I use OperatorState in some way? Can I 
> use a data structure not directly managed by Flink? What will happen in the 
> case of keys across multiple machines?
> 
> 
> 
> 
> Dhruv Kumar
> PhD Candidate
> Department of Computer Science and Engineering
> University of Minnesota
> www.dhruvkumar.me <http://www.dhruvkumar.me/>
> 
>> On Mar 19, 2018, at 02:04, Jörn Franke <jornfra...@gmail.com 
>> <mailto:jornfra...@gmail.com>> wrote:
>> 
>> How would you start implementing it? Where are you stuck?
>> 
>> Did you already try to implement this?
>> 
>> On 18. Mar 2018, at 04:10, Dhruv Kumar <gargdhru...@gmail.com 
>> <mailto:gargdhru...@gmail.com>> wrote:
>> 
>>> Hi
>>> 
>>> I am a CS PhD student at UMN Twin Cities. I am trying to use Flink for 
>>> implementing some very specific use-cases: (They may not seem relevant but 
>>> I need to implement them or I at least need to know if it is possible to 
>>> implement them in Flink)
>>> 
>>> Assumptions:
>>> 1. Data stream is of the form (key, value). We achieve this by the .key 
>>> operation provided by Flink API.
>>> 2. By emitting a key, I mean sending/outputting its aggregated value to any 
>>> data sink. 
>>> 
>>> 1. For each Tumbling window in the Event Time space, for each key, I would 
>>> like to aggregate its value until it crosses a particular threshold (same 
>>> threshold for all the keys). As soon as the key’s aggregated value crosses 
>>> this threshold, I would like to emit this key. At the end of every tumbling 
>>> window, all the (key, value) aggregated pairs  would be emitted 
>>> irrespective of whether they have crossed the threshold or not.
>>> 
>>> 2. For each Tumbling window in the event time space, I would like to 
>>> maintain a LRU cache which stores the keys along with their aggregated 
>>> values and their latest arrival time. The least recently used (LRU) key 
>>> would be the key whose latest arrival time is earlier than the latest 
>>> arrival times of all the other keys present in the LRU cache. The LRU cache 
>>> is of a limited size. So, it is possible that the number of unique keys in 
>>> a particular window is greater than the size of LRU cache. Whenever any 
>>> (key, value) pair arrives, if the key already exists, its aggregated value 
>>> is updated with the value of the newly arrived value and its latest arrival 
>>> time is updated with the current event time. If the key does not exist and 
>>> there is some free slot in the LRU cache, it is added into the LRU. As soon 
>>> as the LRU cache gets occupied fully and a new key comes in which does not 
>>> exist in the LRU cache, we would like to emit the least recently used key 
>>> to accommodate the newly arrived key. As in the case of 1, at the end of 
>>> every tumbling window, all the (key, value) aggregated pairs in the LRU 
>>> cache would be emitted.  
>>> 
>>> Would like to know how can we implement these algorithms using Flink. Any 
>>> help would be greatly appreciated.
>>> 
>>> Dhruv Kumar
>>> PhD Candidate
>>> Department of Computer Science and Engineering
>>> University of Minnesota
>>> www.dhruvkumar.me <http://www.dhruvkumar.me/>
> 



Re: Custom Processing per window

2018-03-19 Thread Dhruv Kumar
Task 1: I implemented it using a custom Trigger (see attached file). Looks like it is doing what I want it to. I copied the code from EventTimeTrigger.java and overwrote the onElement method. Task 2: I will need to maintain the state (this will be the LRU cache) for multiple keys in the same data structure. But it looks like that the Keyed states are on a per key basis. Should I use OperatorState in some way? Can I use a data structure not directly managed by Flink? What will happen in the case of keys across multiple machines?

LazyAlgoTrigger.java
Description: Binary data

Dhruv KumarPhD CandidateDepartment of Computer Science and EngineeringUniversity of Minnesotawww.dhruvkumar.me


On Mar 19, 2018, at 02:04, Jörn Franke <jornfra...@gmail.com> wrote:How would you start implementing it? Where are you stuck?Did you already try to implement this?On 18. Mar 2018, at 04:10, Dhruv Kumar <gargdhru...@gmail.com> wrote:HiI am a CS PhD student at UMN Twin Cities. I am trying to use Flink for implementing some very specific use-cases: (They may not seem relevant but I need to implement them or I at least need to know if it is possible to implement them in Flink)Assumptions:1. Data stream is of the form (key, value). We achieve this by the .key operation provided by Flink API.2. By emitting a key, I mean sending/outputting its aggregated value to any data sink. 1. For each Tumbling window in the Event Time space, for each key, I would like to aggregate its value until it crosses a particular threshold (same threshold for all the keys). As soon as the key’s aggregated value crosses this threshold, I would like to emit this key. At the end of every tumbling window, all the (key, value) aggregated pairs  would be emitted irrespective of whether they have crossed the threshold or not.2. For each Tumbling window in the event time space, I would like to maintain a LRU cache which stores the keys along with their aggregated values and their latest arrival time. The least recently used (LRU) key would be the key whose latest arrival time is earlier than the latest arrival times of all the other keys present in the LRU cache. The LRU cache is of a limited size. So, it is possible that the number of unique keys in a particular window is greater than the size of LRU cache. Whenever any (key, value) pair arrives, if the key already exists, its aggregated value is updated with the value of the newly arrived value and its latest arrival time is updated with the current event time. If the key does not exist and there is some free slot in the LRU cache, it is added into the LRU. As soon as the LRU cache gets occupied fully and a new key comes in which does not exist in the LRU cache, we would like to emit the least recently used key to accommodate the newly arrived key. As in the case of 1, at the end of every tumbling window, all the (key, value) aggregated pairs in the LRU cache would be emitted.  Would like to know how can we implement these algorithms using Flink. Any help would be greatly appreciated.
Dhruv KumarPhD CandidateDepartment of Computer Science and EngineeringUniversity of Minnesotawww.dhruvkumar.me





Custom Processing per window

2018-03-17 Thread Dhruv Kumar
Hi

I am a CS PhD student at UMN Twin Cities. I am trying to use Flink for 
implementing some very specific use-cases: (They may not seem relevant but I 
need to implement them or I at least need to know if it is possible to 
implement them in Flink)

Assumptions:
1. Data stream is of the form (key, value). We achieve this by the .key 
operation provided by Flink API.
2. By emitting a key, I mean sending/outputting its aggregated value to any 
data sink. 

1. For each Tumbling window in the Event Time space, for each key, I would like 
to aggregate its value until it crosses a particular threshold (same threshold 
for all the keys). As soon as the key’s aggregated value crosses this 
threshold, I would like to emit this key. At the end of every tumbling window, 
all the (key, value) aggregated pairs  would be emitted irrespective of whether 
they have crossed the threshold or not.

2. For each Tumbling window in the event time space, I would like to maintain a 
LRU cache which stores the keys along with their aggregated values and their 
latest arrival time. The least recently used (LRU) key would be the key whose 
latest arrival time is earlier than the latest arrival times of all the other 
keys present in the LRU cache. The LRU cache is of a limited size. So, it is 
possible that the number of unique keys in a particular window is greater than 
the size of LRU cache. Whenever any (key, value) pair arrives, if the key 
already exists, its aggregated value is updated with the value of the newly 
arrived value and its latest arrival time is updated with the current event 
time. If the key does not exist and there is some free slot in the LRU cache, 
it is added into the LRU. As soon as the LRU cache gets occupied fully and a 
new key comes in which does not exist in the LRU cache, we would like to emit 
the least recently used key to accommodate the newly arrived key. As in the 
case of 1, at the end of every tumbling window, all the (key, value) aggregated 
pairs in the LRU cache would be emitted.  

Would like to know how can we implement these algorithms using Flink. Any help 
would be greatly appreciated.

Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me



Emulate Tumbling window in Event Time Space

2018-03-08 Thread Dhruv Kumar
Hi

I was trying to emulate tumbling window in event time space. Here is the link 
to my code.
I am using the process function to do the custom processing which I want to do 
within every window. I am having an issue of how to emit results at the end of 
every window since my watermark only gets emitted at every incoming event 
(incoming event will mostly not intersect with the end time of any window). 
Seems like I need to add a trigger somewhere which fires at the end of every 
window. Could any one here help me? Sorry, if I am not clear in anything. I am 
quite new to Flink. 

Thanks
Dhruv