Re: Logical plan optimization with Calcite

2016-07-21 Thread gallenvara
Thanks Max and Timo for the explanation. :)



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Logical-plan-optimization-with-Calcite-tp8037p8106.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: flink1.0 DataStream groupby

2016-07-21 Thread Suneel Marthi
It should be keyBy(0) for DataStream API (since Flink 0.9)

Its groupBy() in DataSet API.

On Fri, Jul 22, 2016 at 1:27 AM,  wrote:

> Hi,
> today,I use flink to rewrite my spark project,in spark ,data is
> rdd,and it have much transformations and actions,but in flink,the
> DataStream does not have groupby and foreach,
>   for example,
>val env=StreamExecutionEnvironment.createLocalEnvironment()
>   val data=List(("1"->"a"),("2"->"b"),("1"->"c"),("2"->"f"))
>   val ds=env.fromCollection(data)
>   val dskeyby=ds.groupBy(0)
>   ds.print()
>  env.execute()
>
> the code "val dskeyby=ds.groupBy(0)" is error,say "value groupBy is not a
> member of org.apache.flink.streaming.api.scala.DataStream"
> so , the solution is?
> 
>
>
>
>
>


flink1.0 DataStream groupby

2016-07-21 Thread rimin515
Hi,today,I use flink to rewrite my spark project,in spark ,data is rdd,and 
it have much transformations and actions,but in flink,the DataStream does not 
have groupby and foreach,  for example,val 
env=StreamExecutionEnvironment.createLocalEnvironment()  val 
data=List(("1"->"a"),("2"->"b"),("1"->"c"),("2"->"f"))  val 
ds=env.fromCollection(data)  val dskeyby=ds.groupBy(0)  ds.print() 
env.execute()
the code "val dskeyby=ds.groupBy(0)" is error,say "value groupBy is not a 
member of org.apache.flink.streaming.api.scala.DataStream"so , the solution 
is?









Re: Error using S3a State Backend: Window Operators sending directory instead of fully qualified file?

2016-07-21 Thread Clifford Resnick
I took another look at this and it occurred to me that the S3a directory issue 
is actually localized to Cloudera's hadoop-aws version, which is stuck at 
2.6.0. Apparently the zeroed out directory timestamps are in the Flink 
recommended version. So, Flink/Yarn/S3a will work, just not with CDH5. I'll 
verify and submit the original PR tomorrow morning EST.

On Jul 22, 2016 12:26 AM, Clifford Resnick  wrote:
I have a fix and test for a recursive HDFSCopyToLocal. I also added similar 
code to Yarn application staging. However, even though all files and resources 
now copy correctly, S3A still fails on Flink session creation. The failure 
stems from the lib folder being registered as an application resource (as 
opposed to its contained contents). Since there is no such thing as a directory 
in S3, there is no file creation timestamp, and the local folder resource fails 
with the following error:

java.io.IOException: Resource 
s3a://mm-dev-flink-savepoints/user/ec2-user/.flink/application_1469150519301_0014/lib
 changed on src filesystem (expected 0, was 1469155595413
at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253)
at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:61)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:357)
at java.security.AccessController.doPrivileged(Native Method)..

I was stumped by why S3NativeFilesystem does _not_ fail, but reading this 
perhaps answers why: 
http://stackoverflow.com/questions/15619712/newly-created-s3-directory-has-1969-12-31-as-timestamp

Everything works for all FileSystem implementations if I change the staging 
code to expand directory resources to file resources, effectively flattening 
all resources into the base directory. The only issue with this approach would 
be if there are like-named classpath resources in nested directories, but 
apparently the current implementation only copies jars so perhaps that’s a 
non-issue.

Short of altering S3A to perform the linked “hack”, I don’t see how 
Flink/Yarn/S3a can work as currently implemented. I can add the resource 
directory flattening to my impending PR but I just want to be sure to first 
mention the risk (like-named nested resources).

BTW, If anyone is wondering why I’m interested in S3a over S3n, it’s for this: 
https://issues.apache.org/jira /browse/HADOOP-11183. In-memory multipart 
writes would be a great way to use the rolling file appender.

-Cliff




On 7/19/16, 4:00 AM, "Ufuk Celebi"  wrote:

Feel free to do the contribution at any time you like. We can also
always make it part of a bugfix release if it does not make it into
the upcoming 1.1 RC (probably end of this week or beginning of next).
Feel free to ping me if you need any feed back or pointers.

– Ufuk


On Mon, Jul 18, 2016 at 9:52 PM, Clifford Resnick
 wrote:
> In 1.1, AbstractYarnClusterDescriptor pushes contents of flink/lib (local 
to where the yarn app is launched) to Yarn with a single directory copy. In 
1.0.3 it looked like it was copying the individual jars.
>
> So, yes I did actually change HDFSCopyToLocal, which was easy, but the 
job staging in the above class also needs altering. I’m happy to contribute on 
both though I won’t be able to get to it until later this week.
>
> -Cliff
>
>
>
> On 7/18/16, 3:38 PM, "Ufuk Celebi"  wrote:
>
> Hey Cliff! Good to see that we came to the same conclusion :-) What do
> you mean with copying of the "lib" folder? This issue should be the
> same for both 1.0 and 1.1. Another work around could be to use the
> fully async RocksDB snapshots with Flink 1.1-SNAPSHOT.
>
> If you like, you could also work on the issue I've created by
> implementing the recursive File copy in Flink (in HDFSCopyToLocal) and
> contribute this via a pull request.
>
> – Ufuk
>
>
> On Mon, Jul 18, 2016 at 7:22 PM, Clifford Resnick
>  wrote:
> > Hi Ufuk,
> >
> > My mail was down, so I missed this response. Thanks for that.
> >
> > On 7/18/16, 10:38 AM, "Ufuk Celebi"  wrote:
> >
> > Hey Cliff!
> >
> > I was able to reproduce this by locally running a job and 
RocksDB semi
> > asynchronous checkpoints (current default) to S3A. I've created 
an
> > issue here: https://issues.apache.org/jira/browse/FLINK-4228.
> >
> > Running with S3N it is working as expected. You can use that
> > implementation as a work around. I don't know whether it's 
possible to
> > disable creation of MD5 hashes for S3A.
> >
> > – Ufuk
> >
> > On Sat, Jul 16, 2016 at 6:26 PM, Clifford Resnick
> >  wrote:
> > > Using Flink 

Re: Error using S3a State Backend: Window Operators sending directory instead of fully qualified file?

2016-07-21 Thread Clifford Resnick
I have a fix and test for a recursive HDFSCopyToLocal. I also added similar 
code to Yarn application staging. However, even though all files and resources 
now copy correctly, S3A still fails on Flink session creation. The failure 
stems from the lib folder being registered as an application resource (as 
opposed to its contained contents). Since there is no such thing as a directory 
in S3, there is no file creation timestamp, and the local folder resource fails 
with the following error:

java.io.IOException: Resource 
s3a://mm-dev-flink-savepoints/user/ec2-user/.flink/application_1469150519301_0014/lib
 changed on src filesystem (expected 0, was 1469155595413
at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253)
at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:61)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:357)
at java.security.AccessController.doPrivileged(Native Method)..

I was stumped by why S3NativeFilesystem does _not_ fail, but reading this 
perhaps answers why: 
http://stackoverflow.com/questions/15619712/newly-created-s3-directory-has-1969-12-31-as-timestamp

Everything works for all FileSystem implementations if I change the staging 
code to expand directory resources to file resources, effectively flattening 
all resources into the base directory. The only issue with this approach would 
be if there are like-named classpath resources in nested directories, but 
apparently the current implementation only copies jars so perhaps that’s a 
non-issue.

Short of altering S3A to perform the linked “hack”, I don’t see how 
Flink/Yarn/S3a can work as currently implemented. I can add the resource 
directory flattening to my impending PR but I just want to be sure to first 
mention the risk (like-named nested resources). 

