Re: Unexpected behavior with Scala App trait.

2016-01-19 Thread Andrea Sella
Hi Chiwan,

I’m not expert of Scala but It seems about closure cleaning problem. Scala
> App trait extends DelayedInit trait to initialize object. But Flink
> serialization stack doesn’t handle this special initialization. (It is just
> my opinion, not verified.)
>

I arrived at the same conclusion with the DelayedInit trait.

>
> To run TFIDFNPE safely, you need to just change tokenize and uniqueWords
> to method like following:
>
> ```
> def tokenize = …
> def uniqueWords = ...
> ```
>
> With this change, I tested that TFIDFNPE works safely in Flink 0.10.1
> cluster.
>

Yeah, it works and i knew it. My aim it was to use tricky (functional)
Scala mechanisms to test how much the Scala APIs are robust and
idomitic-friendly for Scala users.

>
> About TFIDF object, you should avoid overriding main method if the object
> is derived by App trait. It is also related DelaytedInit mechanism [1].
>

Yeah, App trait is into a comment 'cause I am overriding the main method.
TFIDF is equal to TFIDFNPE with App trait and not overriding main method.


Thanks,
Andrea

>
> [1]:
> https://github.com/scala/scala/blob/2.10.x/src/library/scala/App.scala#L31
>
> > On Jan 19, 2016, at 12:08 AM, Andrea Sella 
> wrote:
> >
> > Hi,
> >
> > I was implementing TF-IDF example of flink-training when I faced a
> problem with NPE during the deploy of my Job.
> >
> > Source code: https://github.com/alkagin/flink-tfidf-example
> >
> > I used 0.10.1 version and started in local mode.
> > During the deploy of TFIDFNPE Job, which it extends App, Flink throws
> NullPointerException on both flatMap functions.
> > If I include the tokenize function into the closures of flatMap
> functions, the Job works fine; see example TFIDFApp.
> > To avoid this unexpected behavior I don't have use Scala App trait, see
> TFIDF, but why?
> >
> >
> > Thanks,
> > Andrea
> >
>
>
> Regards,
> Chiwan Park
>
>


Re: Problem to show logs in task managers

2016-01-19 Thread Ana M. Martinez
Hi Till,

Sorry for the delay, you were right, I was not restarting the yarn cluster…

Many thanks for your help!
Ana

On 11 Jan 2016, at 14:39, Till Rohrmann 
> wrote:


You have to restart the yarn cluster to let your changes take effect. You can 
do that via HADOOP_HOME/sbin/stop-yarn.sh; HADOOP_HOME/sbin/start-yarn.sh.

The commands yarn-session.sh ... and bin/flink run -m yarn cluster start a new 
yarn application within the yarn cluster.

Cheers,
Till

​

On Mon, Jan 11, 2016 at 1:39 PM, Ana M. Martinez 
> wrote:
Hi Till,

Thanks for your help. I have checked both in Yarn’s web interface and through 
command line and it seems that there are not occupied containers.

Additionally, I have checked the configuration values in the web interface and 
even though I have changed the log.aggregation property in the yarn-site.xml 
file to true, it appears as false and with the following source label:

yarn.log-aggregation-enable
false
java.io.BufferedInputStream@3c407114


I am not sure if that is relevant. I had assumed that the "./bin/flink run -m 
yarn-cluster" command is starting a yarn session and thus reloading the 
yarn-site file. Is that right? If I am wrong here, then, how can I restart it 
so that the modifications in the yarn-site.xml file are considered? (I have 
also tried with ./bin/yarn-session.sh and then ./bin/flink run without 
success…).

I am not sure if this is related to flink anymore, should I move my problem to 
the yarn community instead?

Thanks,
Ana

On 11 Jan 2016, at 10:37, Till Rohrmann 
> wrote:


Hi Ana,

good to hear that you found the logging statements. You can check in Yarn’s web 
interface whether there are still occupied containers. Alternatively you can go 
to the different machines and run jps which lists you the running Java 
processes. If you see an ApplicationMaster or YarnTaskManagerRunner process, 
then there is still a container running with Flink on this machine. I hope this 
helps you.

Cheers,
Till

​

On Mon, Jan 11, 2016 at 9:37 AM, Ana M. Martinez 
> wrote:
Hi Till,

Thanks for that! I can see the "Logger in LineSplitter.flatMap” output if I 
retrieve the task manager logs manually (under 
/var/log/hadoop-yarn/containers/application_X/…). However that solution is not 
ideal when for instance I am using 32 machines for my mapReduce operations.

I would like to know why Yarn’s log aggregation is not working. Can you tell me 
how to check if there are some Yarn containers running after the Flink job has 
finished? I have tried:
hadoop job -list
but I cannot see any jobs there, although I am not sure that it means that 
there are not containers running...

Thanks,
Ana

