Re: [SUGGESTION] Stack Overflow Documentation for Apache Flink

2016-09-05 Thread Vishnu Viswanath
Hi,

This is my understanding of SO Documentation:

1. Any one in the community can contribute, others can validate and improve
2. Users can request a topic
3. As Max mentioned, the documentation is more example oriented and it
doesn't have to be as comprehensive as the Flink's primary documentation.
(Yes there might be some overlap and duplication)
e.g., this is one of the documentation of apache-spark tag :
http://stackoverflow.com/documentation/apache-spark/833/introduction-to-apache-spark#t=201609060203001533016

Thanks,
Vishnu

On Mon, Sep 5, 2016 at 8:37 AM, Robert Metzger  wrote:

> It seems that the content on SO is licensed under cc by-sa 3.0 with
> attribution required
> The Apache Legal FAQ is not completely clear about that case
> http://www.apache.org/legal/resolved.html#cc-sa
>
> But if we want, we could at least ask the legal PMC if we can add some of
> the content from SO into the Flink documentation.
>
>
> On Mon, Sep 5, 2016 at 1:44 PM, Matthias J. Sax  wrote:
>
>> I voted. It's live now.
>>
>> The advance of SO documentation is also, that people not familiar with
>> Apache might do some documentation (but would never open a PR). Of
>> course, as community, we should put the focus on web page docs. But
>> having something additional can't hurt.
>>
>> From my experience, it is also good if certain aspects are describe by
>> different people and thus with different point of view. It ofter helps
>> users to understand better.
>>
>> Also reoccurring SO question can be handled nicely by SO documentation.
>>
>> -Matthias
>>
>> On 09/05/2016 01:25 PM, Till Rohrmann wrote:
>> > I've understood the SO documentation approach similar to what Max has
>> > said. I see it as source of code examples which illustrate Flink
>> > concepts and which is maintained by the SO community.
>> >
>> > On Mon, Sep 5, 2016 at 1:09 PM, Maximilian Michels > > > wrote:
>> >
>> > I thought it is not about outsourcing but about providing an
>> > example-based documentation on SO which can be easily edited by the
>> SO
>> > community. The work can be fed back to the official Flink
>> > documentation which will always be on flink.apache.org
>> > .
>> >
>> > On Mon, Sep 5, 2016 at 12:42 PM, Fabian Hueske > > > wrote:
>> > > Thanks for the suggestion Vishnu!
>> > > Stackoverflow documentation looks great. I like the easy
>> > contribution and
>> > > versioning features.
>> > >
>> > > However, I am a bit skeptical. IMO, Flink's primary documentation
>> > must be
>> > > hosted by Apache. Out-sourcing such an important aspect of a
>> > project to an
>> > > external service is not an option for me.
>> > > This would mean, that documentation on SO would be an additional /
>> > secondary
>> > > documentation. I see two potential problems with that:
>> > >
>> > > - It is duplicate effort to keep two documentations up-to-date.
>> > Adding a new
>> > > feature of changing some behavior must be documented in two
>> places.
>> > > - Efforts to improve documentation might split up, i.e., the
>> primary
>> > > documentation might receive less improvements and contributions.
>> > >
>> > > Of course, this is just my opinion but I think it is worth to
>> > mention these
>> > > points.
>> > >
>> > > Thanks,
>> > > Fabian
>> > >
>> > > 2016-09-05 12:22 GMT+02:00 Ravikumar Hawaldar
>> > > > il.com>>:
>> > >>
>> > >> Hi,
>> > >>
>> > >>
>> > >> I just committed to apache-flink documentation on SO, one more
>> commit
>> > >> required. Nice idea to document on SO Vishnu.
>> > >>
>> > >>
>> > >>
>> > >> Regards,
>> > >>
>> > >> Ravikumar
>> > >>
>> > >> On 5 September 2016 at 14:22, Maximilian Michels > > > wrote:
>> > >>>
>> > >>> Hi!
>> > >>>
>> > >>> This looks neat. Let's try it out. I just voted.
>> > >>>
>> > >>> Cheers,
>> > >>> Max
>> > >>>
>> > >>> On Sun, Sep 4, 2016 at 8:11 PM, Vishnu Viswanath
>> > >>> > > > wrote:
>> > >>> > Hi All,
>> > >>> >
>> > >>> > Why don't we make use of Stackoverflow's new documentation
>> > feature to
>> > >>> > do
>> > >>> > some documentation of Apache Flink.
>> > >>> >
>> > >>> > To start, at least 5 SO users should commit to document, who
>> > has at
>> > >>> > least150
>> > >>> > reputation and have at least 1 positively scored answer in
>> > Flink tag.
>> > >>> >
>> > >>> > http://stackoverflow.com/documentation/apache-flink
>> > 

Re: NoClassDefFoundError with ElasticsearchSink on Yarn

2016-09-05 Thread Steffen Hausmann

Thanks Aris for your explanation!

A guava version mismatch was indeed the problem. But in addition to 
shading the guava dependencies, I encountered another issue caused by 
conflicting files in META-INF/services:



RemoteTransportException[[Failed to deserialize response of type 
[org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse]]]; 
nested: TransportSerializationException[Failed to deserialize response of type 
[org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse]]; 
nested: ExceptionInInitializerError; nested: IllegalArgumentException[An SPI 
class of type org.apache.lucene.codecs.PostingsFormat with name 'Lucene50' does 
not exist.  You need to add the corresponding JAR file supporting this SPI to 
your classpath.  The current classpath supports the following names: [es090, 
completion090, XBloomFilter]];


By adding the following bits to my pom.xml file, the conflicting files 
are appended instead of overwritten and hence the ElasticSearch sink 
works as expected:




META-INF/services/org.apache.lucene.codecs.Codec



META-INF/services/org.apache.lucene.codecs.DocValuesFormat


   
META-INF/services/org.apache.lucene.codecs.PostingsFormat



Maybe this is something that can be added to the documentation?

Thanks,
Steffen

On 01/09/2016 12:22, aris kol wrote:

Classic problem of every uber-jar containing Hadoop dependencies and
being deployed on Yarn.

What actually happens is that some Hadoop dependency relies on an
old version of guava (11 in this case), which doesn't have the method.
You may have assembled your fat-jar properly, but because Hadoop deps
get introduced to your classpath before your own, you invoke the method
using the guava 11 version of the class.

I fixed that by adding this line:


++ Seq(assemblyShadeRules in assembly :=
Seq(ShadeRule.rename("com.google.common.**" -> "shaded.@1").inAll))

