MaxPermSize on yarn

2015-11-16 Thread Gwenhael Pasquiers
Hi,

We're having some OOM permgen exceptions when running on yarn.

We're not yet sure if it is either a consequence or a cause of our crashes, but 
we've been trying to increase that value... And we did not find how to do it.

I've seen that the yarn-daemon.sh sets a 256m value.
It looks to me that it's also possible to customize the YarnClient JVM args, 
but it will only be for the client, not for the TaskManagers.

Do you know of a way to do it ?

B.R.

Gwenhaël PASQUIERS


Re: Creating a representative streaming workload

2015-11-16 Thread Nick Dimiduk
Why not use an existing benchmarking tool -- is there one? Perhaps you'd
like to build something like YCSB [0] but for streaming workloads?

Apache Storm is the OSS framework that's been around the longest. Search
for "apache storm benchmark" and you'll get some promising hits. Looks like
IBMStreams has a tool [1] and the Ericsson research blog has a detailed
post [2] as well.

[0]: https://github.com/brianfrankcooper/YCSB
[1]:
https://github.com/IBMStreams/benchmarks/wiki/Running-Apache-Storm-benchmark
[2]:
http://www.ericsson.com/research-blog/data-knowledge/trident-benchmarking-performance/

On Mon, Nov 16, 2015 at 6:21 AM, Vasiliki Kalavri  wrote:

> Hello squirrels,
>
> with some colleagues and students here at KTH, we have started 2 projects
> to evaluate (1) performance and (2) behavior in the presence of memory
> interference in cloud environments, for Flink and other systems. We want to
> provide our students with a workload of representative applications for
> testing.
>
> While for batch applications, it is quite clear to us what classes of
> applications are widely used and how to create a workload of different
> types of applications, we are not quite sure about the streaming workload.
>
> That's why, we'd like your opinions! If you're using Flink streaming in
> your company or your project, we'd love your input even more :-)
>
> What kind of applications would you consider as "representative" of a
> streaming workload? Have you run any experiments to evaluate Flink versus
> Spark, Storm etc.? If yes, would you mind sharing your code with us?
>
> We will of course be happy to share our results with everyone after we
> have completed our study.
>
> Thanks a lot!
> -Vasia.
>


Re: Multilang Support on Flink

2015-11-16 Thread Maximilian Michels
Hi Welly,

It's in the main Flink repository. Actually, this has just been
integrated with the Python API, see
https://github.com/apache/flink/blob/master/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java

Before it was independent
https://github.com/apache/flink/blob/release-0.10/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java

I think the creators decided it was too specific to the Python API to
actually serve as a generic interface. You might want to ask Chesnay
(in CC) for the details.

Cheers,
Max

On Sat, Nov 14, 2015 at 12:52 AM, Welly Tambunan  wrote:
> Hi Max,
>
> Do you know where the repo is ?
>
> I try to search on the flink staging but seems it's not there anymore ( via
> google)
>
> Cheers
>
> On Fri, Nov 13, 2015 at 5:07 PM, Maximilian Michels  wrote:
>>
>> Hi Welly,
>>
>> There is a protocol for communicating with other processes. This is
>> reflected in flink-language-binding-generic module. I'm not aware how
>> Spark or Storm communication protocols work but this protocol is
>> rather low level.
>>
>> Cheers,
>> Max
>>
>> On Fri, Nov 13, 2015 at 9:49 AM, Welly Tambunan  wrote:
>> > Hi All,
>> >
>> > I want to ask if there's multilang support ( like in Storm and pipeTo in
>> > Spark ) in flink ?
>> >
>> > I try to find it in the docs but can't find it.
>> >
>> > Any link or direction would be really appreciated.
>> >
>> >
>> > Cheers
>> >
>> > --
>> > Welly Tambunan
>> > Triplelands
>> >
>> > http://weltam.wordpress.com
>> > http://www.triplelands.com
>
>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com


Re: Apache Flink Operator State as Query Cache

2015-11-16 Thread Anwar Rizal
Stephan,

Having a look at the brand new 0.10 release, I noticed that OperatorState
is not implemented for ConnectedStream, which is quite the opposite of what
you said below.

Or maybe I misunderstood your sentence here ?

Thanks,
Anwar.


On Wed, Nov 11, 2015 at 10:49 AM, Stephan Ewen  wrote:

> Hi!
>
> In general, if you can keep state in Flink, you get better
> throughput/latency/consistency and have one less system to worry about
> (external k/v store). State outside means that the Flink processes can be
> slimmer and need fewer resources and as such recover a bit faster. There
> are use cases for that as well.
>
> Storing the model in OperatorState is a good idea, if you can. On the
> roadmap is to migrate the operator state to managed memory as well, so that
> should take care of the GC issues.
>
> We are just adding functionality to make the Key/Value operator state
> usable in CoMap/CoFlatMap as well (currently it only works in windows and
> in Map/FlatMap/Filter functions over the KeyedStream).
> Until the, you should be able to use a simple Java HashMap and use the
> "Checkpointed" interface to get it persistent.
>
> Greetings,
> Stephan
>
>
> On Sun, Nov 8, 2015 at 10:11 AM, Welly Tambunan  wrote:
>
>> Thanks for the answer.
>>
>> Currently the approach that i'm using right now is creating a base/marker
>> interface to stream different type of message to the same operator. Not
>> sure about the performance hit about this compare to the CoFlatMap
>> function.
>>
>> Basically this one is providing query cache, so i'm thinking instead of
>> using in memory cache like redis, ignite etc, i can just use operator state
>> for this one.
>>
>> I just want to gauge do i need to use memory cache or operator state
>> would be just fine.
>>
>> However i'm concern about the Gen 2 Garbage Collection for caching our
>> own state without using operator state. Is there any clarification on that
>> one ?
>>
>>
>>
>> On Sat, Nov 7, 2015 at 12:38 AM, Anwar Rizal  wrote:
>>
>>>
>>> Let me understand your case better here. You have a stream of model and
>>> stream of data. To process the data, you will need a way to access your
>>> model from the subsequent stream operations (map, filter, flatmap, ..).
>>> I'm not sure in which case Operator State is a good choice, but I think
>>> you can also live without.
>>>
>>> val modelStream =  // get the model stream
>>> val dataStream   =
>>>
>>> modelStream.broadcast.connect(dataStream). coFlatMap(  ) Then you can
>>> keep the latest model in a CoFlatMapRichFunction, not necessarily as
>>> Operator State, although maybe OperatorState is a good choice too.
>>>
>>> Does it make sense to you ?
>>>
>>> Anwar
>>>
>>> On Fri, Nov 6, 2015 at 10:21 AM, Welly Tambunan 
>>> wrote:
>>>
 Hi All,

 We have a high density data that required a downsample. However this
 downsample model is very flexible based on the client device and user
 interaction. So it will be wasteful to precompute and store to db.

 So we want to use Apache Flink to do downsampling and cache the result
 for subsequent query.

 We are considering using Flink Operator state for that one.

 Is that the right approach to use that for memory cache ? Or if that
 preferable using memory cache like redis etc.

 Any comments will be appreciated.


 Cheers
 --
 Welly Tambunan
 Triplelands

 http://weltam.wordpress.com
 http://www.triplelands.com 

>>>
>>>
>>
>>
>> --
>> Welly Tambunan
>> Triplelands
>>
>> http://weltam.wordpress.com
>> http://www.triplelands.com 
>>
>
>


Apache Flink 0.10.0 released

2015-11-16 Thread Fabian Hueske
Hi everybody,

The Flink community is excited to announce that Apache Flink 0.10.0 has
been released.
Please find the release announcement here:

-->  http://flink.apache.org/news/2015/11/16/release-0.10.0.html

Best,
Fabian


Re: Apache Flink 0.10.0 released

2015-11-16 Thread Slim Baltagi
Hi

I’m very pleased to be first to tweet about the release of Apache Flink 0.10.0 
just after receiving Fabian’s email :)
Flink 1.0 is around the corner now!

Slim Baltagi

On Nov 16, 2015, at 7:53 AM, Fabian Hueske  wrote:

> Hi everybody, 
> 
> The Flink community is excited to announce that Apache Flink 0.10.0 has been 
> released.
> Please find the release announcement here:
> 
> -->  http://flink.apache.org/news/2015/11/16/release-0.10.0.html
> 
> Best, 
> Fabian



Re: Apache Flink Operator State as Query Cache

2015-11-16 Thread Stephan Ewen
Hi Anwar!

0.10.0 was feature frozen at that time already and under testing. Key/value
state on connected streams will have to go into the next release...

Stephan


On Mon, Nov 16, 2015 at 3:00 PM, Anwar Rizal  wrote:

> Stephan,
>
> Having a look at the brand new 0.10 release, I noticed that OperatorState
> is not implemented for ConnectedStream, which is quite the opposite of what
> you said below.
>
> Or maybe I misunderstood your sentence here ?
>
> Thanks,
> Anwar.
>
>
> On Wed, Nov 11, 2015 at 10:49 AM, Stephan Ewen  wrote:
>
>> Hi!
>>
>> In general, if you can keep state in Flink, you get better
>> throughput/latency/consistency and have one less system to worry about
>> (external k/v store). State outside means that the Flink processes can be
>> slimmer and need fewer resources and as such recover a bit faster. There
>> are use cases for that as well.
>>
>> Storing the model in OperatorState is a good idea, if you can. On the
>> roadmap is to migrate the operator state to managed memory as well, so that
>> should take care of the GC issues.
>>
>> We are just adding functionality to make the Key/Value operator state
>> usable in CoMap/CoFlatMap as well (currently it only works in windows and
>> in Map/FlatMap/Filter functions over the KeyedStream).
>> Until the, you should be able to use a simple Java HashMap and use the
>> "Checkpointed" interface to get it persistent.
>>
>> Greetings,
>> Stephan
>>
>>
>> On Sun, Nov 8, 2015 at 10:11 AM, Welly Tambunan 
>> wrote:
>>
>>> Thanks for the answer.
>>>
>>> Currently the approach that i'm using right now is creating a
>>> base/marker interface to stream different type of message to the same
>>> operator. Not sure about the performance hit about this compare to the
>>> CoFlatMap function.
>>>
>>> Basically this one is providing query cache, so i'm thinking instead of
>>> using in memory cache like redis, ignite etc, i can just use operator state
>>> for this one.
>>>
>>> I just want to gauge do i need to use memory cache or operator state
>>> would be just fine.
>>>
>>> However i'm concern about the Gen 2 Garbage Collection for caching our
>>> own state without using operator state. Is there any clarification on that
>>> one ?
>>>
>>>
>>>
>>> On Sat, Nov 7, 2015 at 12:38 AM, Anwar Rizal 
>>> wrote:
>>>

 Let me understand your case better here. You have a stream of model and
 stream of data. To process the data, you will need a way to access your
 model from the subsequent stream operations (map, filter, flatmap, ..).
 I'm not sure in which case Operator State is a good choice, but I think
 you can also live without.

 val modelStream =  // get the model stream
 val dataStream   =

 modelStream.broadcast.connect(dataStream). coFlatMap(  ) Then you can
 keep the latest model in a CoFlatMapRichFunction, not necessarily as
 Operator State, although maybe OperatorState is a good choice too.

 Does it make sense to you ?

 Anwar

 On Fri, Nov 6, 2015 at 10:21 AM, Welly Tambunan 
 wrote:

> Hi All,
>
> We have a high density data that required a downsample. However this
> downsample model is very flexible based on the client device and user
> interaction. So it will be wasteful to precompute and store to db.
>
> So we want to use Apache Flink to do downsampling and cache the result
> for subsequent query.
>
> We are considering using Flink Operator state for that one.
>
> Is that the right approach to use that for memory cache ? Or if that
> preferable using memory cache like redis etc.
>
> Any comments will be appreciated.
>
>
> Cheers
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com 
>


>>>
>>>
>>> --
>>> Welly Tambunan
>>> Triplelands
>>>
>>> http://weltam.wordpress.com
>>> http://www.triplelands.com 
>>>
>>
>>
>


Re: Error handling

2015-11-16 Thread Stephan Ewen
Makes sense. The class of operations that work "per-tuple" before the data
is forwarded to the network stack could be extended to have error traps.

@Nick: Is that what you had in mind?

On Mon, Nov 16, 2015 at 7:27 PM, Aljoscha Krettek 
wrote:

> Hi,
> I don’t think that alleviates the problem. Sometimes you might want the
> system to continue even if stuff outside the UDF fails. For example, if a
> serializer does not work because of a null value somewhere. You would,
> however, like to get a message about this somewhere, I assume.
>
> Cheers,
> Aljoscha
> > On 16 Nov 2015, at 19:22, Stephan Ewen  wrote:
> >
> > Hi Nick!
> >
> > The errors outside your UDF (such as network problems) will be handled
> by Flink and cause the job to go into recovery. They should be
> transparently handled.
> >
> > Just make sure you activate checkpointing for your job!
> >
> > Stephan
> >
> >
> > On Mon, Nov 16, 2015 at 6:18 PM, Nick Dimiduk 
> wrote:
> > I have been thinking about this, maybe we can add a special output
> stream (for example Kafka, but can be generic) that would get
> errors/exceptions that where throws during processing. The actual
> processing would not stop and the messages in this special stream would
> contain some information about the current state of processing, the input
> element(s) and the machine/VM where computation is happening.
> >
> > Yes, this is precisely what I have in mind. The goal is (1) to not lose
> input data, and (2) to make errors available for operator visibility.
> >
> > It's not very portable, but I was able to implement my Maybe Throwable> type. I can now use it as the output of all my source streams,
> and split those streams on the presence of the Throwable. With this, I'm
> able to trap certain forms of invalid input and send it to an errors sink.
> However, there are still some error cases that cause exceptions,
> apparently, outside of my UDF try block that cause the whole streaming job
> to terminate.
> >
> > > On 11 Nov 2015, at 21:49, Nick Dimiduk  wrote:
> > >
> > > Heya,
> > >
> > > I don't see a section in the online manual dedicated to this topic, so
> I want to raise the question here: How should errors be handled?
> Specifically I'm thinking about streaming jobs, which are expected to
> "never go down". For example, errors can be raised at the point where
> objects are serialized to/from sources/sinks, and UDFs. Cascading provides
> failure traps [0] where erroneous tuples are saved off for post-processing.
> Is there any such functionality in Flink?
> > >
> > > I started down the road of implementing a Maybe/Optional type, a POJO
> Generic triple of  for capturing errors at each stage
> of a pipeline. However, Java type erasure means even though it compiles,
> the job is rejected at submission time.
> > >
> > > How are other people handling errors in their stream processing?
> > >
> > > Thanks,
> > > Nick
> > >
> > > [0]:
> http://docs.cascading.org/cascading/1.2/userguide/html/ch06s03.html
> >
> >
> >
>
>


Re: Error handling

2015-11-16 Thread Aljoscha Krettek
Hi,
I don’t think that alleviates the problem. Sometimes you might want the system 
to continue even if stuff outside the UDF fails. For example, if a serializer 
does not work because of a null value somewhere. You would, however, like to 
get a message about this somewhere, I assume.

Cheers,
Aljoscha
> On 16 Nov 2015, at 19:22, Stephan Ewen  wrote:
> 
> Hi Nick!
> 
> The errors outside your UDF (such as network problems) will be handled by 
> Flink and cause the job to go into recovery. They should be transparently 
> handled.
> 
> Just make sure you activate checkpointing for your job!
> 
> Stephan
> 
> 
> On Mon, Nov 16, 2015 at 6:18 PM, Nick Dimiduk  wrote:
> I have been thinking about this, maybe we can add a special output stream 
> (for example Kafka, but can be generic) that would get errors/exceptions that 
> where throws during processing. The actual processing would not stop and the 
> messages in this special stream would contain some information about the 
> current state of processing, the input element(s) and the machine/VM where 
> computation is happening.
> 
> Yes, this is precisely what I have in mind. The goal is (1) to not lose input 
> data, and (2) to make errors available for operator visibility.
> 
> It's not very portable, but I was able to implement my Maybe Throwable> type. I can now use it as the output of all my source streams, and 
> split those streams on the presence of the Throwable. With this, I'm able to 
> trap certain forms of invalid input and send it to an errors sink. However, 
> there are still some error cases that cause exceptions, apparently, outside 
> of my UDF try block that cause the whole streaming job to terminate.
>  
> > On 11 Nov 2015, at 21:49, Nick Dimiduk  wrote:
> >
> > Heya,
> >
> > I don't see a section in the online manual dedicated to this topic, so I 
> > want to raise the question here: How should errors be handled? Specifically 
> > I'm thinking about streaming jobs, which are expected to "never go down". 
> > For example, errors can be raised at the point where objects are serialized 
> > to/from sources/sinks, and UDFs. Cascading provides failure traps [0] where 
> > erroneous tuples are saved off for post-processing. Is there any such 
> > functionality in Flink?
> >
> > I started down the road of implementing a Maybe/Optional type, a POJO 
> > Generic triple of  for capturing errors at each stage 
> > of a pipeline. However, Java type erasure means even though it compiles, 
> > the job is rejected at submission time.
> >
> > How are other people handling errors in their stream processing?
> >
> > Thanks,
> > Nick
> >
> > [0]: http://docs.cascading.org/cascading/1.2/userguide/html/ch06s03.html
> 
> 
> 



