Re: Understanding Sliding Windows

2016-04-26 Thread Piyush Shrivastava
Hello Dominik,
Thanks for the information. Since my window is getting triggered every 10 
seconds, the results I am getting before 5 minutes would be irrelevant as I 
need to consider data coming in every 5 minutes. Is there a way I can skip the 
results that are output before the first 5 minutes?
Thanks and Regards,Piyush Shrivastava
http://webograffiti.com
 

On Tuesday, 26 April 2016 8:54 PM, Dominik Choma  
wrote:
 

 Piyush,

You created sliding window witch is triggered every 10 seconds
Flink fires up this window every 10 seconds, without waiting at 5 min buffer to 
be filled up
It seems to me that first argument is rather "maximum data buffer retention" 
than " the initial threshold"

Dominik



Dominik

2016-04-26 12:16 GMT+02:00 Piyush Shrivastava :

Hi all,I wanted to know how exactly sliding windows produce results in 
Flink.Suppose I create a sliding window of 5 minutes which is refreshed in 
every 10 seconds:
.timeWindow(Time.minutes(5), Time.seconds(10))
So in every 10 seconds we are looking at data from the past 5 minutes. But what 
happens before the initial 5 minutes have passed?Suppose we start the 
computation at 10:00. At 10:05 we will get the result for 10:00-10:05. But what 
are the results which we get in between this? i.e. at 10:00:10, 10:00:20 and so 
on.Basically why do Flink start producing results before the initial threshold 
has passed? What do these results signify? Thanks and Regards,Piyush Shrivastava
http://webograffiti.com




  

Wildcards with --classpath parameter in CLI

2016-04-26 Thread Ken Krugler
Hi all,

If I want to include all of the jars in a directory, I thought I could do 
--classpath file://http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





Re: Discarding header from CSV file

2016-04-26 Thread nsengupta
Chiwan and other Flinksters,I am stuck with the following. Somehow, I am an
unable to spot the error, if any! Please help.*I have this case class*:case
class BuildingInformation(buildingID: Int, buildingManager: Int,
buildingAge: Int, productID: String, country: String)*I intend to read from
a CSV file which has a one-line
header*:BuildingID,BuildingMgr,BuildingAge,HVACproduct,Country*I attempt to
read the file in this manner*:private def readBuildingInfo(env:
ExecutionEnvironment, inputPath: String) = {env.readCsvFile
[BuildingInformation] (  inputPath,  ignoreFirstLine = true, 
pojoFields =
Array("buildingID","buildingManager","buildingAge","productID","country")   
)}*Then, I use this function in the driver's main()*:val envDefault =
ExecutionEnvironment.getExecutionEnvironmentval buildings =
readBuildingInfo(envDefault,"./SensorFiles/building.csv").collect().toListThe
'buildings' list is always *empty*!I fail to figure out, why! I have checked
that the path of the CSV file is correct and accessible. Also, I can read
the same stuff by following the usual method of reading as a text-line,
parsing the commas and creating the POJOs (case-classes).-- Nirmalya



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474p6477.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Discarding header from CSV file

2016-04-26 Thread nsengupta
Hello Chiwan,

I was just about to post to declare my ignorance, because I searched again
and realized that I failed to spot ReadCsvFile ! :-) You have been faster
than me!

Yes, I should use ReadCsvFile so that I get all the facilities built in.

Many thanks for pointing out.

-- N


[image: --]

Nirmalya Sengupta
[image: https://]about.me/sengupta.nirmalya



On Wed, Apr 27, 2016 at 7:19 AM, Chiwan Park-2 [via Apache Flink User
Mailing List archive.]  wrote:

> Hi, Nirmalya
>
> I recommend readCsvFile() method rather than readTextFile() to read CSV
> file. readCsvFile() provides some features for CSV file such as
> ignoreFirstLine() (what you are looking for), ignoreComments(), and etc.
>
> If you have to use readTextFile() method, I think, you can ignore column
> headers by calling zipWithIndex method and filtering it based on the index.
>
> Regards,
> Chiwan Park
>
> > On Apr 27, 2016, at 10:32 AM, nsengupta <[hidden email]
> > wrote:
> >
> > What is the recommended way of discarding the Column Header(s) from a
> CSV
> > file, if I am using
> >
> > /environment.readTextFile()
> > /
> > facility? Obviously, we don't know beforehand, which of the nodes will
> read
> > the Header(s)? So, we cannot use usual tricks like drop(1)?
> >
> > I don't recall well: has this been discussed and closed earlier in this
> > forum? If so, can someone point that out to me please?
> >
> > -- Nirmalya
> >
> >
> >
> > --
> > View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474.html
> > Sent from the Apache Flink User Mailing List archive. mailing list
> archive at Nabble.com.
>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474p6475.html
> To unsubscribe from Discarding header from CSV file, click here
> 
> .
> NAML
> 
>



-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474p6476.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Discarding header from CSV file

2016-04-26 Thread Chiwan Park
Hi, Nirmalya

I recommend readCsvFile() method rather than readTextFile() to read CSV file. 
readCsvFile() provides some features for CSV file such as ignoreFirstLine() 
(what you are looking for), ignoreComments(), and etc.

If you have to use readTextFile() method, I think, you can ignore column 
headers by calling zipWithIndex method and filtering it based on the index.

Regards,
Chiwan Park

> On Apr 27, 2016, at 10:32 AM, nsengupta  wrote:
> 
> What is the recommended way of discarding the Column Header(s) from a CSV
> file, if I am using
> 
> /environment.readTextFile()
> /
> facility? Obviously, we don't know beforehand, which of the nodes will read
> the Header(s)? So, we cannot use usual tricks like drop(1)?
> 
> I don't recall well: has this been discussed and closed earlier in this
> forum? If so, can someone point that out to me please?
> 
> -- Nirmalya
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Discarding header from CSV file

2016-04-26 Thread nsengupta
What is the recommended way of discarding the Column Header(s) from a CSV
file, if I am using

/environment.readTextFile()
/
facility? Obviously, we don't know beforehand, which of the nodes will read
the Header(s)? So, we cannot use usual tricks like drop(1)?

I don't recall well: has this been discussed and closed earlier in this
forum? If so, can someone point that out to me please?

-- Nirmalya



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: "No more bytes left" at deserialization

2016-04-26 Thread Timur Fayruzov
I built master with scala 2.11 and hadoop 2.7.1, now get a different
exception (still serialization-related though):

java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup
at
com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:162))
-> Filter (Filter at
com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:163))'
, caused an error: Error obtaining the sorted input: Thread 'SortMerger
Reading Thread' terminated due to an exception: Index: 97, Size: 11
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
Thread 'SortMerger Reading Thread' terminated due to an exception: Index:
97, Size: 11
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at
org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
terminated due to an exception: Index: 97, Size: 11
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 11
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.get(ArrayList.java:429)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
at
org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:75)
at
org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at
org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163)
at
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
at
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
at
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)



On Tue, Apr 26, 2016 at 9:07 AM, Till Rohrmann  wrote:

> Then let's keep finger crossed that we've found the culprit :-)
>
> On Tue, Apr 26, 2016 at 6:02 PM, Timur Fayruzov 
> wrote:
>
>> Thank you Till.
>>
>> I will try to run with new binaries today. As I have mentioned, the error
>> is reproducible only on a full dataset, so coming up with sample input data
>> may be problematic (not to mention that the real data can't be shared).
>> I'll see if I can replicate it, but could take a bit longer. Thank you very
>> much for your effort.
>>
>> On Tue, Apr 26, 2016 at 8:46 AM, Till Rohrmann 
>> wrote:
>>
>>> Hi Timur,
>>>
>>> I’ve got good and not so good news. Let’s start with the not so good
>>> news. I couldn’t reproduce your problem but the good news is that I found a
>>> bug in the duplication logic of the OptionSerializer. I’ve already
>>> committed a patch to the master to fix it.
>>>
>>> Thus, I wanted to ask you, whether you could try out the latest master
>>> and check whether your problem still persists. If that’s the case, could
>>> you send me your complete code with sample input data which reproduces your
>>> problem?
>>>
>>> Cheers,
>>> Till
>>> ​
>>>
>>> On Tue, Apr 26, 2016 at 10:55 AM, Aljoscha Krettek 
>>> wrote:
>>>
 Could this be caused by the disabled reference tracking in our Kryo
 serializer? From the stack trace it looks 

Re: Job hangs

2016-04-26 Thread Timur Fayruzov
Robert, Ufuk, logs, execution plan and a screenshot of the console are in
the archive:
https://www.dropbox.com/s/68gyl6f3rdzn7o1/debug-stuck.tar.gz?dl=0

Note that when I looked in the backpressure view I saw back pressure 'high'
on following paths:

Input->code_line:123,124->map->join
Input->code_line:134,135->map->join
Input->code_line:121->map->join

Unfortunately, I was not able to take thread dumps nor heap dumps (neither
kill -3, jstack nor jmap worked, some Amazon AMI problem I assume).

Hope that helps.

Please, let me know if I can assist you in any way. Otherwise, I probably
would not be actively looking at this problem.

Thanks,
Timur


On Tue, Apr 26, 2016 at 8:11 AM, Ufuk Celebi  wrote:

> Can you please further provide the execution plan via
>
> env.getExecutionPlan()
>
>
>
> On Tue, Apr 26, 2016 at 4:23 PM, Timur Fayruzov
>  wrote:
> > Hello Robert,
> >
> > I observed progress for 2 hours(meaning numbers change on dashboard), and
> > then I waited for 2 hours more. I'm sure it had to spill at some point,
> but
> > I figured 2h is enough time.
> >
> > Thanks,
> > Timur
> >
> > On Apr 26, 2016 1:35 AM, "Robert Metzger"  wrote:
> >>
> >> Hi Timur,
> >>
> >> thank you for sharing the source code of your job. That is helpful!
> >> Its a large pipeline with 7 joins and 2 co-groups. Maybe your job is
> much
> >> more IO heavy with the larger input data because all the joins start
> >> spilling?
> >> Our monitoring, in particular for batch jobs is really not very
> advanced..
> >> If we had some monitoring showing the spill status, we would maybe see
> that
> >> the job is still running.
> >>
> >> How long did you wait until you declared the job hanging?
> >>
> >> Regards,
> >> Robert
> >>
> >>
> >> On Tue, Apr 26, 2016 at 10:11 AM, Ufuk Celebi  wrote:
> >>>
> >>> No.
> >>>
> >>> If you run on YARN, the YARN logs are the relevant ones for the
> >>> JobManager and TaskManager. The client log submitting the job should
> >>> be found in /log.
> >>>
> >>> – Ufuk
> >>>
> >>> On Tue, Apr 26, 2016 at 10:06 AM, Timur Fayruzov
> >>>  wrote:
> >>> > I will do it my tomorrow. Logs don't show anything unusual. Are there
> >>> > any
> >>> > logs besides what's in flink/log and yarn container logs?
> >>> >
> >>> > On Apr 26, 2016 1:03 AM, "Ufuk Celebi"  wrote:
> >>> >
> >>> > Hey Timur,
> >>> >
> >>> > is it possible to connect to the VMs and get stack traces of the
> Flink
> >>> > processes as well?
> >>> >
> >>> > We can first have a look at the logs, but the stack traces will be
> >>> > helpful if we can't figure out what the issue is.
> >>> >
> >>> > – Ufuk
> >>> >
> >>> > On Tue, Apr 26, 2016 at 9:42 AM, Till Rohrmann  >
> >>> > wrote:
> >>> >> Could you share the logs with us, Timur? That would be very helpful.
> >>> >>
> >>> >> Cheers,
> >>> >> Till
> >>> >>
> >>> >> On Apr 26, 2016 3:24 AM, "Timur Fayruzov"  >
> >>> >> wrote:
> >>> >>>
> >>> >>> Hello,
> >>> >>>
> >>> >>> Now I'm at the stage where my job seem to completely hang. Source
> >>> >>> code is
> >>> >>> attached (it won't compile but I think gives a very good idea of
> what
> >>> >>> happens). Unfortunately I can't provide the datasets. Most of them
> >>> >>> are
> >>> >>> about
> >>> >>> 100-500MM records, I try to process on EMR cluster with 40 tasks
> 6GB
> >>> >>> memory
> >>> >>> for each.
> >>> >>>
> >>> >>> It was working for smaller input sizes. Any idea on what I can do
> >>> >>> differently is appreciated.
> >>> >>>
> >>> >>> Thans,
> >>> >>> Timur
> >>
> >>
> >
>


Re: [External] Re: Consuming Messages from Kafka

2016-04-26 Thread Robert Metzger
Hi Josh,

The JobManager log won't contain this output.

Check out these slides I did a while ago, they explain how you can retrieve
the logs from the TaskManagers:
http://www.slideshare.net/robertmetzger1/apache-flink-hands-on#14



On Tue, Apr 26, 2016 at 9:41 PM, Conlin, Joshua [USA]  wrote:

> “StringLogSink” just looks like:
>
> System.out.println(msg);
>
> LOG.info("Logging message: " + msg);
>
>
> And LOG is from slf4j.  In the Flink UI that is running on Yarn, I see no
> counts, nor log statements or stdout under JobManager.  It seems to make no
> difference if I submit the job through yarn via command line or the Flink
> UI session already running under yarn.  Where would you recommend I look in
> the Yarn containers?
>
>
> Thanks again for your help.
>
>
> Josh
>
> From: Robert Metzger 
> Reply-To: "user@flink.apache.org" 
> Date: Tuesday, April 26, 2016 at 3:30 PM
> To: "user@flink.apache.org" 
> Subject: [External] Re: Consuming Messages from Kafka
>
> Hi,
>
> the web interface is a good idea for checking if everything is working as
> expected. However in this case I expect the counts for the task be 0
> because the source and sink are chained together into one task (upcoming
> Flink releases will fix this behavior).
>
> I assume the "StringLogSink" is logging all incoming events. How do you do
> that? Using slf4j ? our by System.out.println?
> I'm asking to make sure you're looking at the right place to capture the
> output. It will be at the YARN containers.
>
> Regards,
> Robert
>
>
> On Tue, Apr 26, 2016 at 8:34 PM, Dominik Choma 
> wrote:
>
>> Hi,
>>
>> You can check if any messages are going through dataflow on flink web
>> dashboard
>> https://flink.apache.org/img/blog/new-dashboard-screenshot.png
>>
>>
>>
>> Dominik Choma
>>
>> Wiadomość napisana przez Conlin, Joshua [USA]  w
>> dniu 26 kwi 2016, o godz. 20:16:
>>
>> re messages being sent to Kafka on that topic, I just never see anything
>> in Flink.  Any help/insight you could provide would be greatly
>> appreciated.  If it makes a difference this is running on YARN.  Also,
>> here’s what I see in the logs:
>>
>>
>>
>


Re: [External] Re: Consuming Messages from Kafka

2016-04-26 Thread Conlin, Joshua [USA]
"StringLogSink" just looks like:


System.out.println(msg);

LOG.info("Logging message: " + msg);


And LOG is from slf4j.  In the Flink UI that is running on Yarn, I see no 
counts, nor log statements or stdout under JobManager.  It seems to make no 
difference if I submit the job through yarn via command line or the Flink UI 
session already running under yarn.  Where would you recommend I look in the 
Yarn containers?


Thanks again for your help.


Josh

From: Robert Metzger >
Reply-To: "user@flink.apache.org" 
>
Date: Tuesday, April 26, 2016 at 3:30 PM
To: "user@flink.apache.org" 
>
Subject: [External] Re: Consuming Messages from Kafka

Hi,

the web interface is a good idea for checking if everything is working as 
expected. However in this case I expect the counts for the task be 0 because 
the source and sink are chained together into one task (upcoming Flink releases 
will fix this behavior).

I assume the "StringLogSink" is logging all incoming events. How do you do 
that? Using slf4j ? our by System.out.println?
I'm asking to make sure you're looking at the right place to capture the 
output. It will be at the YARN containers.

Regards,
Robert


On Tue, Apr 26, 2016 at 8:34 PM, Dominik Choma 
> wrote:
Hi,

You can check if any messages are going through dataflow on flink web dashboard
https://flink.apache.org/img/blog/new-dashboard-screenshot.png



Dominik Choma

Wiadomość napisana przez Conlin, Joshua [USA] 
> w dniu 26 kwi 2016, o 
godz. 20:16:

re messages being sent to Kafka on that topic, I just never see anything in 
Flink.  Any help/insight you could provide would be greatly appreciated.  If it 
makes a difference this is running on YARN.  Also, here's what I see in the 
logs:




Re: Consuming Messages from Kafka

2016-04-26 Thread Dominik Choma
Hi,

You can check if any messages are going through dataflow on flink web dashboard
https://flink.apache.org/img/blog/new-dashboard-screenshot.png 




Dominik Choma

> Wiadomość napisana przez Conlin, Joshua [USA]  w dniu 
> 26 kwi 2016, o godz. 20:16:
> 
> re messages being sent to Kafka on that topic, I just never see anything in 
> Flink.  Any help/insight you could provide would be greatly appreciated.  If 
> it makes a difference this is running on YARN.  Also, here’s what I see in 
> the logs:



Consuming Messages from Kafka

2016-04-26 Thread Conlin, Joshua [USA]
Hello,

I am new to Flink and trying to learn this framework.  Seems great so far.  I 
am trying to translate my existing storm Topology to a Flink job and I am 
having trouble consuming data from Kafka.  Here's what my Job looks like:


public static void main(String[] args) throws Exception {

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", "hostname:port");


properties.setProperty("group.id", "stream-test");

properties.setProperty("client.id", "test-flink");

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource kafkaStream = env.addSource(new 
FlinkKafkaConsumer09<>("test", new SimpleStringSchema(), properties));


kafkaStream.addSink(new StringLogSink());

env.execute();


}


There are messages being sent to Kafka on that topic, I just never see anything 
in Flink.  Any help/insight you could provide would be greatly appreciated.  If 
it makes a difference this is running on YARN.  Also, here's what I see in the 
logs:


2016-04-26 18:02:38,707 INFO  org.apache.kafka.common.utils.AppInfoParser   
- Kafka version : 0.9.0.1
2016-04-26 18:02:38,707 INFO  org.apache.kafka.common.utils.AppInfoParser   
- Kafka commitId : 23c69d62a0cabf06
2016-04-26 18:02:38,708 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09  - Trying to 
get partitions for topic test
2016-04-26 18:02:38,854 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09  - Got 1 
partitions from these topics: [test]
2016-04-26 18:02:38,854 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer 
is going to read the following topics (with number of partitions): test (1),
2016-04-26 18:02:38,933 INFO  org.apache.flink.yarn.YarnJobManager  
- Submitting job 0ab4248d8917e707a8f297420e4c564d ().
2016-04-26 18:02:38,934 INFO  org.apache.flink.yarn.YarnJobManager  
- Using restart strategy NoRestartStrategy for 
0ab4248d8917e707a8f297420e4c564d.
2016-04-26 18:02:38,935 INFO  org.apache.flink.yarn.YarnJobManager  
- Scheduling job 0ab4248d8917e707a8f297420e4c564d ().
2016-04-26 18:02:38,935 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
Source -> Sink: Unnamed (1/1) (9bd0abdf59a1af934527257a990910ff) switched from 
CREATED to SCHEDULED
2016-04-26 18:02:38,935 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
Source -> Sink: Unnamed (1/1) (9bd0abdf59a1af934527257a990910ff) switched from 
SCHEDULED to DEPLOYING
2016-04-26 18:02:38,935 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Deploying 
Source: Custom Source -> Sink: Unnamed (1/1) (attempt #0) to ip-10-167-233-231
2016-04-26 18:02:38,936 INFO  org.apache.flink.yarn.YarnJobManager  
- Status of job 0ab4248d8917e707a8f297420e4c564d () changed to 
RUNNING.
2016-04-26 18:02:39,151 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
Source -> Sink: Unnamed (1/1) (9bd0abdf59a1af934527257a990910ff) switched from 
DEPLOYING to RUNNING

Thanks,

Josh


Re: Initializing global data

2016-04-26 Thread Nirmalya Sengupta
Hello Stefano ,

Thanks for sharing your views. Now, that you make me think, I know that
your recommendation works well.

I will go ahead, following your suggestions.

-- Nirmalya

-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."


[image: --]

Nirmalya Sengupta
[image: https://]about.me/sengupta.nirmalya



Re: classpath issue on yarn

2016-04-26 Thread Robert Metzger
Hi Aris,

Did you build the 1.0.2 flink-dist yourself?
If not, which exact version did you download?
For example this file:
http://www.apache.org/dyn/closer.lua/flink/flink-1.0.2/flink-1.0.2-bin-hadoop2-scala_2.11.tgz
has a clean flink-dist jar.



On Tue, Apr 26, 2016 at 12:28 PM, aris kol  wrote:

> Hi guys,
>
> I ran into a weird classpath issue while running a streaming job on a yarn
> cluster.
> I have a relatively simple flow that reads data from kafka, does a few
> manipulations and then indexes them on Elasticsearch (2.3).
>
> I use the elasticsearch2 connector (1.1-SNAPSHOT) (bleeding edge, I know).
>
> The stream works fine in a local flink node (1.0.2) (reading from remote
> kafka and writing to remote es).
> However, when deployed to the remote YARN cluster (again, flink 1.0.2) the
> following exception is thrown:
> ```
> 04/26/2016 10:07:30 Source: Custom Source -> Flat Map -> Sink:
> Unnamed(1/8) switched to FAILED
> java.lang.NoSuchMethodError:
> com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
> at org.elasticsearch.threadpool.ThreadPool.(ThreadPool.java:190)
> at
> org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
> at
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
>
> 04/26/2016 10:07:30 Job execution switched to status FAILING.
> java.lang.NoSuchMethodError:
> com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
> at org.elasticsearch.threadpool.ThreadPool.(ThreadPool.java:190)
> at
> org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
> at
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> 04/26/2016 10:07:30 Source: Custom Source -> Flat Map -> Sink:
> Unnamed(7/8) switched to FAILED
> java.lang.NoClassDefFoundError: Could not initialize class
> org.elasticsearch.threadpool.ThreadPool
> at
> org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
> at
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> ```
>
> I rebuilt the fat jar (I use sbt) many times and in my fat jar there is no
> trace of the old guava `MoreExecutor` class that doesn't contain the 
> `directExecutor`
> method the transport client needs.
> `lib/flink-dist_2.11-1.0.2.jar` unfortunately contains both: the newest
> class coming from guava 18 and an old one introduced probably by some
> ancient hadoop dependency. For some reason the old version takes precedence.
>
> In Spark, I used to configure spark.driver.userClassPathFirst true
> and those problems were usually dealt with. Is there anything similar?
> Any ideas?
>
> Thanks,
> Aris
>


Re: Getting java.lang.Exception when try to fetch data from Kafka

2016-04-26 Thread prateek arora
Hi Robert ,

Hi

I have java program to send data into kafka topic. below is code for this :

private Producer producer = null

Serializer keySerializer = new StringSerializer();
Serializer valueSerializer = new ByteArraySerializer();
producer = new KafkaProducer(props, keySerializer,
valueSerializer);

ProducerRecord imageRecord;
imageRecord = new ProducerRecord(streamInfo.topic,
Integer.toString(messageKey), imageData);

producer.send(imageRecord);


then trying to fetch data in Apache flink .

Regards
Prateek

On Mon, Apr 25, 2016 at 2:42 AM, Robert Metzger  wrote:

> Hi Prateek,
>
> were the messages written to the Kafka topic by Flink, using the
> TypeInformationKeyValueSerializationSchema ? If not, maybe the Flink
> deserializers expect a different data format of the messages in the topic.
>
> How are the messages written into the topic?
>
>
> On Fri, Apr 22, 2016 at 10:21 PM, prateekarora  > wrote:
>
>>
>> Hi
>>
>> I am sending data using kafkaProducer API
>>
>>imageRecord = new ProducerRecord> byte[]>(topic,messageKey, imageData);
>> producer.send(imageRecord);
>>
>>
>> And in flink program  try to fect data using FlinkKafkaConsumer08 . below
>> are the sample code .
>>
>> def main(args: Array[String]) {
>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>   val properties = new Properties()
>>   properties.setProperty("bootstrap.servers", ":9092")
>>   properties.setProperty("zookeeper.connect", ":2181")
>>   properties.setProperty("group.id", "test")
>>
>>   val readSchema = new
>>
>> TypeInformationKeyValueSerializationSchema[String,Array[Byte]](classOf[String],classOf[Array[Byte]],
>>
>> env.getConfig).asInstanceOf[KeyedDeserializationSchema[(String,Array[Byte])]]
>>
>>   val stream : DataStream[(String,Array[Byte])]  =
>> env.addSource(new
>> FlinkKafkaConsumer08[(String,Array[Byte])]("a-0",readSchema, properties))
>>
>>   stream.print
>>   env.execute("Flink Kafka Example")
>>   }
>>
>>
>> but getting  below error :
>>
>> 16/04/22 13:43:39 INFO ExecutionGraph: Source: Custom Source -> Sink:
>> Unnamed (1/4) (d7a151560f6eabdc587a23dc0975cb84) switched from RUNNING to
>> FAILED
>> 16/04/22 13:43:39 INFO ExecutionGraph: Source: Custom Source -> Sink:
>> Unnamed (2/4) (d43754a27e402ed5b02a73d1c9aa3125) switched from RUNNING to
>> CANCELING
>>
>> java.lang.Exception
>> at
>>
>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222)
>> at
>>
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316)
>> at
>>
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>> at
>>
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>> at
>>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.EOFException
>> at
>>
>> org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:298)
>> at org.apache.flink.types.StringValue.readString(StringValue.java:771)
>> at
>>
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
>> at
>>
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
>> at
>>
>> org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:105)
>> at
>>
>> org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:39)
>> at
>>
>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:657)
>>
>>
>> Regards
>> Prateek
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-java-lang-Exception-when-try-to-fetch-data-from-Kafka-tp6365.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>


