Kafka+Flink

2016-01-27 Thread Vinaya M S
Hi,

I have a 3 node kafka cluster. In server.properties file of each of them
I'm setting
 advertisedhost.name: to its IP.

I have 4 node flink cluster. In each of kaka's node I'm setting
host.name: to one of link's worker node.

like,

in kafka1:
host.name:  flink-data1IP

in kafka2:
host.name: flink-data2 IP

in kafka3:
host.name: flink-data3 IP

My question is,
In order to connect to kafka how should my conf/producer.yaml look like?

kafka.brokers: ??

zookeeper.servers: ??

kafka.port: 9092
zookeeper.port: 2181

kafka.topic: ""
kafka.partitions: 1

Thanks,
Vinaya


Re: rowmatrix equivalent

2016-01-27 Thread Chiwan Park
There is a JIRA issue (FLINK-1873, [1]) that covers the distributed matrix 
implementation.

[1]: https://issues.apache.org/jira/browse/FLINK-1873

Regards,
Chiwan Park

> On Jan 27, 2016, at 5:21 PM, Chiwan Park  wrote:
> 
> I hope the distributed matrix and vector implementation on Flink. :)
> 
> Regards,
> Chiwan Park
> 
>> On Jan 27, 2016, at 2:29 AM, Lydia Ickler  wrote:
>> 
>> Hi Till,
>> 
>> maybe I will do that :) 
>> If I have some other questions I will let you know!
>> 
>> Best regards,
>> Lydia
>> 
>> 
>>> Am 24.01.2016 um 17:33 schrieb Till Rohrmann :
>>> 
>>> Hi Lydia,
>>> 
>>> Flink does not come with a distributed matrix implementation as Spark does 
>>> it with the RowMatrix, yet. However, you can easily implement it yourself. 
>>> This would also be a good contribution to the project if you want to tackle 
>>> the problem
>>> 
>>> Cheers,
>>> Till
>>> 
>>> On Sun, Jan 24, 2016 at 4:03 PM, Lydia Ickler  
>>> wrote:
>>> Hi all,
>>> 
>>> this is maybe a stupid question but what within Flink is the equivalent to 
>>> Sparks’ RowMatrix ?
>>> 
>>> Thanks in advance,
>>> Lydia
>>> 
>> 
> 



about blob.storage.dir and .buffer files

2016-01-27 Thread Gwenhael Pasquiers
Hello,

We got a question about blob.storage.dir and it’s .buffer files :

What are they ? And are they cleaned or is there a way to limit their size and 
to evaluate the necessary space ?
We got a node root volume disk filled by those files (~20GB) and it crashed.

Well, the root was filled because we changed the path to /tmp_flink and it was 
on the root filesystem, our bad. We changed it in an urge because the default 
path (/tmp) was being periodically cleaned by the OS and that made flink crash.

B.R.



Re: continous time triger

2016-01-27 Thread Brian Chhun
Hi Aljoscha,

No problem with the change. I think it's more what a user would expect as
well.

On Wed, Jan 27, 2016 at 3:27 AM, Aljoscha Krettek 
wrote:

> Hi Brian,
> you are right about changing the behavior of windows when closing. Would
> this be a problem for you?
>
> Cheers,
> Aljoscha
> > On 26 Jan 2016, at 17:53, Radu Tudoran  wrote:
> >
> > Hi,
> >
> > Thank you for sharing your experience and also to Till for the advice.
> > What I would like to do is to be able to fire the window potentially
> multiple times, even if an event did not arrive. I will look more about how
> dealing with the processing time could help in this
> >
> > Dr. Radu Tudoran
> > Research Engineer - Big Data Expert
> > IT R Division
> >
> > 
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > European Research Center
> > Riesstrasse 25, 80992 München
> >
> > E-mail: radu.tudo...@huawei.com
> > Mobile: +49 15209084330
> > Telephone: +49 891588344173
> >
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> > This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any way
> (including, but not limited to, total or partial disclosure, reproduction,
> or dissemination) by persons other than the intended recipient(s) is
> prohibited. If you receive this e-mail in error, please notify the sender
> by phone or email immediately and delete it!
> >
> > From: Brian Chhun [mailto:brian.ch...@getbraintree.com]
> > Sent: Tuesday, January 26, 2016 5:28 PM
> > To: user@flink.apache.org
> > Subject: Re: continous time triger
> >
> > For what it's worth, we have a trigger that fires once a day for a
> recurring calculation. When an element comes in, we set the trigger
> context's processing time timer to the exact millisecond of the desired
> time. The predefined triggers were useful to look at to achieve this:
> https://github.com/apache/flink/tree/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers
> >
> > Some things I discovered along the way, particularly using processing
> time, which may be useful:
> > - registering a time that's already passed will cause the timer callback
> to be called
> > - when the system shuts down, the window is fired even though the
> trigger has not gone off (this sounds subject to change though)
> >
> > On Tue, Jan 26, 2016 at 3:47 AM, Till Rohrmann 
> wrote:
> > Hi Radu,
> >
> > you can register processing and event time time triggers using the
> TriggerContext which is given to the onElement, onProcessingTime and
> onEventTime methods of Trigger. In case you register a processing time
> timer, the onProcessingTime method will be called once the system clock has
> passed the timer time. In case of an event time timer, the
> onEventTimemethod is called once a watermark has been received which has a
> higher watermark than the timer.
> >
> > I hope this helps you to solve your problem.
> >
> > Cheers,
> > Till
> >
> > ​
> >
> > On Mon, Jan 25, 2016 at 9:25 PM, Radu Tudoran 
> wrote:
> > Re-Hi,
> >
> > I have another question regarding the triggering of the processing of a
> window. Can this be done in some way at specific time intervals,
> independent of whether  an event has been received or not, via a trigger?
> >
> > The reason why I am considering a trigger rather than timeWindow(All) is
> that timeWindow will end up generating multiple windows and duplicating
> data, while having the option from the trigger to actually fire the
> processing at certain times, independent of when the events arrived) would
> enable to operate with a single window.
> >
> > Regards,
> >
> > Dr. Radu Tudoran
> > Research Engineer - Big Data Expert
> > IT R Division
> >
> > 
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > European Research Center
> > Riesstrasse 25, 80992 München
> >
> > E-mail: radu.tudo...@huawei.com
> > Mobile: +49 15209084330
> > Telephone: +49 891588344173
> >
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> > This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any 

Mixing Batch & Streaming

2016-01-27 Thread Don Frascuchon
Hi everyone,

There is any way of mixing dataStreams and dataSets ? For example, enrich
messages from a dataStream with a precalculated info in a dataSet.

Thanks in advance!


Maven artifacts scala 2.11 bug?

2016-01-27 Thread David Kim
Hello again,

I saw the recent change to flink 1.0-SNAPSHOT on explicitly adding the
scala version to the suffix.

I have a sbt project that fails. I don't believe it's a misconfiguration
error on my end because I do see in the logs that it tries to resolve
everything with _2.11.

Could this possibly be a bug on the flink build pipeline for these new
names?