On 08 Jan 2016, at 16:24, Till Rohrmann 
> wrote:


You’re right that the log statements of the LineSplitter are in the logs of the 
cluster nodes, because that’s where the LineSplitter code is executed. In 
contrast, you create a TestClass on the client when you submit the program. 
Therefore, you see the logging statement “Logger in TestClass” on the command 
line or in the cli log file.

So I would assume that the problem is Yarn’s log aggregation. Either your 
configuration is not correct or there are still some Yarn containers running 
after the Flink job has finished. Yarn will only show you the logs after all 
containers are terminated. Maybe you could check that. Alternatively, you can 
try to retrieve the taskmanager logs manually by going to the machine where 
your yarn container was executed. Then under hadoop/logs/userlogs you should 
find somewhere the logs.

Cheers,
Till

​

On Fri, Jan 8, 2016 at 3:50 PM, Ana M. Martinez 
> wrote:
Thanks for the tip Robert! It was a good idea to rule out other possible 
causes, but I am afraid that is not the problem. If we stick to the 
WordCountExample (for simplicity), the Exception is thrown if placed into the 
flatMap function.

I am going to try to re-write my problem and all the settings below:

When I try to aggregate all logs:
 $yarn logs -applicationId application_1452250761414_0005

the following message is retrieved:
16/01/08 13:32:37 INFO client.RMProxy: Connecting to ResourceManager at 
ip-172-31-33-221.us-west-2.compute.internal/172.31.33.221:8032
/var/log/hadoop-yarn/apps/hadoop/logs/application_1452250761414_0005does not 
exist.
Log aggregation has not completed or is not enabled.

(Tried the same command a few minutes later and got the same message, so might 
it be that log aggregation is not properly enabled??)

I am going to carefully enumerate all the steps I have followed (and settings) 
to see if someone can identify why the Logger messages from CORE nodes (in an 
Amazon cluster) are not shown.

1) Enable 

integration with a scheduler

2016-01-19 Thread serkan . tas
Hi,

I am planning to integrate flink with our job scheduler product to execute
jobs - especially bathc like - on flink which may be the part of some
other DAG style job chain.

I need some control ablities like start, stop, suspend, get status...

Where shold i go through ?

-- 
Serkan Tas
Likya Bilgi Teknolojileri
ve Iletiþim Hiz. Ltd.
www.likyateknoloji.com
Tel : 0 216 471 81 55
Gsm : 0 542 242 00 92
Faks:  0 216 661 14 92

--
Bu elektronik posta ve onunla iletilen bütün dosyalar gizlidir. Sadece
yukarýda isimleri belirtilen kiþiler arasýnda özel haberleþme amacýný
taþýmaktadýr. Size yanlýþlýkla ulaþmýþsa bu elektonik postanýn içeriðini
açýklamanýz, kopyalamanýz, yönlendirmeniz ve kullanmanýz kesinlikle
yasaktýr. Lütfen mesajý geri gönderiniz ve sisteminizden siliniz. Likya
Bilgi Teknolojileri ve Ýletiþim Hiz. Ltd. Þti. bu mesajýn içeriði ile
ilgili olarak hiç bir hukuksal sorumluluðu kabul etmez.

This electonic mail and any files transmitted with it are intended for the
private use of  the persons named above. If you received this message in
error, forwarding, copying or use of any of the information is strictly
prohibited. Please immediately notify the sender and delete it from your
system. Likya Bilgi Teknolojileri ve Ýletiþim Hiz. Ltd. Þti. does not
accept legal responsibility for the contents of this message.
--


Bu e-postayý yazdýrmadan önce, çevreye olan sorumluluðunuzu tekrar düþünün.
Please consider your environmental responsibility before printing this
e-mail.





Re: JDBCInputFormat GC overhead limit exceeded error

2016-01-19 Thread Stephan Ewen
Hi!

This kind of error (GC overhead exceeded) usually means that the system is
reaching a state where it has very many still living objects and frees
little memory during each collection. As a consequence, it is basically
busy with only garbage collection.