Re: Custom TimestampExtractor and FlinkKafkaConsumer082

2015-11-16 Thread Konstantin Knauf
Hi Aljoscha,

I changed the Timestamp Extraktor to save the lastSeenTimestamp and only
emit with getCurrentWatermark [1] as you suggested. So basically I do
the opposite than before (only watermarks per events vs only watermarks
per autowatermark). And now it works :). The question remains, why it
did not work before. As far as I see, it is an issue with the first
TimestmapExtractor itself?!

Does getCurrentWatermark(..) somehow "overpower" the extracted watermarks?

Cheers,

Konstantin

[1]

final private long maxDelay;
private long lastTimestamp = Long.MIN_VALUE;

public PojoTimestampExtractor(long maxDelay) {
this.maxDelay = maxDelay;
}

@Override
public long extractTimestamp(Pojo pojo, long l) {
lastTimestamp = pojo.getTime();
return pojo.getTime();
}

@Override
public long extractWatermark(Pojo pojo, long l) {
return Long.MIN_VALUE;
}

@Override
public long getCurrentWatermark() {
return lastTimestamp - maxDelay;
}


On 16.11.2015 13:37, Aljoscha Krettek wrote:
> Hi,
> yes, at your data-rate emitting a watermark for every element should not be a 
> problem. It could become a problem with higher data-rates since the system 
> can get overwhelmed if every element also generates a watermark. In that case 
> I would suggest storing the lastest element-timestamp in an internal field 
> and only emitting in getCurrentWatermark(), since then, then the watermark 
> interval can be tunes using the auto-watermark interval setting.
> 
> But that should not be the cause of the problem that you currently have. 
> Would you maybe be willing to send me some (mock) example data and the code 
> so that I can reproduce the problem and have a look at it? to aljoscha at 
> apache.org.
> 
> Cheers,
> Aljoscha
>> On 16 Nov 2015, at 13:05, Konstantin Knauf  
>> wrote:
>>
>> Hi Aljoscha,
>>
>> ok, now I at least understand, why it works with fromElements(...). For
>> the rest I am not so sure.
>>
>>> What this means in your case is that the watermark can only advance if
>> a new element arrives, because only then is the watermark updated.
>>
>> But new elements arrive all the time, about 50/s, or do you mean
>> something else?
>>
>> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok
>> choice, if i understand the semantics correctly. It just affects
>> watermarking in the absence of events, right?
>>
>> Cheers,
>>
>> Konstantin
>>
>>
>> On 16.11.2015 12:31, Aljoscha Krettek wrote:
>>> Hi,
>>> it could be what Gyula mentioned. Let me first go a bit into how the 
>>> TimestampExtractor works internally.
>>>
>>> First, the timestamp extractor internally keeps the value of the last 
>>> emitted watermark. Then, the semantics of the TimestampExtractor are as 
>>> follows :
>>> - the result of extractTimestamp is taken and it replaces the internal 
>>> timestamp of the element
>>> - if the result of extractWatermark is larger than the last watermark the 
>>> new value is emitted as a watermark and the value is stored
>>> - getCurrentWatermark is called on the specified auto-watermark interval, 
>>> if the returned value is larger than the last watermark it is emitted and 
>>> stored as last watermark
>>>
>>> What this means in your case is that the watermark can only advance if a 
>>> new element arrives, because only then is the watermark updated.
>>>
>>> The reason why you see results if you use fromElements is that the 
>>> window-operator also emits all the windows that it currently has buffered 
>>> if the program closes. This happens in the case of fromElements because 
>>> only a finite number of elements is emitted, after which the source closes, 
>>> thereby finishing the whole program.
>>>
>>> Cheers,
>>> Aljoscha
 On 16 Nov 2015, at 10:42, Gyula Fóra  wrote:

 Could this part of the extractor be the problem Aljoscha?

 @Override