on the artefact that gets deployed on flink.

What this practically does is to shade the guava dependencies. Instead
of containing references to com.google.common your build will reference
shaded.com.google.common and as a result it will use the proper class in
your fat jar.
Get a bit creative with the name (ie use shadedhausmann instead of
shaded) to avoid colliding with external deps shading stuff (something
you have to do when using guava, joda, jackson etc).

Let me know if this helped.

Aris


*From:* Steffen Hausmann 
*Sent:* Thursday, September 1, 2016 8:58 AM
*To:* user@flink.apache.org
*Subject:* NoClassDefFoundError with ElasticsearchSink on Yarn

Hi there,

I’m running a flink program that reads from a Kinesis stream and
eventually writes to an Elasticsearch2 sink. When I’m running the
program locally from the IDE, everything seems to work fine, but when
I’m executing the same program on an EMR cluster with Yarn, a
NoClassDefFoundError occurs:java.lang.NoSuchMethodError:

com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
at
org.elasticsearch.threadpool.ThreadPool.(ThreadPool.java:190)
at
org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:133)
at
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)

I’ve installed flink on an EMR cluster from the binary distribution
flink-1.1.1-bin-hadoop27-scala_2.10.tgzand the jar file that is executed
on the cluster is build with mvn clean package(I’ve attached the pom.xml
for reference).

There is a thread on this list that seems to be related, but I’m afraid
I couldn’t draw any conclusions from it:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/classpath-issue-on-yarn-tt6442.html#none

Any idea, what’s wrong?

Thanks,
Steffen



Sharing Java Collections within Flink Cluster

2016-09-05 Thread Chakravarthy varaga
Hi Team,

I'm working on a Flink Streaming application. The data is injected
through Kafka connectors. The payload volume is roughly 100K/sec. The event
payload is a string. Let's call this as DataStream1.
This application also uses another DataStream, call it DataStream2,
(consumes events off a kafka topic). The elements of this DataStream2
involves in a certain transformation that finally updates a Hashmap(/Java
util Collection). Apparently the flink application should share this
HashMap across the flink cluster so that DataStream1 application could
check the state of the values in this collection. Is there a way to do this
in Flink?

I don't see any Shared Collection used within the cluster?

Best Regards
CVP


Re: emit a single Map<String, T> per window

2016-09-05 Thread Luis Mariano Guerra
On Mon, Sep 5, 2016 at 12:30 PM, Aljoscha Krettek 
wrote:

> Hi,
> for this you would have to use a non-parallel window, i.e. something like
> stream.windowAll().apply(...). This does not compute per key but
> has the drawback that computation does not happen in parallel. If you only
> use it to combine the pre-aggregated maps it could be OK, though.
>
> Cheers,
> Aljoscha
>

hi,

thanks for the tip, it works, I was aware of the non parallel nature of
what I want to do, after seeing it work I tried this:

input.flatMap(new LineSplitter()).keyBy(0)
.timeWindow(Time.of(5, TimeUnit.SECONDS))
.apply(new HashMap(), timeWindowFold,
timeWindowMerge)
.windowAll(TumblingEventTimeWindows.of(Time.of(5,
TimeUnit.SECONDS)))
.apply(new HashMap(), windowAllFold,
windowAllMerge);

and it seems to work, but it seems each timeWindowFold accumulates a single
key, I was expecting to have one or more keys per fold depending on in
which processing node the computation was being handled, I don't care if I
emit one event per key, but I want to know if it's ok and if I'm missing
something (maybe I have to add a line to specify partition?)


> On Fri, 2 Sep 2016 at 18:26 Luis Mariano Guerra 
> wrote:
>
>> On Fri, Sep 2, 2016 at 5:24 PM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi,
>>> from this I would expect to get as many HashMaps as you have keys. The
>>> winFunction is also executed per-key so it cannot combine the HashMaps of
>>> all keys.
>>>
>>> Does this describe the behavior that you're seeing?
>>>
>>
>> yes, it's the behaviour I'm seeing, I'm looking for a way to merge those
>> HashMaps from the same window into a single one, I can't find how.
>>
>>
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Fri, 2 Sep 2016 at 17:37 Luis Mariano Guerra <
>>> mari...@event-fabric.com> wrote:
>>>
 hi!

 I'm trying to collect some metrics by key per window and emiting the
 full result at the end of the window to kafka, I started with a simple
 count by key to test it but my requirements are a little more complex than
 that.

 what I want to do is to fold the stream events as they come and then at
 the end of the window merge them together and emit a singe result, I don't
 want to accumulate all the events and calculate at the end of the window,
 from my understanding of fold in other languages/libraries, this would be
 what I need, using it via apply(stateIn, foldFun, windowFun) but it's not
 working:

 the basic is:

 input
 .flatMap(new LineSplitter())
 .keyBy(0)
 .timeWindow(Time.of(5, TimeUnit.SECONDS))
 .apply(new HashMap(), foldFunction,
 winFunction);

 where foldFunction accumulates by key and winFunction iterate over the
 hasmaps and merges them into a single result hashmap and emits that one at
 the end.

 this emits many one-key hash maps instead of only one with all the
 keys, I tried setting setParallelism(1) in multiple places but still
 doesn't work. More confusingly, in one run it emited a single map but after
 I ran it again it went back to the previous behavior.

 what I'm doing wrong? is there any other approach?

 I can provide the implementation of foldFunction and winFunction if
 required but I think it doesn't change much.

 Reading the source code I see:

 Applies the given window function to each window. The window
 function is called for each evaluation of the window for each key
 individually. The output of the window function is interpreted as a regular
 non-windowed stream.

 emphasis on " for each key individually", the return type of apply is
 SingleOutputStreamOperator which doesn't provide many operations to group
 the emited values.

 thanks in advance.

>>>


Re: RawSchema as deserialization schema

2016-09-05 Thread Maximilian Michels
Just implement DeserializationSchema and return the byte array from
Kafka. Byte array serialization poses no problem to the Flink
serialization.

On Mon, Sep 5, 2016 at 3:50 PM, Swapnil Chougule
 wrote:
> I am using Kafka consumer in flink 1.1.1 with Kafka 0.8.2. I want to read
> byte array as it is in datastream. I tried to use RawSchema as
> desrialization schema but couldn't find same 1.1.1.
> I want to know whether I have to write my custom  implementation for same ?
> Can somebody help me to sort out same ?
>
> Also passing byte[] to next operator is supported as far as serialization is
> concerned ?


Re: Kafka SimpleStringConsumer NPE