Your job probably has about 500-600 MB or free memory, the rest is at that
memory size reserved for JVM overhead and Flink's worker memory.
Now, since your job actually does not keep any objects or rows around, this
should be plenty. I can only suspect that the Oracle JDBC driver is very
memory hungry, thus pushing the system to the limit. (I found this, for
example

What you can do:
 For this kind of job, you can simply tell Flink to not reserve as much
memory, by using the option "taskmanager.memory.size=1". If the JDBC driver
has no leak, but is simply super hungry, this should solve it.

Greetings,
Stephan


I also found these resources concerning Oracle JDBC memory:

 -
http://stackoverflow.com/questions/2876895/oracle-t4cpreparedstatement-memory-leaks
(bottom answer)
 - https://community.oracle.com/thread/2220078?tstart=0


On Tue, Jan 19, 2016 at 5:44 PM, Maximilian Bode <
maximilian.b...@tngtech.com> wrote:

> Hi Robert,
>
> I am using 0.10.1.
>
>
> Am 19.01.2016 um 17:42 schrieb Robert Metzger :
>
> Hi Max,
>
> which version of Flink are you using?
>
> On Tue, Jan 19, 2016 at 5:35 PM, Maximilian Bode <
> maximilian.b...@tngtech.com> wrote:
>
>> Hi everyone,
>>
>> I am facing a problem using the JDBCInputFormat which occurred in a
>> larger Flink job. As a minimal example I can reproduce it when just writing
>> data into a csv after having read it from a database, i.e.
>>
>> DataSet existingData = env.createInput(
>> JDBCInputFormat.buildJDBCInputFormat()
>> .setDrivername("oracle.jdbc.driver.OracleDriver")
>> .setUsername(…)
>> .setPassword(…)
>> .setDBUrl(…)
>> .setQuery("select DATA from TABLENAME")
>> .finish(),
>> new TupleTypeInfo<>(Tuple1.class, BasicTypeInfo.STRING_TYPE_INFO));
>> existingData.writeAsCsv(…);
>>
>> where DATA is a column containing strings of length ~25 characters and
>> TABLENAME contains 20 million rows.
>>
>> After starting the job on a YARN cluster (using -tm 3072 and leaving the
>> other memory settings at default values), Flink happily goes along at first
>> but then fails after something like three million records have been sent by
>> the JDBCInputFormat. The Exception reads "The slot in which the task was
>> executed has been released. Probably loss of TaskManager …". The local
>> taskmanager.log in the affected container reads
>> "java.lang.OutOfMemoryError: GC overhead limit exceeded
>> at
>> java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1063)
>>
>> at 
>> org.jboss.netty.channel.socket.nio.NioClientBoss.processConnectTimeout(NioClientBoss.java:119)
>> at
>> org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:83)
>> at
>> org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
>> at
>> org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:744)"
>>
>> Any ideas what is going wrong here?
>>
>> Cheers,
>> Max
>>
>> —
>> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>
>>
>
>


Re: Flink Stream: collect in an array all records within a window

2016-01-19 Thread Matthias J. Sax
What type is your DataStream? It must be DataStream[String] to work with
SimpleStringSchema.

If you have a different type, just implement a customized
SerializationSchema.

-Matthias


On 01/19/2016 11:26 AM, Saiph Kappa wrote:
> When I use SimpleStringSchema I get the error: Type mismatch, expected:
> SerializationSchema[String, Array[Byte]], actual: SimpleStringSchema. I
> think SimpleStringSchema extends SerializationSchema[String], and
> therefore cannot be used as argument of writeToSocket. Can you confirm
> this please?
> 
> s.writeToSocket(host, port.toInt, new SimpleStringSchema())
> 
> 
> Thanks.
> 
> On Tue, Jan 19, 2016 at 10:34 AM, Matthias J. Sax  > wrote:
> 
> There is SimpleStringSchema.
> 
> -Matthias
> 
> On 01/18/2016 11:21 PM, Saiph Kappa wrote:
> > Hi Matthias,
> >
> > Thanks for your response. The method .writeToSocket seems to be what I
> > was looking for. Can you tell me what kind of serialization schema
> > should I use assuming my socket server receives strings. I have
> > something like this in scala:
> >
> > |val server =newServerSocket()while(true){val s =server.accept()val
> > 
> in=newBufferedSource(s.getInputStream()).getLines()println(in.next())s.close()}
> >
> > |
> >
> > Thanks|
> > |
> >
> >
> >
> > On Mon, Jan 18, 2016 at 8:46 PM, Matthias J. Sax  
> > >> wrote:
> >
> > Hi Saiph,
> >
> > you can use AllWindowFunction via .apply(...) to get an
> .collect method:
> >
> > From:
> >   
>  
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html
> >
> > > // applying an AllWindowFunction on non-keyed window stream
> > > allWindowedStream.apply (new
> > AllWindowFunction, Integer, Window>() {
> > > public void apply (Window window,
> > > Iterable> values,
> > > Collector out) throws Exception {
> > > int sum = 0;
> > > for (value t: values) {
> > > sum += t.f1;
> > > }
> > > out.collect (new Integer(sum));
> > > }
> > > });
> >
> > If you consume all those value via an sink, the sink will run
> an the
> > cluster. You can use .writeToSocket(...) as sink:
> >   
>  
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#data-sinks
> >
> > -Matthias
> >
> >
> > On 01/18/2016 06:30 PM, Saiph Kappa wrote:
> > > Hi,
> > >
> > > After performing a windowAll() on a DataStream[String], is
> there any
> > > method to collect and return an array with all Strings
> within a window
> > > (similar to .collect in Spark).
> > >
> > > I basically want to ship all strings in a window to a remote
> server
> > > through a socket, and want to use the same socket connection
> for all
> > > strings that I send. The method .addSink iterates over all
> > records, but
> > > does the provided function runs on the flink client or on
> the server?
> > >
> > > Thanks.
> >
> >
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: Flink Stream: collect in an array all records within a window