public long getCurrentWatermark() {
return Long.MIN_VALUE;
}

 Gyula

 Konstantin Knauf  ezt írta (időpont: 2015. 
 nov. 16., H, 10:39):
 Hi Aljoscha,

 thanks for your answer. Yes I am using the same TimestampExtractor-Class.

 The timestamps look good to me. Here an example.

 {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00

 The order now is

 stream
 .map(dummyMapper)
 .assignTimestamps(...)
 .timeWindow(...)

 Is there a way to print out the assigned timestamps after
 stream.assignTimestamps(...)?

 Cheers,

 Konstantin


 On 16.11.2015 10:31, Aljoscha Krettek wrote:
> Hi,
> are you also using the timestamp extractor when you are using 
> env.fromCollection().
>
> Could you maybe insert a dummy mapper after the Kafka source that just 
> prints the element and forwards it? To see if the elements 

Re: Creating a representative streaming workload

2015-11-16 Thread Vasiliki Kalavri
Hi,

thanks Nick and Ovidiu for the links!

Just to clarify, we're not looking into creating a generic streaming
benchmark. We have quite limited time and resources for this project. What
we want is to decide on a set of 3-4 _common_ streaming applications. To
give you an idea, for the batch workload, we will pick something like a
grep, one relational application, a graph algorithm, and an ML algorithm.

Cheers,
-Vasia.

On 16 November 2015 at 19:25, Ovidiu-Cristian MARCU <
ovidiu-cristian.ma...@inria.fr> wrote:

> Regarding Flink vs Spark / Storm you can check here:
> http://www.sparkbigdata.com/102-spark-blog-slim-baltagi/14-results-of-a-benchmark-between-apache-flink-and-apache-spark
>
> Best regards,
> Ovidiu
>
> On 16 Nov 2015, at 15:21, Vasiliki Kalavri 
> wrote:
>
> Hello squirrels,
>
> with some colleagues and students here at KTH, we have started 2 projects
> to evaluate (1) performance and (2) behavior in the presence of memory
> interference in cloud environments, for Flink and other systems. We want to
> provide our students with a workload of representative applications for
> testing.
>
> While for batch applications, it is quite clear to us what classes of
> applications are widely used and how to create a workload of different
> types of applications, we are not quite sure about the streaming workload.
>
> That's why, we'd like your opinions! If you're using Flink streaming in
> your company or your project, we'd love your input even more :-)
>
> What kind of applications would you consider as "representative" of a
> streaming workload? Have you run any experiments to evaluate Flink versus
> Spark, Storm etc.? If yes, would you mind sharing your code with us?
>
> We will of course be happy to share our results with everyone after we
> have completed our study.
>
> Thanks a lot!
> -Vasia.
>
>
>


Re: Error handling

2015-11-16 Thread Nick Dimiduk
>
> I have been thinking about this, maybe we can add a special output stream
> (for example Kafka, but can be generic) that would get errors/exceptions
> that where throws during processing. The actual processing would not stop
> and the messages in this special stream would contain some information
> about the current state of processing, the input element(s) and the
> machine/VM where computation is happening.
>

Yes, this is precisely what I have in mind. The goal is (1) to not lose
input data, and (2) to make errors available for operator visibility.

It's not very portable, but I was able to implement my Maybe type. I can now use it as the output of all my source streams,
and split those streams on the presence of the Throwable. With this, I'm
able to trap certain forms of invalid input and send it to an errors sink.
However, there are still some error cases that cause exceptions,
apparently, outside of my UDF try block that cause the whole streaming job
to terminate.


> > On 11 Nov 2015, at 21:49, Nick Dimiduk  wrote:
> >
> > Heya,
> >
> > I don't see a section in the online manual dedicated to this topic, so I
> want to raise the question here: How should errors be handled? Specifically
> I'm thinking about streaming jobs, which are expected to "never go down".
> For example, errors can be raised at the point where objects are serialized
> to/from sources/sinks, and UDFs. Cascading provides failure traps [0] where
> erroneous tuples are saved off for post-processing. Is there any such
> functionality in Flink?
> >
> > I started down the road of implementing a Maybe/Optional type, a POJO
> Generic triple of  for capturing errors at each stage
> of a pipeline. However, Java type erasure means even though it compiles,
> the job is rejected at submission time.
> >
> > How are other people handling errors in their stream processing?
> >
> > Thanks,
> > Nick
> >
> > [0]: http://docs.cascading.org/cascading/1.2/userguide/html/ch06s03.html
>
>


Re: Creating a representative streaming workload

2015-11-16 Thread Nick Dimiduk
All those should apply for streaming too...

On Mon, Nov 16, 2015 at 11:06 AM, Vasiliki Kalavri <
vasilikikala...@gmail.com> wrote:

> Hi,
>
> thanks Nick and Ovidiu for the links!
>
> Just to clarify, we're not looking into creating a generic streaming
> benchmark. We have quite limited time and resources for this project. What
> we want is to decide on a set of 3-4 _common_ streaming applications. To
> give you an idea, for the batch workload, we will pick something like a
> grep, one relational application, a graph algorithm, and an ML algorithm.
>
> Cheers,
> -Vasia.
>
> On 16 November 2015 at 19:25, Ovidiu-Cristian MARCU <
> ovidiu-cristian.ma...@inria.fr> wrote:
>
>> Regarding Flink vs Spark / Storm you can check here:
>> http://www.sparkbigdata.com/102-spark-blog-slim-baltagi/14-results-of-a-benchmark-between-apache-flink-and-apache-spark
>>
>> Best regards,
>> Ovidiu
>>
>> On 16 Nov 2015, at 15:21, Vasiliki Kalavri 
>> wrote:
>>
>> Hello squirrels,
>>
>> with some colleagues and students here at KTH, we have started 2 projects
>> to evaluate (1) performance and (2) behavior in the presence of memory
>> interference in cloud environments, for Flink and other systems. We want to
>> provide our students with a workload of representative applications for
>> testing.
>>
>> While for batch applications, it is quite clear to us what classes of
>> applications are widely used and how to create a workload of different
>> types of applications, we are not quite sure about the streaming workload.
>>
>> That's why, we'd like your opinions! If you're using Flink streaming in
>> your company or your project, we'd love your input even more :-)
>>
>> What kind of applications would you consider as "representative" of a
>> streaming workload? Have you run any experiments to evaluate Flink versus
>> Spark, Storm etc.? If yes, would you mind sharing your code with us?
>>
>> We will of course be happy to share our results with everyone after we
>> have completed our study.
>>
>> Thanks a lot!
>> -Vasia.
>>
>>
>>
>


Re: Different CoGroup behavior inside DeltaIteration

2015-11-16 Thread Stephan Ewen
It is actually very important that the co group in delta iterations works
like that.
If the CoGroup touched every element in the solution set, the "decreasing
work" effect would not happen.

The delta iterations are designed for cases where specific updates to the
solution are made, driven by the workset.
Driving an operator by solution set contents would result in a "bulk
iteration" style pattern, so the idea would be to use a proper bulk
iteration for those cases.

Does that make sense?



On Mon, Nov 16, 2015 at 10:54 PM, Fabian Hueske  wrote:

> Hi,
>
> this is an artifact of how the solution set is internally implemented.
> Usually, a CoGroup is executed using a sort-merge strategy, i.e., both
> input are sorted, merged, and handed to the CoGroup function in a streaming
> fashion. Both inputs are treated equally, and if one of both inputs does
> not contain a key which is contained in the other input, the CoGroup
> function is called with an empty iterator.
>
> The solution set of a delta iteration is stored in a hash table (with only
> one entry per key). When a solution set is coGrouped with another data set,
> the other input is sorted and probed against the hash table. The solution
> set iterator of the CoGroup function will contain one element if the hash
> table contains an element and be empty if the hash table doesn't contain an
> entry for the key. However, the hash table will not check that all elements
> of the hash table have been looked-up in order to identify elements of the
> solution set for which no corresponding element was present in the other
> data set.
>
> So, the CoGroup with a solution set works only in one direction as stated
> in the documentation. This is kind of intended by the way the solution set
> CoGroup is implemented, but we should definitely updated the documentation
> to cover this case!
>
> If you have a use case that requires a solution set CoGroup with the
> missing behavior you should open a JIRA issue.
> Otherwise it would be great if you could also open a JIRA issue to extend
> the documentation.
>
> Thank you, Fabian
>
> 2015-11-16 1:02 GMT+01:00 Truong Duc Kien :
>
>> Hi,
>>
>> When running CoGroup between the solution set and a different dataset
>> inside a DeltaIteration, the CoGroupFunction only get called for items
>> that exist in the other dataset, simillar to an inner join. This is not
>> the documented behavior for CoGroup:
>>
>> If a DataSet has a group with no matching key in the other DataSet,
>>> the CoGroupFunction is called with an empty group for the non-existing
>>> group.
>>>
>>
>> The following code shows the problem.
>>
>> import org.apache.flink.api.scala._
>> import org.apache.flink.util.Collector
>>
>> object CoGroupExample {
>>
>>   def coGroupFuntion(first: Iterator[(Int, Int)],
>>  second: Iterator[(Int, Int)],
>>  out: Collector[(Int, Int)]): Unit = {
>> if (second.hasNext) {
>>   out.collect(second.next)
>> } else {
>>   printf("Not in second set: %s\n", first.next)
>>   println("These two lines doesn't appear when " +
>> "running cogroup on solution set")
>> }
>>   }
>>
>>   def main(args: Array[String]): Unit = {
>> val env = ExecutionEnvironment.getExecutionEnvironment
>> env.getConfig.disableSysoutLogging()
>>
>> val d1 = env.fromElements(
>>   new Tuple2(1, 1),
>>   new Tuple2(2, 1) ,
>>   new Tuple2(3, 1)
>> )
>>
>> d1.iterateDelta(d1, 1, Array{0}) {
>>   (solutionSet, workSet) => {
>> val f = workSet.filter(_._1 != 1)
>> println("Cogroup on solution set with delta iteration")
>> val newSolutionSet = solutionSet.coGroup(f)
>>   .where(0)
>>   .equalTo(0)
>>   .apply(coGroupFuntion _)
>> (newSolutionSet, newSolutionSet)
>>   }
>> }.print()
>>
>> println("Normal cogroup")
>> val d2 = d1.filter(_._1 != 1)
>> d1.coGroup(d2).where(0).equalTo(0).apply(coGroupFuntion _).print()
>>   }
>> }
>>
>>
>>
>> Is this the expected behavior or should I file a bug about this ?
>>
>> Best regards,
>> Kien Truong
>>
>
>


Re: Different CoGroup behavior inside DeltaIteration

2015-11-16 Thread Duc Kien Truong

Hi,
Thanks for the suggestion. I'm trying to use the delta iteration so that I 
can
get the empty work set convergence criteria for free. But since doing an 
outer
join between the work set and the solution set is not possible using 
cogroup, I

will try to adapt my algorithm to use the bulk iteration.

Best, Kien Truong

Sent using CloudMagic Email
[https://cloudmagic.com/k/d/mailapp?ct=ta=8.0.55=5.1.1=email_footer_2] 
On Mon, Nov 16, 2015 at 11:02 PM, Stephan Ewen < se...@apache.org 
[se...@apache.org] > wrote:
It is actually very important that the co group in delta iterations works 
like
that. If the CoGroup touched every element in the solution set, the 
"decreasing work"

effect would not happen.

The delta iterations are designed for cases where specific updates to the
solution are made, driven by the workset. Driving an operator by solution 
set contents would result in a "bulk iteration"
style pattern, so the idea would be to use a proper bulk iteration for 
those

cases.

Does that make sense?


On Mon, Nov 16, 2015 at 10:54 PM, Fabian Hueske < fhue...@gmail.com 
[fhue...@gmail.com] > wrote:

Hi,

this is an artifact of how the solution set is internally implemented. 
Usually,
a CoGroup is executed using a sort-merge strategy, i.e., both input are 
sorted,
merged, and handed to the CoGroup function in a streaming fashion. Both 
inputs
are treated equally, and if one of both inputs does not contain a key which 
is

contained in the other input, the CoGroup function is called with an empty
iterator.

The solution set of a delta iteration is stored in a hash table (with only 
one

entry per key). When a solution set is coGrouped with another data set, the
other input is sorted and probed against the hash table. The solution set
iterator of the CoGroup function will contain one element if the hash table
contains an element and be empty if the hash table doesn't contain an entry 
for
the key. However, the hash table will not check that all elements of the 
hash
table have been looked-up in order to identify elements of the solution set 
for

which no corresponding element was present in the other data set.

So, the CoGroup with a solution set works only in one direction as stated 
in the
documentation. This is kind of intended by the way the solution set CoGroup 
is
implemented, but we should definitely updated the documentation to cover 
this

case!

If you have a use case that requires a solution set CoGroup with the 
missing

behavior you should open a JIRA issue.
Otherwise it would be great if you could also open a JIRA issue to extend 
the

documentation.

Thank you, Fabian

2015-11-16 1:02 GMT+01:00 Truong Duc Kien < duckientru...@gmail.com 
[duckientru...@gmail.com] > :

Hi,

When running CoGroup between the solution set and a different dataset
inside a DeltaIteration, the CoGroupFunction only get called for items
that exist in the other dataset, simillar to an inner join. This is not
the documented behavior for CoGroup:

If a DataSet has a group with no matching key in the other DataSet,
the CoGroupFunction is called with an empty group for the non-existing
group.

The following code shows the problem.

import org.apache.flink.api.scala._
import org.apache.flink.util.Collector

object CoGroupExample {

def coGroupFuntion(first: Iterator[(Int, Int)],
second: Iterator[(Int, Int)],
out: Collector[(Int, Int)]): Unit = {
if (second.hasNext) {
out.collect(second.next)
} else {
printf("Not in second set: %s\n", first.next)
println("These two lines doesn't appear when " +
"running cogroup on solution set")
}
}

def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
env.getConfig.disableSysoutLogging()

val d1 = env.fromElements(
new Tuple2(1, 1),
new Tuple2(2, 1) ,
new Tuple2(3, 1)
)

d1.iterateDelta(d1, 1, Array{0}) {
(solutionSet, workSet) => {
val f = workSet.filter(_._1 != 1)
println("Cogroup on solution set with delta iteration")
val newSolutionSet = solutionSet.coGroup(f)
.where(0)
.equalTo(0)
.apply(coGroupFuntion _)
(newSolutionSet, newSolutionSet)
}
}.print()

println("Normal cogroup")
val d2 = d1.filter(_._1 != 1)
d1.coGroup(d2).where(0).equalTo(0).apply(coGroupFuntion _).print()
}
}



Is this the expected behavior or should I file a bug about this ?

Best regards,
Kien Truong

Re: Apache Flink Operator State as Query Cache

2015-11-16 Thread Welly Tambunan
Hi Stephan,

So that will be in Flink 1.0 right ?

Cheers

On Mon, Nov 16, 2015 at 9:06 PM, Stephan Ewen  wrote:

> Hi Anwar!
>
> 0.10.0 was feature frozen at that time already and under testing.
> Key/value state on connected streams will have to go into the next
> release...
>
> Stephan
>
>
> On Mon, Nov 16, 2015 at 3:00 PM, Anwar Rizal  wrote:
>
>> Stephan,
>>
>> Having a look at the brand new 0.10 release, I noticed that OperatorState
>> is not implemented for ConnectedStream, which is quite the opposite of what
>> you said below.
>>
>> Or maybe I misunderstood your sentence here ?
>>
>> Thanks,
>> Anwar.
>>
>>
>> On Wed, Nov 11, 2015 at 10:49 AM, Stephan Ewen  wrote:
>>
>>> Hi!
>>>
>>> In general, if you can keep state in Flink, you get better
>>> throughput/latency/consistency and have one less system to worry about
>>> (external k/v store). State outside means that the Flink processes can be
>>> slimmer and need fewer resources and as such recover a bit faster. There
>>> are use cases for that as well.
>>>
>>> Storing the model in OperatorState is a good idea, if you can. On the
>>> roadmap is to migrate the operator state to managed memory as well, so that
>>> should take care of the GC issues.
>>>
>>> We are just adding functionality to make the Key/Value operator state
>>> usable in CoMap/CoFlatMap as well (currently it only works in windows and
>>> in Map/FlatMap/Filter functions over the KeyedStream).
>>> Until the, you should be able to use a simple Java HashMap and use the
>>> "Checkpointed" interface to get it persistent.
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Sun, Nov 8, 2015 at 10:11 AM, Welly Tambunan 
>>> wrote:
>>>
 Thanks for the answer.

 Currently the approach that i'm using right now is creating a
 base/marker interface to stream different type of message to the same
 operator. Not sure about the performance hit about this compare to the
 CoFlatMap function.

 Basically this one is providing query cache, so i'm thinking instead of
 using in memory cache like redis, ignite etc, i can just use operator state
 for this one.

 I just want to gauge do i need to use memory cache or operator state
 would be just fine.

 However i'm concern about the Gen 2 Garbage Collection for caching our
 own state without using operator state. Is there any clarification on that
 one ?



 On Sat, Nov 7, 2015 at 12:38 AM, Anwar Rizal 
 wrote:

>
> Let me understand your case better here. You have a stream of model
> and stream of data. To process the data, you will need a way to access 
> your
> model from the subsequent stream operations (map, filter, flatmap, ..).
> I'm not sure in which case Operator State is a good choice, but I
> think you can also live without.
>
> val modelStream =  // get the model stream
> val dataStream   =
>
> modelStream.broadcast.connect(dataStream). coFlatMap(  ) Then you can
> keep the latest model in a CoFlatMapRichFunction, not necessarily as
> Operator State, although maybe OperatorState is a good choice too.
>
> Does it make sense to you ?
>
> Anwar
>
> On Fri, Nov 6, 2015 at 10:21 AM, Welly Tambunan 
> wrote:
>
>> Hi All,
>>
>> We have a high density data that required a downsample. However this
>> downsample model is very flexible based on the client device and user
>> interaction. So it will be wasteful to precompute and store to db.
>>
>> So we want to use Apache Flink to do downsampling and cache the
>> result for subsequent query.
>>
>> We are considering using Flink Operator state for that one.
>>
>> Is that the right approach to use that for memory cache ? Or if that
>> preferable using memory cache like redis etc.
>>
>> Any comments will be appreciated.
>>
>>
>> Cheers
>> --
>> Welly Tambunan
>> Triplelands
>>
>> http://weltam.wordpress.com
>> http://www.triplelands.com 
>>
>
>


 --
 Welly Tambunan
 Triplelands

 http://weltam.wordpress.com
 http://www.triplelands.com 

>>>
>>>
>>
>


-- 
Welly Tambunan
Triplelands

http://weltam.wordpress.com
http://www.triplelands.com 


Re: Apache Flink 0.10.0 released

2015-11-16 Thread Welly Tambunan
Great Job guys,

So this is the first production ready for Streaming API !

Cool !

Cheers

On Mon, Nov 16, 2015 at 9:02 PM, Leonard Wolters  wrote:

> congrats!
>
> L.
>
>
> On 16-11-15 14:53, Fabian Hueske wrote:
>
> Hi everybody,
>
> The Flink community is excited to announce that Apache Flink 0.10.0 has
> been released.
> Please find the release announcement here:
>
> -->  http://flink.apache.org/news/2015/11/16/release-0.10.0.html
>
> Best,
> Fabian
>
>
> --
> Leonard Wolters
> Chief Product Manager
> *M*: +31 (0)6 55 53 04 01 | *T*: +31 (0)88 10 44 555
> *E*: leon...@sagent.io | *W*: sagent.io | Disclaimer
>  | Sagent BV
> Herengracht 504 | 1017CB Amsterdam | Netherlands
>



-- 
Welly Tambunan
Triplelands

http://weltam.wordpress.com
http://www.triplelands.com 


Re: Custom TimestampExtractor and FlinkKafkaConsumer082

2015-11-16 Thread Konstantin Knauf
Hi Aljoscha,

thanks for your answer. Yes I am using the same TimestampExtractor-Class.

The timestamps look good to me. Here an example.

{"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00

The order now is

stream
.map(dummyMapper)
.assignTimestamps(...)
.timeWindow(...)

Is there a way to print out the assigned timestamps after
stream.assignTimestamps(...)?

Cheers,

Konstantin


On 16.11.2015 10:31, Aljoscha Krettek wrote:
> Hi,
> are you also using the timestamp extractor when you are using 
> env.fromCollection().
> 
> Could you maybe insert a dummy mapper after the Kafka source that just prints 
> the element and forwards it? To see if the elements come with a good 
> timestamp from Kafka.
> 
> Cheers,
> Aljoscha
>> On 15 Nov 2015, at 22:55, Konstantin Knauf  
>> wrote:
>>
>> Hi everyone,
>>
>> I have the following issue with Flink (0.10) and Kafka.
>>
>> I am using a very simple TimestampExtractor like [1], which just
>> extracts a millis timestamp from a POJO. In my streaming job, I read in
>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
>>
>> stream = env.addSource(new FlinkKafkaConsumer082<
>> (parameterTool.getRequired("topic"),
>>new AvroPojoDeserializationSchema(),
>> parameterTool.getProperties()))
>>
>> I have timestampEnabled() and the TimeCharacteristics are EventTime,
>> AutoWatermarkIntervall is 500.
>>
>> The problem is, when I do something like:
>>
>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
>> .sum(..)
>> .print()
>>
>> env.execute();
>>
>> the windows never get triggered.
>>
>> If I use ProcessingTime it works.
>> If I use env.fromCollection(...) instead of the KafkaSource it works
>> with EventTime, too.
>>
>> Any ideas what I could be doing wrong are highly appreciated.
>>
>> Cheers,
>>
>> Konstantin
>>
>> [1]:
>>
>> public class PojoTimestampExtractor implements TimestampExtractor {
>>
>>final private long maxDelay;
>>
>>public  PojoTimestampExtractor(long maxDelay) {
>>this.maxDelay = maxDelay;
>>}
>>
>>@Override
>>public long extractTimestamp(Pojo fightEvent, long l) {
>>return pojo.getTime();
>>}
>>
>>@Override
>>public long extractWatermark(Pojo pojo, long l) {
>>return pojo.getTime() - maxDelay;
>>}
>>
>>@Override
>>public long getCurrentWatermark() {
>>return Long.MIN_VALUE;
>>}
> 
> 

-- 
Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082