2016-09-05 Thread Maximilian Michels
Your Kafka topic seems to contain null values. By default, Flink will
just forward null values to the DeserializationSchema which has to
take care of null values. The SimpleStringSchema doesn't do that and
fails with a NullPointerException. Thus, you need an additional check
in your DeserializationSchema to handle null values.

On Sun, Sep 4, 2016 at 2:46 PM, Tzu-Li (Gordon) Tai  wrote:
> Hi David,
>
> Is it possible that your Kafka installation is an older version than 0.9? Or
> you may have used a different Kafka client major version in your job jar's
> dependency?
> This seems like an odd incompatible protocol with the Kafka broker to me, as
> the client in the Kafka consumer is reading null record bytes.
>
> Regards,
> Gordon
>
>
> On September 4, 2016 at 7:17:04 AM, dbciar (da...@dbciar.co.uk) wrote:
>
> Hello Everyone,
>
> I was wondering if anyone could help shed light on where I have introduced
> an error into my code to get the following error:
>
> java.lang.NullPointerException
> at java.lang.String.(String.java:556)
> at
> org.apache.flink.streaming.util.serialization.SimpleStringSchema.deserialize(SimpleStringSchema.java:34)
> at
> org.apache.flink.streaming.util.serialization.SimpleStringSchema.deserialize(SimpleStringSchema.java:27)
> at
> org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:39)
> at
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:227)
> at java.lang.Thread.run(Thread.java:745)
>
> I get this error while running a job that connects to kafka from a local
> deployment. Could it be to do with how I'm packaging the Jar before
> uploading it to the cluster?
>
> The job plan is created and deployed OK via the management website, but as
> soon as data is added to Kafka I get the above and the job stops. Using
> Kafka's own console consumer script, I validated the kafka queue and the
> data looks exactly like the testing data I used when reading from local
> files.
>
> Any help as always appreciated,
> Cheers,
> David
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-SimpleStringConsumer-NPE-tp.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.


Re: Storing JPMML-Model Object as a Variable Closure?

2016-09-05 Thread Simone Robutti
>The only drawback is that the Evaluator gets initialized once per
Function-Instance.

I believe that reducing the number of functions instances is something that
should be handled by Flink's runtime and that's why I've implemented the
solution this way. In our tests the number of instances was minimal but
this is still extremely experimental so take it with a grain of salt.

I believe that this is highly dependent on the expected size of the PMML
models though.

2016-09-05 16:33 GMT+02:00 Bauss, Julian :

> Hi Simone,
>
>
>
> thank you for your feedback!
>
> The code snippet you provided works fine.
>
>
>
> The only drawback is that the Evaluator gets initialized once per
> Function-Instance.
>
> That could lead to high memory consumption depending on the level of
> parallelism
>
> and the size of the PMML-Model (which can get quite big).
>
>
>
> The „obvious“ optimization would be to initialize and hide the Evaluator
> behind a singleton since it
>
> is thread safe. (Which is what I wanted to avoid in the first place. But
> maybe that is the best solution
>
> at the moment?)
>
>
>
> Best Regards,
>
>
>
> Julian
>
>
>
> *Von:* Simone Robutti [mailto:simone.robu...@radicalbit.io]
> *Gesendet:* Montag, 5. September 2016 15:42
>
> *An:* user@flink.apache.org
> *Betreff:* Re: Storing JPMML-Model Object as a Variable Closure?
>
>
>
> Yes, sorry but it's private and I just discovered we don't want to release
> it as public yet. This piece of code could help you though:
> https://gist.github.com/chobeat/f07221357a2e3f9efa377e4cb0479f92
>
>
>
> Ignore all the stuff about the strategies. The important part is the
> `open` method and the transient var. This is used to load the PMML file and
> instance all the JPMML stuff when you instance the Flink operator. The
> variable `pmmlSource` is a string but you can replace that part with a load
> from HDFS or other FS if you want every node to load the .jpmml file in
> parallel and be in control of that part.
>
>
>
>
>
>
>
> 2016-09-05 15:24 GMT+02:00 Bauss, Julian :
>
> Hi Simone,
>
>
>
> that sounds promising!
>
> Unfortunately your link leads to a 404 page.
>
>
>
> Best Regards,
>
>
>
> Julian
>
>
>
> *Von:* Simone Robutti [mailto:simone.robu...@radicalbit.io]
> *Gesendet:* Montag, 5. September 2016 14:59
>
>
> *An:* user@flink.apache.org
> *Betreff:* Re: Storing JPMML-Model Object as a Variable Closure?
>
>
>
> I think you could make use of this small component I've developed:
> https://gitlab.com/chobeat/Flink-JPMML
>
>
>
> It's specifically for using JPMML on Flink. Maybe there's too much stuff
> for what you need but you could reuse the code of the Operator to do what
> you need.
>
>
>
> 2016-09-05 14:11 GMT+02:00 Bauss, Julian :
>
> Hi Stephan,
>
>
>
> thanks for your reply!
>
> It seems as if I can only use broadcast variables on DataSet-Operators
> (using myFunc.withBroadcastSet(…))
>
> Is that right?
>
>
>
> I am working on a DataStream, though. Do streams offer similiar
> functionality?
>
>
>
> Best Regards,
>
>
>
> Julian
>
>
>
> *Von:* Stephan Ewen [mailto:se...@apache.org]
> *Gesendet:* Freitag, 2. September 2016 15:27
> *An:* user@flink.apache.org
> *Betreff:* Re: Storing JPMML-Model Object as a Variable Closure?
>
>
>
> How about using a source and broadcast variable?
>
>
>
> You could write the model to the storage (DFS), the read it with a source
> and use a broadcast variable to send it to all tasks.
>
> A single record can be very large, so it should work even if your model is
> quite big.
>
>
>
> Does that sound feasible?
>
>
>
> In future versions of flink, you may be able to skip the "write to DFS"
> step and simply have the model in a collection source (when large RPC
> messages are supported).
>
>
>
> Best,
>
> Stephan
>
>
>
>
>
>
>
> On Fri, Sep 2, 2016 at 11:20 AM, Bauss, Julian 
> wrote:
>
> Hello Everybody,
>
>
>
> I’m currently refactoring some code and am looking for a better
> alternative to handle
>
> JPMML-Models in data streams. At the moment the flink job I’m working on
> references a model-object
>
> as a Singleton which I want to change because static references tend to
> cause problems in distributed systems.
>
>
>
> I thought about handing the model-object to the function that uses it as a
> variable closure. The object
>
> can be between 16MB and 250MB in size (depending on the depth of the
> decision tree).
>
>
>
> According to https://cwiki.apache.org/confluence/display/FLINK/
> Variables+Closures+vs.+Broadcast+Variables that’s way too large though.
>
> Are there any viable alternatives or would this be the „right way“ to
> handle this situation?
>
>
>
> Best Regards,
>
>
>
> Julian
>
>
> 
> **
>
> bonprix Handelsgesellschaft mbH
> Sitz der Gesellschaft: Hamburg
>
> Geschäftsführung:
> Dr. Marcus Ackermann 