2016-01-19 Thread Matthias J. Sax
It should work.

Your error message indicates, that your DataStream is of type
[String,Array[Byte]] and not of type [String].

> Type mismatch, expected: SerializationSchema[String, Array[Byte]], actual: 
> SimpleStringSchema

Can you maybe share your code?

-Matthias

On 01/19/2016 01:57 PM, Saiph Kappa wrote:
> It's DataStream[String]. So it seems that SimpleStringSchema cannot be
> used in writeToSocket regardless of the type of the DataStream. Right?
> 
> On Tue, Jan 19, 2016 at 1:32 PM, Matthias J. Sax  > wrote:
> 
> What type is your DataStream? It must be DataStream[String] to work with
> SimpleStringSchema.
> 
> If you have a different type, just implement a customized
> SerializationSchema.
> 
> -Matthias
> 
> 
> On 01/19/2016 11:26 AM, Saiph Kappa wrote:
> > When I use SimpleStringSchema I get the error: Type mismatch, expected:
> > SerializationSchema[String, Array[Byte]], actual: SimpleStringSchema. I
> > think SimpleStringSchema extends SerializationSchema[String], and
> > therefore cannot be used as argument of writeToSocket. Can you confirm
> > this please?
> >
> > s.writeToSocket(host, port.toInt, new SimpleStringSchema())
> >
> >
> > Thanks.
> >
> > On Tue, Jan 19, 2016 at 10:34 AM, Matthias J. Sax  
> > >> wrote:
> >
> > There is SimpleStringSchema.
> >
> > -Matthias
> >
> > On 01/18/2016 11:21 PM, Saiph Kappa wrote:
> > > Hi Matthias,
> > >
> > > Thanks for your response. The method .writeToSocket seems to be 
> what I
> > > was looking for. Can you tell me what kind of serialization schema
> > > should I use assuming my socket server receives strings. I have
> > > something like this in scala:
> > >
> > > |val server =newServerSocket()while(true){val s 
> =server.accept()val
> > > 
> in=newBufferedSource(s.getInputStream()).getLines()println(in.next())s.close()}
> > >
> > > |
> > >
> > > Thanks|
> > > |
> > >
> > >
> > >
> > > On Mon, Jan 18, 2016 at 8:46 PM, Matthias J. Sax 
>   >
> > > 
>  > >
> > > Hi Saiph,
> > >
> > > you can use AllWindowFunction via .apply(...) to get an
> > .collect method:
> > >
> > > From:
> > >
> > 
> 
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html
> > >
> > > > // applying an AllWindowFunction on non-keyed window
> stream
> > > > allWindowedStream.apply (new
> > > AllWindowFunction, Integer,
> Window>() {
> > > > public void apply (Window window,
> > > > Iterable> values,
> > > > Collector out) throws Exception {
> > > > int sum = 0;
> > > > for (value t: values) {
> > > > sum += t.f1;
> > > > }
> > > > out.collect (new Integer(sum));
> > > > }
> > > > });
> > >
> > > If you consume all those value via an sink, the sink
> will run
> > an the
> > > cluster. You can use .writeToSocket(...) as sink:
> > >
> > 
> 
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#data-sinks
> > >
> > > -Matthias
> > >
> > >
> > > On 01/18/2016 06:30 PM, Saiph Kappa wrote:
> > > > Hi,
> > > >
> > > > After performing a windowAll() on a DataStream[String], is
> > there any
> > > > method to collect and return an array with all Strings
> > within a window
> > > > (similar to .collect in Spark).
> > > >
> > > > I basically want to ship all strings in a window to a
> remote
> > server
> > > > through a socket, and want to use the same socket
> connection
> > for all
> > > > strings that I send. The method .addSink iterates over all
> > > records, but
> > > > does the provided function runs on the flink client or on
> > the server?
> > > >
> > > > Thanks.
> > >
> > >
> >
> >
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: Discarded messages on filter and checkpoint ack

2016-01-19 Thread Stephan Ewen
Hi!

There are no acks for individual messages in Flink. All messages that an
operator receives between two checkpoint barriers fail or succeed together.

That's why dropped messages in filters need no dedicated acks. If the next
checkpoint barrier passes through the whole topology, the set of messages
from before that checkpoint is "acked" as a whole.

Greetings,
Stephan


On Tue, Jan 19, 2016 at 1:42 PM, Don Frascuchon 
wrote:

> Hello,
>
> Working with flink checkpointing i see the messages discarded by a filter
> function are no acked for the checkpoint ack.
>
> How can i auto confirm those discarded messages?
> The ack notification is trigged by a sink ?
>
> Thanks in advance!
>
>


Discarded messages on filter and checkpoint ack

2016-01-19 Thread Don Frascuchon
Hello,

Working with flink checkpointing i see the messages discarded by a filter
function are no acked for the checkpoint ack.

How can i auto confirm those discarded messages?
The ack notification is trigged by a sink ?

Thanks in advance!


Re: Discarded messages on filter and checkpoint ack

2016-01-19 Thread Don Frascuchon
Yes...

In fact, my function filter was wrong. It's working well now

Thanks for your response Stephan!

El mar., 19 ene. 2016 a las 14:25, Stephan Ewen ()
escribió:

> Hi!
>
> There are no acks for individual messages in Flink. All messages that an
> operator receives between two checkpoint barriers fail or succeed together.
>
> That's why dropped messages in filters need no dedicated acks. If the next
> checkpoint barrier passes through the whole topology, the set of messages
> from before that checkpoint is "acked" as a whole.
>
> Greetings,
> Stephan
>
>
> On Tue, Jan 19, 2016 at 1:42 PM, Don Frascuchon 
> wrote:
>
>> Hello,
>>
>> Working with flink checkpointing i see the messages discarded by a filter
>> function are no acked for the checkpoint ack.
>>
>> How can i auto confirm those discarded messages?
>> The ack notification is trigged by a sink ?
>>
>> Thanks in advance!
>>
>>
>


Re: Flink Stream: collect in an array all records within a window

2016-01-19 Thread Matthias J. Sax
Seems you are right. It works on the current 1.0-Snapshot version which
has a different signature...

> def writeToSocket(
>   hostname: String,
>   port: Integer,
>   schema: SerializationSchema[T]): DataStreamSink[T] = {
> javaStream.writeToSocket(hostname, port, schema)
>   }

instead of 0.10.1:

> def writeToSocket(
>   hostname: String,
>   port: Integer,
>   schema: SerializationSchema[T, Array[Byte]]): DataStreamSink[T] = {
> javaStream.writeToSocket(hostname, port, schema)
>   }

I guess, you can still implement your own SerializationSchema for 0.10.1
to make it work.


-Matthias


On 01/19/2016 04:27 PM, Saiph Kappa wrote:
> I think this is a bug in the scala API.
> 
> def writeToSocket(hostname : scala.Predef.String, port : java.lang.Integer, 
> schema : org.apache.flink.streaming.util.serialization.SerializationSchema[T, 
> scala.Array[scala.Byte]]) : 
> org.apache.flink.streaming.api.datastream.DataStreamSink[T] = { /* compiled 
> code */ }
> 
> 
> 
> On Tue, Jan 19, 2016 at 2:16 PM, Matthias J. Sax  > wrote:
> 
> It should work.
> 
> Your error message indicates, that your DataStream is of type
> [String,Array[Byte]] and not of type [String].
> 
> > Type mismatch, expected: SerializationSchema[String, Array[Byte]], 
> actual: SimpleStringSchema
> 
> Can you maybe share your code?
> 
> -Matthias
> 
> On 01/19/2016 01:57 PM, Saiph Kappa wrote:
> > It's DataStream[String]. So it seems that SimpleStringSchema cannot be
> > used in writeToSocket regardless of the type of the DataStream. Right?
> >
> > On Tue, Jan 19, 2016 at 1:32 PM, Matthias J. Sax  
> > >> wrote:
> >
> > What type is your DataStream? It must be DataStream[String] to work 
> with
> > SimpleStringSchema.
> >
> > If you have a different type, just implement a customized
> > SerializationSchema.
> >
> > -Matthias
> >
> >
> > On 01/19/2016 11:26 AM, Saiph Kappa wrote:
> > > When I use SimpleStringSchema I get the error: Type mismatch, 
> expected:
> > > SerializationSchema[String, Array[Byte]], actual: 
> SimpleStringSchema. I
> > > think SimpleStringSchema extends SerializationSchema[String], and
> > > therefore cannot be used as argument of writeToSocket. Can you 
> confirm
> > > this please?
> > >
> > > s.writeToSocket(host, port.toInt, new SimpleStringSchema())
> > >
> > >
> > > Thanks.
> > >
> > > On Tue, Jan 19, 2016 at 10:34 AM, Matthias J. Sax 
>   >
> > > 
>  > >
> > > There is SimpleStringSchema.
> > >
> > > -Matthias
> > >
> > > On 01/18/2016 11:21 PM, Saiph Kappa wrote:
> > > > Hi Matthias,
> > > >
> > > > Thanks for your response. The method .writeToSocket
> seems to be what I
> > > > was looking for. Can you tell me what kind of
> serialization schema
> > > > should I use assuming my socket server receives
> strings. I have
> > > > something like this in scala:
> > > >
> > > > |val server =newServerSocket()while(true){val s
> =server.accept()val
> > > >
> 
> in=newBufferedSource(s.getInputStream()).getLines()println(in.next())s.close()}
> > > >
> > > > |
> > > >
> > > > Thanks|
> > > > |
> > > >
> > > >
> > > >
> > > > On Mon, Jan 18, 2016 at 8:46 PM, Matthias J. Sax
>   >  
> > >>
> > > > 
> >
> > 
>  wrote:
> > > >
> > > > Hi Saiph,
> > > >
> > > > you can use AllWindowFunction via .apply(...) to
> get an
> > > .collect method:
> > > >
> > > > From:
> > > >
> > >
> >   
>  
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html
> > > >
> > > > > // applying an AllWindowFunction on non-keyed window
> >  