BTW, If anyone is wondering why I’m interested in S3a over S3n, it’s for this: 
https://issues.apache.org/jira /browse/HADOOP-11183. In-memory multipart 
writes would be a great way to use the rolling file appender.   


-Cliff




On 7/19/16, 4:00 AM, "Ufuk Celebi"  wrote:

Feel free to do the contribution at any time you like. We can also
always make it part of a bugfix release if it does not make it into
the upcoming 1.1 RC (probably end of this week or beginning of next).
Feel free to ping me if you need any feed back or pointers.

– Ufuk


On Mon, Jul 18, 2016 at 9:52 PM, Clifford Resnick
 wrote:
> In 1.1, AbstractYarnClusterDescriptor pushes contents of flink/lib (local 
to where the yarn app is launched) to Yarn with a single directory copy. In 
1.0.3 it looked like it was copying the individual jars.
>
> So, yes I did actually change HDFSCopyToLocal, which was easy, but the 
job staging in the above class also needs altering. I’m happy to contribute on 
both though I won’t be able to get to it until later this week.
>
> -Cliff
>
>
>
> On 7/18/16, 3:38 PM, "Ufuk Celebi"  wrote:
>
> Hey Cliff! Good to see that we came to the same conclusion :-) What do
> you mean with copying of the "lib" folder? This issue should be the
> same for both 1.0 and 1.1. Another work around could be to use the
> fully async RocksDB snapshots with Flink 1.1-SNAPSHOT.
>
> If you like, you could also work on the issue I've created by
> implementing the recursive File copy in Flink (in HDFSCopyToLocal) and
> contribute this via a pull request.
>
> – Ufuk
>
>
> On Mon, Jul 18, 2016 at 7:22 PM, Clifford Resnick
>  wrote:
> > Hi Ufuk,
> >
> > My mail was down, so I missed this response. Thanks for that.
> >
> > On 7/18/16, 10:38 AM, "Ufuk Celebi"  wrote:
> >
> > Hey Cliff!
> >
> > I was able to reproduce this by locally running a job and 
RocksDB semi
> > asynchronous checkpoints (current default) to S3A. I've created 
an
> > issue here: https://issues.apache.org/jira/browse/FLINK-4228.
> >
> > Running with S3N it is working as expected. You can use that
> > implementation as a work around. I don't know whether it's 
possible to
> > disable creation of MD5 hashes for S3A.
> >
> > – Ufuk
> >
> > On Sat, Jul 16, 2016 at 6:26 PM, Clifford Resnick
> >  wrote:
> > > Using Flink 1.1-SNAPSHOT, Hadoop-aws 2.6.4
> > >
> > >
> > >
> > > The error I’m getting is :
> > >
> > >
> > >
> > > 11:05:44,425 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask
> > > - Caught exception while materializing asynchronous 
checkpoints.
>

Re: add FLINK_LIB_DIR to classpath on yarn -> add properties file to class path on yarn

2016-07-21 Thread 김동일
I’saw the source code of 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
Flink ships the FLINK_LIB_DIR and add to classpath only jar files.
I want to know how to add resource file to classpath.

Best Regards, 
Dong-iL, Kim.

> On Jul 22, 2016, at 4:28 AM, Dong iL, Kim  wrote:
> 
> Hello.
> I have a flink cluster on yarn.
> I wanna add FLINK_LIB_DIR to classpath.
> because hibernate.cfg.xml need to be on the classpath.
> when i'm using stand alone cluster, just add FLINK_LIB_DIR to FLINK_CLASSPATH.
> but on yarn, Fixing config.sh, yarn-session.sh and flink-daemon.sh is not 
> working.
> 
> Best Regards,
> Dong-iL, Kim.
> -- 
> http://www.kiva.org " TARGET="_top">
> http://www.kiva.org/images/bannerlong.png 
> " WIDTH="460" HEIGHT="60" 
> ALT="Kiva - loans that change lives" BORDER="0" ALIGN="BOTTOM">



Re: counting words (not frequency)

2016-07-21 Thread hrajaram
Can't you use a KeyedStream, I mean keyBy with the sameKey?  something like
this,
source.flatMap(new Tokenizer()).keyBy(0).sum(2).project(2).print();

Assuming tokenizer is giving Tuple3 

1-> is always the same key, say "test" 
2->the actual word 
3-> 1



There might be some other good choices but this is the first thing that
quickly came in my mind :-)

Hari




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/counting-words-not-frequency-tp8099p8100.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


counting words (not frequency)

2016-07-21 Thread Roshan Naik
Was trying to write a simple streaming Flink program that counts the total 
words(not the frequency)  in a fie.
 I was thinking on the lines of :

counts =  text.flatMap(new Tokenizer())
.count(); // count() isnt part of streamin APIs (but supported for batching)

Any suggestions on how to do this ? I just want a continuous count (not 
windowed count)
-roshan


Re: Processing windows in event time order

2016-07-21 Thread Sameer W
Alijoscha - Thanks it works exactly as you said. I found out why my windows
were firing twice. I was making the error of adding the
AutoWatermarkInterval to the existing watermark each time the watermark was
sampled from the source just to fire a window if one of the sources was
delayed substantially.

But doesn't this mean, that if one of the sources stop sending data (device
lost internet connectivity temporarily) , then such a pipeline would just
freeze and windows would keep accumulating on the reduce side as other
sources (except one) would keep sending data  and their watermarks. Isn't
this a risk for a possible Out of Memory Error. Should one always use a
RocksDB alternative to mitigate such risks.

Sameer



On Thu, Jul 21, 2016 at 7:52 AM, Aljoscha Krettek 
wrote:

> Yes, that is to be expected. Stream 2 should only send the watermark once
> the elements with a timestamp lower than the watermark have been sent as
> well.
>
> On Thu, 21 Jul 2016 at 13:10 Sameer W  wrote:
>
>> Thanks, Aljoscha,
>>
>> This what I am seeing when I use Ascending timestamps as watermarks-
>>
>> Consider a window if 1-5 seconds
>> Stream 1- Sends Elements A,B
>>
>> Stream 2 (20 seconds later) - Sends Elements C,D
>>
>> I see Window (1-5) fires first with just A,B. After 20 seconds Window
>> (1-5) fires again but this time with only C,D. If I add a delay where I lag
>> the watermarks by 20 seconds, then only one instance of the Window (1-5)
>> fires with elements A,B,C,D.
>>
>> Sameer
>>
>> On Thu, Jul 21, 2016 at 5:17 AM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi David,
>>> windows are being processed in order of their end timestamp. So if you
>>> specify an allowed lateness of zero (which will only be possible on Flink
>>> 1.1 or by using a custom trigger) you should be able to sort the elements.
>>> The ordering is only valid within one key, though, since windows for
>>> different keys with the same end timestamp will be processed in an
>>> arbitrary order.
>>>
>>> @Sameer If both sources emit watermarks that are correct for the
>>> elements that they are emitting the Trigger should only fire when both
>>> sources progressed their watermarks sufficiently far. Could you maybe give
>>> a more detailed example of the problem that you described?
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>>
>>> On Thu, 21 Jul 2016 at 04:03 Sameer Wadkar  wrote:
>>>
 Hi,

 If watermarks arriving from multiple sources, how long does the Event
 Time Trigger wait for the slower source to send its watermarks before
 triggering only from the faster source? I have seen that if one of the
 sources is really slow then the elements of the faster source fires and
 when the elements arrive from the slower source, the same window fires
 again with the new elements only. I can work around this by adding delays
 but does merging watermarks require that both have arrived by the time the
 watermarks progress to the point where a window can be triggered? Is
 applying a delay in the watermark the only way to solve this.

 Sameer

 Sent from my iPhone

 On Jul 20, 2016, at 9:41 PM, Vishnu Viswanath <
 vishnu.viswanat...@gmail.com> wrote:

 Hi David,

 You are right, the events in the window are not sorted according to the
 EventTime hence the processing is not done in an increasing order of
 timestamp.
 As you said, you will have to do the sorting yourself in your window
 function to make sure that you are processing the events in order.

 What Flink does is (when EventTime is set and timestamp is assigned),
 it will assign the elements to the Windows based on the EventTime, which
 otherwise (if using ProcessingTime) might have ended up in a different
 Window. (as per the ProcessingTime).

 This is as per my limited knowledge, other Flink experts can correct me
 if this is wrong.

 Thanks,
 Vishnu

 On Wed, Jul 20, 2016 at 9:30 PM, David Desberg 
 wrote:

> Hi all,
>
> In Flink, after setting the time characteristic to event time and
> properly assigning timestamps/watermarks, time-based windows will be
> created based upon event time. If we need to process events within a 
> window
> in event time order, we can sort the windowed values and process as
> necessary by applying a WindowFunction. However, as I understand it, there
> is no guarantee that time-based windows will be processed in time order. 
> Is
> this correct? Or, if we assume a watermarking system that (for example's
> sake) does not allow any late events, is there a way within Flink to
> guarantee that windows will be processed (via an applied WindowFunction) 
> in
> strictly increasing time order?
>
> If necessary, I can provide a more concrete explanation of what I
> mean/am looking for.
>
> Thanks!
> David



>>


add FLINK_LIB_DIR to classpath on yarn

2016-07-21 Thread Dong iL, Kim
Hello.
I have a flink cluster on yarn.
I wanna add FLINK_LIB_DIR to classpath.
because hibernate.cfg.xml need to be on the classpath.
when i'm using stand alone cluster, just add FLINK_LIB_DIR to
FLINK_CLASSPATH.
but on yarn, Fixing config.sh, yarn-session.sh and flink-daemon.sh is not
working.

Best Regards,
Dong-iL, Kim.
-- 
http://www.kiva.org"; TARGET="_top">
http://www.kiva.org/images/bannerlong.png"; WIDTH="460"
HEIGHT="60" ALT="Kiva - loans that change lives" BORDER="0"
ALIGN="BOTTOM">


Re: Processing windows in event time order

2016-07-21 Thread David Desberg
Aljoscha,

Awesome. Exactly the behavior I was hoping would be exhibited. Thank you
for the quick answer :)

Thanks,
David

On Thu, Jul 21, 2016 at 2:17 AM, Aljoscha Krettek 
wrote:

> Hi David,
> windows are being processed in order of their end timestamp. So if you
> specify an allowed lateness of zero (which will only be possible on Flink
> 1.1 or by using a custom trigger) you should be able to sort the elements.
> The ordering is only valid within one key, though, since windows for
> different keys with the same end timestamp will be processed in an
> arbitrary order.
>
> @Sameer If both sources emit watermarks that are correct for the elements
> that they are emitting the Trigger should only fire when both sources
> progressed their watermarks sufficiently far. Could you maybe give a more
> detailed example of the problem that you described?
>
> Cheers,
> Aljoscha
>
>
> On Thu, 21 Jul 2016 at 04:03 Sameer Wadkar  wrote:
>
>> Hi,
>>
>> If watermarks arriving from multiple sources, how long does the Event
>> Time Trigger wait for the slower source to send its watermarks before
>> triggering only from the faster source? I have seen that if one of the
>> sources is really slow then the elements of the faster source fires and
>> when the elements arrive from the slower source, the same window fires
>> again with the new elements only. I can work around this by adding delays
>> but does merging watermarks require that both have arrived by the time the
>> watermarks progress to the point where a window can be triggered? Is
>> applying a delay in the watermark the only way to solve this.
>>
>> Sameer
>>
>> Sent from my iPhone
>>
>> On Jul 20, 2016, at 9:41 PM, Vishnu Viswanath <
>> vishnu.viswanat...@gmail.com> wrote:
>>
>> Hi David,
>>
>> You are right, the events in the window are not sorted according to the
>> EventTime hence the processing is not done in an increasing order of
>> timestamp.
>> As you said, you will have to do the sorting yourself in your window
>> function to make sure that you are processing the events in order.
>>
>> What Flink does is (when EventTime is set and timestamp is assigned), it
>> will assign the elements to the Windows based on the EventTime, which
>> otherwise (if using ProcessingTime) might have ended up in a different
>> Window. (as per the ProcessingTime).
>>
>> This is as per my limited knowledge, other Flink experts can correct me
>> if this is wrong.
>>
>> Thanks,
>> Vishnu
>>
>> On Wed, Jul 20, 2016 at 9:30 PM, David Desberg 
>> wrote:
>>
>>> Hi all,
>>>
>>> In Flink, after setting the time characteristic to event time and
>>> properly assigning timestamps/watermarks, time-based windows will be
>>> created based upon event time. If we need to process events within a window
>>> in event time order, we can sort the windowed values and process as
>>> necessary by applying a WindowFunction. However, as I understand it, there
>>> is no guarantee that time-based windows will be processed in time order. Is
>>> this correct? Or, if we assume a watermarking system that (for example's
>>> sake) does not allow any late events, is there a way within Flink to
>>> guarantee that windows will be processed (via an applied WindowFunction) in
>>> strictly increasing time order?
>>>
>>> If necessary, I can provide a more concrete explanation of what I
>>> mean/am looking for.
>>>
>>> Thanks!
>>> David
>>
>>
>>


Re: Using Kafka and Flink for batch processing of a batch data source

2016-07-21 Thread Suneel Marthi
I meant to respond to this thread yesterday, but got busy with work and
slipped me.

This is possible doable using Flink Streaming, others can correct me here.

*Assumption:* Both the Batch and Streaming processes are reading from a
single Kafka topic and by "Batched data", I am assuming its the same data
that's being fed to Streaming but aggregated over a longer time period.

This could be done using a Lambda like Architecture.

1. A Kafka topic that's ingesting data to be distributed to various
consumers.
2. A Flink Streaming process with a small time window (minutes/seconds)
that's ingesting from Kafka and handles data over this small window.
3. Another Flink Streaming process with a very long time window (few hrs ?)
that's also ingesting from Kafka and is munging over large time periods of
data (think mini-batch that extends Streaming).

This should work and u don't need a separate Batch process.  A similar
architecture using Spark Streaming (for both batch and streaming) is
demonstrated by Cloudera's Oryx 2.0 project - see http://oryx.io


On Thu, Jul 21, 2016 at 12:41 PM, milind parikh 
wrote:

> At this point in time, imo, batch processing is not why you should be
> considering Flink.
>
> That said, I predict that the stream processing (and event processing)
> will become the dominant methodology; as we begin to gravitate towards  "I
> can't wait; I want it now" phenomenon. In that methodology,  I believe
> Flink represents the cutting edge of what is possible; at this point in
> time.
>
> Regards
> Milind
>
> On Jul 20, 2016 4:57 PM, "Leith Mudge"  wrote:
>
> Thanks Milind & Till,
>
>
>
> This is what I thought from my reading of the documentation but it is nice
> to have it confirmed by people more knowledgeable.
>
>
>
> Supplementary to this question is whether Flink is the best choice for
> batch processing at this point in time or whether I would be better to look
> at a more mature and dedicated batch processing engine such as Spark? I do
> like the choices that adopting the unified programming model outlined in
> Apache Beam/Google Cloud Dataflow SDK and this purports to have runners for
> both Flink and Spark.
>
>
>
> Regards,
>
>
>
> Leith
>
> *From: *Till Rohrmann 
> *Date: *Wednesday, 20 July 2016 at 5:05 PM
> *To: *
> *Subject: *Re: Using Kafka and Flink for batch processing of a batch data
> source
>
>
>
> At the moment there is also no batch source for Kafka. I'm also not so
> sure how you would define a batch given a Kafka stream. Only reading till a
> certain offset? Or maybe until one has read n messages?
>
>
>
> I think it's best to write the batch data to HDFS or another batch data
> store.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Wed, Jul 20, 2016 at 8:08 AM, milind parikh 
> wrote:
>
> It likely does not make sense to publish a file ( "batch data") into
> Kafka; unless the file is very small.
>
> An improvised pub-sub mechanism for Kafka could be to (a) write the file
> into a persistent store outside of kafka (b) publishing of a message into
> Kafka about that write so as to enable processing of that file.
>
> If you really needed to have provenance around processing, you could route
> data processing through Nifi before Flink.
>
> Regards
> Milind
>
>
>
> On Jul 19, 2016 9:37 PM, "Leith Mudge"  wrote:
>
> I am currently working on an architecture for a big data streaming and
> batch processing platform. I am planning on using Apache Kafka for a
> distributed messaging system to handle data from streaming data sources and
> then pass on to Apache Flink for stream processing. I would also like to
> use Flink's batch processing capabilities to process batch data.
>
> Does it make sense to pass the batched data through Kafka on a periodic
> basis as a source for Flink batch processing (is this even possible?) or
> should I just write the batch data to a data store and then process by
> reading into Flink?
>
>
> --
>
>
> | All rights in this email and any attached documents or files are
> expressly reserved. This e-mail, and any files transmitted with it,
> contains confidential information which may be subject to legal privilege.
> If you are not the intended recipient, please delete it and notify Palamir
> Pty Ltd by e-mail. Palamir Pty Ltd does not warrant this transmission or
> attachments are free from viruses or similar malicious code and does not
> accept liability for any consequences to the recipient caused by opening or
> using this e-mail. For the legal protection of our business, any email sent
> or received by us may be monitored or intercepted. | Please consider the
> environment before printing this email. |
>
>
>
> --
>
> | All rights in this email and any attached documents or files are
> expressly reserved. This e-mail, and any files transmitted with it,
> contains confidential information which may be subject to legal privilege.
> If you are not the intended recipient, please delete it and notify Palamir
> Pty Lt

Re: Using Kafka and Flink for batch processing of a batch data source

2016-07-21 Thread milind parikh
At this point in time, imo, batch processing is not why you should be
considering Flink.

That said, I predict that the stream processing (and event processing) will
become the dominant methodology; as we begin to gravitate towards  "I can't
wait; I want it now" phenomenon. In that methodology,  I believe Flink
represents the cutting edge of what is possible; at this point in time.

Regards
Milind

On Jul 20, 2016 4:57 PM, "Leith Mudge"  wrote:

Thanks Milind & Till,



This is what I thought from my reading of the documentation but it is nice
to have it confirmed by people more knowledgeable.



Supplementary to this question is whether Flink is the best choice for
batch processing at this point in time or whether I would be better to look
at a more mature and dedicated batch processing engine such as Spark? I do
like the choices that adopting the unified programming model outlined in
Apache Beam/Google Cloud Dataflow SDK and this purports to have runners for
both Flink and Spark.



Regards,



Leith

*From: *Till Rohrmann 
*Date: *Wednesday, 20 July 2016 at 5:05 PM
*To: *
*Subject: *Re: Using Kafka and Flink for batch processing of a batch data
source



At the moment there is also no batch source for Kafka. I'm also not so sure
how you would define a batch given a Kafka stream. Only reading till a
certain offset? Or maybe until one has read n messages?



I think it's best to write the batch data to HDFS or another batch data
store.



Cheers,

Till



On Wed, Jul 20, 2016 at 8:08 AM, milind parikh 
wrote:

It likely does not make sense to publish a file ( "batch data") into Kafka;
unless the file is very small.

An improvised pub-sub mechanism for Kafka could be to (a) write the file
into a persistent store outside of kafka (b) publishing of a message into
Kafka about that write so as to enable processing of that file.

If you really needed to have provenance around processing, you could route
data processing through Nifi before Flink.

Regards
Milind



On Jul 19, 2016 9:37 PM, "Leith Mudge"  wrote:

I am currently working on an architecture for a big data streaming and
batch processing platform. I am planning on using Apache Kafka for a
distributed messaging system to handle data from streaming data sources and
then pass on to Apache Flink for stream processing. I would also like to
use Flink's batch processing capabilities to process batch data.

Does it make sense to pass the batched data through Kafka on a periodic
basis as a source for Flink batch processing (is this even possible?) or
should I just write the batch data to a data store and then process by
reading into Flink?


--


| All rights in this email and any attached documents or files are
expressly reserved. This e-mail, and any files transmitted with it,
contains confidential information which may be subject to legal privilege.
If you are not the intended recipient, please delete it and notify Palamir
Pty Ltd by e-mail. Palamir Pty Ltd does not warrant this transmission or
attachments are free from viruses or similar malicious code and does not
accept liability for any consequences to the recipient caused by opening or
using this e-mail. For the legal protection of our business, any email sent
or received by us may be monitored or intercepted. | Please consider the
environment before printing this email. |



--

| All rights in this email and any attached documents or files are
expressly reserved. This e-mail, and any files transmitted with it,
contains confidential information which may be subject to legal privilege.
If you are not the intended recipient, please delete it and notify Palamir
Pty Ltd by e-mail. Palamir Pty Ltd does not warrant this transmission or
attachments are free from viruses or similar malicious code and does not
accept liability for any consequences to the recipient caused by opening or
using this e-mail. For the legal protection of our business, any email sent
or received by us may be monitored or intercepted. | Please consider the
environment before printing this email. |


Re: taskmanager memory leak

2016-07-21 Thread 김동일
I think so. 
I’ll test it on EMR and then reply.

I am truly grateful for your support.

> On Jul 21, 2016, at 8:49 PM, Stephan Ewen  wrote:
> 
> I don't know that answer, sorry. Maybe one of the others can chime in here.
> 
> Did you deactivate checkpointing (then it should not write to S3) and did 
> that resolve the leak?
> 
> Best,
> Stephan
> 
> 
> On Thu, Jul 21, 2016 at 12:52 PM, 김동일  > wrote:
> Dear Stephan.
> 
> I also suspect the s3. 
> I’ve tried s3n, s3a.
> what is suitable library? I’m using aws-java-sdk-1.7.4 and hadoop-aws-2.7.2.
> 
> Best regards.
> 
>> On Jul 21, 2016, at 5:54 PM, Stephan Ewen > > wrote:
>> 
>> Hi!
>> 
>> There is a memory debugging logger, you can activate it like that:
>> https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#memory-and-performance-debugging
>>  
>> 
>> 
>> It will print which parts of the memory are growing.
>> 
>> What you can also try is to deactivate checkpointing for one run and see if 
>> that solves it. If yes, then I suspect there is a memory leak in the s3 
>> library (are you using s3, s3a, or s3n?).
>> 
>> Can you also check what libraries you are using? We have seen cases of 
>> memory leaks in the libraries people used.
>> 
>> Greetings,
>> Stephan
>> 
>> 
>> 
>> On Thu, Jul 21, 2016 at 5:13 AM, 김동일 > > wrote:
>> hi. stephan. 
>> 
>> - Did you submit any job to the cluster, or is the memory just growing even 
>> on an idle TaskManager?
>> 
>> I have some stream job. 
>> 
>> - If you are running a job, do you use the RocksDB state backend, of the 
>> FileSystem state backend?
>> 
>> file state backend. i use s3.
>> 
>> - Does it grow infinitely, or simply up a certain point and then goes down 
>> again?
>> 
>> I think it infinitely. kernel kills the process , oom.
>> 
>> 
>> 
>> On Thu, Jul 21, 2016 at 3:52 AM Stephan Ewen > > wrote:
>> Hi!
>> 
>> In order to answer this, we need a bit more information. Here are some 
>> followup questions:
>> 
>>   - Did you submit any job to the cluster, or is the memory just growing 
>> even on an idle TaskManager?
>>   - If you are running a job, do you use the RocksDB state backend, of the 
>> FileSystem state backend?
>>   - Does it grow infinitely, or simply up a certain point and then goes down 
>> again?
>> 
>> Greetings,
>> Stephan
>> 
>> 
>> On Wed, Jul 20, 2016 at 5:58 PM, 김동일 > > wrote:
>> oh. my flink version is 1.0.3.
>> 
>> 
>> -- Forwarded message --
>> From: 김동일 mailto:kim.s...@gmail.com>>
>> Date: Thu, Jul 21, 2016 at 12:52 AM
>> Subject: taskmanager memory leak
>> To: user@flink.apache.org 
>> 
>> 
>> I've set up cluster(stand alone).
>> Taskmanager consumes memory over the Xmx property and it grows up 
>> continuously.
>> I saw this 
>> link(http://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3ccak2vtervsw4muboc4swix0mr6y9bijznjuypf6_f9f0g9-_...@mail.gmail.com%3E
>>  
>> ).
>> So i set the taskmanager.memory.preallocation value to true but that is not 
>> solution.
>> 
>> my java version is
>> java version "1.8.0_20"
>> Java(TM) SE Runtime Environment (build 1.8.0_20-b26)
>> Java HotSpot(TM) 64-Bit Server VM (build 25.20-b23, mixed mode)
>> 
>> and my flink-conf.yaml 
>> 
>>  
>> 
>>  
>> 
>>  
>> 
>>  
>> 
>>  
>> 
>>  
>> 
>>  
>> 
>>  
>> 
>>  
>> 

