Working with the Windowing functionality

2015-11-26 Thread Niels Basjes
Hi,

I'm trying to build something in Flink that relies heavily on the Windowing
features.

In essence what I want to build:
I have clickstream data coming in via Kafka. Each record (click) has a
sessionid and a timestamp.
I want to create a window for each session and after 30 minutes idle I want
all events for that session (visit) to be written to disk.
This should result in the effect that a specific visit exists in exactly
one file.
Since HDFS does not like 'small files' I want to create a (set of) files
every 15 minutes that contains several complete  visits.
So I need to buffer the 'completed visits' and flush them to disk in 15
minute batches.

What I think I need to get this is:
1) A map function that assigns the visit-id (i.e. new id after 30 minutes
idle)
2) A window per visit-id (close the window 30 minutes after the last click)
3) A window per 15 minutes that only contains windows of visits that are
complete

Today I've been trying to get this setup and I think I have some parts that
are in the right direction.

I have some questions and I'm hoping you guys can help me:

1) I have trouble understanding the way a windowed stream works "exactly".
As a consequence I'm having a hard time verifying if my code does what I
understand it should do.
I guess what would really help me is a very simple example on how to
unittest such a window.

2) Is what I describe above perhaps already been done before? If so; any
pointers are really appreciated.

3) Am I working in the right direction for what I'm trying to achieve; or
should I use a different API? a different approach?

Thanks

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


Re: Working with State example /flink streaming

2015-11-26 Thread Lopez, Javier
Hi, thanks for the answer. It worked but not in the way we expected. We
expect to have only one sum per ID and we are getting all the consecutive
sums, for example:

We expect this: (11,6) but we get this (11,1) (11,3) (11,6) (the initial
values are ID -> 11, values -> 1,2,3). Here is the code we are using for
our test:

DataStream> stream =
...;DataStream> result =
stream.keyBy(0).map(new RollingSum());