JDBCInputFormat GC overhead limit exceeded error

2016-01-19 Thread Maximilian Bode
Hi everyone,

I am facing a problem using the JDBCInputFormat which occurred in a larger 
Flink job. As a minimal example I can reproduce it when just writing data into 
a csv after having read it from a database, i.e.

DataSet existingData = env.createInput(
JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("oracle.jdbc.driver.OracleDriver")
.setUsername(…)
.setPassword(…)
.setDBUrl(…)
.setQuery("select DATA from TABLENAME")
.finish(),
new TupleTypeInfo<>(Tuple1.class, BasicTypeInfo.STRING_TYPE_INFO));
existingData.writeAsCsv(…);

where DATA is a column containing strings of length ~25 characters and 
TABLENAME contains 20 million rows.

After starting the job on a YARN cluster (using -tm 3072 and leaving the other 
memory settings at default values), Flink happily goes along at first but then 
fails after something like three million records have been sent by the 
JDBCInputFormat. The Exception reads "The slot in which the task was executed 
has been released. Probably loss of TaskManager …". The local taskmanager.log 
in the affected container reads
"java.lang.OutOfMemoryError: GC overhead limit exceeded
at 
java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1063)
at 
org.jboss.netty.channel.socket.nio.NioClientBoss.processConnectTimeout(NioClientBoss.java:119)
at 
org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:83)
at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
at 
org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)"

Any ideas what is going wrong here?

Cheers,
Max

— 
Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082



Re: Flink Stream: How to ship results through socket server

2016-01-19 Thread Matthias J. Sax
Your "SocketWriter-Thread" code will run on your client. All code in
"main" runs on the client.

execute() itself runs on the client, too. Of course, it triggers the job
submission to the cluster. In this step, the assembled job from the
previous calls is translated into the JobGraph which is submitted to the
JobManager for execution.

You should start your SocketWriter-Thread manually on the cluster, ie,
if you use "localhost" in "env.socketTextStream", it must be the
TaskManager machine that executes this SocketStream-source task.

I guess, it would be better not to use "localhost", but start your
SocketWriter-Thread on a dedicated machine in the cluster, and connect
your SocketStream-source to this machine via its host name.

-Matthias



On 01/19/2016 03:57 PM, Saiph Kappa wrote:
> Hi,
> 
> This is a simple example that I found using Flink Stream. I changed it
> so the flink client can be executed on a remote cluster, and so that it
> can open a socket server to ship its results for any other consumer
> machine. It seems to me that the socket server is not being open in the
> remote cluster, but rather in my local machine (which I'm using to
> launch the app). How can I achieve that? I want to be able to ship
> results directly from the remote cluster, and through a socket server
> where clients can use as a tap.
> 
> Sorry about indentation:
> 
> |def main(args: Array[String]) { |
> 
> val env =
> StreamExecutionEnvironment.createRemoteEnvironment("myhostname",
> DefaultFlinkMasterPort,
> 
> ||"myapp-assembly-0.1-SNAPSHOT.jar"); | //Read from a socket stream at
> map it to StockPrice objects val socketStockStream =
> env.socketTextStream("localhost", ).map(x => { val split =
> x.split(",") StockPrice(split(0), split(1).toDouble) }) //Generate other
> stock streams val SPX_Stream = env.addSource(generateStock("SPX")(10) _)
> val FTSE_Stream = env.addSource(generateStock("FTSE")(20) _) val
> DJI_Stream = env.addSource(generateStock("DJI")(30) _) val BUX_Stream =
> env.addSource(generateStock("BUX")(40) _) //Merge all stock streams
> together val stockStream = socketStockStream.merge(SPX_Stream,
> FTSE_Stream, DJI_Stream, BUX_Stream) stockStream.print()
> |
> 
> // WHERE IS THE FOLLOWING CODE RUN?
> 
> |var out: PrintWriter = null
> new Thread {
> override def run(): Unit = {
> val serverSocket = new ServerSocket(12345)
> while (true) {
> val socket = serverSocket.accept()
> val hostname = socket.getInetAddress.getHostName.split('.').head
> println(s"Got a new connection from $hostname")
> out = new PrintWriter(socket.getOutputStream)
> }
> }
> }.start()
> 
> |||stockStream|.addSink(record => {
> if(out != null) {
> out.write(record)
> out.flush()
> }
> })
> 
> env.execute("Stock stream") }|
> 
> Thanks.