Re: Processing windows in event time order

2016-07-21 Thread Sameer W
Stream2 does send watermarks only after it sees elements C,D. It send the
watermark (5) 20 seconds after Stream 1 sends it.

>From what I understand Flink merges watermarks from both streams on the
Reduce side. But does it wait a certain pre-configured amount of time (for
watermarks from both streams to arrive) before it finally fires the first
stream.



On Thu, Jul 21, 2016 at 7:52 AM, Aljoscha Krettek 
wrote:

> Yes, that is to be expected. Stream 2 should only send the watermark once
> the elements with a timestamp lower than the watermark have been sent as
> well.
>
> On Thu, 21 Jul 2016 at 13:10 Sameer W  wrote:
>
>> Thanks, Aljoscha,
>>
>> This what I am seeing when I use Ascending timestamps as watermarks-
>>
>> Consider a window if 1-5 seconds
>> Stream 1- Sends Elements A,B
>>
>> Stream 2 (20 seconds later) - Sends Elements C,D
>>
>> I see Window (1-5) fires first with just A,B. After 20 seconds Window
>> (1-5) fires again but this time with only C,D. If I add a delay where I lag
>> the watermarks by 20 seconds, then only one instance of the Window (1-5)
>> fires with elements A,B,C,D.
>>
>> Sameer
>>
>> On Thu, Jul 21, 2016 at 5:17 AM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi David,
>>> windows are being processed in order of their end timestamp. So if you
>>> specify an allowed lateness of zero (which will only be possible on Flink
>>> 1.1 or by using a custom trigger) you should be able to sort the elements.
>>> The ordering is only valid within one key, though, since windows for
>>> different keys with the same end timestamp will be processed in an
>>> arbitrary order.
>>>
>>> @Sameer If both sources emit watermarks that are correct for the
>>> elements that they are emitting the Trigger should only fire when both
>>> sources progressed their watermarks sufficiently far. Could you maybe give
>>> a more detailed example of the problem that you described?
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>>
>>> On Thu, 21 Jul 2016 at 04:03 Sameer Wadkar  wrote:
>>>
 Hi,

 If watermarks arriving from multiple sources, how long does the Event
 Time Trigger wait for the slower source to send its watermarks before
 triggering only from the faster source? I have seen that if one of the
 sources is really slow then the elements of the faster source fires and
 when the elements arrive from the slower source, the same window fires
 again with the new elements only. I can work around this by adding delays
 but does merging watermarks require that both have arrived by the time the
 watermarks progress to the point where a window can be triggered? Is
 applying a delay in the watermark the only way to solve this.

 Sameer

 Sent from my iPhone

 On Jul 20, 2016, at 9:41 PM, Vishnu Viswanath <
 vishnu.viswanat...@gmail.com> wrote:

 Hi David,

 You are right, the events in the window are not sorted according to the
 EventTime hence the processing is not done in an increasing order of
 timestamp.
 As you said, you will have to do the sorting yourself in your window
 function to make sure that you are processing the events in order.

 What Flink does is (when EventTime is set and timestamp is assigned),
 it will assign the elements to the Windows based on the EventTime, which
 otherwise (if using ProcessingTime) might have ended up in a different
 Window. (as per the ProcessingTime).

 This is as per my limited knowledge, other Flink experts can correct me
 if this is wrong.

 Thanks,
 Vishnu

 On Wed, Jul 20, 2016 at 9:30 PM, David Desberg 
 wrote:

> Hi all,
>
> In Flink, after setting the time characteristic to event time and
> properly assigning timestamps/watermarks, time-based windows will be
> created based upon event time. If we need to process events within a 
> window
> in event time order, we can sort the windowed values and process as
> necessary by applying a WindowFunction. However, as I understand it, there
> is no guarantee that time-based windows will be processed in time order. 
> Is
> this correct? Or, if we assume a watermarking system that (for example's
> sake) does not allow any late events, is there a way within Flink to
> guarantee that windows will be processed (via an applied WindowFunction) 
> in
> strictly increasing time order?
>
> If necessary, I can provide a more concrete explanation of what I
> mean/am looking for.
>
> Thanks!
> David



>>


Re: Processing windows in event time order

2016-07-21 Thread Aljoscha Krettek
Yes, that is to be expected. Stream 2 should only send the watermark once
the elements with a timestamp lower than the watermark have been sent as
well.

On Thu, 21 Jul 2016 at 13:10 Sameer W  wrote:

> Thanks, Aljoscha,
>
> This what I am seeing when I use Ascending timestamps as watermarks-
>
> Consider a window if 1-5 seconds
> Stream 1- Sends Elements A,B
>
> Stream 2 (20 seconds later) - Sends Elements C,D
>
> I see Window (1-5) fires first with just A,B. After 20 seconds Window
> (1-5) fires again but this time with only C,D. If I add a delay where I lag
> the watermarks by 20 seconds, then only one instance of the Window (1-5)
> fires with elements A,B,C,D.
>
> Sameer
>
> On Thu, Jul 21, 2016 at 5:17 AM, Aljoscha Krettek 
> wrote:
>
>> Hi David,
>> windows are being processed in order of their end timestamp. So if you
>> specify an allowed lateness of zero (which will only be possible on Flink
>> 1.1 or by using a custom trigger) you should be able to sort the elements.
>> The ordering is only valid within one key, though, since windows for
>> different keys with the same end timestamp will be processed in an
>> arbitrary order.
>>
>> @Sameer If both sources emit watermarks that are correct for the elements
>> that they are emitting the Trigger should only fire when both sources
>> progressed their watermarks sufficiently far. Could you maybe give a more
>> detailed example of the problem that you described?
>>
>> Cheers,
>> Aljoscha
>>
>>
>> On Thu, 21 Jul 2016 at 04:03 Sameer Wadkar  wrote:
>>
>>> Hi,
>>>
>>> If watermarks arriving from multiple sources, how long does the Event
>>> Time Trigger wait for the slower source to send its watermarks before
>>> triggering only from the faster source? I have seen that if one of the
>>> sources is really slow then the elements of the faster source fires and
>>> when the elements arrive from the slower source, the same window fires
>>> again with the new elements only. I can work around this by adding delays
>>> but does merging watermarks require that both have arrived by the time the
>>> watermarks progress to the point where a window can be triggered? Is
>>> applying a delay in the watermark the only way to solve this.
>>>
>>> Sameer
>>>
>>> Sent from my iPhone
>>>
>>> On Jul 20, 2016, at 9:41 PM, Vishnu Viswanath <
>>> vishnu.viswanat...@gmail.com> wrote:
>>>
>>> Hi David,
>>>
>>> You are right, the events in the window are not sorted according to the
>>> EventTime hence the processing is not done in an increasing order of
>>> timestamp.
>>> As you said, you will have to do the sorting yourself in your window
>>> function to make sure that you are processing the events in order.
>>>
>>> What Flink does is (when EventTime is set and timestamp is assigned), it
>>> will assign the elements to the Windows based on the EventTime, which
>>> otherwise (if using ProcessingTime) might have ended up in a different
>>> Window. (as per the ProcessingTime).
>>>
>>> This is as per my limited knowledge, other Flink experts can correct me
>>> if this is wrong.
>>>
>>> Thanks,
>>> Vishnu
>>>
>>> On Wed, Jul 20, 2016 at 9:30 PM, David Desberg 
>>> wrote:
>>>
 Hi all,

 In Flink, after setting the time characteristic to event time and
 properly assigning timestamps/watermarks, time-based windows will be
 created based upon event time. If we need to process events within a window
 in event time order, we can sort the windowed values and process as
 necessary by applying a WindowFunction. However, as I understand it, there
 is no guarantee that time-based windows will be processed in time order. Is
 this correct? Or, if we assume a watermarking system that (for example's
 sake) does not allow any late events, is there a way within Flink to
 guarantee that windows will be processed (via an applied WindowFunction) in
 strictly increasing time order?

 If necessary, I can provide a more concrete explanation of what I
 mean/am looking for.

 Thanks!
 David