AW: Storing JPMML-Model Object as a Variable Closure?

2016-09-05 Thread Bauss, Julian
Hi Simone,

thank you for your feedback!
The code snippet you provided works fine.

The only drawback is that the Evaluator gets initialized once per 
Function-Instance.
That could lead to high memory consumption depending on the level of parallelism
and the size of the PMML-Model (which can get quite big).

The „obvious“ optimization would be to initialize and hide the Evaluator behind 
a singleton since it
is thread safe. (Which is what I wanted to avoid in the first place. But maybe 
that is the best solution
at the moment?)

Best Regards,

Julian

Von: Simone Robutti [mailto:simone.robu...@radicalbit.io]
Gesendet: Montag, 5. September 2016 15:42
An: user@flink.apache.org
Betreff: Re: Storing JPMML-Model Object as a Variable Closure?

Yes, sorry but it's private and I just discovered we don't want to release it 
as public yet. This piece of code could help you though: 
https://gist.github.com/chobeat/f07221357a2e3f9efa377e4cb0479f92

Ignore all the stuff about the strategies. The important part is the `open` 
method and the transient var. This is used to load the PMML file and instance 
all the JPMML stuff when you instance the Flink operator. The variable 
`pmmlSource` is a string but you can replace that part with a load from HDFS or 
other FS if you want every node to load the .jpmml file in parallel and be in 
control of that part.



2016-09-05 15:24 GMT+02:00 Bauss, Julian 
>:
Hi Simone,

that sounds promising!
Unfortunately your link leads to a 404 page.

Best Regards,

Julian

Von: Simone Robutti 
[mailto:simone.robu...@radicalbit.io]
Gesendet: Montag, 5. September 2016 14:59

An: user@flink.apache.org
Betreff: Re: Storing JPMML-Model Object as a Variable Closure?

I think you could make use of this small component I've developed: 
https://gitlab.com/chobeat/Flink-JPMML

It's specifically for using JPMML on Flink. Maybe there's too much stuff for 
what you need but you could reuse the code of the Operator to do what you need.

2016-09-05 14:11 GMT+02:00 Bauss, Julian 
>:
Hi Stephan,

thanks for your reply!
It seems as if I can only use broadcast variables on DataSet-Operators (using 
myFunc.withBroadcastSet(…))
Is that right?

I am working on a DataStream, though. Do streams offer similiar functionality?

Best Regards,

Julian

Von: Stephan Ewen [mailto:se...@apache.org]
Gesendet: Freitag, 2. September 2016 15:27
An: user@flink.apache.org
Betreff: Re: Storing JPMML-Model Object as a Variable Closure?

How about using a source and broadcast variable?

You could write the model to the storage (DFS), the read it with a source and 
use a broadcast variable to send it to all tasks.
A single record can be very large, so it should work even if your model is 
quite big.

Does that sound feasible?

In future versions of flink, you may be able to skip the "write to DFS" step 
and simply have the model in a collection source (when large RPC messages are 
supported).

Best,
Stephan



On Fri, Sep 2, 2016 at 11:20 AM, Bauss, Julian 
> wrote:
Hello Everybody,

I’m currently refactoring some code and am looking for a better alternative to 
handle
JPMML-Models in data streams. At the moment the flink job I’m working on 
references a model-object
as a Singleton which I want to change because static references tend to cause 
problems in distributed systems.

I thought about handing the model-object to the function that uses it as a 
variable closure. The object
can be between 16MB and 250MB in size (depending on the depth of the decision 
tree).

According to 
https://cwiki.apache.org/confluence/display/FLINK/Variables+Closures+vs.+Broadcast+Variables
 that’s way too large though.
Are there any viable alternatives or would this be the „right way“ to handle 
this situation?

Best Regards,

Julian


**

bonprix Handelsgesellschaft mbH
Sitz der Gesellschaft: Hamburg

Geschäftsführung:
Dr. Marcus Ackermann (Vorsitzender)
Dr. Kai Heck
Rien Jansen
Markus Fuchshofen
Beiratsvorsitzender: Alexander Birken

Handelsregister AG Hamburg HR B 36 455

Adresse:

bonprix Handelsgesellschaft mbH

Haldesdorfer Str. 61
22179 Hamburg

Diese E-Mail enthält vertrauliche und/oder rechtlich geschützte Informationen.
Wenn Sie nicht der richtige Adressat sind oder diese E-Mail irrtümlich erhalten 
haben,
informieren Sie bitte sofort den Absender und vernichten Sie diese Mail.
Das unerlaubte Kopieren sowie die unbefugte Weitergabe dieser E-Mail ist nicht 
gestattet.

This e-mail may contain confidential and/or privileged information.
If you are not the intended recipient (or have received the e-mail in error)
please notify the sender 

Re: Firing windows multiple times

2016-09-05 Thread Aljoscha Krettek
I forgot to mention the FLIP that would basically provide the functionality
that we need (without handling of late elements):
https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata.
I just need to find some time to implement this or find someone who would
be wiling to implement it.

You're right, the "allowed lateness" feature was newly introduced in Flink
1.1. You're also mostly right right about the possibilities it opens up.
With the addition there are basically two knobs now that can be used to
tune the behavior of Flink when it comes to event-time, watermarks and
lateness. Having a bit of allowed lateness allows the watermark to be a bit
more aggressive in when it updates the time. If you don't allow any
lateness the watermark better be pretty close to correct, otherwise you
might lose data. I agree that this is not really intuitive for everyone and
I myself don't really know what would be good settings in production for
all cases.

How are you dealing with (or planning to deal with) elements that arrive
behind the watermark? Is it ok for you to completely drop them? I'm trying
to learn what the requirements of different folks are.

Best,
Aljoscha

On Fri, 2 Sep 2016 at 19:44 Shannon Carey  wrote:

> Of course! I really appreciate your interest & attention. I hope we will
> figure out solutions that other people can use.
>
> I agree with your analysis. Your triggering syntax is particularly nice. I
> wrote a custom trigger which does exactly that but without the nice fluent
> API. As I considered the approach you mentioned, it was clear that I would
> not be able to easily solve the problem of multiple windows with
> early-firing events causing over-counting. Modifying the windowing system
> as you describe would be helpful. Events could either be filtered out, as
> you describe, or perhaps the windows themselves could be muted/un-muted
> depending on whether they are the closest window (by end time) to the
> current watermark.
>
> I'm not clear on the purpose of the late firing you describe. I believe
> that was added in Flink 1.1 and it's a new concept to me. I thought late
> events were completely handled by decisions made in the watermark &
> timestamp assigner. Does this feature allow events after the watermark to
> still be incorporated into windows that have already been closed by a
> watermark? Perhaps it's intended to allow window-specific lateness
> allowance, rather than the stream-global watermarker? That does sound
> problematic. I assume there's a reason for closing the window before the
> allowed lateness has elapsed? Otherwise, the window (trigger, really) could
> just add the lateness to the watermark and pretend that the watermark
> hadn't been reached until the lateness had already passed.
>
> I agree that your idea is potentially a lot better than the approach I
> described, if it can be implemented! You are right that the approach I
> described requires that all the events be retained in the window state so
> that aggregation can be done repeatedly from the raw events as new events
> come in and old events are evicted. In practice, we are currently writing
> the first aggregations (day-level) to an external database and then
> querying that time-series from the second-level (year) aggregation so that
> we don't actually need to keep all that data around in Flink state.
> Obviously, that approach can have an impact on the processing guarantees
> when a failure/recovery occurs if we don't do it carefully. Also, we're not
> particularly sophisticated yet with regard to avoiding unnecessary queries
> to the time series data.
>
> -Shannon
>
>
> From: Aljoscha Krettek 
> Date: Friday, September 2, 2016 at 4:02 AM
>
> To: "user@flink.apache.org" 
> Subject: Re: Firing windows multiple times
>
> I see, I didn't forget about this, it's just that I'm thinking hard.
>
> I think in your case (which I imagine some other people to also have) we
> would need an addition to the windowing system that the original Google
> Dataflow paper called retractions. The problem is best explained with an
> example. Say you have this program:
>
> DataStream input = ...
>
> DataStream firstAggregate = input
>   .keyBy(...)
>   .window(TumblingTimeWindow(1 Day))
>
> .trigger(EventTime.afterEndOfWindow().withEarlyTrigger(Repeatedly.forever(ProcessingTime.afterFirstElement(Time.seconds(30)
>   .reduce(new SomeAggregate())
>
> DataStream secondAggregate = firstAggregate
>   .keyBy(...)
>   .window(TumblingTimeWindow(5 Days)
>
> .trigger(EventTime.afterEndOfWindow().withEarlyTrigger(Repeatedly.forever(ProcessingTime.afterFirstElement(Time.seconds(30)
>   .reduce(new SomeAggregate())
>
> The problem here is that the second windowing operation sees all the
> incremental early-firing updates from the first window operation, it would
> thus over count. This problem could be overcome by introducing meta data in

Re: Flink Iterations vs. While loop

2016-09-05 Thread Theodore Vasiloudis
Hello Dan,

are you broadcasting the 85GB of data then? I don't get why you wouldn't
store that file on HDFS so it's accessible by your workers.


If you have the full code available somewhere we might be able to help
better.

For L-BFGS you should only be broadcasting the model (i.e. the weight
vector), and yes that would happen at each iteration, since you are
updating the model at each iteration.

On Fri, Sep 2, 2016 at 5:30 PM, Dan Drewes 
wrote:

> Hi Greg,
>
> thanks for your response!
>
> I just had a look and realized that it's just about 85 GB of data. Sorry
> about that wrong information.
>
> It's read from a csv file on the master node's local file system. The 8
> nodes have more than 40 GB available memory each and since the data is
> equally distributed I assume there should be no need to spill anything on
> disk.
>
> There are 9 iterations.
>
> Is it possible that also with Flink Iterations the data is repeatedly
> distributed? Or the other way around: Might it be that flink "remembers"
> somehow that the data is already distributed even for the while loop?
>
> -Dan
>
>
>
> Am 02.09.2016 um 16:39 schrieb Greg Hogan:
>
> Hi Dan,
>
> Where are you reading the 200 GB "data" from? How much memory per node? If
> the DataSet is read from a distributed filesystem and if with iterations
> Flink must spill to disk then I wouldn't expect much difference. About how
> many iterations are run in the 30 minutes? I don't know that this is
> reported explicitly, but if your convergence function only has one input
> record per iteration then the reported total is the iteration count.
>
> One other thought, we should soon have support for object reuse with
> arrays (FLINK-3695). This would be implemented as DoubleValueArray or
> ValueArray rather than double[] but it would be interesting to
> test for a change in performance.
>
> Greg
>
> On Fri, Sep 2, 2016 at 6:16 AM, Dan Drewes 
> wrote:
>
>> Hi,
>>
>> for my bachelor thesis I'm testing an implementation of L-BFGS algorithm
>> with Flink Iterations against a version without Flink Iterations but a
>> casual while loop instead. Both programs use the same Map and Reduce
>> transformations in each iteration. It was expected, that the performance of
>> the Flink Iterations would scale better with increasing size of the input
>> data set. However, the measured results on an ibm-power-cluster are very
>> similar for both versions, e.g. around 30 minutes for 200 GB data. The
>> cluster has 8 nodes, was configured with 4 slots per node and I used a
>> total parallelism of 32.
>> In every Iteration of the while loop a new flink job is started and I
>> thought, that also the data would be distributed over the network again in
>> each iteration which should consume a significant and measurable amount of
>> time. Is that thought wrong or what is the computional overhead of the
>> flink iterations that is equalizing this disadvantage?
>> I include the relevant part of both programs and also attach the
>> generated execution plans.
>> Thank you for any ideas as I could not find much about this issue in the
>> flink docs.
>>
>> Best, Dan
>>
>> *Flink Iterations:*
>>
>> DataSet data = ...
>>
>> State state = initialState(m, initweights,0,new double[initweights.length]);
>> DataSet statedataset = env.fromElements(state);
>> //start of iteration sectionIterativeDataSet loop= 
>> statedataset.iterate(niter);;
>>
>>
>> DataSet statewithnewlossgradient = 
>> data.map(difffunction).withBroadcastSet(loop, "state")
>>   .reduce(accumulate)
>>   .map(new NormLossGradient(datasize))
>>   .map(new SetLossGradient()).withBroadcastSet(loop,"state")
>>   .map(new LBFGS());
>>
>>
>> DataSet converged = statewithnewlossgradient.filter(
>>new FilterFunction() {
>>   @Override  public boolean filter(State value) throws Exception {
>>  if(value.getIflag()[0] == 0){
>> return false;
>>  }
>>  return true;
>>   }
>>}
>> );
>>
>> DataSet finalstate = 
>> loop.closeWith(statewithnewlossgradient,converged);
>>
>> *While loop: *
>>
>> DataSet data =...
>> State state = initialState(m, initweights,0,new double[initweights.length]);
>> int cnt=0;do{
>>LBFGS lbfgs = new LBFGS();
>>statedataset=data.map(difffunction).withBroadcastSet(statedataset, 
>> "state")
>>   .reduce(accumulate)
>>   .map(new NormLossGradient(datasize))
>>   .map(new SetLossGradient()).withBroadcastSet(statedataset,"state")
>>   .map(lbfgs);
>>cnt++;
>> }while (cnt>
>>


RawSchema as deserialization schema

2016-09-05 Thread Swapnil Chougule
I am using Kafka consumer in flink 1.1.1 with Kafka 0.8.2. I want to read
byte array as it is in datastream. I tried to use RawSchema as
desrialization schema but couldn't find same 1.1.1.
I want to know whether I have to write my custom  implementation for same ?
Can somebody help me to sort out same ?

Also passing byte[] to next operator is supported as far as serialization
is concerned ?


Re: Storing JPMML-Model Object as a Variable Closure?

2016-09-05 Thread Simone Robutti
Yes, sorry but it's private and I just discovered we don't want to release
it as public yet. This piece of code could help you though:
https://gist.github.com/chobeat/f07221357a2e3f9efa377e4cb0479f92

Ignore all the stuff about the strategies. The important part is the `open`
method and the transient var. This is used to load the PMML file and
instance all the JPMML stuff when you instance the Flink operator. The
variable `pmmlSource` is a string but you can replace that part with a load
from HDFS or other FS if you want every node to load the .jpmml file in
parallel and be in control of that part.



2016-09-05 15:24 GMT+02:00 Bauss, Julian :

> Hi Simone,
>
>
>
> that sounds promising!
>
> Unfortunately your link leads to a 404 page.
>
>
>
> Best Regards,
>
>
>
> Julian
>
>
>
> *Von:* Simone Robutti [mailto:simone.robu...@radicalbit.io]
> *Gesendet:* Montag, 5. September 2016 14:59
>
> *An:* user@flink.apache.org
> *Betreff:* Re: Storing JPMML-Model Object as a Variable Closure?
>
>
>
> I think you could make use of this small component I've developed:
> https://gitlab.com/chobeat/Flink-JPMML
>
>
>
> It's specifically for using JPMML on Flink. Maybe there's too much stuff
> for what you need but you could reuse the code of the Operator to do what
> you need.
>
>
>
> 2016-09-05 14:11 GMT+02:00 Bauss, Julian :
>
> Hi Stephan,
>
>
>
> thanks for your reply!
>
> It seems as if I can only use broadcast variables on DataSet-Operators
> (using myFunc.withBroadcastSet(…))
>
> Is that right?
>
>
>
> I am working on a DataStream, though. Do streams offer similiar
> functionality?
>
>
>
> Best Regards,
>
>
>
> Julian
>
>
>
> *Von:* Stephan Ewen [mailto:se...@apache.org]
> *Gesendet:* Freitag, 2. September 2016 15:27
> *An:* user@flink.apache.org
> *Betreff:* Re: Storing JPMML-Model Object as a Variable Closure?
>
>
>
> How about using a source and broadcast variable?
>
>
>
> You could write the model to the storage (DFS), the read it with a source
> and use a broadcast variable to send it to all tasks.
>
> A single record can be very large, so it should work even if your model is
> quite big.
>
>
>
> Does that sound feasible?
>
>
>
> In future versions of flink, you may be able to skip the "write to DFS"
> step and simply have the model in a collection source (when large RPC
> messages are supported).
>
>
>
> Best,
>
> Stephan
>
>
>
>
>
>
>
> On Fri, Sep 2, 2016 at 11:20 AM, Bauss, Julian 
> wrote:
>
> Hello Everybody,
>
>
>
> I’m currently refactoring some code and am looking for a better
> alternative to handle
>
> JPMML-Models in data streams. At the moment the flink job I’m working on
> references a model-object
>
> as a Singleton which I want to change because static references tend to
> cause problems in distributed systems.
>
>
>
> I thought about handing the model-object to the function that uses it as a
> variable closure. The object
>
> can be between 16MB and 250MB in size (depending on the depth of the
> decision tree).
>
>
>
> According to https://cwiki.apache.org/confluence/display/FLINK/
> Variables+Closures+vs.+Broadcast+Variables that’s way too large though.
>
> Are there any viable alternatives or would this be the „right way“ to
> handle this situation?
>
>
>
> Best Regards,
>
>
>
> Julian
>
>
> 
> **
>
> bonprix Handelsgesellschaft mbH
> Sitz der Gesellschaft: Hamburg
>
> Geschäftsführung:
> Dr. Marcus Ackermann (Vorsitzender)
> Dr. Kai Heck
> Rien Jansen
> Markus Fuchshofen
> Beiratsvorsitzender: Alexander Birken
>
> Handelsregister AG Hamburg HR B 36 455
>
> Adresse:
>
> bonprix Handelsgesellschaft mbH
>
> Haldesdorfer Str. 61
> 22179 Hamburg
>
> Diese E-Mail enthält vertrauliche und/oder rechtlich geschützte
> Informationen.
> Wenn Sie nicht der richtige Adressat sind oder diese E-Mail irrtümlich
> erhalten haben,
> informieren Sie bitte sofort den Absender und vernichten Sie diese Mail.
> Das unerlaubte Kopieren sowie die unbefugte Weitergabe dieser E-Mail ist
> nicht gestattet.
>
> This e-mail may contain confidential and/or privileged information.
> If you are not the intended recipient (or have received the e-mail in
> error)
> please notify the sender immediately and delete this e-mail. Any
> unauthorized copying,
> disclosure or distribution of the material in this e-mail is strictly
> forbidden.
>
> 
> **
>
>
>
>
> 
> **
>
> bonprix Handelsgesellschaft mbH
> Sitz der Gesellschaft: Hamburg
>
> Geschäftsführung:
> Dr. Marcus Ackermann (Vorsitzender)
> Dr. Kai Heck
> Rien Jansen
> Markus Fuchshofen
> Beiratsvorsitzender: Alexander Birken
>
> Handelsregister AG Hamburg HR B 36 455
>
> Adresse:

Re: [SUGGESTION] Stack Overflow Documentation for Apache Flink

2016-09-05 Thread Matthias J. Sax
I voted. It's live now.

The advance of SO documentation is also, that people not familiar with
Apache might do some documentation (but would never open a PR). Of
course, as community, we should put the focus on web page docs. But
having something additional can't hurt.

From my experience, it is also good if certain aspects are describe by
different people and thus with different point of view. It ofter helps
users to understand better.

Also reoccurring SO question can be handled nicely by SO documentation.

-Matthias

On 09/05/2016 01:25 PM, Till Rohrmann wrote:
> I've understood the SO documentation approach similar to what Max has
> said. I see it as source of code examples which illustrate Flink
> concepts and which is maintained by the SO community.
> 
> On Mon, Sep 5, 2016 at 1:09 PM, Maximilian Michels  > wrote:
> 
> I thought it is not about outsourcing but about providing an
> example-based documentation on SO which can be easily edited by the SO
> community. The work can be fed back to the official Flink
> documentation which will always be on flink.apache.org
> .
> 
> On Mon, Sep 5, 2016 at 12:42 PM, Fabian Hueske  > wrote:
> > Thanks for the suggestion Vishnu!
> > Stackoverflow documentation looks great. I like the easy
> contribution and
> > versioning features.
> >
> > However, I am a bit skeptical. IMO, Flink's primary documentation
> must be
> > hosted by Apache. Out-sourcing such an important aspect of a
> project to an
> > external service is not an option for me.
> > This would mean, that documentation on SO would be an additional /
> secondary
> > documentation. I see two potential problems with that:
> >
> > - It is duplicate effort to keep two documentations up-to-date.
> Adding a new
> > feature of changing some behavior must be documented in two places.
> > - Efforts to improve documentation might split up, i.e., the primary
> > documentation might receive less improvements and contributions.
> >
> > Of course, this is just my opinion but I think it is worth to
> mention these
> > points.
> >
> > Thanks,
> > Fabian
> >
> > 2016-09-05 12:22 GMT+02:00 Ravikumar Hawaldar
> > >:
> >>
> >> Hi,
> >>
> >>
> >> I just committed to apache-flink documentation on SO, one more commit
> >> required. Nice idea to document on SO Vishnu.
> >>
> >>
> >>
> >> Regards,
> >>
> >> Ravikumar
> >>
> >> On 5 September 2016 at 14:22, Maximilian Michels  > wrote:
> >>>
> >>> Hi!
> >>>
> >>> This looks neat. Let's try it out. I just voted.
> >>>
> >>> Cheers,
> >>> Max
> >>>
> >>> On Sun, Sep 4, 2016 at 8:11 PM, Vishnu Viswanath
> >>>  > wrote:
> >>> > Hi All,
> >>> >
> >>> > Why don't we make use of Stackoverflow's new documentation
> feature to
> >>> > do
> >>> > some documentation of Apache Flink.
> >>> >
> >>> > To start, at least 5 SO users should commit to document, who
> has at
> >>> > least150
> >>> > reputation and have at least 1 positively scored answer in
> Flink tag.
> >>> >
> >>> > http://stackoverflow.com/documentation/apache-flink
> 
> >>> >
> >>> > Regards,
> >>> > Vishnu Viswanath
> >>
> >>
> >
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: [SUGGESTION] Stack Overflow Documentation for Apache Flink

2016-09-05 Thread Fabian Hueske
Thanks for the suggestion Vishnu!
Stackoverflow documentation looks great. I like the easy contribution and
versioning features.

However, I am a bit skeptical. IMO, Flink's primary documentation must be
hosted by Apache. Out-sourcing such an important aspect of a project to an
external service is not an option for me.
This would mean, that documentation on SO would be an additional /
secondary documentation. I see two potential problems with that:

- It is duplicate effort to keep two documentations up-to-date. Adding a
new feature of changing some behavior must be documented in two places.
- Efforts to improve documentation might split up, i.e., the primary
documentation might receive less improvements and contributions.

Of course, this is just my opinion but I think it is worth to mention these
points.

Thanks,
Fabian

2016-09-05 12:22 GMT+02:00 Ravikumar Hawaldar 
:

> Hi,
>
>
> I just committed to apache-flink documentation on SO, one more commit
> required. Nice idea to document on SO Vishnu.
>
>
>
> Regards,
>
> Ravikumar
>
> On 5 September 2016 at 14:22, Maximilian Michels  wrote:
>
>> Hi!
>>
>> This looks neat. Let's try it out. I just voted.
>>
>> Cheers,
>> Max
>>
>> On Sun, Sep 4, 2016 at 8:11 PM, Vishnu Viswanath
>>  wrote:
>> > Hi All,
>> >
>> > Why don't we make use of Stackoverflow's new documentation feature to do
>> > some documentation of Apache Flink.
>> >
>> > To start, at least 5 SO users should commit to document, who has at
>> least150
>> > reputation and have at least 1 positively scored answer in Flink tag.
>> >
>> > http://stackoverflow.com/documentation/apache-flink
>> >
>> > Regards,
>> > Vishnu Viswanath
>>
>
>


Re: checkpoints not removed on hdfs.

2016-09-05 Thread Aljoscha Krettek
Hi,
which version of Flink are you using? Are the checkpoints being reported as
successful in the Web Frontend, i.e. in the "checkpoints" tab of the
running job?

Cheers,
Aljoscha

On Fri, 2 Sep 2016 at 12:17 Dong-iL, Kim  wrote:

> Hi,
>
> I’m using HDFS as state backend.
> The checkpoints folder grows bigger every moments.
> What shall I do?
>
> Regards.


Re: Apache Flink: How does it handle the backpressure?

2016-09-05 Thread Aljoscha Krettek
That's true. The reason why it works in Flink is that a slow downstream
operator will back pressure an upstream operator which will then slow down.
The technical implementation of this relies on the fact that Flink uses a
bounded pool of network buffers. A sending operator writes data to network
buffers and they are free for reuse once the data was sent. If a downstream
operator is slow in processing received network buffers then the upstream
operator will block until more network buffers become available.

Cheers,
Aljoscha

On Fri, 2 Sep 2016 at 21:57 rss rss  wrote:

> Hi,
>
>   some time ago I found a problem with backpressure in Spark and prepared
> a simple test to check it and compare with Flink.
> https://github.com/rssdev10/spark-kafka-streaming
>
>
> +
> https://mail-archives.apache.org/mod_mbox/spark-user/201607.mbox/%3CCA+AWphp=2VsLrgSTWFFknw_KMbq88fZhKfvugoe4YYByEt7a=w...@mail.gmail.com%3E
>
> In case of Flink it works. In case of Spark it works if you setup
> limitations of input rates per data sources. See source code an example.
> And actually backpressure detector in Spark works very bad.
>
> Best regards
>
> 2016-09-02 15:07 GMT+03:00 jiecxy <253441...@qq.com>:
>
>> For an operator, the input stream is faster than its output stream, so its
>> input buffer will block the previous operator's output thread that
>> transfers
>> the data to this operator. Right?
>>
>> Do the Flink and the Spark both handle the backpressure by blocking the
>> thread? So what's the difference between them?
>>
>> For the data source, it is continuously producing the data, what if its
>> output thread is blocked? Would the buffer overflow?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Apache-Flink-How-does-it-handle-the-backpressure-tp8866.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>


Re: fromParallelCollection

2016-09-05 Thread Maximilian Michels
Please give us a bit more insight on what you're trying to do.

On Sat, Sep 3, 2016 at 5:01 AM,   wrote:
> Hi,
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>   val tr = env.fromParallelCollection(data)
>
> the data i do not know initialize,some one can tell me..
> 
>
>
>


Re: Flink Kafka more consumers than partitions

2016-09-05 Thread Maximilian Michels
Thanks for letting us know!

On Sat, Sep 3, 2016 at 12:42 PM, neo21 zerro  wrote:
> Hi all,
>
> It turns out that there were other factors influencing my performance tests.
> (actually hbase)
> Hence, more consumers than partitions in Flink was not the problem.
> Thanks for the help!
>
>
> On Wednesday, August 3, 2016 5:42 PM, neo21 zerro 
> wrote:
>
>
> Hello,
>
> I've tried to increase the network buffers but I didn't get any performance
> improvement.
> However, I have to re-run some tests just to be sure that the testing was
> not influenced by other factors.
> Will get back with more info.
> Thanks for the help for now.
>
>
> On Wednesday, August 3, 2016 12:58 PM, neo21 zerro 
> wrote:
>
>
> It's the default, ProcessingTime.
>
>
> On Wednesday, August 3, 2016 12:07 PM, Stephan Ewen 
> wrote:
>
>
> Hi!
>
> Are you running on ProcessingTime or on EventTime?
>
> Thanks,
> Stephan
>
>
> On Wed, Aug 3, 2016 at 11:57 AM, neo21 zerro  wrote:
>
> Hi guys,
>
> Thanks for getting back to me.
>
> So to clarify:
> Topology wise flink kafka source (does avro deserialization and small
> map) -> window operator which does batching for 3 seconds -> hbase sink
>
> Experiments:
>
> 1. flink source: parallelism 40 (20 idle tasks) -> window operator:
> parallelism 160 -> hbase sink: parallelism 160
> - roughly 10.000 requests/sec on hbase
> 2. flink source: parallelism 20 -> window operator: parallelism 160 -> hbase
> sink: parallelism 160
> - roughly 100.000 requests/sec on hbase (10x more)
>
> @Stephan as described below the parallelism of the sink was kept the same. I
> agree with you that there is nothing to backpressue on the source ;)
> However, my understanding right now is that only backpressure can be the
> explanation for this situation. Since we just change the source parallelism,
> other things like hbase parallelism  are kept the same.
>
> @Sameer all of those things are valid points. We make sure that we reduce
> the row locking by partitioning the data on the hbase sinks. We are just
> after why this limitation is happening. And since the same setup is used but
> just the source parallelism is changed I don't expect this to be a hbase
> issue.
>
> Thanks guys!
>
>
>
> On Wednesday, August 3, 2016 11:38 AM, Sameer Wadkar 
> wrote:
> What is the parallelism of the sink or the operator which writes to the
> sinks in the first case. HBase puts are constrained by the following:
> 1. How your regions are distributed. Are you pre-splitting your regions for
> the table. Do you know the number of regions your Hbase tables are split
> into.
> 2. Are all the sinks writing to all the regions. Meaning are you getting
> records in the sink operator which could potentially go to any region. This
> can become a big bottleneck if you have 40 sinks talking to all regions. I
> pre-split my regions based on key salting and use custom partitioning to
> ensure each sink operator writes to only a few regions and my performance
> shot up by several orders.
> 3. You are probably doing this but adding puts in batches helps in general
> but having each batch contain puts for too many regions hurts.
>
> If the source parallelism is the same as the parallelism of other operators
> the 40 sinks communicating to all regions might be a problem. When you go
> down to 20 sinks you actually might be getting better performance due to
> lesser resource contention on HBase.
>
> Sent from my iPhone
>
>
>> On Aug 3, 2016, at 4:14 AM, neo21 zerro  wrote:
>>
>> Hello everybody,
>>
>> I'm using Flink Kafka consumer 0.8.x with kafka 0.8.2 and flink 1.0.3 on
>> YARN.
>> In kafka I have a topic which have 20 partitions and my flink topology
>> reads from kafka (source) and writes to hbase (sink).
>>
>> when:
>> 1. flink source has parallelism set to 40 (20 of the tasks are idle) I
>> see 10.000 requests/sec on hbase
>> 2. flink source has parallelism set to 20 (exact number of partitions)
>> I see 100. requests/sec on hbase (so a 10x improvement)
>>
>>
>> It's clear that hbase is not the limiting factor in my topology.
>> Assumption: Flink backpressure mechanism kicks in in the 1. case more
>> aggressively and it's limiting the ingestion of tuples in the topology.
>>
>> The question: In the first case, why are those 20 sources which are
>> sitting idle contributing so much to the backpressure?
>>
>>
>> Thanks guys!
>
>
>
>
>
>
>
>


Re: [SUGGESTION] Stack Overflow Documentation for Apache Flink

2016-09-05 Thread Maximilian Michels
Hi!

This looks neat. Let's try it out. I just voted.

Cheers,
Max

On Sun, Sep 4, 2016 at 8:11 PM, Vishnu Viswanath
 wrote:
> Hi All,
>
> Why don't we make use of Stackoverflow's new documentation feature to do
> some documentation of Apache Flink.
>
> To start, at least 5 SO users should commit to document, who has at least150
> reputation and have at least 1 positively scored answer in Flink tag.
>
> http://stackoverflow.com/documentation/apache-flink
>
> Regards,
> Vishnu Viswanath