Here's the error with the resolve logs

[info] downloading
https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-scala_2.11/1.0-SNAPSHOT/flink-scala_2.11-1.0-20160127.155017-227.jar
...
[info]  [SUCCESSFUL ]
org.apache.flink#flink-scala_2.11;1.0-SNAPSHOT!flink-scala_2.11.jar (4733ms)
[info] downloading
https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-clients_2.11/1.0-SNAPSHOT/flink-clients_2.11-1.0-20160127.154808-232.jar
...
[info]  [SUCCESSFUL ]
org.apache.flink#flink-clients_2.11;1.0-SNAPSHOT!flink-clients_2.11.jar
(3677ms)
[info] downloading
https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-streaming-scala_2.11/1.0-SNAPSHOT/flink-streaming-scala_2.11-1.0-20160127.155621-189.jar
...
[info]  [SUCCESSFUL ]
org.apache.flink#flink-streaming-scala_2.11;1.0-SNAPSHOT!flink-streaming-scala_2.11.jar
(3832ms)
[info] downloading
https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-connector-kafka-0.8_2.11/1.0-SNAPSHOT/flink-connector-kafka-0.8_2.11-1.0-20160127.155721-25.jar
...
[info]  [SUCCESSFUL ]
org.apache.flink#flink-connector-kafka-0.8_2.11;1.0-SNAPSHOT!flink-connector-kafka-0.8_2.11.jar
(3422ms)
[info] downloading
https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-core/1.0-SNAPSHOT/flink-core-1.0-20160127.154317-233.jar
...
[info]  [SUCCESSFUL ]
org.apache.flink#flink-core;1.0-SNAPSHOT!flink-core.jar (3624ms)
[info] downloading
https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-core/1.0-SNAPSHOT/flink-core-1.0-20160127.154317-233-tests.jar
...
[info]  [SUCCESSFUL ]
org.apache.flink#flink-core;1.0-SNAPSHOT!flink-core.jar(test-jar) (2376ms)
[info] downloading
https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-java/1.0-SNAPSHOT/flink-java-1.0-20160127.154408-233.jar
...
[info]  [SUCCESSFUL ]
org.apache.flink#flink-java;1.0-SNAPSHOT!flink-java.jar (3164ms)
[info] downloading
https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-optimizer_2.11/1.0-SNAPSHOT/flink-optimizer_2.11-1.0-20160127.154739-232.jar
...
[info]  [SUCCESSFUL ]
org.apache.flink#flink-optimizer_2.11;1.0-SNAPSHOT!flink-optimizer_2.11.jar
(4014ms)
[info] downloading
https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-annotations/1.0-SNAPSHOT/flink-annotations-1.0-20160127.153952-64.jar
...
[info]  [SUCCESSFUL ]
org.apache.flink#flink-annotations;1.0-SNAPSHOT!flink-annotations.jar
(1511ms)
[info] downloading
https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-shaded-hadoop2/1.0-SNAPSHOT/flink-shaded-hadoop2-1.0-20160127.154029-239.jar
...
[info]  [SUCCESSFUL ]
org.apache.flink#flink-shaded-hadoop2;1.0-SNAPSHOT!flink-shaded-hadoop2.jar
(7671ms)
[info] downloading
https://jcenter.bintray.com/javax/servlet/servlet-api/2.5/servlet-api-2.5.jar
...
[info]  [SUCCESSFUL ] javax.servlet#servlet-api;2.5!servlet-api.jar (433ms)
[info] downloading
https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-runtime_2.11/1.0-SNAPSHOT/flink-runtime_2.11-1.0-20160127.154708-232.jar
...
[info]  [SUCCESSFUL ]
org.apache.flink#flink-runtime_2.11;1.0-SNAPSHOT!flink-runtime_2.11.jar
(4392ms)
[info] downloading
https://jcenter.bintray.com/io/netty/netty-all/4.0.27.Final/netty-all-4.0.27.Final.jar
...
[info]  [SUCCESSFUL ] io.netty#netty-all;4.0.27.Final!netty-all.jar (6098ms)
[info] downloading
https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-streaming-java_2.11/1.0-SNAPSHOT/flink-streaming-java_2.11-1.0-20160127.155106-226.jar
...
[info]  [SUCCESSFUL ]
org.apache.flink#flink-streaming-java_2.11;1.0-SNAPSHOT!flink-streaming-java_2.11.jar
(3794ms)
[info] downloading
https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-connector-kafka-base_2.11/1.0-SNAPSHOT/flink-connector-kafka-base_2.11-1.0-20160127.155702-25.jar
...
[info]  [SUCCESSFUL ]
org.apache.flink#flink-connector-kafka-base_2.11;1.0-SNAPSHOT!flink-connector-kafka-base_2.11.jar
(3608ms)
[info] downloading
https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-shaded-curator-recipes/1.0-SNAPSHOT/flink-shaded-curator-recipes-1.0-20160127.154128-236.jar
...
[info]  [SUCCESSFUL ]
org.apache.flink#flink-shaded-curator-recipes;1.0-SNAPSHOT!flink-shaded-curator-recipes.jar
(3803ms)
[info] downloading

Re: Maven artifacts scala 2.11 bug?

2016-01-27 Thread Robert Metzger
Hi David,

can you post your SBT build file as well?

On Wed, Jan 27, 2016 at 7:52 PM, David Kim 
wrote:

> Hello again,
>
> I saw the recent change to flink 1.0-SNAPSHOT on explicitly adding the
> scala version to the suffix.
>
> I have a sbt project that fails. I don't believe it's a misconfiguration
> error on my end because I do see in the logs that it tries to resolve
> everything with _2.11.
>
> Could this possibly be a bug on the flink build pipeline for these new
> names?
>
> Here's the error with the resolve logs
>
> [info] downloading
> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-scala_2.11/1.0-SNAPSHOT/flink-scala_2.11-1.0-20160127.155017-227.jar
> ...
> [info]  [SUCCESSFUL ]
> org.apache.flink#flink-scala_2.11;1.0-SNAPSHOT!flink-scala_2.11.jar (4733ms)
> [info] downloading
> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-clients_2.11/1.0-SNAPSHOT/flink-clients_2.11-1.0-20160127.154808-232.jar
> ...
> [info]  [SUCCESSFUL ]
> org.apache.flink#flink-clients_2.11;1.0-SNAPSHOT!flink-clients_2.11.jar
> (3677ms)
> [info] downloading
> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-streaming-scala_2.11/1.0-SNAPSHOT/flink-streaming-scala_2.11-1.0-20160127.155621-189.jar
> ...
> [info]  [SUCCESSFUL ]
> org.apache.flink#flink-streaming-scala_2.11;1.0-SNAPSHOT!flink-streaming-scala_2.11.jar
> (3832ms)
> [info] downloading
> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-connector-kafka-0.8_2.11/1.0-SNAPSHOT/flink-connector-kafka-0.8_2.11-1.0-20160127.155721-25.jar
> ...
> [info]  [SUCCESSFUL ]
> org.apache.flink#flink-connector-kafka-0.8_2.11;1.0-SNAPSHOT!flink-connector-kafka-0.8_2.11.jar
> (3422ms)
> [info] downloading
> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-core/1.0-SNAPSHOT/flink-core-1.0-20160127.154317-233.jar
> ...
> [info]  [SUCCESSFUL ]
> org.apache.flink#flink-core;1.0-SNAPSHOT!flink-core.jar (3624ms)
> [info] downloading
> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-core/1.0-SNAPSHOT/flink-core-1.0-20160127.154317-233-tests.jar
> ...
> [info]  [SUCCESSFUL ]
> org.apache.flink#flink-core;1.0-SNAPSHOT!flink-core.jar(test-jar) (2376ms)
> [info] downloading
> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-java/1.0-SNAPSHOT/flink-java-1.0-20160127.154408-233.jar
> ...
> [info]  [SUCCESSFUL ]
> org.apache.flink#flink-java;1.0-SNAPSHOT!flink-java.jar (3164ms)
> [info] downloading
> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-optimizer_2.11/1.0-SNAPSHOT/flink-optimizer_2.11-1.0-20160127.154739-232.jar
> ...
> [info]  [SUCCESSFUL ]
> org.apache.flink#flink-optimizer_2.11;1.0-SNAPSHOT!flink-optimizer_2.11.jar
> (4014ms)
> [info] downloading
> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-annotations/1.0-SNAPSHOT/flink-annotations-1.0-20160127.153952-64.jar
> ...
> [info]  [SUCCESSFUL ]
> org.apache.flink#flink-annotations;1.0-SNAPSHOT!flink-annotations.jar
> (1511ms)
> [info] downloading
> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-shaded-hadoop2/1.0-SNAPSHOT/flink-shaded-hadoop2-1.0-20160127.154029-239.jar
> ...
> [info]  [SUCCESSFUL ]
> org.apache.flink#flink-shaded-hadoop2;1.0-SNAPSHOT!flink-shaded-hadoop2.jar
> (7671ms)
> [info] downloading
> https://jcenter.bintray.com/javax/servlet/servlet-api/2.5/servlet-api-2.5.jar
> ...
> [info]  [SUCCESSFUL ] javax.servlet#servlet-api;2.5!servlet-api.jar (433ms)
> [info] downloading
> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-runtime_2.11/1.0-SNAPSHOT/flink-runtime_2.11-1.0-20160127.154708-232.jar
> ...
> [info]  [SUCCESSFUL ]
> org.apache.flink#flink-runtime_2.11;1.0-SNAPSHOT!flink-runtime_2.11.jar
> (4392ms)
> [info] downloading
> https://jcenter.bintray.com/io/netty/netty-all/4.0.27.Final/netty-all-4.0.27.Final.jar
> ...
> [info]  [SUCCESSFUL ] io.netty#netty-all;4.0.27.Final!netty-all.jar
> (6098ms)
> [info] downloading
> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-streaming-java_2.11/1.0-SNAPSHOT/flink-streaming-java_2.11-1.0-20160127.155106-226.jar
> ...
> [info]  [SUCCESSFUL ]
> org.apache.flink#flink-streaming-java_2.11;1.0-SNAPSHOT!flink-streaming-java_2.11.jar
> (3794ms)
> [info] downloading
> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-connector-kafka-base_2.11/1.0-SNAPSHOT/flink-connector-kafka-base_2.11-1.0-20160127.155702-25.jar
> ...
> [info]  [SUCCESSFUL ]
> org.apache.flink#flink-connector-kafka-base_2.11;1.0-SNAPSHOT!flink-connector-kafka-base_2.11.jar
> (3608ms)
> [info] downloading
> 

Reading ORC format on Flink

2016-01-27 Thread Philip Lee
Hello,

Question about reading ORC format on Flink.

I want to use dataset after loadtesting csv to orc format by Hive.
Can Flink support reading ORC format?

If so, please let me know how to use the dataset in Flink.

Best,
Phil


Re: Maven artifacts scala 2.11 bug?

2016-01-27 Thread Stephan Ewen
Good to hear!

Sorry for the hassle you have to go through. There is a lot of
restructuring to make it clean for 1.0.

Greetings,
Stephan


On Wed, Jan 27, 2016 at 9:16 PM, David Kim 
wrote:

> Hi Stephan, Robert,
>
> Yes, I found a solution. Turns out that I shouldn't specify a suffix for
> flink-core. I changed flink-core to not have any suffix.
>
> "org.apache.flink" %% "flink-core" % flinkVersion % "it,test" classifier "
> tests",
> "org.apache.flink" % "flink-core" % flinkVersion % "it,test" classifier "
> tests"
>
>
> Once I did that I was able to resolve and build. Thanks for the help!
>
> Cheers,
> David
>
> On Wed, Jan 27, 2016 at 2:12 PM, Stephan Ewen  wrote:
>
>> Hi David!
>>
>> The dependencies that SBT marks as wrong 
>> (org.apache.flink:flink-shaded-hadoop2,
>> org.apache.flink:flink-core, org.apache.flink:flink-annotations) are
>> actually those that are Scala-independent, and have no suffix at all.
>>
>> It is possible your SBT file does not like miking dependencies with and
>> without suffix?
>>
>> Greetings,
>> Stephan
>>
>>
>>
>> On Wed, Jan 27, 2016 at 8:40 PM, David Kim <
>> david@braintreepayments.com> wrote:
>>
>>> Hi Robert,
>>>
>>> Here's the relevant snippet for my sbt config.
>>>
>>>
>>> My dependencies are listed in a file called Dependencies.scala.
>>>
>>> object Dependencies {
>>>
>>>   val flinkVersion = "1.0-SNAPSHOT"
>>>
>>>   val flinkDependencies = Seq(
>>> "org.apache.flink" %% "flink-scala" % flinkVersion,
>>> "org.apache.flink" %% "flink-clients" % flinkVersion,
>>> "org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
>>> "org.apache.flink" %% "flink-connector-kafka-0.8" % flinkVersion
>>>   )
>>>
>>>   val testDependencies = Seq(
>>> "org.apache.flink" %% "flink-core" % flinkVersion % "it,test" 
>>> classifier "tests",
>>> "org.apache.flink" %% "flink-test-utils" % flinkVersion % "it,test",
>>> "org.apache.flink" %% "flink-streaming-contrib" % flinkVersion % 
>>> "it,test",
>>> "org.scalatest" %% "scalatest" % "2.2.4" % "it,test",
>>> "org.scalacheck" %% "scalacheck" % "1.12.5" % "it,test",
>>> "org.scalamock" %% "scalamock-scalatest-support" % "3.2" % "it,test",
>>> "net.manub" %% "scalatest-embedded-kafka" % "0.4.1" % "it,test"
>>>   )
>>>
>>>
>>> My project settings are in a file called MyBuild.scala
>>>
>>> object MyBuild extends Build {
>>>   override lazy val settings = super.settings ++ Seq(
>>> scalaVersion := "2.11.7",
>>> scalacOptions += "-target:jvm-1.8",
>>> javacOptions ++= Seq("-source", "1.8", "-target", "1.8")
>>>   )
>>>
>>> Thanks,
>>> David
>>>
>>> On Wed, Jan 27, 2016 at 1:30 PM, Robert Metzger 
>>> wrote:
>>>
 Hi David,

 can you post your SBT build file as well?

 On Wed, Jan 27, 2016 at 7:52 PM, David Kim <
 david@braintreepayments.com> wrote:

> Hello again,
>
> I saw the recent change to flink 1.0-SNAPSHOT on explicitly adding the
> scala version to the suffix.
>
> I have a sbt project that fails. I don't believe it's a
> misconfiguration error on my end because I do see in the logs that it 
> tries
> to resolve everything with _2.11.
>
> Could this possibly be a bug on the flink build pipeline for these new
> names?
>
> Here's the error with the resolve logs
>
> [info] downloading
> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-scala_2.11/1.0-SNAPSHOT/flink-scala_2.11-1.0-20160127.155017-227.jar
> ...
> [info]  [SUCCESSFUL ]
> org.apache.flink#flink-scala_2.11;1.0-SNAPSHOT!flink-scala_2.11.jar 
> (4733ms)
> [info] downloading
> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-clients_2.11/1.0-SNAPSHOT/flink-clients_2.11-1.0-20160127.154808-232.jar
> ...
> [info]  [SUCCESSFUL ]
> org.apache.flink#flink-clients_2.11;1.0-SNAPSHOT!flink-clients_2.11.jar
> (3677ms)
> [info] downloading
> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-streaming-scala_2.11/1.0-SNAPSHOT/flink-streaming-scala_2.11-1.0-20160127.155621-189.jar
> ...
> [info]  [SUCCESSFUL ]
> org.apache.flink#flink-streaming-scala_2.11;1.0-SNAPSHOT!flink-streaming-scala_2.11.jar
> (3832ms)
> [info] downloading
> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-connector-kafka-0.8_2.11/1.0-SNAPSHOT/flink-connector-kafka-0.8_2.11-1.0-20160127.155721-25.jar
> ...
> [info]  [SUCCESSFUL ]
> org.apache.flink#flink-connector-kafka-0.8_2.11;1.0-SNAPSHOT!flink-connector-kafka-0.8_2.11.jar
> (3422ms)
> [info] downloading
> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-core/1.0-SNAPSHOT/flink-core-1.0-20160127.154317-233.jar
> ...
> [info]  [SUCCESSFUL ]
> 

Re: Maven artifacts scala 2.11 bug?

2016-01-27 Thread David Kim
Hi Stephan, Robert,

Yes, I found a solution. Turns out that I shouldn't specify a suffix for
flink-core. I changed flink-core to not have any suffix.

"org.apache.flink" %% "flink-core" % flinkVersion % "it,test" classifier "
tests",
"org.apache.flink" % "flink-core" % flinkVersion % "it,test" classifier "
tests"


Once I did that I was able to resolve and build. Thanks for the help!

Cheers,
David

On Wed, Jan 27, 2016 at 2:12 PM, Stephan Ewen  wrote:

> Hi David!
>
> The dependencies that SBT marks as wrong 
> (org.apache.flink:flink-shaded-hadoop2,
> org.apache.flink:flink-core, org.apache.flink:flink-annotations) are
> actually those that are Scala-independent, and have no suffix at all.
>
> It is possible your SBT file does not like miking dependencies with and
> without suffix?
>
> Greetings,
> Stephan
>
>
>
> On Wed, Jan 27, 2016 at 8:40 PM, David Kim <
> david@braintreepayments.com> wrote:
>
>> Hi Robert,
>>
>> Here's the relevant snippet for my sbt config.
>>
>>
>> My dependencies are listed in a file called Dependencies.scala.
>>
>> object Dependencies {
>>
>>   val flinkVersion = "1.0-SNAPSHOT"
>>
>>   val flinkDependencies = Seq(
>> "org.apache.flink" %% "flink-scala" % flinkVersion,
>> "org.apache.flink" %% "flink-clients" % flinkVersion,
>> "org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
>> "org.apache.flink" %% "flink-connector-kafka-0.8" % flinkVersion
>>   )
>>
>>   val testDependencies = Seq(
>> "org.apache.flink" %% "flink-core" % flinkVersion % "it,test" classifier 
>> "tests",
>> "org.apache.flink" %% "flink-test-utils" % flinkVersion % "it,test",
>> "org.apache.flink" %% "flink-streaming-contrib" % flinkVersion % 
>> "it,test",
>> "org.scalatest" %% "scalatest" % "2.2.4" % "it,test",
>> "org.scalacheck" %% "scalacheck" % "1.12.5" % "it,test",
>> "org.scalamock" %% "scalamock-scalatest-support" % "3.2" % "it,test",
>> "net.manub" %% "scalatest-embedded-kafka" % "0.4.1" % "it,test"
>>   )
>>
>>
>> My project settings are in a file called MyBuild.scala
>>
>> object MyBuild extends Build {
>>   override lazy val settings = super.settings ++ Seq(
>> scalaVersion := "2.11.7",
>> scalacOptions += "-target:jvm-1.8",
>> javacOptions ++= Seq("-source", "1.8", "-target", "1.8")
>>   )
>>
>> Thanks,
>> David
>>
>> On Wed, Jan 27, 2016 at 1:30 PM, Robert Metzger 
>> wrote:
>>
>>> Hi David,
>>>
>>> can you post your SBT build file as well?
>>>
>>> On Wed, Jan 27, 2016 at 7:52 PM, David Kim <
>>> david@braintreepayments.com> wrote:
>>>
 Hello again,

 I saw the recent change to flink 1.0-SNAPSHOT on explicitly adding the
 scala version to the suffix.

 I have a sbt project that fails. I don't believe it's a
 misconfiguration error on my end because I do see in the logs that it tries
 to resolve everything with _2.11.

 Could this possibly be a bug on the flink build pipeline for these new
 names?

 Here's the error with the resolve logs

 [info] downloading
 https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-scala_2.11/1.0-SNAPSHOT/flink-scala_2.11-1.0-20160127.155017-227.jar
 ...
 [info]  [SUCCESSFUL ]
 org.apache.flink#flink-scala_2.11;1.0-SNAPSHOT!flink-scala_2.11.jar 
 (4733ms)
 [info] downloading
 https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-clients_2.11/1.0-SNAPSHOT/flink-clients_2.11-1.0-20160127.154808-232.jar
 ...
 [info]  [SUCCESSFUL ]
 org.apache.flink#flink-clients_2.11;1.0-SNAPSHOT!flink-clients_2.11.jar
 (3677ms)
 [info] downloading
 https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-streaming-scala_2.11/1.0-SNAPSHOT/flink-streaming-scala_2.11-1.0-20160127.155621-189.jar
 ...
 [info]  [SUCCESSFUL ]
 org.apache.flink#flink-streaming-scala_2.11;1.0-SNAPSHOT!flink-streaming-scala_2.11.jar
 (3832ms)
 [info] downloading
 https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-connector-kafka-0.8_2.11/1.0-SNAPSHOT/flink-connector-kafka-0.8_2.11-1.0-20160127.155721-25.jar
 ...
 [info]  [SUCCESSFUL ]
 org.apache.flink#flink-connector-kafka-0.8_2.11;1.0-SNAPSHOT!flink-connector-kafka-0.8_2.11.jar
 (3422ms)
 [info] downloading
 https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-core/1.0-SNAPSHOT/flink-core-1.0-20160127.154317-233.jar
 ...
 [info]  [SUCCESSFUL ]
 org.apache.flink#flink-core;1.0-SNAPSHOT!flink-core.jar (3624ms)
 [info] downloading
 https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-core/1.0-SNAPSHOT/flink-core-1.0-20160127.154317-233-tests.jar
 ...
 [info]  [SUCCESSFUL ]
 org.apache.flink#flink-core;1.0-SNAPSHOT!flink-core.jar(test-jar) (2376ms)
 [info] downloading
 

Flume source support

2016-01-27 Thread Alexandr Dzhagriev
Hello,

At the moment master branch contains the commented class FlumeSource.
Unfortunately, I can't find any information regarding the future support of
it. Can anyone, please, shed the light on it?

Thanks, Alex.


Re: Maven artifacts scala 2.11 bug?

2016-01-27 Thread David Kim
Hi Robert,

Here's the relevant snippet for my sbt config.


My dependencies are listed in a file called Dependencies.scala.

object Dependencies {

  val flinkVersion = "1.0-SNAPSHOT"

  val flinkDependencies = Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion,
"org.apache.flink" %% "flink-clients" % flinkVersion,
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
"org.apache.flink" %% "flink-connector-kafka-0.8" % flinkVersion
  )

  val testDependencies = Seq(
"org.apache.flink" %% "flink-core" % flinkVersion % "it,test"
classifier "tests",
"org.apache.flink" %% "flink-test-utils" % flinkVersion % "it,test",
"org.apache.flink" %% "flink-streaming-contrib" % flinkVersion % "it,test",
"org.scalatest" %% "scalatest" % "2.2.4" % "it,test",
"org.scalacheck" %% "scalacheck" % "1.12.5" % "it,test",
"org.scalamock" %% "scalamock-scalatest-support" % "3.2" % "it,test",
"net.manub" %% "scalatest-embedded-kafka" % "0.4.1" % "it,test"
  )


My project settings are in a file called MyBuild.scala

object MyBuild extends Build {
  override lazy val settings = super.settings ++ Seq(
scalaVersion := "2.11.7",
scalacOptions += "-target:jvm-1.8",
javacOptions ++= Seq("-source", "1.8", "-target", "1.8")
  )

Thanks,
David

On Wed, Jan 27, 2016 at 1:30 PM, Robert Metzger  wrote:

> Hi David,
>
> can you post your SBT build file as well?
>
> On Wed, Jan 27, 2016 at 7:52 PM, David Kim <
> david@braintreepayments.com> wrote:
>
>> Hello again,
>>
>> I saw the recent change to flink 1.0-SNAPSHOT on explicitly adding the
>> scala version to the suffix.
>>
>> I have a sbt project that fails. I don't believe it's a misconfiguration
>> error on my end because I do see in the logs that it tries to resolve
>> everything with _2.11.
>>
>> Could this possibly be a bug on the flink build pipeline for these new
>> names?
>>
>> Here's the error with the resolve logs
>>
>> [info] downloading
>> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-scala_2.11/1.0-SNAPSHOT/flink-scala_2.11-1.0-20160127.155017-227.jar
>> ...
>> [info]  [SUCCESSFUL ]
>> org.apache.flink#flink-scala_2.11;1.0-SNAPSHOT!flink-scala_2.11.jar (4733ms)
>> [info] downloading
>> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-clients_2.11/1.0-SNAPSHOT/flink-clients_2.11-1.0-20160127.154808-232.jar
>> ...
>> [info]  [SUCCESSFUL ]
>> org.apache.flink#flink-clients_2.11;1.0-SNAPSHOT!flink-clients_2.11.jar
>> (3677ms)
>> [info] downloading
>> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-streaming-scala_2.11/1.0-SNAPSHOT/flink-streaming-scala_2.11-1.0-20160127.155621-189.jar
>> ...
>> [info]  [SUCCESSFUL ]
>> org.apache.flink#flink-streaming-scala_2.11;1.0-SNAPSHOT!flink-streaming-scala_2.11.jar
>> (3832ms)
>> [info] downloading
>> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-connector-kafka-0.8_2.11/1.0-SNAPSHOT/flink-connector-kafka-0.8_2.11-1.0-20160127.155721-25.jar
>> ...
>> [info]  [SUCCESSFUL ]
>> org.apache.flink#flink-connector-kafka-0.8_2.11;1.0-SNAPSHOT!flink-connector-kafka-0.8_2.11.jar
>> (3422ms)
>> [info] downloading
>> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-core/1.0-SNAPSHOT/flink-core-1.0-20160127.154317-233.jar
>> ...
>> [info]  [SUCCESSFUL ]
>> org.apache.flink#flink-core;1.0-SNAPSHOT!flink-core.jar (3624ms)
>> [info] downloading
>> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-core/1.0-SNAPSHOT/flink-core-1.0-20160127.154317-233-tests.jar
>> ...
>> [info]  [SUCCESSFUL ]
>> org.apache.flink#flink-core;1.0-SNAPSHOT!flink-core.jar(test-jar) (2376ms)
>> [info] downloading
>> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-java/1.0-SNAPSHOT/flink-java-1.0-20160127.154408-233.jar
>> ...
>> [info]  [SUCCESSFUL ]
>> org.apache.flink#flink-java;1.0-SNAPSHOT!flink-java.jar (3164ms)
>> [info] downloading
>> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-optimizer_2.11/1.0-SNAPSHOT/flink-optimizer_2.11-1.0-20160127.154739-232.jar
>> ...
>> [info]  [SUCCESSFUL ]
>> org.apache.flink#flink-optimizer_2.11;1.0-SNAPSHOT!flink-optimizer_2.11.jar
>> (4014ms)
>> [info] downloading
>> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-annotations/1.0-SNAPSHOT/flink-annotations-1.0-20160127.153952-64.jar
>> ...
>> [info]  [SUCCESSFUL ]
>> org.apache.flink#flink-annotations;1.0-SNAPSHOT!flink-annotations.jar
>> (1511ms)
>> [info] downloading
>> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-shaded-hadoop2/1.0-SNAPSHOT/flink-shaded-hadoop2-1.0-20160127.154029-239.jar
>> ...
>> [info]  [SUCCESSFUL ]
>> org.apache.flink#flink-shaded-hadoop2;1.0-SNAPSHOT!flink-shaded-hadoop2.jar
>> (7671ms)
>> [info] downloading
>> 

Re: Imbalanced workload between workers

2016-01-27 Thread Pieter Hameete
Hi Till,

i've upgraded to Flink 0.10.1 and ran the job again without any changes to
the code to see the bytes input and output of the operators and for the
different workers.To my surprise it is very well balanced between all
workers and because of this the job completed much faster.

Are there any changes/fixes between Flink 0.9.1 and 0.10.1 that could cause
this to be better for me now?

Thanks,

Pieter

2016-01-27 14:10 GMT+01:00 Pieter Hameete :

>
> Cheers for the quick reply Till.
>
> That would be very useful information to have! I'll upgrade my project to
> Flink 0.10.1 tongiht and let you know if I can find out if theres a skew in
> the data :-)
>
> - Pieter
>
>
> 2016-01-27 13:49 GMT+01:00 Till Rohrmann :
>
>> Could it be that your data is skewed? This could lead to different loads
>> on different task managers.
>>
>> With the latest Flink version, the web interface should show you how many
>> bytes each operator has written and received. There you could see if one
>> operator receives more elements than the others.
>>
>> Cheers,
>> Till
>>
>> On Wed, Jan 27, 2016 at 1:35 PM, Pieter Hameete 
>> wrote:
>>
>>> Hi guys,
>>>
>>> Currently I am running a job in the GCloud in a configuration with 4
>>> task managers that each have 4 CPUs (for a total parallelism of 16).
>>>
>>> However, I noticed my job is running much slower than expected and after
>>> some more investigation I found that one of the workers is doing a majority
>>> of the work (its CPU load was at 100% while the others were almost idle).
>>>
>>> My job execution plan can be found here: http://i.imgur.com/fHKhVFf.png
>>>
>>> The input is split into multiple files so loading the data is properly
>>> distributed over the workers.
>>>
>>> I am wondering if you can provide me with some tips on how to figure out
>>> what is going wrong here:
>>>
>>>- Could this imbalance in workload be the result of an imbalance in
>>>the hash paritioning?
>>>- Is there a convenient way to see how many elements each worker
>>>   gets to process? Would it work to write the output of the CoGroup to 
>>> disk
>>>   because each worker writes to its own output file and investigate the
>>>   differences?
>>>- Is there something strange about the execution plan that could
>>>cause this?
>>>
>>> Thanks and kind regards,
>>>
>>> Pieter
>>>
>>
>>
>


Re: Streaming left outer join

2016-01-27 Thread Stephan Ewen
Hi!

I think this pull request may be implementing what you are looking for:
https://github.com/apache/flink/pull/1527

Stephan


On Wed, Jan 27, 2016 at 2:06 PM, Alexander Gryzlov 
wrote:

> Hello Aljoscha,
>
> Indeed, it seems like I'd need a custom operator. I imagine this involves
> implementing org.apache.flink.streaming.api.operators.TwoInputStreamOperator? 
> Could
> you provide those pointers please?
>
> Alex
>
> On Wed, Jan 27, 2016 at 12:03 PM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>> I’m afraid there is currently now way to do what you want with the
>> builtin window primitives. Each of the slices of the sliding windows is
>> essentially evaluated independently. Therefore, there cannot be effects in
>> one slice that influence processing of another slice.
>>
>> What you could do is switch to tumbling windows, then each element would
>> only be in one window. That probably won’t fit your use case anymore. The
>> alternative I see to that is to implement everything in a custom operator
>> where you deal with window states and triggering on time yourself. Let me
>> know if you need some pointers about that one.
>>
>> Cheers,
>> Aljoscha
>> > On 26 Jan 2016, at 19:32, Alexander Gryzlov 
>> wrote:
>> >
>> > Hello,
>> >
>> > I'm trying to implement a left outer join of two Kafka streams within a
>> sliding window. So far I have the following code:
>> >
>> > foos
>> >   .coGroup(bars)
>> >   .where(_.baz).equalTo(_.baz)
>> >   .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.MINUTES),
>> Time.of(1, TimeUnit.SECONDS)))
>> >   .apply((fs: Iterator[Foo], bs: Iterator[Bar], o: Collector[FooBar]) =>
>> >fs.foreach(f =>
>> > if (bs.isEmpty)
>> >   o.collect(FooBar(f, None))
>> > else
>> >   bs.foreach(b => o.collect(FooBar(f, Some(b
>> >)
>> >   )
>> >
>> > However, this results in the pair being emitted from every window
>> slide, regardless of the match. The desired behaviour would be:
>> > * emit the the match as soon as it's found, don't emit any more pairs
>> for it,
>> > * otherwise, emit the empty match, when the left side element leaves
>> the last of its windows
>> >
>> > What would be the idiomatic/efficient way to implement such behaviour?
>> Is it possible at all with the coGroup/window mechanism, or some other way
>> is necessary?
>> >
>> > Alex
>>
>>
>


Re: Reading ORC format on Flink

2016-01-27 Thread Chiwan Park
Hi Phil,

I think that you can read ORC file using OrcInputFormat [1] with readHadoopFile 
method.

There is an example on MapReduce [2] in Stackoveflow. The approach works also 
on Flink. Maybe you have to use RichMapFunction [3] to initialize OrcSerde and 
StructObjectInspector object.

Regards,
Chiwan Park

[1]: 
https://hive.apache.org/javadocs/r0.13.1/api/ql/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.html
[2]: 
http://stackoverflow.com/questions/22673222/how-do-you-use-orcfile-input-output-format-in-mapreduce
[3]: 
https://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java/org/apache/flink/api/common/functions/RichMapFunction.html

> On Jan 28, 2016, at 4:44 AM, Philip Lee  wrote:
> 
> Hello, 
> 
> Question about reading ORC format on Flink.
> 
> I want to use dataset after loadtesting csv to orc format by Hive.
> Can Flink support reading ORC format?
> 
> If so, please let me know how to use the dataset in Flink.
> 
> Best,
> Phil
> 
> 
> 
> 



Re: [NOTICE] Maven artifacts names now suffixed with Scala version

2016-01-27 Thread Ufuk Celebi
Thanks for the notice. I’ve added a warning to the snapshot docs and created a 
Wiki page with the changes: 
https://cwiki.apache.org/confluence/display/FLINK/Maven+artifact+names+suffixed+with+Scala+version

– Ufuk

> On 27 Jan 2016, at 12:20, Maximilian Michels  wrote:
> 
> Dear users and developers,
> 
> We have merged changes [1] that will affect how you build Flink
> programs with the latest snapshot version of Flink and with future
> releases. Maven artifacts which depend on Scala are now suffixed with
> the Scala major version, e.g. "2.10" or "2.11".
> 
> While some of the Maven modules are Scala-free, e.g. "flink-java" or
> "flink-core", most of the artifacts now carry the default Scala 2.10
> suffix, e.g. "flink-streaming-java_2.10" or "flink-clients_2.10". This
> way of suffixing artifact names is common practice and inevitable to
> avoid Scala version conflicts [2].
> 
> Please note that you have to update artifact names in your project pom
> if you work with 1.0-SNAPSHOT. For example, if you use
> "flink-streaming-java", please update the dependency as follows:
> 
>
>org.apache.flink
>flink-streaming-java_2.10
>1.0-SNAPSHOT
>
> 
> The documentation has also been updated to indicate the suffixes where 
> required.
> 
> Best,
> Max
> 
> [1] https://issues.apache.org/jira/browse/FLINK-2933
> [2] Scala code is only compatible across minor versions, e.g. 2.11.x
> is compatible with 2.11.y but not 2.10.x with 2.11.x.



Re: continous time triger

2016-01-27 Thread Aljoscha Krettek
Hi Brian,
you are right about changing the behavior of windows when closing. Would this 
be a problem for you?

Cheers,
Aljoscha
> On 26 Jan 2016, at 17:53, Radu Tudoran  wrote:
> 
> Hi,
>  
> Thank you for sharing your experience and also to Till for the advice.
> What I would like to do is to be able to fire the window potentially multiple 
> times, even if an event did not arrive. I will look more about how dealing 
> with the processing time could help in this
>  
> Dr. Radu Tudoran
> Research Engineer - Big Data Expert
> IT R Division
>  
> 
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
>  
> E-mail: radu.tudo...@huawei.com
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>  
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> This e-mail and its attachments contain confidential information from HUAWEI, 
> which is intended only for the person or entity whose address is listed 
> above. Any use of the information contained herein in any way (including, but 
> not limited to, total or partial disclosure, reproduction, or dissemination) 
> by persons other than the intended recipient(s) is prohibited. If you receive 
> this e-mail in error, please notify the sender by phone or email immediately 
> and delete it!
>  
> From: Brian Chhun [mailto:brian.ch...@getbraintree.com] 
> Sent: Tuesday, January 26, 2016 5:28 PM
> To: user@flink.apache.org
> Subject: Re: continous time triger
>  
> For what it's worth, we have a trigger that fires once a day for a recurring 
> calculation. When an element comes in, we set the trigger context's 
> processing time timer to the exact millisecond of the desired time. The 
> predefined triggers were useful to look at to achieve this: 
> https://github.com/apache/flink/tree/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers
>  
> Some things I discovered along the way, particularly using processing time, 
> which may be useful:
> - registering a time that's already passed will cause the timer callback to 
> be called
> - when the system shuts down, the window is fired even though the trigger has 
> not gone off (this sounds subject to change though)
>  
> On Tue, Jan 26, 2016 at 3:47 AM, Till Rohrmann  wrote:
> Hi Radu,
> 
> you can register processing and event time time triggers using the 
> TriggerContext which is given to the onElement, onProcessingTime and 
> onEventTime methods of Trigger. In case you register a processing time timer, 
> the onProcessingTime method will be called once the system clock has passed 
> the timer time. In case of an event time timer, the onEventTimemethod is 
> called once a watermark has been received which has a higher watermark than 
> the timer.
> 
> I hope this helps you to solve your problem.
> 
> Cheers,
> Till
> 
> ​
>  
> On Mon, Jan 25, 2016 at 9:25 PM, Radu Tudoran  wrote:
> Re-Hi,
>  
> I have another question regarding the triggering of the processing of a 
> window. Can this be done in some way at specific time intervals, independent 
> of whether  an event has been received or not, via a trigger?
>  
> The reason why I am considering a trigger rather than timeWindow(All) is that 
> timeWindow will end up generating multiple windows and duplicating data, 
> while having the option from the trigger to actually fire the processing at 
> certain times, independent of when the events arrived) would enable to 
> operate with a single window.
>  
> Regards,
>  
> Dr. Radu Tudoran
> Research Engineer - Big Data Expert
> IT R Division
>  
> 
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
>  
> E-mail: radu.tudo...@huawei.com
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>  
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> This e-mail and its attachments contain confidential information from HUAWEI, 
> which is intended only for the person or entity whose address is listed 
> above. Any use of the information contained herein in any way (including, but 
> not limited to, total or partial disclosure, reproduction, or dissemination) 
> by persons other than the intended recipient(s) is prohibited. If you receive 
> this e-mail in error, please notify the sender by phone or email immediately 
> and delete it!



Re: Task Manager metrics per job on Flink 0.9.1

2016-01-27 Thread Fabian Hueske
Hi,

it is correct that the metrics are collected from the task managers.
In Flink 0.9.1 the metrics are visualized as charts in the web dashboard.
This visualization was removed when the dashboard was redesigned and
updated for 0.10. but will be hopefully be added again.

For Flink 0.9.1, the metrics are cached in memory for plotting and not
persisted anywhere. The JobManager exposes the stats via a REST-like
interface. You would need to check the code of the web dashboard to get the
correct URL.
With the updated web UI of 0.10.1 all job information and stats are exposed
via well defined REST interfaces:
https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/monitoring_rest_api.html

In both cases, you can periodically poll the interfaces to collect stats.

Best, Fabian




2016-01-26 21:22 GMT+01:00 Ritesh Kumar Singh :

> I didn't know these stats were collected. Thanks for telling :)
> In that case, it should definitely be a feature which can be enabled via
> config files.
>
> *Ritesh Kumar Singh,*
> *https://riteshtoday.wordpress.com/* 
>
> On Tue, Jan 26, 2016 at 8:22 PM, Pieter Hameete 
> wrote:
>
>> Hi Ritesh,
>>
>> thanks for the response! The metrics are already being gathered though,
>> so I think it would be nice to have a configuration/option to log them
>> somewhere. It doesnt have to be enabled by default, and I dont think it
>> should degrade the performance very much. It looks like the metrics are
>> currently sent with each heartbeat by default already. Your Web UI probably
>> hangs because it has to update all the graphs on every heartbeat, when you
>> have many task managers that will be heavy on your computer :-)
>>
>> - Pieter
>>
>> 2016-01-26 20:17 GMT+01:00 Ritesh Kumar Singh <
>> riteshoneinamill...@gmail.com>:
>>
>>> Going by the list in the latest documentation
>>> 
>>> for Flink 0.10.1 release, memory and cpu stats are not stored. Neither is
>>> the time spent on garbage collection stored anywhere.
>>>
>>> In my opinion, trying to store these metrics will degrade the
>>> performance of jobs too. And so its basically a trade off between
>>> performance and computation cost. For me, the web ui hangs even for the
>>> current set of parameters :(
>>>
>>> *Ritesh Kumar Singh,*
>>> *https://riteshtoday.wordpress.com/*
>>> 
>>>
>>> On Tue, Jan 26, 2016 at 7:16 PM, Pieter Hameete 
>>> wrote:
>>>
 Hi people!

 A lot of metrics are gathered for each TaskManager every few seconds.
 The web UI shows nice graphs for some of these metrics too.

 I would like to make graphs of the memory and cpu usage, and the time
 spent on garbage collection for each job. Because of this I am wondering if
 the metrics are also stored somewhere, or if there is an option to enable
 storing the metrics per job.

 In the configuration documentation I could not find such an option. Is
 this possible in version 0.9.1 of Flink? If not: is it possible in Flink
 0.10.1 or is it possible to request or develop such a feature?

 Thank you for your help and kind regards,

 Pieter



>>>
>>
>


Re: Streaming left outer join

2016-01-27 Thread Aljoscha Krettek
Hi,
I’m afraid there is currently now way to do what you want with the builtin 
window primitives. Each of the slices of the sliding windows is essentially 
evaluated independently. Therefore, there cannot be effects in one slice that 
influence processing of another slice.

What you could do is switch to tumbling windows, then each element would only 
be in one window. That probably won’t fit your use case anymore. The 
alternative I see to that is to implement everything in a custom operator where 
you deal with window states and triggering on time yourself. Let me know if you 
need some pointers about that one.

Cheers,
Aljoscha
> On 26 Jan 2016, at 19:32, Alexander Gryzlov  wrote:
> 
> Hello, 
> 
> I'm trying to implement a left outer join of two Kafka streams within a 
> sliding window. So far I have the following code:
> 
> foos
>   .coGroup(bars)
>   .where(_.baz).equalTo(_.baz)
>   .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.MINUTES), Time.of(1, 
> TimeUnit.SECONDS)))
>   .apply((fs: Iterator[Foo], bs: Iterator[Bar], o: Collector[FooBar]) =>
>fs.foreach(f =>
> if (bs.isEmpty)
>   o.collect(FooBar(f, None))
> else
>   bs.foreach(b => o.collect(FooBar(f, Some(b
>)
>   )
> 
> However, this results in the pair being emitted from every window slide, 
> regardless of the match. The desired behaviour would be:
> * emit the the match as soon as it's found, don't emit any more pairs for it,
> * otherwise, emit the empty match, when the left side element leaves the last 
> of its windows
> 
> What would be the idiomatic/efficient way to implement such behaviour? Is it 
> possible at all with the coGroup/window mechanism, or some other way is 
> necessary?
> 
> Alex



Re: Imbalanced workload between workers

2016-01-27 Thread Till Rohrmann
Could it be that your data is skewed? This could lead to different loads on
different task managers.

With the latest Flink version, the web interface should show you how many
bytes each operator has written and received. There you could see if one
operator receives more elements than the others.

Cheers,
Till

On Wed, Jan 27, 2016 at 1:35 PM, Pieter Hameete  wrote:

> Hi guys,
>
> Currently I am running a job in the GCloud in a configuration with 4 task
> managers that each have 4 CPUs (for a total parallelism of 16).
>
> However, I noticed my job is running much slower than expected and after
> some more investigation I found that one of the workers is doing a majority
> of the work (its CPU load was at 100% while the others were almost idle).
>
> My job execution plan can be found here: http://i.imgur.com/fHKhVFf.png
>
> The input is split into multiple files so loading the data is properly
> distributed over the workers.
>
> I am wondering if you can provide me with some tips on how to figure out
> what is going wrong here:
>
>- Could this imbalance in workload be the result of an imbalance in
>the hash paritioning?
>- Is there a convenient way to see how many elements each worker gets
>   to process? Would it work to write the output of the CoGroup to disk
>   because each worker writes to its own output file and investigate the
>   differences?
>- Is there something strange about the execution plan that could cause
>this?
>
> Thanks and kind regards,
>
> Pieter
>


Re: Streaming left outer join

2016-01-27 Thread Alexander Gryzlov
Hello Aljoscha,

Indeed, it seems like I'd need a custom operator. I imagine this involves
implementing org.apache.flink.streaming.api.operators.TwoInputStreamOperator?
Could
you provide those pointers please?

Alex

On Wed, Jan 27, 2016 at 12:03 PM, Aljoscha Krettek 
wrote:

> Hi,
> I’m afraid there is currently now way to do what you want with the builtin
> window primitives. Each of the slices of the sliding windows is essentially
> evaluated independently. Therefore, there cannot be effects in one slice
> that influence processing of another slice.
>
> What you could do is switch to tumbling windows, then each element would
> only be in one window. That probably won’t fit your use case anymore. The
> alternative I see to that is to implement everything in a custom operator
> where you deal with window states and triggering on time yourself. Let me
> know if you need some pointers about that one.
>
> Cheers,
> Aljoscha
> > On 26 Jan 2016, at 19:32, Alexander Gryzlov 
> wrote:
> >
> > Hello,
> >
> > I'm trying to implement a left outer join of two Kafka streams within a
> sliding window. So far I have the following code:
> >
> > foos
> >   .coGroup(bars)
> >   .where(_.baz).equalTo(_.baz)
> >   .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.MINUTES), Time.of(1,
> TimeUnit.SECONDS)))
> >   .apply((fs: Iterator[Foo], bs: Iterator[Bar], o: Collector[FooBar]) =>
> >fs.foreach(f =>
> > if (bs.isEmpty)
> >   o.collect(FooBar(f, None))
> > else
> >   bs.foreach(b => o.collect(FooBar(f, Some(b
> >)
> >   )
> >
> > However, this results in the pair being emitted from every window slide,
> regardless of the match. The desired behaviour would be:
> > * emit the the match as soon as it's found, don't emit any more pairs
> for it,
> > * otherwise, emit the empty match, when the left side element leaves the
> last of its windows
> >
> > What would be the idiomatic/efficient way to implement such behaviour?
> Is it possible at all with the coGroup/window mechanism, or some other way
> is necessary?
> >
> > Alex
>
>


Re: Imbalanced workload between workers

2016-01-27 Thread Pieter Hameete
Cheers for the quick reply Till.

That would be very useful information to have! I'll upgrade my project to
Flink 0.10.1 tongiht and let you know if I can find out if theres a skew in
the data :-)

- Pieter


2016-01-27 13:49 GMT+01:00 Till Rohrmann :

> Could it be that your data is skewed? This could lead to different loads
> on different task managers.
>
> With the latest Flink version, the web interface should show you how many
> bytes each operator has written and received. There you could see if one
> operator receives more elements than the others.
>
> Cheers,
> Till
>
> On Wed, Jan 27, 2016 at 1:35 PM, Pieter Hameete 
> wrote:
>
>> Hi guys,
>>
>> Currently I am running a job in the GCloud in a configuration with 4 task
>> managers that each have 4 CPUs (for a total parallelism of 16).
>>
>> However, I noticed my job is running much slower than expected and after
>> some more investigation I found that one of the workers is doing a majority
>> of the work (its CPU load was at 100% while the others were almost idle).
>>
>> My job execution plan can be found here: http://i.imgur.com/fHKhVFf.png
>>
>> The input is split into multiple files so loading the data is properly
>> distributed over the workers.
>>
>> I am wondering if you can provide me with some tips on how to figure out
>> what is going wrong here:
>>
>>- Could this imbalance in workload be the result of an imbalance in
>>the hash paritioning?
>>- Is there a convenient way to see how many elements each worker gets
>>   to process? Would it work to write the output of the CoGroup to disk
>>   because each worker writes to its own output file and investigate the
>>   differences?
>>- Is there something strange about the execution plan that could
>>cause this?
>>
>> Thanks and kind regards,
>>
>> Pieter
>>
>
>