>>>
>>>
>>>
>


Re: taskmanager memory leak

2016-07-21 Thread Stephan Ewen
I don't know that answer, sorry. Maybe one of the others can chime in here.

Did you deactivate checkpointing (then it should not write to S3) and did
that resolve the leak?

Best,
Stephan


On Thu, Jul 21, 2016 at 12:52 PM, 김동일  wrote:

> Dear Stephan.
>
> I also suspect the s3.
> I’ve tried s3n, s3a.
> what is suitable library? I’m using aws-java-sdk-1.7.4 and
> hadoop-aws-2.7.2.
>
> Best regards.
>
> On Jul 21, 2016, at 5:54 PM, Stephan Ewen  wrote:
>
> Hi!
>
> There is a memory debugging logger, you can activate it like that:
>
> https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#memory-and-performance-debugging
>
> It will print which parts of the memory are growing.
>
> What you can also try is to deactivate checkpointing for one run and see
> if that solves it. If yes, then I suspect there is a memory leak in the s3
> library (are you using s3, s3a, or s3n?).
>
> Can you also check what libraries you are using? We have seen cases of
> memory leaks in the libraries people used.
>
> Greetings,
> Stephan
>
>
>
> On Thu, Jul 21, 2016 at 5:13 AM, 김동일  wrote:
>
>> hi. stephan.
>>
>> - Did you submit any job to the cluster, or is the memory just growing
>> even on an idle TaskManager?
>>
>> I have some stream job.
>>
>> - If you are running a job, do you use the RocksDB state backend, of the
>> FileSystem state backend?
>>
>> file state backend. i use s3.
>>
>> - Does it grow infinitely, or simply up a certain point and then goes
>> down again?
>>
>> I think it infinitely. kernel kills the process , oom.
>>
>>
>>
>> On Thu, Jul 21, 2016 at 3:52 AM Stephan Ewen  wrote:
>>
>>> Hi!
>>>
>>> In order to answer this, we need a bit more information. Here are some
>>> followup questions:
>>>
>>>   - Did you submit any job to the cluster, or is the memory just growing
>>> even on an idle TaskManager?
>>>   - If you are running a job, do you use the RocksDB state backend, of
>>> the FileSystem state backend?
>>>   - Does it grow infinitely, or simply up a certain point and then goes
>>> down again?
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Wed, Jul 20, 2016 at 5:58 PM, 김동일  wrote:
>>>
 oh. my flink version is 1.0.3.


 -- Forwarded message --
 From: 김동일 
 Date: Thu, Jul 21, 2016 at 12:52 AM
 Subject: taskmanager memory leak
 To: user@flink.apache.org


 I've set up cluster(stand alone).
 Taskmanager consumes memory over the Xmx property and it grows up
 continuously.
 I saw this link(
 http://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3ccak2vtervsw4muboc4swix0mr6y9bijznjuypf6_f9f0g9-_...@mail.gmail.com%3E
 ).
 So i set the taskmanager.memory.preallocation value to true but that is
 not solution.

 my java version is

 java version "1.8.0_20"
 Java(TM) SE Runtime Environment (build 1.8.0_20-b26)
 Java HotSpot(TM) 64-Bit Server VM (build 25.20-b23, mixed mode)

 and my flink-conf.yaml
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Re: Processing windows in event time order

2016-07-21 Thread Sameer W
Thanks, Aljoscha,

This what I am seeing when I use Ascending timestamps as watermarks-

Consider a window if 1-5 seconds
Stream 1- Sends Elements A,B

Stream 2 (20 seconds later) - Sends Elements C,D

I see Window (1-5) fires first with just A,B. After 20 seconds Window (1-5)
fires again but this time with only C,D. If I add a delay where I lag the
watermarks by 20 seconds, then only one instance of the Window (1-5) fires
with elements A,B,C,D.

Sameer

On Thu, Jul 21, 2016 at 5:17 AM, Aljoscha Krettek 
wrote:

> Hi David,
> windows are being processed in order of their end timestamp. So if you
> specify an allowed lateness of zero (which will only be possible on Flink
> 1.1 or by using a custom trigger) you should be able to sort the elements.
> The ordering is only valid within one key, though, since windows for
> different keys with the same end timestamp will be processed in an
> arbitrary order.
>
> @Sameer If both sources emit watermarks that are correct for the elements
> that they are emitting the Trigger should only fire when both sources
> progressed their watermarks sufficiently far. Could you maybe give a more
> detailed example of the problem that you described?
>
> Cheers,
> Aljoscha
>
>
> On Thu, 21 Jul 2016 at 04:03 Sameer Wadkar  wrote:
>
>> Hi,
>>
>> If watermarks arriving from multiple sources, how long does the Event
>> Time Trigger wait for the slower source to send its watermarks before
>> triggering only from the faster source? I have seen that if one of the
>> sources is really slow then the elements of the faster source fires and
>> when the elements arrive from the slower source, the same window fires
>> again with the new elements only. I can work around this by adding delays
>> but does merging watermarks require that both have arrived by the time the
>> watermarks progress to the point where a window can be triggered? Is
>> applying a delay in the watermark the only way to solve this.
>>
>> Sameer
>>
>> Sent from my iPhone
>>
>> On Jul 20, 2016, at 9:41 PM, Vishnu Viswanath <
>> vishnu.viswanat...@gmail.com> wrote:
>>
>> Hi David,
>>
>> You are right, the events in the window are not sorted according to the
>> EventTime hence the processing is not done in an increasing order of
>> timestamp.
>> As you said, you will have to do the sorting yourself in your window
>> function to make sure that you are processing the events in order.
>>
>> What Flink does is (when EventTime is set and timestamp is assigned), it
>> will assign the elements to the Windows based on the EventTime, which
>> otherwise (if using ProcessingTime) might have ended up in a different
>> Window. (as per the ProcessingTime).
>>
>> This is as per my limited knowledge, other Flink experts can correct me
>> if this is wrong.
>>
>> Thanks,
>> Vishnu
>>
>> On Wed, Jul 20, 2016 at 9:30 PM, David Desberg 
>> wrote:
>>
>>> Hi all,
>>>
>>> In Flink, after setting the time characteristic to event time and
>>> properly assigning timestamps/watermarks, time-based windows will be
>>> created based upon event time. If we need to process events within a window
>>> in event time order, we can sort the windowed values and process as
>>> necessary by applying a WindowFunction. However, as I understand it, there
>>> is no guarantee that time-based windows will be processed in time order. Is
>>> this correct? Or, if we assume a watermarking system that (for example's
>>> sake) does not allow any late events, is there a way within Flink to
>>> guarantee that windows will be processed (via an applied WindowFunction) in
>>> strictly increasing time order?
>>>
>>> If necessary, I can provide a more concrete explanation of what I
>>> mean/am looking for.
>>>
>>> Thanks!
>>> David
>>
>>
>>