signature.asc
Description: OpenPGP digital signature


Re: JDBCInputFormat GC overhead limit exceeded error

2016-01-19 Thread Maximilian Bode
Hi Robert,

I am using 0.10.1.

> Am 19.01.2016 um 17:42 schrieb Robert Metzger :
> 
> Hi Max,
> 
> which version of Flink are you using?
> 
> On Tue, Jan 19, 2016 at 5:35 PM, Maximilian Bode  > wrote:
> Hi everyone,
> 
> I am facing a problem using the JDBCInputFormat which occurred in a larger 
> Flink job. As a minimal example I can reproduce it when just writing data 
> into a csv after having read it from a database, i.e.
> 
> DataSet existingData = env.createInput(
>   JDBCInputFormat.buildJDBCInputFormat()
>   .setDrivername("oracle.jdbc.driver.OracleDriver")
>   .setUsername(…)
>   .setPassword(…)
>   .setDBUrl(…)
>   .setQuery("select DATA from TABLENAME")
>   .finish(),
>   new TupleTypeInfo<>(Tuple1.class, BasicTypeInfo.STRING_TYPE_INFO));
> existingData.writeAsCsv(…);
> 
> where DATA is a column containing strings of length ~25 characters and 
> TABLENAME contains 20 million rows.
> 
> After starting the job on a YARN cluster (using -tm 3072 and leaving the 
> other memory settings at default values), Flink happily goes along at first 
> but then fails after something like three million records have been sent by 
> the JDBCInputFormat. The Exception reads "The slot in which the task was 
> executed has been released. Probably loss of TaskManager …". The local 
> taskmanager.log in the affected container reads
> "java.lang.OutOfMemoryError: GC overhead limit exceeded
> at 
> java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1063)
> at 
> org.jboss.netty.channel.socket.nio.NioClientBoss.processConnectTimeout(NioClientBoss.java:119)
> at 
> org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:83)
> at 
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
> at 
> org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)"
> 
> Any ideas what is going wrong here?
> 
> Cheers,
> Max
> 
> —
> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com 
> 
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> 
> 



signature.asc
Description: Message signed with OpenPGP using GPGMail


DataSet in Streaming application under Flink

2016-01-19 Thread Sylvain Hotte

Hi,
I want to know if it is possible to load a small dataset in a stream 
application under flink.


Here's an example:
I have a data stream A and a Data Set B
I need to compare all A tuple to tuple of B.
Since B is small, it would be loaded on all node and be persistent (not 
reloaded at every computation)


I am doing a Master on realtime geospatial  operator in Big Data and I 
looking at different strategy to spatially distribute the stream base on 
application and operation characteristic.

One of them involve comparing dataset & datastream.

Regards,

Sylvain Hotte




Re: Flink Stream: How to ship results through socket server

2016-01-19 Thread Saiph Kappa
Thanks for your reply Mattias. So it is not possible to open a socket
server in the JobGraph and having it open during the lifetime of the job,
is that what you are saying? And it is required to have an external process
to open that socket server.

On Tue, Jan 19, 2016 at 5:38 PM, Matthias J. Sax  wrote:

> Your "SocketWriter-Thread" code will run on your client. All code in
> "main" runs on the client.
>
> execute() itself runs on the client, too. Of course, it triggers the job
> submission to the cluster. In this step, the assembled job from the
> previous calls is translated into the JobGraph which is submitted to the
> JobManager for execution.
>
> You should start your SocketWriter-Thread manually on the cluster, ie,
> if you use "localhost" in "env.socketTextStream", it must be the
> TaskManager machine that executes this SocketStream-source task.
>
> I guess, it would be better not to use "localhost", but start your
> SocketWriter-Thread on a dedicated machine in the cluster, and connect
> your SocketStream-source to this machine via its host name.
>
> -Matthias
>
>
>
> On 01/19/2016 03:57 PM, Saiph Kappa wrote:
> > Hi,
> >
> > This is a simple example that I found using Flink Stream. I changed it
> > so the flink client can be executed on a remote cluster, and so that it
> > can open a socket server to ship its results for any other consumer
> > machine. It seems to me that the socket server is not being open in the
> > remote cluster, but rather in my local machine (which I'm using to
> > launch the app). How can I achieve that? I want to be able to ship
> > results directly from the remote cluster, and through a socket server
> > where clients can use as a tap.
> >
> > Sorry about indentation:
> >
> > |def main(args: Array[String]) { |
> >
> > val env =
> > StreamExecutionEnvironment.createRemoteEnvironment("myhostname",
> > DefaultFlinkMasterPort,
> >
> > ||"myapp-assembly-0.1-SNAPSHOT.jar"); | //Read from a socket stream at
> > map it to StockPrice objects val socketStockStream =
> > env.socketTextStream("localhost", ).map(x => { val split =
> > x.split(",") StockPrice(split(0), split(1).toDouble) }) //Generate other
> > stock streams val SPX_Stream = env.addSource(generateStock("SPX")(10) _)
> > val FTSE_Stream = env.addSource(generateStock("FTSE")(20) _) val
> > DJI_Stream = env.addSource(generateStock("DJI")(30) _) val BUX_Stream =
> > env.addSource(generateStock("BUX")(40) _) //Merge all stock streams
> > together val stockStream = socketStockStream.merge(SPX_Stream,
> > FTSE_Stream, DJI_Stream, BUX_Stream) stockStream.print()
> > |
> >
> > // WHERE IS THE FOLLOWING CODE RUN?
> >
> > |var out: PrintWriter = null
> > new Thread {
> > override def run(): Unit = {
> > val serverSocket = new ServerSocket(12345)
> > while (true) {
> > val socket = serverSocket.accept()
> > val hostname = socket.getInetAddress.getHostName.split('.').head
> > println(s"Got a new connection from $hostname")
> > out = new PrintWriter(socket.getOutputStream)
> > }
> > }
> > }.start()
> >
> > |||stockStream|.addSink(record => {
> > if(out != null) {
> > out.write(record)
> > out.flush()
> > }
> > })
> >
> > env.execute("Stock stream") }|
> >
> > Thanks.
>
>


Re: JDBCInputFormat GC overhead limit exceeded error

2016-01-19 Thread Robert Metzger
Hi Max,

which version of Flink are you using?

On Tue, Jan 19, 2016 at 5:35 PM, Maximilian Bode <
maximilian.b...@tngtech.com> wrote:

> Hi everyone,
>
> I am facing a problem using the JDBCInputFormat which occurred in a larger
> Flink job. As a minimal example I can reproduce it when just writing data
> into a csv after having read it from a database, i.e.
>
> DataSet existingData = env.createInput(
> JDBCInputFormat.buildJDBCInputFormat()
> .setDrivername("oracle.jdbc.driver.OracleDriver")
> .setUsername(…)
> .setPassword(…)
> .setDBUrl(…)
> .setQuery("select DATA from TABLENAME")
> .finish(),
> new TupleTypeInfo<>(Tuple1.class, BasicTypeInfo.STRING_TYPE_INFO));
> existingData.writeAsCsv(…);
>
> where DATA is a column containing strings of length ~25 characters and
> TABLENAME contains 20 million rows.
>
> After starting the job on a YARN cluster (using -tm 3072 and leaving the
> other memory settings at default values), Flink happily goes along at first
> but then fails after something like three million records have been sent by
> the JDBCInputFormat. The Exception reads "The slot in which the task was
> executed has been released. Probably loss of TaskManager …". The local
> taskmanager.log in the affected container reads
> "java.lang.OutOfMemoryError: GC overhead limit exceeded
> at
> java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1063)
>
> at 
> org.jboss.netty.channel.socket.nio.NioClientBoss.processConnectTimeout(NioClientBoss.java:119)
> at
> org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:83)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
> at
> org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)"
>
> Any ideas what is going wrong here?
>
> Cheers,
> Max
>
> —
> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>


Re: DataSet in Streaming application under Flink

2016-01-19 Thread Till Rohrmann
Hi Sylvain,

what you could do for example is to load a static data set, e.g. from HDFS,
in the open method of your comparator and cache it there. The open method
is called for each task once when it is created. The comparator could then
be a RichMapFunction implementation. By making the field storing the small
data set static, you can even share the data among all tasks which run on
the same TaskManager.

Cheers,
Till
​

On Tue, Jan 19, 2016 at 5:53 PM, Sylvain Hotte 
wrote:

> Hi,
> I want to know if it is possible to load a small dataset in a stream
> application under flink.
>
> Here's an example:
> I have a data stream A and a Data Set B
> I need to compare all A tuple to tuple of B.
> Since B is small, it would be loaded on all node and be persistent (not
> reloaded at every computation)
>
> I am doing a Master on realtime geospatial  operator in Big Data and I
> looking at different strategy to spatially distribute the stream base on
> application and operation characteristic.
> One of them involve comparing dataset & datastream.
>
> Regards,
>
> Sylvain Hotte
>
>
>