Re: Return unique counter using groupReduceFunction

2016-04-26 Thread Fabian Hueske
Hi Biplob,

Flink is a distributed, data parallel system which means that there are
several instances of you ReduceFunction running in parallel, each with its
own timestamp counter.
If you want to have a unique timestamp, you have to set the parallelism of
the reduce operator to 1, but then the program might become inefficient.

Maybe DataSetUtils.zipWithIndex() or DataSetUtils.zipWithUniqueId() are
helpful for you use case.

Best, Fabian


2016-04-26 17:12 GMT+02:00 Biplob Biswas :

> Hi,
>
> I am using a groupreduce function to aggregate the content of the objects
> but at the same time i need to return a unique counter from the function
> but
> my attempts are failing and the identifiers are somehow very random and
> getting duplicated.
>
> Following is the part of my code which is supposed to generate a unique
> counter and return it with out.collect.
>
>
> public static class sumReducer implements
> GroupReduceFunction,
> Tuple5 Long, Long>> {
>
> double sum[] = null;
> double sumOfSquare[] = null;
> long timestamp = 0;
> @Override
> public void reduce(Iterable> in,
> Collector> out)
> throws Exception {
>
> int id = 0;
> long count = 0;
> boolean flag = true;
> for(Tuple2 i:in)
> {
> if(flag)
> {
> timestamp++;
> System.out.println("uniqueid: " +
> i.f0 + ", t: " + timestamp );
> sum = new double[i.f1.pt.length];
> sumOfSquare = new
> double[sum.length];
> id = i.f0;
> for(int j=0;j {
> sum[j] = i.f1.pt[j];
> sumOfSquare[j] = i.f1.pt[j]
> * i.f1.pt[j];
> }
> flag = false;
> }
> else
> {
> int len = i.f1.pt.length;
> for(int j=0;jlen;j++)
> {
> sum[j] += i.f1.pt[j];
> sumOfSquare[j] += (i.f1.pt[j]
> * i.f1.pt[j]);
> }
> }
> count++;
> }
> out.collect(new Tuple5Integer,Point, Point,
> Long, Long>(id,new
> Point(sum), new Point(sumOfSquare),count, timestamp));
> }
>
> I want the timestamp to be unique, but even though the code
> "System.out.println("uniqueid: " + i.f0 + ", t: " + timestamp );" executes
> once for each of the identifier (given by i.f0) by which it is grouped and
> then the groupReducce function is called still I get the following output
> for the above println statement.
>
> uniqueid: 2, t: 1
> uniqueid: 1, t: 1
> uniqueid: 7, t: 2
> uniqueid: 9, t: 3
> uniqueid: 6, t: 2
> uniqueid: 3, t: 1
> uniqueid: 5, t: 2
> uniqueid: 8, t: 3
>
> I dont really get why I am getting this discrepancy, probably I am missing
> some Flink concept, I am relatively very new to the flink platform and any
> help is appreciated. Thanks a lot.
>
> Thanks and Regards
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Return-unique-counter-using-groupReduceFunction-tp6452.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Flink first() operator

2016-04-26 Thread Fabian Hueske
Actually, memory should not be a problem since the full data set would not
be materialized in memory.
Flink has a streaming runtime so most of the data would be immediately
filtered out.
However, reading the whole file causes of course a lot of unnecessary IO.

2016-04-26 17:09 GMT+02:00 Biplob Biswas :

> Thanks, I was looking into the Textinputformat you suggested, and would get
> back to it once I start working with huge files. I would assume there's no
> workaround or additonal parameters to the readscvfile function so as to
> restrict the number of lines read in one go as reading a big file would be
> a
> big problem in terms of memory.
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-first-operator-tp6377p6451.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Regarding Broadcast of datasets in streaming context

2016-04-26 Thread Biplob Biswas
Hi, I have yet another question, this time maintaining a global list of
centroids. 

I am trying to implement the clustream algorithm and for that purpose I have
the initial set of centres in a flink dataset. Now I need to update the set
of centres for every data tuple that comes from the stream. From what I have
read so far on 2 different posts having similar questions, is that, in case
of streaming datasets the co-map operator was asked to use and retrieve them
in 2 separate map functions.

My idea is to broadcast the dataset in each flink partition and whenever a
data tuple is mapped to a partition using a map function, update the
broadcasted dataset.
But as this is currently not possible, thus I was thinking to broadcast the
datastream using 

"ds.broadcast()"

so that every partition receives the streamed tuple. Then, use a normal
flatmap function for the centres and use the broadcasted tuple to update the
centres and return the updated set of centres.

My question is, would this work? If yes, may someone give an example of the
datastream broadcast function and how to retrieve the broadcasted stream in
a map function?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: "No more bytes left" at deserialization

2016-04-26 Thread Till Rohrmann
Then let's keep finger crossed that we've found the culprit :-)

On Tue, Apr 26, 2016 at 6:02 PM, Timur Fayruzov 
wrote:

> Thank you Till.
>
> I will try to run with new binaries today. As I have mentioned, the error
> is reproducible only on a full dataset, so coming up with sample input data
> may be problematic (not to mention that the real data can't be shared).
> I'll see if I can replicate it, but could take a bit longer. Thank you very
> much for your effort.
>
> On Tue, Apr 26, 2016 at 8:46 AM, Till Rohrmann 
> wrote:
>
>> Hi Timur,
>>
>> I’ve got good and not so good news. Let’s start with the not so good
>> news. I couldn’t reproduce your problem but the good news is that I found a
>> bug in the duplication logic of the OptionSerializer. I’ve already
>> committed a patch to the master to fix it.
>>
>> Thus, I wanted to ask you, whether you could try out the latest master
>> and check whether your problem still persists. If that’s the case, could
>> you send me your complete code with sample input data which reproduces your
>> problem?
>>
>> Cheers,
>> Till
>> ​
>>
>> On Tue, Apr 26, 2016 at 10:55 AM, Aljoscha Krettek 
>> wrote:
>>
>>> Could this be caused by the disabled reference tracking in our Kryo
>>> serializer? From the stack trace it looks like its failing when trying to
>>> deserialize the traits that are wrapped in Options.
>>>
>>> On Tue, 26 Apr 2016 at 10:09 Ufuk Celebi  wrote:
>>>
 Hey Timur,

 I'm sorry about this bad experience.

 From what I can tell, there is nothing unusual with your code. It's
 probably an issue with Flink.

 I think we have to wait a little longer to hear what others in the
 community say about this.

 @Aljoscha, Till, Robert: any ideas what might cause this?

 – Ufuk


 On Mon, Apr 25, 2016 at 6:50 PM, Timur Fayruzov
  wrote:
 > Still trying to resolve this serialization issue. I was able to hack
 it by
 > 'serializing' `Record` to String and then 'deserializing' it in
 coGroup, but
 > boy its so ugly.
 >
 > So the bug is that it can't deserialize the case class that has the
 > structure (slightly different and more detailed than I stated above):
 > ```
 > case class Record(name: Name, phone: Option[Phone], address:
 > Option[Address])
 >
 > case class Name(givenName: Option[String], middleName: Option[String],
 > familyName: Option[String], generationSuffix: Option[String] = None)
 >
 > trait Address{
 >   val city: String
 >   val state: String
 >   val country: String
 >   val latitude: Double
 >   val longitude: Double
 >   val postalCode: String
 >   val zip4: String
 >   val digest: String
 > }
 >
 >
 > case class PoBox(city: String,
 >  state: String,
 >  country: String,
 >  latitude: Double,
 >  longitude: Double,
 >  postalCode: String,
 >  zip4: String,
 >  digest: String,
 >  poBox: String
 > ) extends Address
 >
 > case class PostalAddress(city: String,
 >  state: String,
 >  country: String,
 >  latitude: Double,
 >  longitude: Double,
 >  postalCode: String,
 >  zip4: String,
 >  digest: String,
 >  preDir: String,
 >  streetName: String,
 >  streetType: String,
 >  postDir: String,
 >  house: String,
 >  aptType: String,
 >  aptNumber: String
 > ) extends Address
 > ```
 >
 > I would expect that serialization is one of Flink cornerstones and
 should be
 > well tested, so there is a high chance of me doing things wrongly,
 but I
 > can't really find anything unusual in my code.
 >
 > Any suggestion what to try is highly welcomed.
 >
 > Thanks,
 > Timur
 >
 >
 > On Sun, Apr 24, 2016 at 3:26 AM, Timur Fayruzov <
 timur.fairu...@gmail.com>
 > wrote:
 >>
 >> Hello Robert,
 >>
 >> I'm on 1.0.0 compiled with Scala 2.11. The second exception was an
 issue
 >> with a cluster (that I didn't dig into), when I restarted the
 cluster I was
 >> able to go past it, so now I have the following exception:
 >>
 >> java.lang.Exception: The data preparation for task 'CHAIN CoGroup
 (CoGroup
 >> at
 >>
 

Re: "No more bytes left" at deserialization

2016-04-26 Thread Till Rohrmann
Hi Timur,

I’ve got good and not so good news. Let’s start with the not so good news.
I couldn’t reproduce your problem but the good news is that I found a bug
in the duplication logic of the OptionSerializer. I’ve already committed a
patch to the master to fix it.

Thus, I wanted to ask you, whether you could try out the latest master and
check whether your problem still persists. If that’s the case, could you
send me your complete code with sample input data which reproduces your
problem?

Cheers,
Till
​

On Tue, Apr 26, 2016 at 10:55 AM, Aljoscha Krettek 
wrote:

> Could this be caused by the disabled reference tracking in our Kryo
> serializer? From the stack trace it looks like its failing when trying to
> deserialize the traits that are wrapped in Options.
>
> On Tue, 26 Apr 2016 at 10:09 Ufuk Celebi  wrote:
>
>> Hey Timur,
>>
>> I'm sorry about this bad experience.
>>
>> From what I can tell, there is nothing unusual with your code. It's
>> probably an issue with Flink.
>>
>> I think we have to wait a little longer to hear what others in the
>> community say about this.
>>
>> @Aljoscha, Till, Robert: any ideas what might cause this?
>>
>> – Ufuk
>>
>>
>> On Mon, Apr 25, 2016 at 6:50 PM, Timur Fayruzov
>>  wrote:
>> > Still trying to resolve this serialization issue. I was able to hack it
>> by
>> > 'serializing' `Record` to String and then 'deserializing' it in
>> coGroup, but
>> > boy its so ugly.
>> >
>> > So the bug is that it can't deserialize the case class that has the
>> > structure (slightly different and more detailed than I stated above):
>> > ```
>> > case class Record(name: Name, phone: Option[Phone], address:
>> > Option[Address])
>> >
>> > case class Name(givenName: Option[String], middleName: Option[String],
>> > familyName: Option[String], generationSuffix: Option[String] = None)
>> >
>> > trait Address{
>> >   val city: String
>> >   val state: String
>> >   val country: String
>> >   val latitude: Double
>> >   val longitude: Double
>> >   val postalCode: String
>> >   val zip4: String
>> >   val digest: String
>> > }
>> >
>> >
>> > case class PoBox(city: String,
>> >  state: String,
>> >  country: String,
>> >  latitude: Double,
>> >  longitude: Double,
>> >  postalCode: String,
>> >  zip4: String,
>> >  digest: String,
>> >  poBox: String
>> > ) extends Address
>> >
>> > case class PostalAddress(city: String,
>> >  state: String,
>> >  country: String,
>> >  latitude: Double,
>> >  longitude: Double,
>> >  postalCode: String,
>> >  zip4: String,
>> >  digest: String,
>> >  preDir: String,
>> >  streetName: String,
>> >  streetType: String,
>> >  postDir: String,
>> >  house: String,
>> >  aptType: String,
>> >  aptNumber: String
>> > ) extends Address
>> > ```
>> >
>> > I would expect that serialization is one of Flink cornerstones and
>> should be
>> > well tested, so there is a high chance of me doing things wrongly, but I
>> > can't really find anything unusual in my code.
>> >
>> > Any suggestion what to try is highly welcomed.
>> >
>> > Thanks,
>> > Timur
>> >
>> >
>> > On Sun, Apr 24, 2016 at 3:26 AM, Timur Fayruzov <
>> timur.fairu...@gmail.com>
>> > wrote:
>> >>
>> >> Hello Robert,
>> >>
>> >> I'm on 1.0.0 compiled with Scala 2.11. The second exception was an
>> issue
>> >> with a cluster (that I didn't dig into), when I restarted the cluster
>> I was
>> >> able to go past it, so now I have the following exception:
>> >>
>> >> java.lang.Exception: The data preparation for task 'CHAIN CoGroup
>> (CoGroup
>> >> at
>> >>
>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:158))
>> >> -> Filter (Filter at
>> >>
>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:159))'
>> >> , caused an error: Error obtaining the sorted input: Thread 'SortMerger
>> >> Reading Thread' terminated due to an exception: Serializer consumed
>> more
>> >> bytes than the record had. This indicates broken serialization. If you
>> are
>> >> using custom serialization types (Value or Writable), check their
>> >> serialization methods. If you are using a Kryo-serialized type, check
>> the
>> >> corresponding Kryo serializer.
>> >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
>> >> at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> >> at 

Return unique counter using groupReduceFunction

2016-04-26 Thread Biplob Biswas
Hi,

I am using a groupreduce function to aggregate the content of the objects
but at the same time i need to return a unique counter from the function but
my attempts are failing and the identifiers are somehow very random and
getting duplicated.

Following is the part of my code which is supposed to generate a unique
counter and return it with out.collect.


public static class sumReducer implements 
GroupReduceFunction, Tuple5> {

double sum[] = null;
double sumOfSquare[] = null;
long timestamp = 0;
@Override
public void reduce(Iterable> in,
Collector> out)
throws Exception {

int id = 0;
long count = 0;
boolean flag = true;
for(Tuple2 i:in)
{
if(flag)
{
timestamp++;
System.out.println("uniqueid: " + i.f0 
+ ", t: " + timestamp );
sum = new double[i.f1.pt.length];
sumOfSquare = new double[sum.length];
id = i.f0;
for(int j=0;j(id,new
Point(sum), new Point(sumOfSquare),count, timestamp));  
}

I want the timestamp to be unique, but even though the code
"System.out.println("uniqueid: " + i.f0 + ", t: " + timestamp );" executes
once for each of the identifier (given by i.f0) by which it is grouped and
then the groupReducce function is called still I get the following output
for the above println statement.

uniqueid: 2, t: 1
uniqueid: 1, t: 1
uniqueid: 7, t: 2
uniqueid: 9, t: 3
uniqueid: 6, t: 2
uniqueid: 3, t: 1
uniqueid: 5, t: 2
uniqueid: 8, t: 3

I dont really get why I am getting this discrepancy, probably I am missing
some Flink concept, I am relatively very new to the flink platform and any
help is appreciated. Thanks a lot.

Thanks and Regards



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Return-unique-counter-using-groupReduceFunction-tp6452.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Understanding Sliding Windows

2016-04-26 Thread Dominik Choma
Piyush,

You created sliding window witch is triggered every 10 seconds
Flink fires up this window every 10 seconds, without waiting at 5 min
buffer to be filled up
It seems to me that first argument is rather "maximum data buffer
retention" than " the initial threshold"

Dominik



Dominik

2016-04-26 12:16 GMT+02:00 Piyush Shrivastava :

> Hi all,
> I wanted to know how exactly sliding windows produce results in Flink.
> Suppose I create a sliding window of 5 minutes which is refreshed in every
> 10 seconds:
>
> .timeWindow(Time.minutes(5), Time.seconds(10))
>
> So in every 10 seconds we are looking at data from the past 5 minutes. But
> what happens before the initial 5 minutes have passed?
> Suppose we start the computation at 10:00. At 10:05 we will get the result
> for 10:00-10:05. But what are the results which we get in between this?
> i.e. at 10:00:10, 10:00:20 and so on.
> Basically why do Flink start producing results before the initial
> threshold has passed? What do these results signify?
>
> Thanks and Regards,
> Piyush Shrivastava 
> [image: WeboGraffiti]
> http://webograffiti.com
>


Re: Job hangs

2016-04-26 Thread Timur Fayruzov
Hello Robert,

I observed progress for 2 hours(meaning numbers change on dashboard), and
then I waited for 2 hours more. I'm sure it had to spill at some point, but
I figured 2h is enough time.

Thanks,
Timur
On Apr 26, 2016 1:35 AM, "Robert Metzger"  wrote:

> Hi Timur,
>
> thank you for sharing the source code of your job. That is helpful!
> Its a large pipeline with 7 joins and 2 co-groups. Maybe your job is much
> more IO heavy with the larger input data because all the joins start
> spilling?
> Our monitoring, in particular for batch jobs is really not very advanced..
> If we had some monitoring showing the spill status, we would maybe see that
> the job is still running.
>
> How long did you wait until you declared the job hanging?
>
> Regards,
> Robert
>
>
> On Tue, Apr 26, 2016 at 10:11 AM, Ufuk Celebi  wrote:
>
>> No.
>>
>> If you run on YARN, the YARN logs are the relevant ones for the
>> JobManager and TaskManager. The client log submitting the job should
>> be found in /log.
>>
>> – Ufuk
>>
>> On Tue, Apr 26, 2016 at 10:06 AM, Timur Fayruzov
>>  wrote:
>> > I will do it my tomorrow. Logs don't show anything unusual. Are there
>> any
>> > logs besides what's in flink/log and yarn container logs?
>> >
>> > On Apr 26, 2016 1:03 AM, "Ufuk Celebi"  wrote:
>> >
>> > Hey Timur,
>> >
>> > is it possible to connect to the VMs and get stack traces of the Flink
>> > processes as well?
>> >
>> > We can first have a look at the logs, but the stack traces will be
>> > helpful if we can't figure out what the issue is.
>> >
>> > – Ufuk
>> >
>> > On Tue, Apr 26, 2016 at 9:42 AM, Till Rohrmann 
>> wrote:
>> >> Could you share the logs with us, Timur? That would be very helpful.
>> >>
>> >> Cheers,
>> >> Till
>> >>
>> >> On Apr 26, 2016 3:24 AM, "Timur Fayruzov" 
>> >> wrote:
>> >>>
>> >>> Hello,
>> >>>
>> >>> Now I'm at the stage where my job seem to completely hang. Source
>> code is
>> >>> attached (it won't compile but I think gives a very good idea of what
>> >>> happens). Unfortunately I can't provide the datasets. Most of them are
>> >>> about
>> >>> 100-500MM records, I try to process on EMR cluster with 40 tasks 6GB
>> >>> memory
>> >>> for each.
>> >>>
>> >>> It was working for smaller input sizes. Any idea on what I can do
>> >>> differently is appreciated.
>> >>>
>> >>> Thans,
>> >>> Timur
>>
>
>


Re: Initializing global data

2016-04-26 Thread Stefano Baghino
Hi Nirmalya,

I'm not really sure setGlobalJobParameters is what you're looking for. If
the ReferableData more then some simple configuration (and judging from its
type it looks like so) maybe you can try to leverage broadcast variables.
You can read more about them here:
https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#broadcast-variables

Regarding your question on CSV, I always find the built-in APIs very handy.
I also expect them to work in a parallel fashion on distributed file
systems out of the box, so I wouldn't re-write them (unless you have very
specific needs, of course).

On Tue, Apr 26, 2016 at 3:25 PM, Nirmalya Sengupta <
sengupta.nirma...@gmail.com> wrote:

> Hello Flinksters,
>
> I need to initialize a piece of global data at the beginning of a Flink
> application. I use this piece as a READ-ONLY for enhancing the streamed
> data that come in at the time of execution. So, this piece of data also has
> to be present in all the execution nodes. For readability, let's name this
> piece as ReferableData.
>
> The ReferableData is essentially a Map[String, UserDefinedClass]. The
> source is a regular CSV file, present in the local file system (no HDFS,for
> the time being). So, I read the CSV file in the usual manner, parse it,
> create instances of UserDefinedClass and create a Map[asStringK,
> asInstanceV] and make this available globally.
>
> What I gather from the documents and discussions in this forum is that one
> of the ways to achieve this is to use
>
> env.getConfig().setGlobalJobParameters(parameters)
>
> Where 'parameters' is the Map is the ReferableData that I want to create
> from the CSV file.
>
> I have two questions:
>
> a) Is this approach of providing look-up data to all nodes, a good
> practice?
>
> b) Should I read the input CSV file through the
> ExecutionEnvironment.readTextFile or using standard IO routines of Java?
> Does the community have any preference?
>
> -- Nirmalya
>
>
>
>
> --
> Software Technologist
> http://www.linkedin.com/in/nirmalyasengupta
> "If you have built castles in the air, your work need not be lost. That is
> where they should be.
> Now put the foundation under them."
>
>
> [image: --]
>
> Nirmalya Sengupta
> [image: https://]about.me/sengupta.nirmalya
>
> 
>



-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit


Re: Control Trigger behavior based on external datasource

2016-04-26 Thread Hironori Ogibayashi
Till,

Thank you for your answer.
That's true that there is the case window operator have not received
all data for the key.
I will go with the second idea.

Thanks!

Hironori

2016-04-26 17:46 GMT+09:00 Till Rohrmann :
> Hi Hironori,
>
> I would go with the second approach, because it is not guaranteed that all
> events of a given key have been received by the window operator if the data
> source says that all events for this key have been read. The events might
> still be in flight. Furthermore, it integrates more nicely with Flink's
> streaming model.
>
> Cheers,
> Till
>
> On Tue, Apr 26, 2016 at 10:16 AM, Hironori Ogibayashi 
> wrote:
>>
>> Hello,
>>
>> I am using GlobalWindow and my custom trigger (similar to
>> ContinuousProcessingTimeTrigger).
>> In my trigger I want to control the TriggerResult based on external
>> datasource.
>> That datasource has flags for each key which describes if stream  for that
>> key has been finished (so, can be purged).
>>
>> I am thinking of two approaches, so could you give me some advise about
>> which is better, or are there any other better solutions?
>>
>> 1. Check datasource in onProcessingTime()
>>
>> Query datasource (i.e. Redis) in onProcessingTime() and return FIRE or
>> FIRE_AND_PURGE based on the result.
>> Maybe I will create Jedis or JedisPool instance in the trigger's
>> constructor?
>>
>> 2. External program periodically query datasource and send special
>> event for keys of finished stream.
>>
>> The schema of the event will be the same as normal events in the
>> stream, but has special value in a field. So, the trigger will be able
>> to handle the event in onElement(). I need to filter that event
>> afterward so that it does not affect the computation result.
>>
>> Thanks,
>> Hironori Ogibayashi
>
>


Understanding Sliding Windows

2016-04-26 Thread Piyush Shrivastava
Hi all,I wanted to know how exactly sliding windows produce results in 
Flink.Suppose I create a sliding window of 5 minutes which is refreshed in 
every 10 seconds:
.timeWindow(Time.minutes(5), Time.seconds(10))
So in every 10 seconds we are looking at data from the past 5 minutes. But what 
happens before the initial 5 minutes have passed?Suppose we start the 
computation at 10:00. At 10:05 we will get the result for 10:00-10:05. But what 
are the results which we get in between this? i.e. at 10:00:10, 10:00:20 and so 
on.Basically why do Flink start producing results before the initial threshold 
has passed? What do these results signify? Thanks and Regards,Piyush Shrivastava
http://webograffiti.com


Re: Submit Flink Jobs to YARN running on AWS

2016-04-26 Thread Robert Metzger
I've started my own EMR cluster and tried to launch a Flink job from my
local machine on it.
I have to admin that configuring the EMR launched Hadoop for external
access is quite a hassle.

I'm not even able to submit Flink to the YARN cluster because the client
can not connect to the ResourceManager. I've change the resource manager
hostname to the public one in the yarn-site.xml on the cluster and
restarted it, but the client still can not connect.
It seems that the RM address is being overwritten by the Hadoop code?
[image: Inline image 1]

How did you manage to get this working?

In the VM settings, I disabled the "Source/Dest checks", but I don't think
this is related.

Have you considered using Amazon's VPN service, I guess then you would have
"local" access to the cluster?

On YARN, Flink is not using the flink-conf.yaml setting for the
jobmanager's hostname. Its using YARN's "yarn.nodemanager.hostname" from
the yarn-site.xml.
I haven't tried it, but it could work if you set the public hostname of
each NodeManager in the yarn-site.xml.

Also, maybe the product forum / customer support of Amazon can help you
here. Other systems like Spark or Storm have very similar architectures and
will face the same issues. I guess they have some recipes for such
situations.

Regards,
Robert




On Tue, Apr 26, 2016 at 10:47 AM, Robert Metzger 
wrote:

> Hi Abhi,
>
> I'll try to reproduce the issue and come up with a solution.
>
> On Tue, Apr 26, 2016 at 1:13 AM, Bajaj, Abhinav 
> wrote:
>
>> Hi Fabian,
>>
>> Thanks for your reply and the pointers to documentation.
>>
>> In these steps, I think the Flink client is installed on the master node,
>> referring to steps mentioned in Flink docs here
>> 
>> .
>> However, the scenario I have is to run the client on my local machine and
>> submit jobs remotely to the YARN Cluster (running on EMR or independently).
>>
>> Let me describe in more detail here.
>> I am trying to submit a single Flink Job to YARN using the client,
>> running on my dev machine -
>>
>> ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096
>>  ./examples/batch/WordCount.jar
>>
>> In my understanding, YARN (running in AWS) allocates a container for the
>> Jobmanager.
>> Jobmanager discovers the IP and started the Actor system. At this step
>> the IP it uses is the internal IP address, of the EC2 instance.
>>
>> The client, running on my dev machine, is not able to connect to the
>> Jobmanager for reasons explained in my mail below.
>>
>> Is there a way, where I can set Jobmanager to use the hostname and not
>> the IP address?
>>
>> Or any other suggestions?
>>
>> Thanks,
>> Abhi
>>
>> *[image: cid:DACBF116-FD8C-48DB-B91D-D54510B306E8]*
>>
>> *Abhinav Bajaj*
>>
>> Senior Engineer
>>
>> HERE Predictive Analytics
>>
>> Office:  +12062092767
>>
>> Mobile: +17083299516
>>
>> *HERE Seattle*
>>
>> 701 Pike Street, #2000, Seattle, WA 98101, USA
>>
>> *47° 36' 41" N. 122° 19' 57" W*
>>
>> *HERE Maps*
>>
>>
>>
>>
>> From: Fabian Hueske 
>> Reply-To: "user@flink.apache.org" 
>> Date: Wednesday, March 9, 2016 at 12:51 AM
>> To: "user@flink.apache.org" 
>> Subject: Re: Submit Flink Jobs to YARN running on AWS
>>
>> Hi Abhi,
>>
>> I have used Flink on EMR via YARN a couple of times without problems.
>> I started a Flink YARN session like this:
>>
>> ./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096
>>
>> This will start five YARN containers (1 JobManager with 1024MB, 4
>> Taskmanagers with 4096MB). See more config options in the documentation [1].
>> In one of the last lines of the std-out output you should find a line
>> that tells you the IP and port of the JobManager.
>>
>> With the IP and port, you can submit a job as follows:
>>
>> ./bin/flink run -m jmIP:jmPort -p 4 jobJarFile.jar 
>>
>> This will send the job to the JobManager specified by IP and port and
>> execute the program with a parallelism of 4. See more config options in the
>> documentation [2].
>>
>> If this does not help, could you share the exact command that you use to
>> start the YARN session and submit the job?
>>
>> Best, Fabian
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/yarn_setup.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/cli.html
>>
>> 2016-03-08 0:25 GMT+01:00 Bajaj, Abhinav :
>>
>>> Hi,
>>>
>>> I am a newbie to Flink and trying to use it in AWS.
>>> I have created a YARN cluster on AWS EC2 machines.
>>> Trying to submit Flink job to the remote YARN cluster using the Flink
>>> Client running on my local machine.
>>>
>>> The Jobmanager start successfully on the YARN container but the client
>>> is not able to connect to the Jobmanager.
>>>
>>> Flink Client Logs -
>>>
>>> 13:57:34,877 INFO  org.apache.flink.yarn.FlinkYarnClient
>>> - Deploying cluster, 

Re: Gracefully stop long running streaming job

2016-04-26 Thread Maximilian Michels
I have to warn you that the Storm SpoutWrapper and the TwitterSource
are currently the only stoppable sources. However, we could make more
stoppable, e.g. the KafkaConsumer.

On Tue, Apr 19, 2016 at 12:38 AM, Robert Schmidtke
 wrote:
> I'm on 0.10.2 which seems to be still lacking this feature. Anyway I'm happy
> to see it'll be in future releases, so I'll get to enjoy it once I upgrade
> :) I'm using a FlinkKafkaConsumer081 for the record.
>
> Anyway, thanks a bunch
> Robert
>
> On Tue, Apr 19, 2016 at 12:14 AM, Matthias J. Sax  wrote:
>>
>> If all your sources implements Stoppable interface, you can STOP a job.
>>
>> ./bin/flink stop JobID
>>
>> STOP is however quite new and it is ongoing work to make available
>> sources stoppable (some are already). Not sure what kind of sources you
>> are using right now.
>>
>> -Matthias
>>
>>
>> On 04/18/2016 10:50 PM, Robert Schmidtke wrote:
>> > Hi everyone,
>> >
>> > I am running a streaming benchmark which involves a potentially
>> > infinitely running Flink Streaming Job. I run it blocking on YARN using
>> > ./bin/flink run ... and then send the command into background,
>> > remembering its PID to kill it later on. While this gets the work done,
>> > the job always ends up in the FAILED state. I imagine it would be the
>> > same if I used ./bin/flink cancel ... to cancel the job? It's not that
>> > pressing but it would be nice to shut down a streaming job properly.
>> >
>> > Thanks
>> >
>> > Robert
>> >
>> > --
>> > My GPG Key ID: 336E2680
>>
>
>
>
> --
> My GPG Key ID: 336E2680


About flink stream table API

2016-04-26 Thread Zhangrucong
Hello:
 I want to learn the flink stream API. The stream sql is the same with 
calcite?
 In the flowing link, the examples of table api are dataset, where I can 
see the detail introduction of streaming table API.
https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/table.html

 Thanks in advance!




Re: YARN terminating TaskNode

2016-04-26 Thread Maximilian Michels
Hi Timur,

Indeed, if you use JNI libraries then the memory will be off-heap and
the -XmX limit will not be respected. Currently, we don't expect users
to use JNI memory allocation. We might want to enforce a more strict
direct memory limit in the future. In this case, you would get an
OutOfMemoryException before Yarn could kill the container. Both are
not nice to have :)

You will have to adjust 'yarn.heap-cutoff-ratio' or
'yarn.heap-cutoff-min' (for an absolute memory cutoff) to adjust to
your JNI memory needs.

Cheers,
Max


On Mon, Apr 25, 2016 at 8:27 PM, Timur Fayruzov
 wrote:
> Great answer, thanks you Max for a very detailed explanation! Illuminating
> how off-heap parameter affects the memory allocation.
>
> I read this post:
> https://blogs.oracle.com/jrockit/entry/why_is_my_jvm_process_larger_t
>
> and the thing that jumped on me is the allocation of memory for jni libs. I
> do use a native library in my application, which is likely the culprit. I
> need to account for its memory footprint when doing my memory calculations.
>
> Thanks,
> Timur
>
>
> On Mon, Apr 25, 2016 at 10:28 AM, Maximilian Michels  wrote:
>>
>> Hi Timur,
>>
>> Shedding some light on the memory calculation:
>>
>> You have a total memory size of 2500 MB for each TaskManager. The
>> default for 'taskmanager.memory.fraction' is 0.7. This is the fraction
>> of the memory used by the memory manager. When you have turned on
>> off-heap memory, this memory is allocated off-heap. As you pointed
>> out, the default Yarn cutoff ratio is 0.25.
>>
>> Memory cutoff for Yarn: 2500 * 0.25 MB = 625 MB
>>
>> Java heap size with off-heap disabled: 2500 MB - 625 MB = 1875 MB
>>
>> Java heap size with off-heap enabled: (2500 MB - 625 MB) * 0.3 = 562,5
>> MB (~570 MB in your case)
>> Off-heap memory size: (2500 MB - 625 MB) * 0.7 = 1312,5 MB
>>
>> The heap memory limits in your log seem to be calculated correctly.
>> Note that we don't set a strict limit for the off-heap memory because
>> the Flink memory manager controls the amount of memory allocated. It
>> will preallocate memory when you have 'taskmanager.memory.preallocate'
>> set to true. Otherwise it will allocate dynamically. Still, you should
>> have about 500 MB memory left with everything allocated. There is some
>> more direct (off-heap) memory allocated for the network stack
>> adjustable with 'taskmanager.network.numberOfBuffers' which is set to
>> 2048 by default and corresponds to 2048 * 32 KB = 64 MB memory. I
>> believe this can grow up to twice of that size. Still, should be
>> enough memory left.
>>
>> Are you running a streaming or batch job? Off-heap memory and memory
>> preallocation are mostly beneficial for batch jobs which use the
>> memory manager a lot for sorting, hashing and caching.
>>
>> For streaming I'd suggest to use Flink's defaults:
>>
>> taskmanager.memory.off-heap: false
>> taskmanager.memory.preallocate: false
>>
>> Raising the cutoff ratio should prevent killing of the TaskManagers.
>> As Robert mentioned, in practice the JVM tends to allocate more than
>> the maximum specified heap size. You can put the following in your
>> flink-conf.yaml:
>>
>> # slightly raise the cut off ratio (might need to be even higher)
>> yarn.heap-cutoff-ratio: 0.3
>>
>> Thanks,
>> Max
>>
>> On Mon, Apr 25, 2016 at 5:52 PM, Timur Fayruzov
>>  wrote:
>> > Hello Maximilian,
>> >
>> > I'm using 1.0.0 compiled with Scala 2.11 and Hadoop 2.7. I'm running
>> > this on
>> > EMR. I didn't see any exceptions in other logs. What are the logs you
>> > are
>> > interested in?
>> >
>> > Thanks,
>> > Timur
>> >
>> > On Mon, Apr 25, 2016 at 3:44 AM, Maximilian Michels 
>> > wrote:
>> >>
>> >> Hi Timur,
>> >>
>> >> Which version of Flink are you using? Could you share the entire logs?
>> >>
>> >> Thanks,
>> >> Max
>> >>
>> >> On Mon, Apr 25, 2016 at 12:05 PM, Robert Metzger 
>> >> wrote:
>> >> > Hi Timur,
>> >> >
>> >> > The reason why we only allocate 570mb for the heap is because you are
>> >> > allocating most of the memory as off heap (direct byte buffers).
>> >> >
>> >> > In theory, the memory footprint of the JVM is limited to 570 (heap) +
>> >> > 1900
>> >> > (direct mem) = 2470 MB (which is below 2500). But in practice thje
>> >> > JVM
>> >> > is
>> >> > allocating more memory, causing these killings by YARN.
>> >> >
>> >> > I have to check the code of Flink again, because I would expect the
>> >> > safety
>> >> > boundary to be much larger than 30 mb.
>> >> >
>> >> > Regards,
>> >> > Robert
>> >> >
>> >> >
>> >> > On Fri, Apr 22, 2016 at 9:47 PM, Timur Fayruzov
>> >> > 
>> >> > wrote:
>> >> >>
>> >> >> Hello,
>> >> >>
>> >> >> Next issue in a string of things I'm solving is that my application
>> >> >> fails
>> >> >> with the message 'Connection unexpectedly closed by remote task
>> >> >> manager'.
>> >> >>
>> >> >> Yarn log shows the following:
>> >> >>

Re: "No more bytes left" at deserialization

2016-04-26 Thread Aljoscha Krettek
Could this be caused by the disabled reference tracking in our Kryo
serializer? From the stack trace it looks like its failing when trying to
deserialize the traits that are wrapped in Options.

On Tue, 26 Apr 2016 at 10:09 Ufuk Celebi  wrote:

> Hey Timur,
>
> I'm sorry about this bad experience.
>
> From what I can tell, there is nothing unusual with your code. It's
> probably an issue with Flink.
>
> I think we have to wait a little longer to hear what others in the
> community say about this.
>
> @Aljoscha, Till, Robert: any ideas what might cause this?
>
> – Ufuk
>
>
> On Mon, Apr 25, 2016 at 6:50 PM, Timur Fayruzov
>  wrote:
> > Still trying to resolve this serialization issue. I was able to hack it
> by
> > 'serializing' `Record` to String and then 'deserializing' it in coGroup,
> but
> > boy its so ugly.
> >
> > So the bug is that it can't deserialize the case class that has the
> > structure (slightly different and more detailed than I stated above):
> > ```
> > case class Record(name: Name, phone: Option[Phone], address:
> > Option[Address])
> >
> > case class Name(givenName: Option[String], middleName: Option[String],
> > familyName: Option[String], generationSuffix: Option[String] = None)
> >
> > trait Address{
> >   val city: String
> >   val state: String
> >   val country: String
> >   val latitude: Double
> >   val longitude: Double
> >   val postalCode: String
> >   val zip4: String
> >   val digest: String
> > }
> >
> >
> > case class PoBox(city: String,
> >  state: String,
> >  country: String,
> >  latitude: Double,
> >  longitude: Double,
> >  postalCode: String,
> >  zip4: String,
> >  digest: String,
> >  poBox: String
> > ) extends Address
> >
> > case class PostalAddress(city: String,
> >  state: String,
> >  country: String,
> >  latitude: Double,
> >  longitude: Double,
> >  postalCode: String,
> >  zip4: String,
> >  digest: String,
> >  preDir: String,
> >  streetName: String,
> >  streetType: String,
> >  postDir: String,
> >  house: String,
> >  aptType: String,
> >  aptNumber: String
> > ) extends Address
> > ```
> >
> > I would expect that serialization is one of Flink cornerstones and
> should be
> > well tested, so there is a high chance of me doing things wrongly, but I
> > can't really find anything unusual in my code.
> >
> > Any suggestion what to try is highly welcomed.
> >
> > Thanks,
> > Timur
> >
> >
> > On Sun, Apr 24, 2016 at 3:26 AM, Timur Fayruzov <
> timur.fairu...@gmail.com>
> > wrote:
> >>
> >> Hello Robert,
> >>
> >> I'm on 1.0.0 compiled with Scala 2.11. The second exception was an issue
> >> with a cluster (that I didn't dig into), when I restarted the cluster I
> was
> >> able to go past it, so now I have the following exception:
> >>
> >> java.lang.Exception: The data preparation for task 'CHAIN CoGroup
> (CoGroup
> >> at
> >>
> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:158))
> >> -> Filter (Filter at
> >>
> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:159))'
> >> , caused an error: Error obtaining the sorted input: Thread 'SortMerger
> >> Reading Thread' terminated due to an exception: Serializer consumed more
> >> bytes than the record had. This indicates broken serialization. If you
> are
> >> using custom serialization types (Value or Writable), check their
> >> serialization methods. If you are using a Kryo-serialized type, check
> the
> >> corresponding Kryo serializer.
> >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
> >> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> >> at java.lang.Thread.run(Thread.java:745)
> >> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> >> Thread 'SortMerger Reading Thread' terminated due to an exception:
> >> Serializer consumed more bytes than the record had. This indicates
> broken
> >> serialization. If you are using custom serialization types (Value or
> >> Writable), check their serialization methods. If you are using a
> >> Kryo-serialized type, check the corresponding Kryo serializer.
> >> at
> >>
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
> >> at
> >>
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
> >> at
> >>
> 

Re: Submit Flink Jobs to YARN running on AWS

2016-04-26 Thread Robert Metzger
Hi Abhi,

I'll try to reproduce the issue and come up with a solution.

On Tue, Apr 26, 2016 at 1:13 AM, Bajaj, Abhinav 
wrote:

> Hi Fabian,
>
> Thanks for your reply and the pointers to documentation.
>
> In these steps, I think the Flink client is installed on the master node,
> referring to steps mentioned in Flink docs here
> 
> .
> However, the scenario I have is to run the client on my local machine and
> submit jobs remotely to the YARN Cluster (running on EMR or independently).
>
> Let me describe in more detail here.
> I am trying to submit a single Flink Job to YARN using the client, running
> on my dev machine -
>
> ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096
>  ./examples/batch/WordCount.jar
>
> In my understanding, YARN (running in AWS) allocates a container for the
> Jobmanager.
> Jobmanager discovers the IP and started the Actor system. At this step the
> IP it uses is the internal IP address, of the EC2 instance.
>
> The client, running on my dev machine, is not able to connect to the
> Jobmanager for reasons explained in my mail below.
>
> Is there a way, where I can set Jobmanager to use the hostname and not the
> IP address?
>
> Or any other suggestions?
>
> Thanks,
> Abhi
>
> *[image: cid:DACBF116-FD8C-48DB-B91D-D54510B306E8]*
>
> *Abhinav Bajaj*
>
> Senior Engineer
>
> HERE Predictive Analytics
>
> Office:  +12062092767
>
> Mobile: +17083299516
>
> *HERE Seattle*
>
> 701 Pike Street, #2000, Seattle, WA 98101, USA
>
> *47° 36' 41" N. 122° 19' 57" W*
>
> *HERE Maps*
>
>
>
>
> From: Fabian Hueske 
> Reply-To: "user@flink.apache.org" 
> Date: Wednesday, March 9, 2016 at 12:51 AM
> To: "user@flink.apache.org" 
> Subject: Re: Submit Flink Jobs to YARN running on AWS
>
> Hi Abhi,
>
> I have used Flink on EMR via YARN a couple of times without problems.
> I started a Flink YARN session like this:
>
> ./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096
>
> This will start five YARN containers (1 JobManager with 1024MB, 4
> Taskmanagers with 4096MB). See more config options in the documentation [1].
> In one of the last lines of the std-out output you should find a line that
> tells you the IP and port of the JobManager.
>
> With the IP and port, you can submit a job as follows:
>
> ./bin/flink run -m jmIP:jmPort -p 4 jobJarFile.jar 
>
> This will send the job to the JobManager specified by IP and port and
> execute the program with a parallelism of 4. See more config options in the
> documentation [2].
>
> If this does not help, could you share the exact command that you use to
> start the YARN session and submit the job?
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/yarn_setup.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/cli.html
>
> 2016-03-08 0:25 GMT+01:00 Bajaj, Abhinav :
>
>> Hi,
>>
>> I am a newbie to Flink and trying to use it in AWS.
>> I have created a YARN cluster on AWS EC2 machines.
>> Trying to submit Flink job to the remote YARN cluster using the Flink
>> Client running on my local machine.
>>
>> The Jobmanager start successfully on the YARN container but the client is
>> not able to connect to the Jobmanager.
>>
>> Flink Client Logs -
>>
>> 13:57:34,877 INFO  org.apache.flink.yarn.FlinkYarnClient
>> - Deploying cluster, current state ACCEPTED
>> 13:57:35,951 INFO  org.apache.flink.yarn.FlinkYarnClient
>> - Deploying cluster, current state ACCEPTED
>> 13:57:37,027 INFO  org.apache.flink.yarn.FlinkYarnClient
>> - YARN application has been deployed successfully.
>> 13:57:37,100 INFO  org.apache.flink.yarn.FlinkYarnCluster
>>- Start actor system.
>> 13:57:37,532 INFO  org.apache.flink.yarn.FlinkYarnCluster
>>- Start application client.
>> YARN cluster started
>> JobManager web interface address
>> http://ec2-XX-XX-XX-XX.compute-1.amazonaws.com:8088/proxy/application_1456184947990_0003/
>> Waiting until all TaskManagers have connected
>> 13:57:37,540 INFO  org.apache.flink.yarn.ApplicationClient
>> - Notification about new leader address akka.tcp:
>> //flink@54.35.41.12:41292/user/jobmanager with session ID null.
>> No status updates from the YARN cluster received so far. Waiting ...
>> 13:57:37,543 INFO  org.apache.flink.yarn.ApplicationClient
>> - Received address of new leader 
>> akka.tcp://flink@54.35.41.12:41292/user/jobmanager
>> with session ID null.
>> 13:57:37,543 INFO  org.apache.flink.yarn.ApplicationClient
>> - Disconnect from JobManager null.
>> 13:57:37,545 INFO  org.apache.flink.yarn.ApplicationClient
>> - Trying to register at JobManager akka.tcp://flink@54.35.41.12
>> :41292/user/jobmanager.
>> No status updates from the YARN cluster received so far. Waiting ...
>>
>> The logs of the Jobmanager contains the following -
>>

Re: Control Trigger behavior based on external datasource

2016-04-26 Thread Till Rohrmann
Hi Hironori,

I would go with the second approach, because it is not guaranteed that all
events of a given key have been received by the window operator if the data
source says that all events for this key have been read. The events might
still be in flight. Furthermore, it integrates more nicely with Flink's
streaming model.

Cheers,
Till

On Tue, Apr 26, 2016 at 10:16 AM, Hironori Ogibayashi 
wrote:

> Hello,
>
> I am using GlobalWindow and my custom trigger (similar to
> ContinuousProcessingTimeTrigger).
> In my trigger I want to control the TriggerResult based on external
> datasource.
> That datasource has flags for each key which describes if stream  for that
> key has been finished (so, can be purged).
>
> I am thinking of two approaches, so could you give me some advise about
> which is better, or are there any other better solutions?
>
> 1. Check datasource in onProcessingTime()
>
> Query datasource (i.e. Redis) in onProcessingTime() and return FIRE or
> FIRE_AND_PURGE based on the result.
> Maybe I will create Jedis or JedisPool instance in the trigger's
> constructor?
>
> 2. External program periodically query datasource and send special
> event for keys of finished stream.
>
> The schema of the event will be the same as normal events in the
> stream, but has special value in a field. So, the trigger will be able
> to handle the event in onElement(). I need to filter that event
> afterward so that it does not affect the computation result.
>
> Thanks,
> Hironori Ogibayashi
>


Re: Job hangs

2016-04-26 Thread Robert Metzger
Hi Timur,

thank you for sharing the source code of your job. That is helpful!
Its a large pipeline with 7 joins and 2 co-groups. Maybe your job is much
more IO heavy with the larger input data because all the joins start
spilling?
Our monitoring, in particular for batch jobs is really not very advanced..
If we had some monitoring showing the spill status, we would maybe see that
the job is still running.

How long did you wait until you declared the job hanging?

Regards,
Robert


On Tue, Apr 26, 2016 at 10:11 AM, Ufuk Celebi  wrote:

> No.
>
> If you run on YARN, the YARN logs are the relevant ones for the
> JobManager and TaskManager. The client log submitting the job should
> be found in /log.
>
> – Ufuk
>
> On Tue, Apr 26, 2016 at 10:06 AM, Timur Fayruzov
>  wrote:
> > I will do it my tomorrow. Logs don't show anything unusual. Are there any
> > logs besides what's in flink/log and yarn container logs?
> >
> > On Apr 26, 2016 1:03 AM, "Ufuk Celebi"  wrote:
> >
> > Hey Timur,
> >
> > is it possible to connect to the VMs and get stack traces of the Flink
> > processes as well?
> >
> > We can first have a look at the logs, but the stack traces will be
> > helpful if we can't figure out what the issue is.
> >
> > – Ufuk
> >
> > On Tue, Apr 26, 2016 at 9:42 AM, Till Rohrmann 
> wrote:
> >> Could you share the logs with us, Timur? That would be very helpful.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Apr 26, 2016 3:24 AM, "Timur Fayruzov" 
> >> wrote:
> >>>
> >>> Hello,
> >>>
> >>> Now I'm at the stage where my job seem to completely hang. Source code
> is
> >>> attached (it won't compile but I think gives a very good idea of what
> >>> happens). Unfortunately I can't provide the datasets. Most of them are
> >>> about
> >>> 100-500MM records, I try to process on EMR cluster with 40 tasks 6GB
> >>> memory
> >>> for each.
> >>>
> >>> It was working for smaller input sizes. Any idea on what I can do
> >>> differently is appreciated.
> >>>
> >>> Thans,
> >>> Timur
>


Control Trigger behavior based on external datasource

2016-04-26 Thread Hironori Ogibayashi
Hello,

I am using GlobalWindow and my custom trigger (similar to
ContinuousProcessingTimeTrigger).
In my trigger I want to control the TriggerResult based on external datasource.
That datasource has flags for each key which describes if stream  for that
key has been finished (so, can be purged).

I am thinking of two approaches, so could you give me some advise about
which is better, or are there any other better solutions?

1. Check datasource in onProcessingTime()

Query datasource (i.e. Redis) in onProcessingTime() and return FIRE or
FIRE_AND_PURGE based on the result.
Maybe I will create Jedis or JedisPool instance in the trigger's constructor?

2. External program periodically query datasource and send special
event for keys of finished stream.

The schema of the event will be the same as normal events in the
stream, but has special value in a field. So, the trigger will be able
to handle the event in onElement(). I need to filter that event
afterward so that it does not affect the computation result.

Thanks,
Hironori Ogibayashi


Re: Job hangs

2016-04-26 Thread Ufuk Celebi
No.

If you run on YARN, the YARN logs are the relevant ones for the
JobManager and TaskManager. The client log submitting the job should
be found in /log.

– Ufuk

On Tue, Apr 26, 2016 at 10:06 AM, Timur Fayruzov
 wrote:
> I will do it my tomorrow. Logs don't show anything unusual. Are there any
> logs besides what's in flink/log and yarn container logs?
>
> On Apr 26, 2016 1:03 AM, "Ufuk Celebi"  wrote:
>
> Hey Timur,
>
> is it possible to connect to the VMs and get stack traces of the Flink
> processes as well?
>
> We can first have a look at the logs, but the stack traces will be
> helpful if we can't figure out what the issue is.
>
> – Ufuk
>
> On Tue, Apr 26, 2016 at 9:42 AM, Till Rohrmann  wrote:
>> Could you share the logs with us, Timur? That would be very helpful.
>>
>> Cheers,
>> Till
>>
>> On Apr 26, 2016 3:24 AM, "Timur Fayruzov" 
>> wrote:
>>>
>>> Hello,
>>>
>>> Now I'm at the stage where my job seem to completely hang. Source code is
>>> attached (it won't compile but I think gives a very good idea of what
>>> happens). Unfortunately I can't provide the datasets. Most of them are
>>> about
>>> 100-500MM records, I try to process on EMR cluster with 40 tasks 6GB
>>> memory
>>> for each.
>>>
>>> It was working for smaller input sizes. Any idea on what I can do
>>> differently is appreciated.
>>>
>>> Thans,
>>> Timur


Re: "No more bytes left" at deserialization

2016-04-26 Thread Ufuk Celebi
Hey Timur,

I'm sorry about this bad experience.

>From what I can tell, there is nothing unusual with your code. It's
probably an issue with Flink.

I think we have to wait a little longer to hear what others in the
community say about this.

@Aljoscha, Till, Robert: any ideas what might cause this?

– Ufuk


On Mon, Apr 25, 2016 at 6:50 PM, Timur Fayruzov
 wrote:
> Still trying to resolve this serialization issue. I was able to hack it by
> 'serializing' `Record` to String and then 'deserializing' it in coGroup, but
> boy its so ugly.
>
> So the bug is that it can't deserialize the case class that has the
> structure (slightly different and more detailed than I stated above):
> ```
> case class Record(name: Name, phone: Option[Phone], address:
> Option[Address])
>
> case class Name(givenName: Option[String], middleName: Option[String],
> familyName: Option[String], generationSuffix: Option[String] = None)
>
> trait Address{
>   val city: String
>   val state: String
>   val country: String
>   val latitude: Double
>   val longitude: Double
>   val postalCode: String
>   val zip4: String
>   val digest: String
> }
>
>
> case class PoBox(city: String,
>  state: String,
>  country: String,
>  latitude: Double,
>  longitude: Double,
>  postalCode: String,
>  zip4: String,
>  digest: String,
>  poBox: String
> ) extends Address
>
> case class PostalAddress(city: String,
>  state: String,
>  country: String,
>  latitude: Double,
>  longitude: Double,
>  postalCode: String,
>  zip4: String,
>  digest: String,
>  preDir: String,
>  streetName: String,
>  streetType: String,
>  postDir: String,
>  house: String,
>  aptType: String,
>  aptNumber: String
> ) extends Address
> ```
>
> I would expect that serialization is one of Flink cornerstones and should be
> well tested, so there is a high chance of me doing things wrongly, but I
> can't really find anything unusual in my code.
>
> Any suggestion what to try is highly welcomed.
>
> Thanks,
> Timur
>
>
> On Sun, Apr 24, 2016 at 3:26 AM, Timur Fayruzov 
> wrote:
>>
>> Hello Robert,
>>
>> I'm on 1.0.0 compiled with Scala 2.11. The second exception was an issue
>> with a cluster (that I didn't dig into), when I restarted the cluster I was
>> able to go past it, so now I have the following exception:
>>
>> java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup
>> at
>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:158))
>> -> Filter (Filter at
>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:159))'
>> , caused an error: Error obtaining the sorted input: Thread 'SortMerger
>> Reading Thread' terminated due to an exception: Serializer consumed more
>> bytes than the record had. This indicates broken serialization. If you are
>> using custom serialization types (Value or Writable), check their
>> serialization methods. If you are using a Kryo-serialized type, check the
>> corresponding Kryo serializer.
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger Reading Thread' terminated due to an exception:
>> Serializer consumed more bytes than the record had. This indicates broken
>> serialization. If you are using custom serialization types (Value or
>> Writable), check their serialization methods. If you are using a
>> Kryo-serialized type, check the corresponding Kryo serializer.
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>> at
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>> at
>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>> ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>> terminated due to an exception: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at
>> 