Re: taskmanager memory leak

2016-07-21 Thread 김동일
Dear Stephan.

I also suspect the s3. 
I’ve tried s3n, s3a.
what is suitable library? I’m using aws-java-sdk-1.7.4 and hadoop-aws-2.7.2.

Best regards.

> On Jul 21, 2016, at 5:54 PM, Stephan Ewen  wrote:
> 
> Hi!
> 
> There is a memory debugging logger, you can activate it like that:
> https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#memory-and-performance-debugging
>  
> 
> 
> It will print which parts of the memory are growing.
> 
> What you can also try is to deactivate checkpointing for one run and see if 
> that solves it. If yes, then I suspect there is a memory leak in the s3 
> library (are you using s3, s3a, or s3n?).
> 
> Can you also check what libraries you are using? We have seen cases of memory 
> leaks in the libraries people used.
> 
> Greetings,
> Stephan
> 
> 
> 
> On Thu, Jul 21, 2016 at 5:13 AM, 김동일  > wrote:
> hi. stephan. 
> 
> - Did you submit any job to the cluster, or is the memory just growing even 
> on an idle TaskManager?
> 
> I have some stream job. 
> 
> - If you are running a job, do you use the RocksDB state backend, of the 
> FileSystem state backend?
> 
> file state backend. i use s3.
> 
> - Does it grow infinitely, or simply up a certain point and then goes down 
> again?
> 
> I think it infinitely. kernel kills the process , oom.
> 
> 
> 
> On Thu, Jul 21, 2016 at 3:52 AM Stephan Ewen  > wrote:
> Hi!
> 
> In order to answer this, we need a bit more information. Here are some 
> followup questions:
> 
>   - Did you submit any job to the cluster, or is the memory just growing even 
> on an idle TaskManager?
>   - If you are running a job, do you use the RocksDB state backend, of the 
> FileSystem state backend?
>   - Does it grow infinitely, or simply up a certain point and then goes down 
> again?
> 
> Greetings,
> Stephan
> 
> 
> On Wed, Jul 20, 2016 at 5:58 PM, 김동일  > wrote:
> oh. my flink version is 1.0.3.
> 
> 
> -- Forwarded message --
> From: 김동일 mailto:kim.s...@gmail.com>>
> Date: Thu, Jul 21, 2016 at 12:52 AM
> Subject: taskmanager memory leak
> To: user@flink.apache.org 
> 
> 
> I've set up cluster(stand alone).
> Taskmanager consumes memory over the Xmx property and it grows up 
> continuously.
> I saw this 
> link(http://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3ccak2vtervsw4muboc4swix0mr6y9bijznjuypf6_f9f0g9-_...@mail.gmail.com%3E
>  
> ).
> So i set the taskmanager.memory.preallocation value to true but that is not 
> solution.
> 
> my java version is
> java version "1.8.0_20"
> Java(TM) SE Runtime Environment (build 1.8.0_20-b26)
> Java HotSpot(TM) 64-Bit Server VM (build 25.20-b23, mixed mode)
> 
> and my flink-conf.yaml 
> 
>  
> 
>  
> 
>  
> 
>  
> 
>  
> 
>  
> 
>  
> 
>  
> 
>  
> 
>  
> 
>  
> 
>  
> 
>  
> 

DataStreamUtils conversion problem, showing varied results for same code

2016-07-21 Thread subash basnet
Hello all,

My task to cluster the stream of points around the centroids, I am using
DataStreamUtils to collect the stream and pass it on to the map function to
perform the necessary action. Below is the code:

DataStream points = newDataStream.map(new getPoints());
DataStream centroids = newCentroidDataStream.map(new
TupleCentroidConverter());

Iterator iter = *DataStreamUtils*.collect(centroids);
Collection collectionCentroids = Lists.newArrayList(iter);
DataStream newCentroids = points.map(new
SelectNearestCenter(collectionCentroids))
.map(new CountAppender()).keyBy(0).reduce(new
CentroidAccumulator()).map(new CentroidAverager());

Iterator iter1 = *DataStreamUtils*.collect(newCentroids);
Collection finalCentroidsCollection = Lists.newArrayList(iter1);
DataStream> clusteredPoints = points
// assign points to final clusters
.map(new SelectNearestCenter(finalCentroidsCollection));
*clusteredPoints*.print();

public static final class SelectNearestCenter extends
RichMapFunction> {
private Collection centroids;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
public SelectNearestCenter(Collection centroids) {
this.centroids = centroids;
}
@Override
public Tuple2 map(Point p) throws Exception {

double minDistance = Double.MAX_VALUE;
String closestCentroidId = "-1";
..
return new Tuple2(closestCentroidId, p);
}
}

Cases:
1. Waited for around 10mins, and the *clusteredPoints *got printed but with
centroid id as '-1' for all the points. And the execution ends after a
certain time, due to multiple execution since there is one already inside
the datastreamutil.
Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Job execution
failed.. How to get rid of this exception.

1> (-1, 121.86 121.87 121.8149 121.8149 60600.0)
4> (-1, 121.52 121.52 121.45 121.485 28800.0)
.

2. Waited for around 10mins, the *clusteredPoints *got printed with the
centroid id as desired shown below. The *clusteredPoint* also gets printed
in the console in streaming manner.  And throws no exception at all. The
streaming continues.

1> (Wed Jul 20 16:45:01 CEST 2016, 121.555 121.56 121.53 121.5385 69300.0)
1> (Wed Jul 20 18:19:00 CEST 2016, 121.8699 121.89 121.86 121.86 25700.0)
3> (Wed Jul 20 16:41:59 CEST 2016, 121.415 121.47 121.41 121.4658 38400.0)
1> (Wed Jul 20 18:13:59 CEST 2016, 121.86 121.87 121.8149 121.8149 60600.0)
4> (Wed Jul 20 16:43:59 CEST 2016, 121.52 121.52 121.45 121.485 28800.0)
3> (Wed Jul 20 18:16:59 CEST 2016, 121.8716 121.92 121.85 121.9141 64500.0)
4> (Wed Jul 20 18:15:00 CEST 2016, 121.92 121.92 121.88 121.88 53500.0)
4> (Wed Jul 20 18:12:04 CEST 2016, 121.82 121.82 121.74 121.74 43600.0)
..

3. The *clusteredPoints* is printed with the centroid id as desired in
streaming manner. But after certain duration the exception same as in case
1 is thrown and the program ends abruptly.

Why so much variation in result on executing the same code. Now, in case of
centroid id as '-1' in case 1, I would not be able to perform operations
later on as all the *clusteredPoints* have the same centroid id '-1' which
should have been rather timestamp as shown in case 2. How could be the
solution to this issue.

Best Regards,
Subash Basnet


Re: Processing windows in event time order

2016-07-21 Thread Aljoscha Krettek
Hi David,
windows are being processed in order of their end timestamp. So if you
specify an allowed lateness of zero (which will only be possible on Flink
1.1 or by using a custom trigger) you should be able to sort the elements.
The ordering is only valid within one key, though, since windows for
different keys with the same end timestamp will be processed in an
arbitrary order.

@Sameer If both sources emit watermarks that are correct for the elements
that they are emitting the Trigger should only fire when both sources
progressed their watermarks sufficiently far. Could you maybe give a more
detailed example of the problem that you described?

Cheers,
Aljoscha


On Thu, 21 Jul 2016 at 04:03 Sameer Wadkar  wrote:

> Hi,
>
> If watermarks arriving from multiple sources, how long does the Event Time
> Trigger wait for the slower source to send its watermarks before triggering
> only from the faster source? I have seen that if one of the sources is
> really slow then the elements of the faster source fires and when the
> elements arrive from the slower source, the same window fires again with
> the new elements only. I can work around this by adding delays but does
> merging watermarks require that both have arrived by the time the
> watermarks progress to the point where a window can be triggered? Is
> applying a delay in the watermark the only way to solve this.
>
> Sameer
>
> Sent from my iPhone
>
> On Jul 20, 2016, at 9:41 PM, Vishnu Viswanath <
> vishnu.viswanat...@gmail.com> wrote:
>
> Hi David,
>
> You are right, the events in the window are not sorted according to the
> EventTime hence the processing is not done in an increasing order of
> timestamp.
> As you said, you will have to do the sorting yourself in your window
> function to make sure that you are processing the events in order.
>
> What Flink does is (when EventTime is set and timestamp is assigned), it
> will assign the elements to the Windows based on the EventTime, which
> otherwise (if using ProcessingTime) might have ended up in a different
> Window. (as per the ProcessingTime).
>
> This is as per my limited knowledge, other Flink experts can correct me if
> this is wrong.
>
> Thanks,
> Vishnu
>
> On Wed, Jul 20, 2016 at 9:30 PM, David Desberg 
> wrote:
>
>> Hi all,
>>
>> In Flink, after setting the time characteristic to event time and
>> properly assigning timestamps/watermarks, time-based windows will be
>> created based upon event time. If we need to process events within a window
>> in event time order, we can sort the windowed values and process as
>> necessary by applying a WindowFunction. However, as I understand it, there
>> is no guarantee that time-based windows will be processed in time order. Is
>> this correct? Or, if we assume a watermarking system that (for example's
>> sake) does not allow any late events, is there a way within Flink to
>> guarantee that windows will be processed (via an applied WindowFunction) in
>> strictly increasing time order?
>>
>> If necessary, I can provide a more concrete explanation of what I mean/am
>> looking for.
>>
>> Thanks!
>> David
>
>
>


Re: Guava immutable collection kryo serialization

2016-07-21 Thread Stephan Ewen
Hi!

Custom Kryo Serializers can be shipped either as objects (must be
serializable) or as classes (can be non serializable, must have a default
constructor).

For non-serializable serializers, try to use: ExecutionConfig.
registerTypeWithKryoSerializer(Class type, Class>
serializerClass)

Stephan


On Thu, Jul 21, 2016 at 10:36 AM, Stefan Richter <
s.rich...@data-artisans.com> wrote:

> Hi,
>
> to answer this question, it would be helpful if you could provide the
> stacktrace of your exception and the code you use to register the
> serializer.
>
> Best,
> Stefan
>
> Am 21.07.2016 um 05:28 schrieb Shaosu Liu :
>
>
> Hi,
>
> How do I do Guava Immutable collections serialization in Flink?
>
> I am getting error
>
> Caused by: java.io.NotSerializableException:
> de.javakaffee.kryoserializers.guava.ImmutableMapSerializer
>
> when I register ImmutableMap to be serialized by the
> ImmutableMapSerializer. I am using the latest version of 0.38.
>
> --
> Cheers,
> Shaosu
>
>
>


Re: taskmanager memory leak

2016-07-21 Thread Stephan Ewen
Hi!

There is a memory debugging logger, you can activate it like that:
https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#memory-and-performance-debugging

It will print which parts of the memory are growing.

What you can also try is to deactivate checkpointing for one run and see if
that solves it. If yes, then I suspect there is a memory leak in the s3
library (are you using s3, s3a, or s3n?).

Can you also check what libraries you are using? We have seen cases of
memory leaks in the libraries people used.

Greetings,
Stephan



On Thu, Jul 21, 2016 at 5:13 AM, 김동일  wrote:

> hi. stephan.
>
> - Did you submit any job to the cluster, or is the memory just growing
> even on an idle TaskManager?
>
> I have some stream job.
>
> - If you are running a job, do you use the RocksDB state backend, of the
> FileSystem state backend?
>
> file state backend. i use s3.
>
> - Does it grow infinitely, or simply up a certain point and then goes down
> again?
>
> I think it infinitely. kernel kills the process , oom.
>
>
>
> On Thu, Jul 21, 2016 at 3:52 AM Stephan Ewen  wrote:
>
>> Hi!
>>
>> In order to answer this, we need a bit more information. Here are some
>> followup questions:
>>
>>   - Did you submit any job to the cluster, or is the memory just growing
>> even on an idle TaskManager?
>>   - If you are running a job, do you use the RocksDB state backend, of
>> the FileSystem state backend?
>>   - Does it grow infinitely, or simply up a certain point and then goes
>> down again?
>>
>> Greetings,
>> Stephan
>>
>>
>> On Wed, Jul 20, 2016 at 5:58 PM, 김동일  wrote:
>>
>>> oh. my flink version is 1.0.3.
>>>
>>>
>>> -- Forwarded message --
>>> From: 김동일 
>>> Date: Thu, Jul 21, 2016 at 12:52 AM
>>> Subject: taskmanager memory leak
>>> To: user@flink.apache.org
>>>
>>>
>>> I've set up cluster(stand alone).
>>> Taskmanager consumes memory over the Xmx property and it grows up
>>> continuously.
>>> I saw this link(
>>> http://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3ccak2vtervsw4muboc4swix0mr6y9bijznjuypf6_f9f0g9-_...@mail.gmail.com%3E
>>> ).
>>> So i set the taskmanager.memory.preallocation value to true but that is
>>> not solution.
>>>
>>> my java version is
>>>
>>> java version "1.8.0_20"
>>> Java(TM) SE Runtime Environment (build 1.8.0_20-b26)
>>> Java HotSpot(TM) 64-Bit Server VM (build 25.20-b23, mixed mode)
>>>
>>> and my flink-conf.yaml
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>>
>>> env.java.home: /usr/java/default
>>> jobmanager.rpc.address: internal.stream01.denma.ggportal.net
>>> jobmanager.rpc.port: 6123
>>> jobmanager.heap.mb: 

Re: Guava immutable collection kryo serialization

2016-07-21 Thread Stefan Richter
Hi,

to answer this question, it would be helpful if you could provide the 
stacktrace of your exception and the code you use to register the serializer.

Best,
Stefan

> Am 21.07.2016 um 05:28 schrieb Shaosu Liu :
> 
> 
> Hi,
> 
> How do I do Guava Immutable collections serialization in Flink?
> 
> I am getting error 
> 
> Caused by: java.io.NotSerializableException: 
> de.javakaffee.kryoserializers.guava.ImmutableMapSerializer
> 
> when I register ImmutableMap to be serialized by the ImmutableMapSerializer. 
> I am using the latest version of 0.38.
> 
> -- 
> Cheers,
> Shaosu



Re: Running multiple Flink Streaming Jobs, one by one

2016-07-21 Thread Stefan Richter
Hi,

the answer to this question depends on how you are starting the jobs. Do you 
have Java program that submits jobs in a loop that repeatedly calls 
StreamExecutionEnvironment.execute() or a shell script that submits jobs 
through the CLI? In both cases, the process should block (either on 
StreamExecutionEnvironment.execute() or on the CLI) until one job has 
terminated and then your loop can simply continue to submit the next job.

Best,
Stefan

> Am 20.07.2016 um 23:15 schrieb Biplob Biswas :
> 
> Hi,
> 
> I want to run test my flink streaming code, and thus I want to run flink
> streaming jobs with different parameters one by one. 
> 
> So, when one job finishes after it doesn't receive new data points for
> sometime , the next job with a different set of parameter should start.
> 
> For this, I am already stopping my iteration automatically if it doesn't
> receive data points for 5 seconds, but when the job terminates how can I
> know it has terminated and the next job should start automatically?
> 
> Can this be done? 
> 
> Thanks
> Biplob
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Running-multiple-Flink-Streaming-Jobs-one-by-one-tp8075.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.