public static class RollingSum extends RichMapFunction, Tuple4> {

// persistent counter
private OperatorState sum;
private OperatorState count;


@Override
public Tuple4 map(Tuple2 value1) {
try {
Double newSum = sum.value()+value1.f1;

sum.update(newSum);
count.update(count.value()+1);
return new Tuple4(value1.f0,sum.value(),count.value(),sum.value()/count.value());
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

return null;

}

@Override
public void open(Configuration config) {
sum = getRuntimeContext().getKeyValueState("mySum",
Double.class, 0D);
count = getRuntimeContext().getKeyValueState("myCounter",
Long.class, 0L);
}

}


We are using a Tuple4 because we want to calculate the sum and the average
(So our Tuple is ID, SUM, Count, AVG). Do we need to add another step to
get a single value out of it? or is this the expected behavior.

Thanks again for your help.

On 25 November 2015 at 17:19, Stephan Ewen  wrote:

> Hi Javier!
>
> You can solve this both using windows, or using manual state.
>
> What is better depends a bit on when you want to have the result (the
> sum). Do you want a result emitted after each update (or do some other
> operation with that value) or do you want only the final sum after a
> certain time?
>
> For the second variant, I would use a window, for the first variant, you
> could use custom state as follows:
>
> For each element, you take the current state for the key, add the value to
> get the new sum. Then you update the state with the new sum and emit the
> value as well...
>
> Java:
>
> DataStream> stream = ...;DataStream Long>> result = stream.keyBy(0).map(new RollingSum());
>
>
> public class RollingSum extends RichMapFunction, 
> Tuple2> {
>
> private OperatorState sum;
>
> @Override
> public Tuple2 map(Tuple2 value) {
> *long *newSum = sum.value() + value.f1;sum.update(newSum);
> return *new* Tuple2<>(value.f0, newSum);
> }
>
> @Override
> public void open(Configuration config) {
> counter = getRuntimeContext().getKeyValueState("myCounter", 
> Long.class, 0L);
> }}
>
>
>
> In Scala, you can write this briefly as:
>
> val stream: DataStream[(String, Int)] = *...*
> val counts: DataStream[(String, Int)] = stream
>   .keyBy(_._1)
>   .mapWithState((in: (String, Int), sum: Option[Int]) => {*val* newSum = 
> in._2 + sum.getOrElse(0)
> ( (in._1, newSum), Some(newSum) )
>  }
>
>
> Does that help?
>
> Thanks also for pointing out the error in the sample code...
>
> Greetings,
> Stephan
>
>
> On Wed, Nov 25, 2015 at 4:55 PM, Lopez, Javier 
> wrote:
>
>> Hi,
>>
>> We are trying to do a test using States but we have not been able to
>> achieve our desired result. Basically we have a data stream with data as
>> [{"id":"11","value":123}] and we want to calculate the sum of all values
>> grouping by ID. We were able to achieve this using windows but not with
>>  states. The example that is in the documentation (
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#working-with-state)
>> is not very clear and even has some errors, for example:
>>
>> public class CounterSum implements RichReduceFunction
>>
>> should be
>>
>> public class CounterSum extends RichReduceFunction
>>
>> as RichReduceFuncion is a Class, not an interface.
>>
>> We wanted to ask you if you have an example of how to use States with
>> Flink.
>>
>> Thanks in advance for your help.
>>
>>
>>
>
>


Re: Working with State example /flink streaming

2015-11-26 Thread Stephan Ewen
Hi!

In streaming, there is no "end" of the stream when you would emit the final
sum. That's why there are windows.

If you do not want the partial sums, but only the final sum, you need to
define what window in which the sum is computed. At the end of that window,
that value is emitted. The window can be based on time, counts, or other
measures.

Greetings,
Stephan


On Thu, Nov 26, 2015 at 4:07 PM, Lopez, Javier 
wrote:

> Hi, thanks for the answer. It worked but not in the way we expected. We
> expect to have only one sum per ID and we are getting all the consecutive
> sums, for example:
>
> We expect this: (11,6) but we get this (11,1) (11,3) (11,6) (the initial
> values are ID -> 11, values -> 1,2,3). Here is the code we are using for
> our test:
>
> DataStream> stream = ...;DataStream Double, Long, Double>> result = stream.keyBy(0).map(new RollingSum());
>
> public static class RollingSum extends RichMapFunction Double>, Tuple4> {
>
> // persistent counter
>   private OperatorState sum;
>   private OperatorState count;
>   
>
> @Override
> public Tuple4 map(Tuple2 Double> value1) {
>   try {
>   Double newSum = sum.value()+value1.f1;
>   
>   sum.update(newSum);
>   count.update(count.value()+1);
>   return new Tuple4 Double>(value1.f0,sum.value(),count.value(),sum.value()/count.value());
>   } catch (IOException e) {
>   // TODO Auto-generated catch block
>   e.printStackTrace();
>   }
>
>   return null;
>
> }
>
> @Override
> public void open(Configuration config) {
> sum = getRuntimeContext().getKeyValueState("mySum", Double.class, 
> 0D);
> count = getRuntimeContext().getKeyValueState("myCounter", 
> Long.class, 0L);
> }
>
> }
>
>
> We are using a Tuple4 because we want to calculate the sum and the average
> (So our Tuple is ID, SUM, Count, AVG). Do we need to add another step to
> get a single value out of it? or is this the expected behavior.
>
> Thanks again for your help.
>
> On 25 November 2015 at 17:19, Stephan Ewen  wrote:
>
>> Hi Javier!
>>
>> You can solve this both using windows, or using manual state.
>>
>> What is better depends a bit on when you want to have the result (the
>> sum). Do you want a result emitted after each update (or do some other
>> operation with that value) or do you want only the final sum after a
>> certain time?
>>
>> For the second variant, I would use a window, for the first variant, you
>> could use custom state as follows:
>>
>> For each element, you take the current state for the key, add the value
>> to get the new sum. Then you update the state with the new sum and emit the
>> value as well...
>>
>> Java:
>>
>> DataStream> stream = ...;DataStream> Long>> result = stream.keyBy(0).map(new RollingSum());
>>
>>
>> public class RollingSum extends RichMapFunction, 
>> Tuple2> {
>>
>> private OperatorState sum;
>>
>> @Override
>> public Tuple2 map(Tuple2 value) {
>> *long *newSum = sum.value() + value.f1;sum.update(newSum);
>> return *new* Tuple2<>(value.f0, newSum);
>> }
>>
>> @Override
>> public void open(Configuration config) {
>> counter = getRuntimeContext().getKeyValueState("myCounter", 
>> Long.class, 0L);
>> }}
>>
>>
>>
>> In Scala, you can write this briefly as:
>>
>> val stream: DataStream[(String, Int)] = *...*
>> val counts: DataStream[(String, Int)] = stream
>>   .keyBy(_._1)
>>   .mapWithState((in: (String, Int), sum: Option[Int]) => {*val* newSum = 
>> in._2 + sum.getOrElse(0)
>> ( (in._1, newSum), Some(newSum) )
>>  }
>>
>>
>> Does that help?
>>
>> Thanks also for pointing out the error in the sample code...
>>
>> Greetings,
>> Stephan
>>
>>
>> On Wed, Nov 25, 2015 at 4:55 PM, Lopez, Javier 
>> wrote:
>>
>>> Hi,
>>>
>>> We are trying to do a test using States but we have not been able to
>>> achieve our desired result. Basically we have a data stream with data as
>>> [{"id":"11","value":123}] and we want to calculate the sum of all values
>>> grouping by ID. We were able to achieve this using windows but not with
>>>  states. The example that is in the documentation (
>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#working-with-state)
>>> is not very clear and even has some errors, for example:
>>>
>>> public class CounterSum implements RichReduceFunction
>>>
>>> should be
>>>
>>> public 

Re: Doubt about window and count trigger

2015-11-26 Thread Matthias J. Sax
Hi,

a Trigger is an *additional* condition for intermediate (early)
evaluation of the window. Thus, it is not "or-ed" to the basic window
definition.

If you want to have an or-ed window condition, you can customize it by
specifying your own window definition.

> dataStream.window(new MyOwnWindow() extends WindowAssigner { /* put your code 
> here */ );

-Matthias


On 11/26/2015 11:40 PM, Anwar Rizal wrote:
> Hi all,
> 
> From the documentation:
> "The |Trigger| specifies when the function that comes after the window
> clause (e.g., |sum|, |count|) is evaluated (“fires”) for each window."
> 
> So, basically, if I specify:
> 
> |keyedStream
> .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))
> .trigger(CountTrigger.of(100))|
> 
> |
> |
> 
> |The execution of the window function is triggered when the count reaches 100 
> in the time window of 5 seconds. If you have a system that never reaches 100 
> in 5 seconds, basically you will never have the window fired.|
> 
> |
> |
> 
> |My question is, what would be the best option to have behavior as follow:|
> 
> |The execution of the window function is triggered when 5 seconds is reached 
> or 100 events are received before 5 seconds.|
> 
> 
> I think of implementing my own trigger that looks like CountTrigger, but that 
> will fire also when the end of time window is reached (at the moment, it just 
> returns Continue, instead of Fired). But maybe there's a better way ? 
> 
> Is there a reason why CountTrigger is implemented as it is implemented today, 
> and not as I described above (5 seconds or 100 events reached, whichever 
> comes first).
> 
> 
> Thanks,
> 
> Anwar.
> 



signature.asc
Description: OpenPGP digital signature