Re: Job hangs

2016-04-26 Thread Ufuk Celebi
Hey Timur,

is it possible to connect to the VMs and get stack traces of the Flink
processes as well?

We can first have a look at the logs, but the stack traces will be
helpful if we can't figure out what the issue is.

– Ufuk

On Tue, Apr 26, 2016 at 9:42 AM, Till Rohrmann  wrote:
> Could you share the logs with us, Timur? That would be very helpful.
>
> Cheers,
> Till
>
> On Apr 26, 2016 3:24 AM, "Timur Fayruzov"  wrote:
>>
>> Hello,
>>
>> Now I'm at the stage where my job seem to completely hang. Source code is
>> attached (it won't compile but I think gives a very good idea of what
>> happens). Unfortunately I can't provide the datasets. Most of them are about
>> 100-500MM records, I try to process on EMR cluster with 40 tasks 6GB memory
>> for each.
>>
>> It was working for smaller input sizes. Any idea on what I can do
>> differently is appreciated.
>>
>> Thans,
>> Timur


Re: Job hangs

2016-04-26 Thread Till Rohrmann
Could you share the logs with us, Timur? That would be very helpful.

Cheers,
Till
On Apr 26, 2016 3:24 AM, "Timur Fayruzov"  wrote:

> Hello,
>
> Now I'm at the stage where my job seem to completely hang. Source code is
> attached (it won't compile but I think gives a very good idea of what
> happens). Unfortunately I can't provide the datasets. Most of them are
> about 100-500MM records, I try to process on EMR cluster with 40 tasks 6GB
> memory for each.
>
> It was working for smaller input sizes. Any idea on what I can do
> differently is appreciated.
>
> Thans,
> Timur
>