Re: Flink not reading from Kafka

2017-02-17 Thread Mohit Anchlia
Interestingly enough same job runs ok on Linux but not on windows

On Fri, Feb 17, 2017 at 4:54 PM, Mohit Anchlia 
wrote:

> I have this code trying to read from a topic however the flink process
> comes up and waits forever even though there is data in the topic. Not sure
> why? Has anyone else seen this problem?
>
> StreamExecutionEnvironment env = StreamExecutionEnvironment
>
> .*createLocalEnvironment*();
>
> Properties properties = *new* Properties();
>
> properties.setProperty("bootstrap.servers", ":9092");
>
> properties.setProperty("group.id", "test1");
>
> properties.setProperty("auto.offset.reset", "earliest");
>
> FlatMapFunction> flatMapper =
> //something
>
>
>
> DataStream stream = env
>
> .addSource(*new* FlinkKafkaConsumer010<>("test", *new*
> SimpleStringSchema(), properties));
>
> stream.map(s -> Integer.*valueOf*(s)).flatMap(flatMapper).returns(
>
> *new* TypeHint>() {
>
> }).print();
>
> JobExecutionResult *res* = env.execute();
>
>
>
> 02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink:
> Unnamed(4/4) switched to RUNNING
>
>
> 02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink:
> Unnamed(1/4) switched to RUNNING
>
>
> 02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink:
> Unnamed(2/4) switched to RUNNING
>
>
>


Flink not reading from Kafka

2017-02-17 Thread Mohit Anchlia
I have this code trying to read from a topic however the flink process
comes up and waits forever even though there is data in the topic. Not sure
why? Has anyone else seen this problem?

StreamExecutionEnvironment env = StreamExecutionEnvironment

.*createLocalEnvironment*();

Properties properties = *new* Properties();

properties.setProperty("bootstrap.servers", ":9092");

properties.setProperty("group.id", "test1");

properties.setProperty("auto.offset.reset", "earliest");

FlatMapFunction> flatMapper = //something



DataStream stream = env

.addSource(*new* FlinkKafkaConsumer010<>("test", *new*
SimpleStringSchema(), properties));

stream.map(s -> Integer.*valueOf*(s)).flatMap(flatMapper).returns(

*new* TypeHint>() {

}).print();

JobExecutionResult *res* = env.execute();



02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink:
Unnamed(4/4) switched to RUNNING


02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink:
Unnamed(1/4) switched to RUNNING


02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink:
Unnamed(2/4) switched to RUNNING


Re: Aggregation problem.

2017-02-17 Thread Fabian Hueske
Hi,

this looks like a bug to me.
Can you open a JIRA and maybe a small testcase to reproduce the issue?

Thank you,
Fabian

2017-02-18 1:06 GMT+01:00 Kürşat Kurt :

> Hi;
>
>
>
> I have a Dataset like this:
>
>
>
> *(**0,Auto,0.4,1,5.8317538999854194E-5)*
>
> *(0,Computer,0.2,1,4.8828125E-5)*
>
> *(0,Sports,0.4,2,1.7495261699956258E-4)*
>
> *(1,Auto,0.4,1,1.7495261699956258E-4)*
>
> *(1,Computer,0.2,1,4.8828125E-5)*
>
> *(1,Sports,0.4,1,5.8317538999854194E-5)*
>
>
>
> This code; *ds.groupBy(0).max(4).print() *prints :
>
>
>
> *(0,Sports,0.4,1,1.7495261699956258E-4)*
>
> *(1,Sports,0.4,1,1.7495261699956258E-4)*
>
>
>
> ..but i am expecting
>
>
>
> *(0,Sports,0.4,2,1.7495261699956258E-4)*
>
> *(1,Auto,0.4,1,1.7495261699956258E-4)*
>
>
>
> What is wrong with this code?
>


Aggregation problem.

2017-02-17 Thread Kürşat Kurt
Hi;

 

I have a Dataset like this:

 

(0,Auto,0.4,1,5.8317538999854194E-5)

(0,Computer,0.2,1,4.8828125E-5)

(0,Sports,0.4,2,1.7495261699956258E-4)

(1,Auto,0.4,1,1.7495261699956258E-4)

(1,Computer,0.2,1,4.8828125E-5)

(1,Sports,0.4,1,5.8317538999854194E-5)

 

This code; ds.groupBy(0).max(4).print() prints :

 

(0,Sports,0.4,1,1.7495261699956258E-4)

(1,Sports,0.4,1,1.7495261699956258E-4)

 

..but i am expecting

 

(0,Sports,0.4,2,1.7495261699956258E-4)

(1,Auto,0.4,1,1.7495261699956258E-4)

 

What is wrong with this code?



Re: Performance tuning

2017-02-17 Thread Dmitry Golubets
Hi Daniel,

I've implemented a macro that generates message pack serializers in our
codebase.
Resulting code is basically a series of writes\reads like in hand-written
structured serialization.

E.g. given
case class Data1(str: String, subdata: Data2)
case class Data2(num: Int)

serialization code for Data1 will be like:
packer.packString(str)
packer.packInt(num)

The data structures in our project are quite big (2-4kb in json) and
contain nested classes with many fields.
So custom serialization helps us to avoid reflection and reduces data size
to send over the network.

However, it worth mentioning, I see that on small case classes Flink
default serialization works faster.


Best regards,
Dmitry

On Fri, Feb 17, 2017 at 6:01 PM, Daniel Santos 
wrote:

> Hello Dimitry,
>
> Could you please elaborate on your tuning on -> 
> environment.addDefaultKryoSerializer(..)
> .
>
> I'm interested on knowing what have you done there for a boost of about
> 50% .
>
> Some small or simple example would be very nice.
>
> Thank you very much in advance.
>
> Kind Regards,
>
> Daniel Santos
>
> On 02/17/2017 12:43 PM, Dmitry Golubets wrote:
>
> Hi,
>
> My streaming job cannot benefit much from parallelization unfortunately.
> So I'm looking for things I can tune in Flink, to make it process
> sequential stream faster.
>
> So far in our current engine based on Akka Streams (non distributed ofc)
> we have 20k msg/sec.
> Ported to Flink I'm getting 14k so far.
>
> My observations are following:
>
>- if I chain operations together they execute all in sequence, so I
>basically sum up the time required to process one data item across all my
>stream operators, not good
>- if I split chains, they execute asynchronously to each other, but
>there is serialization and network overhead
>
> Second approach gives me better results, considering that I have a server
> with more than enough memory and cores to do all side work for
> serialization. But I want to reduce this serialization\data transfer
> overhead to a minimum.
>
> So what I have now:
>
> environment.getConfig.enableObjectReuse() // cos it's Scala we don't need
> unnecessary serialization
> environment.getConfig.disableAutoTypeRegistration() // it works faster
> with it, I'm not sure why
> environment.addDefaultKryoSerializer(..) // custom Message Pack
> serialization for all message types, gives about 50% boost
>
> But that's it, I don't know what else to do.
> I didn't find any interesting network\buffer settings in docs.
>
> Best regards,
> Dmitry
>
>
>


Re: How important is 'registerType'?

2017-02-17 Thread Dmitry Golubets
Hi Till,

It happened during deserialization of a savepoint.

Best regards,
Dmitry

On Fri, Feb 17, 2017 at 2:48 PM, Till Rohrmann  wrote:

> Hi Dmitry,
>
> curious to know when exactly you observed the IllegalStateException. Did
> it happen after resuming from a savepoint or did it already happen during
> the first run of the program? If the latter is the case, then this might
> indicate a bug where we don’t use the correct ExecutionConfig to
> instantiate the serializers.
>
> Concerning the addDefaultKryoSerializer method, this basically register a
> serializer for a specific type but it does not register the type with Kryo.
> Thus, it should still be beneficial to call registerType for the type for
> which you’ve registered a default serializer. But you can also call
> registerTypeWithKryoSerializer which does both for you.
>
> Cheers,
> Till
> ​
>
> On Fri, Feb 17, 2017 at 12:38 PM, Dmitry Golubets 
> wrote:
>
>> Hi,
>>
>> I was using ```cs.knownDirectSubclasses``` recursively to find and
>> register subclasses, which may have resulted in order mess.
>> Later I changed that to 
>> cs.knownDirectSubclasses.toList.sortBy(_.fullName)```
>> which should have fixed the order.
>> But either it didn't or there was another problem, I was getting the
>> error anyway.
>> Interesting, it happend only on a KeyedStream after window, without
>> window it was fine.
>> I didn't change anything else in the job.
>>
>> However I removed ```registerType``` calls completely. Because I didn't
>> notice any performance difference.
>> Do you know if ```registerType``` has any effect at all if I use it
>> together with ```addDefaultKryoSerializer``` for that type?
>>
>>
>> Best regards,
>> Dmitry
>>
>> On Thu, Feb 16, 2017 at 10:40 AM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi,
>>> are you changing anything on your job between performing the savepoint
>>> and restoring the savepoint? Flink upgrade, Job upgrade, changing Kryo
>>> version, changing order in which you register Kryo serialisers?
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On Fri, 10 Feb 2017 at 18:26 Dmitry Golubets 
>>> wrote:
>>>
 The docs say that it may improve performance.

 How true is it, when custom serializers are provided?
 There is also 'disableAutoTypeRegistration' method in the config class,
 implying Flink registers types automatically.

 So, given that I have an hierarchy:
 trait A
 class B extends A
 class C extends A

 and I do addDefaultKryoSerializer(classOf[A], classOf[ASerializer])

 should I care about registering B and C with 'registerType' method?

 It worth mentioning that when I registered my message class
 hierarchies, I got:
 java.lang.IllegalStateException: Could not initialize keyed state
 backend.
 java.io.StreamCorruptedException: invalid type code: 00
 on restoring from savepoint

 After some debugging I found that 'registerType' was the cause.
 It might be possible that my code called registerType in different
 order.
 Could it be a problem?

 Best regards,
 Dmitry

>>>
>>
>


blob store defaults to /tmp and files get deleted

2017-02-17 Thread Shannon Carey
A few of my jobs recently failed and showed this exception:


org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user 
class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
ClassLoader info: URL ClassLoader:
file: 
'/tmp/blobStore-5f023409-6af5-4de6-8ed0-e80a2eb9633e/cache/blob_d9a9fb884f3b436030afcf7b8e1bce678acceaf2'
 (invalid JAR: zip file is empty)
Class not resolvable through given classloader.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:208)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
at java.lang.Thread.run(Thread.java:745)


As you can see, Flink is storing things underneath /tmp, which is the 
(undocumented) default for the blob store. As you may know, on Linux, there's 
typically a program such as tmpwatch which is run periodically to clear out 
data from /tmp.


Flink also uses /tmp as the default for jobmanager.web.tmpdir (and 
jobmanager.web.upload.dir in 1.2).


Therefore, assuming that this is indeed the cause of the job failure/the 
exception, it seems highly advisable that when you run a Flink cluster you 
configure blob.storage.directory and jobmanager.web.tmpdir to a specific folder 
that is not beneath /tmp. I don't know if there is any material about setting 
up a production cluster, but this would definitely seem to be a necessary 
configuration to specify if you want to avoid problems. Enabling High 
Availability mode should also be on that list, I think.


-Shannon


Re: Performance tuning

2017-02-17 Thread Daniel Santos

Hello Dimitry,

Could you please elaborate on your tuning on -> 
environment.addDefaultKryoSerializer(..) .


I'm interested on knowing what have you done there for a boost of about 
50% .


Some small or simple example would be very nice.

Thank you very much in advance.

Kind Regards,

Daniel Santos


On 02/17/2017 12:43 PM, Dmitry Golubets wrote:

Hi,

My streaming job cannot benefit much from parallelization unfortunately.
So I'm looking for things I can tune in Flink, to make it process 
sequential stream faster.


So far in our current engine based on Akka Streams (non distributed 
ofc) we have 20k msg/sec.

Ported to Flink I'm getting 14k so far.

My observations are following:

  * if I chain operations together they execute all in sequence, so I
basically sum up the time required to process one data item across
all my stream operators, not good
  * if I split chains, they execute asynchronously to each other, but
there is serialization and network overhead

Second approach gives me better results, considering that I have a 
server with more than enough memory and cores to do all side work for 
serialization. But I want to reduce this serialization\data transfer 
overhead to a minimum.


So what I have now:

environment.getConfig.enableObjectReuse() // cos it's Scala we don't 
need unnecessary serialization
environment.getConfig.disableAutoTypeRegistration() // it works faster 
with it, I'm not sure why
environment.addDefaultKryoSerializer(..) // custom Message Pack 
serialization for all message types, gives about 50% boost


But that's it, I don't know what else to do.
I didn't find any interesting network\buffer settings in docs.

Best regards,
Dmitry




Re: Performance tuning

2017-02-17 Thread Shannon Carey
One network setting is mentioned here:

https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/datastream_api.html#controlling-latency


From: Dmitry Golubets >
Date: Friday, February 17, 2017 at 6:43 AM
To: >
Subject: Performance tuning

Hi,

My streaming job cannot benefit much from parallelization unfortunately.
So I'm looking for things I can tune in Flink, to make it process sequential 
stream faster.

So far in our current engine based on Akka Streams (non distributed ofc) we 
have 20k msg/sec.
Ported to Flink I'm getting 14k so far.

My observations are following:

  *   if I chain operations together they execute all in sequence, so I 
basically sum up the time required to process one data item across all my 
stream operators, not good
  *   if I split chains, they execute asynchronously to each other, but there 
is serialization and network overhead

Second approach gives me better results, considering that I have a server with 
more than enough memory and cores to do all side work for serialization. But I 
want to reduce this serialization\data transfer overhead to a minimum.

So what I have now:

environment.getConfig.enableObjectReuse() // cos it's Scala we don't need 
unnecessary serialization
environment.getConfig.disableAutoTypeRegistration() // it works faster with it, 
I'm not sure why
environment.addDefaultKryoSerializer(..) // custom Message Pack serialization 
for all message types, gives about 50% boost

But that's it, I don't know what else to do.
I didn't find any interesting network\buffer settings in docs.

Best regards,
Dmitry


Re: Cartesian product over windows

2017-02-17 Thread Sonex
Hi Till,

when you say parallel windows, what do you mean? Do you mean the use of
timeWindowAll which has all the elements of a window in a single task?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cartesian-product-over-windows-tp11676p11716.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: How important is 'registerType'?

2017-02-17 Thread Till Rohrmann
Hi Dmitry,

curious to know when exactly you observed the IllegalStateException. Did it
happen after resuming from a savepoint or did it already happen during the
first run of the program? If the latter is the case, then this might
indicate a bug where we don’t use the correct ExecutionConfig to
instantiate the serializers.

Concerning the addDefaultKryoSerializer method, this basically register a
serializer for a specific type but it does not register the type with Kryo.
Thus, it should still be beneficial to call registerType for the type for
which you’ve registered a default serializer. But you can also call
registerTypeWithKryoSerializer which does both for you.

Cheers,
Till
​

On Fri, Feb 17, 2017 at 12:38 PM, Dmitry Golubets 
wrote:

> Hi,
>
> I was using ```cs.knownDirectSubclasses``` recursively to find and
> register subclasses, which may have resulted in order mess.
> Later I changed that to 
> cs.knownDirectSubclasses.toList.sortBy(_.fullName)```
> which should have fixed the order.
> But either it didn't or there was another problem, I was getting the error
> anyway.
> Interesting, it happend only on a KeyedStream after window, without window
> it was fine.
> I didn't change anything else in the job.
>
> However I removed ```registerType``` calls completely. Because I didn't
> notice any performance difference.
> Do you know if ```registerType``` has any effect at all if I use it
> together with ```addDefaultKryoSerializer``` for that type?
>
>
> Best regards,
> Dmitry
>
> On Thu, Feb 16, 2017 at 10:40 AM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>> are you changing anything on your job between performing the savepoint
>> and restoring the savepoint? Flink upgrade, Job upgrade, changing Kryo
>> version, changing order in which you register Kryo serialisers?
>>
>> Best,
>> Aljoscha
>>
>> On Fri, 10 Feb 2017 at 18:26 Dmitry Golubets  wrote:
>>
>>> The docs say that it may improve performance.
>>>
>>> How true is it, when custom serializers are provided?
>>> There is also 'disableAutoTypeRegistration' method in the config class,
>>> implying Flink registers types automatically.
>>>
>>> So, given that I have an hierarchy:
>>> trait A
>>> class B extends A
>>> class C extends A
>>>
>>> and I do addDefaultKryoSerializer(classOf[A], classOf[ASerializer])
>>>
>>> should I care about registering B and C with 'registerType' method?
>>>
>>> It worth mentioning that when I registered my message class hierarchies,
>>> I got:
>>> java.lang.IllegalStateException: Could not initialize keyed state
>>> backend.
>>> java.io.StreamCorruptedException: invalid type code: 00
>>> on restoring from savepoint
>>>
>>> After some debugging I found that 'registerType' was the cause.
>>> It might be possible that my code called registerType in different
>>> order.
>>> Could it be a problem?
>>>
>>> Best regards,
>>> Dmitry
>>>
>>
>


Re: Flink batch processing fault tolerance

2017-02-17 Thread Aljoscha Krettek
@Anton, these are the Ideas I was mentioning and I'm afraid I have nothing
more to add. (In the FLIP)

On Fri, 17 Feb 2017 at 06:26 wangzhijiang999 
wrote:

> yes, it is really a critical problem for large batch job because the
> unexpected failure is a common case.
> And we are already focusing on realizing the ideas mentioned in FLIP1,
> wish to contirbute to flink in months.
>
> Best,
>
> Zhijiang
>
> --
> 发件人:Si-li Liu 
> 发送时间:2017年2月17日(星期五) 11:22
> 收件人:user 
> 主 题:Re: Flink batch processing fault tolerance
>
> Hi,
>
> It's the reason why I gave up use Flink for my current project and pick up
> traditional Hadoop Framework again.
>
> 2017-02-17 10:56 GMT+08:00 Renjie Liu :
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures
> This FLIP may help.
>
> On Thu, Feb 16, 2017 at 7:34 PM Anton Solovev 
> wrote:
>
> Hi Aljoscha,
>
> Could you share your plans of resolving it?
>
>
>
> Best,
>
> Anton
>
>
>
>
>
> *From:* Aljoscha Krettek [mailto:aljos...@apache.org]
> *Sent:* Thursday, February 16, 2017 2:48 PM
> *To:* user@flink.apache.org
> *Subject:* Re: Flink batch processing fault tolerance
>
>
>
> Hi,
>
> yes, this is indeed true. We had some plans for how to resolve this but
> they never materialised because of the focus on Stream Processing. We might
> unite the two in the future and then you will get fault-tolerant
> batch/stream processing in the same API.
>
>
>
> Best,
>
> Aljoscha
>
>
>
> On Wed, 15 Feb 2017 at 09:28 Renjie Liu  wrote:
>
> Hi, all:
> I'm learning flink's doc and curious about the fault tolerance of batch
> process jobs. It seems that when one of task execution fails, the whole job
> will be restarted, is it true? If so, isn't it impractical to deploy large
> flink batch jobs?
>
> --
>
> Liu, Renjie
>
> Software Engineer, MVAD
> --
> Liu, Renjie
> Software Engineer, MVAD
>
> --
> Best regards
>
> Sili Liu
>
>


Re: Clarification: use of AllWindowedStream.apply() function

2017-02-17 Thread Aljoscha Krettek
Yes, you're correct. :-)

On Thu, 16 Feb 2017 at 14:24 nsengupta  wrote:

> Thanks, Aljoscha for the clarification.
>
> I understand that instead of using a flatMap() in the way I am using, I am
> better off using :
> * a fold (init, fold_func, window_func) first and then
> * map to a different type of my choice, inside the window_func,
> parameterised above
>
> I hope I am correct. If so, you don't need to spend time to comment;
> ☺otherwise, please give a hint.
>
> -- Nirmalya
>
> -
>
> On Thu, Feb 16, 2017 at 4:12 PM, Aljoscha Krettek [via Apache Flink User
> Mailing List archive.] <[hidden email]
> > wrote:
>
> Hi,
> you would indeed use apply(), or better fold(,
> , ) to map the result of folding your
> window to some other data type. If you will, a WindowFunction allows
> "mapping" the result of your windowing to a different type.
>
> Best,
> Aljoscha
>
> On Wed, 15 Feb 2017 at 06:14 nsengupta <[hidden email]
> > wrote:
>
> I have gone through this  post
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WindowedStream-operation-questions-td6006.html
> >
> , where Aljoscha explains that /mapping/ on WindowedStream is /not/
> allowed.
>
> So, I think I haven't asked the question properly. Here is (hopefully) a
> better and easier version:
>
> 1.I begin with records of type RawMITSIMTuple.
> 2.When I group them using a Window, I get an
> AllWindowedStream[RawMITSIMTuple].
> 3.I /fold/ the tuples obtained in the Window, which gives me a
> DataStream[Vector[RawMITSIMTuple].
> 4.What I need is a DataStream[PositionReport]. So, I need to flatMap
> the
> output of previous step, where I first get hold of each of the
> RawMITSIMTuple and map that to PositionReport.
>
> val positionReportStream = this
>   .readRawMITSIMTuplesInjected(envDefault,args(0))
>   .assignAscendingTimestamps(e => e.timeOfReport)
>   .windowAll(TumblingEventTimeWindows.of(Time.seconds(30)))
>   .fold(Vector[RawMITSIMTuple]())((collectorBin,rawRecord) => {
>   collectorBin :+ rawRecord)
> })
>   .flatMap(r => r.map(e => this.preparePositionReport(e)))
>
> This gives me what I want, but I feel this is verbose and inefficient. Am I
> thinking correctly? If so, what is a better idiom to use in such cases?
>
> -- Nirmalya
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Clarification-use-of-AllWindowedStream-apply-function-tp11627p11630.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Clarification-use-of-AllWindowedStream-apply-function-tp11627p11665.html
> To unsubscribe from Clarification: use of AllWindowedStream.apply()
> function, click here.
> NAML
> 
>
>
>
>
> --
> Software Technologist
> http://www.linkedin.com/in/nirmalyasengupta
> "If you have built castles in the air, your work need not be lost. That is
> where they should be.
> Now put the foundation under them."
>
> --
> View this message in context: Re: Clarification: use of
> AllWindowedStream.apply() function
> 
> Sent from the Apache Flink User Mailing List archive. mailing list archive
>  at
> Nabble.com.
>


Re: Reliable Distributed FS support (HCFS)

2017-02-17 Thread Aljoscha Krettek
Hi,
I think atomic rename is not part of the requirements.

I'll add +Stephan who recently wrote this document in case he has any
additional input.

Cheers,
Aljoscha

On Thu, 16 Feb 2017 at 23:28 Vijay Srinivasaraghavan 
wrote:

> Following up on my question regarding backed Filesystem (HCFS)
> requirements. Appreciate any inputs.
>
> ---
> Regarding the Filesystem abstraction support, we are planning to use a
> distributed file system which complies with Hadoop Compatible File System
> (HCFS) standard in place of standard HDFS.
>
> According to the documentation (
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/filesystems.html),
> persistence gurantees is listed as one of the main requirement and to be
> precises it qualifies both visibility and durability gurantees.
>
> My question is,
>
> 1) Are we expecting the file system to support "Atomic Rename"
> characteristics? I believe checkpoint mechanism involves in renaming the
> files and will that have an impact if "atomic rename" is not guranteed by
> the underlying file system?
>
> 2) How does one certify Flink with HCFS (in place of standard HDFS) in
> terms of the scenarios/usecase that needs to be tested? Is there any
> general guidance on this?
> ---
>
> Regards
> Vijay
>
>
> On Wednesday, February 15, 2017 11:28 AM, Vijay Srinivasaraghavan <
> vijikar...@yahoo.com> wrote:
>
>
> Hello,
>
> Regarding the Filesystem abstraction support, we are planning to use a
> distributed file system which complies with Hadoop Compatible File System
> (HCFS) standard in place of standard HDFS.
>
> According to the documentation (
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/filesystems.html),
> persistence gurantees is listed as one of the main requirement and to be
> precises it qualifies both visibility and durability gurantees.
>
> My question is,
>
> 1) Are we expecting the file system to support "Atomic Rename"
> characteristics? I believe checkpoint mechanism involves in renaming the
> files and will that have an impact if "atomic rename" is not guranteed by
> the underlying file system?
>
> 2) How does one certify Flink with HCFS (in place of standard HDFS) in
> terms of the scenarios/usecase that needs to be tested? Is there any
> general guidance on this?
>
> Thanks
> Vijay
>
>
>


Re: Can't run flink on yarn on version 1.2.0

2017-02-17 Thread Bruno Aranda
Hi Howard,

We run Flink 1.2 in Yarn without issues. Sorry I don't have any specific
solution, but are you sure you don't have some sort of Flink mix? In your
logs I can see:

The configuration directory ('/home/software/flink-1.1.4/conf') contains
both LOG4J and Logback configuration files. Please delete or rename one of
them.

Where it mentions 1.1.4 in the folder for the conf dir instead of 1.2.

Cheers,

Bruno

On Fri, 17 Feb 2017 at 08:50 Howard,Li(vip.com) 
wrote:

> Hi,
>
>  I’m trying to run flink on yarn by using command: bin/flink run
> -m yarn-cluster -yn 2 -ys 4 ./examples/batch/WordCount.jar
>
>  But I got the following error:
>
>
>
> 2017-02-17 15:52:40,746 INFO
> org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for
> the flink jar passed. Using the location of class
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
>
> 2017-02-17 15:52:40,746 INFO
> org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for
> the flink jar passed. Using the location of class
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
>
> 2017-02-17 15:52:40,775 INFO
> org.apache.flink.yarn.YarnClusterDescriptor   - Using
> values:
>
> 2017-02-17 15:52:40,775 INFO
> org.apache.flink.yarn.YarnClusterDescriptor   -
> TaskManager count = 2
>
> 2017-02-17 15:52:40,775 INFO
> org.apache.flink.yarn.YarnClusterDescriptor   -
> JobManager memory = 1024
>
> 2017-02-17 15:52:40,775 INFO
> org.apache.flink.yarn.YarnClusterDescriptor   -
> TaskManager memory = 1024
>
> 2017-02-17 15:52:40,796 INFO
> org.apache.hadoop.yarn.client.RMProxy - Connecting
> to ResourceManager at /0.0.0.0:8032
>
> 2017-02-17 15:52:41,680 WARN
> org.apache.flink.yarn.YarnClusterDescriptor   - The
> configuration directory ('/home/software/flink-1.1.4/conf') contains both
> LOG4J and Logback configuration files. Please delete or rename one of them.
>
> 2017-02-17 15:52:41,702 INFO
> org.apache.flink.yarn.Utils   - Copying
> from file:/home/software/flink-1.1.4/conf/logback.xml to hdfs://
> 10.199.202.161:9000/user/root/.flink/application_1487247313588_0017/logback.xml
>
> 2017-02-17 15:52:42,025 INFO
> org.apache.flink.yarn.Utils   - Copying
> from file:/home/software/flink-1.1.4/lib to hdfs://
> 10.199.202.161:9000/user/root/.flink/application_1487247313588_0017/lib
>
> 2017-02-17 15:52:42,695 INFO
> org.apache.flink.yarn.Utils   - Copying
> from file:/home/software/flink-1.1.4/conf/log4j.properties to hdfs://
> 10.199.202.161:9000/user/root/.flink/application_1487247313588_0017/log4j.properties
>
> 2017-02-17 15:52:42,722 INFO
> org.apache.flink.yarn.Utils   - Copying
> from file:/home/software/flink-1.1.4/lib/flink-dist_2.10-1.1.4.jar to
> hdfs://
> 10.199.202.161:9000/user/root/.flink/application_1487247313588_0017/flink-dist_2.10-1.1.4.jar
>
> 2017-02-17 15:52:43,346 INFO
> org.apache.flink.yarn.Utils   - Copying
> from /home/software/flink-1.1.4/conf/flink-conf.yaml to hdfs://
> 10.199.202.161:9000/user/root/.flink/application_1487247313588_0017/flink-conf.yaml
>
> 2017-02-17 15:52:43,386 INFO
> org.apache.flink.yarn.YarnClusterDescriptor   - Submitting
> application master application_1487247313588_0017
>
> 2017-02-17 15:52:43,425 INFO
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted
> application application_1487247313588_0017
>
> 2017-02-17 15:52:43,425 INFO
> org.apache.flink.yarn.YarnClusterDescriptor   - Waiting for
> the cluster to be allocated
>
> 2017-02-17 15:52:43,427 INFO
> org.apache.flink.yarn.YarnClusterDescriptor   - Deploying
> cluster, current state ACCEPTED
>
> 2017-02-17 15:52:48,471 INFO
> org.apache.flink.yarn.YarnClusterDescriptor   - YARN
> application has been deployed successfully.
>
> Cluster started: Yarn cluster with application id
> application_1487247313588_0017
>
> Using address 10.199.202.162:43809 to connect to JobManager.
>
> JobManager web interface address
> http://vip-rc-ucsww.vclound.com:8088/proxy/application_1487247313588_0017/
>
> Using the parallelism provided by the remote cluster (8). To use another
> parallelism, set it at the ./bin/flink client.
>
> Starting execution of program
>
> 2017-02-17 15:52:49,278 INFO
> org.apache.flink.yarn.YarnClusterClient   - Starting
> program in interactive mode
>
> Executing WordCount example with default input data set.
>
> Use --input to specify file input.
>
> Printing result to stdout. Use --output to specify output path.
>
> 2017-02-17 15:52:49,609 INFO
> org.apache.flink.yarn.YarnClusterClient   - Waiting
> until all TaskManagers have connected
>
> Waiting until all 

Can't run flink on yarn on version 1.2.0

2017-02-17 Thread Howard,Li(vip.com)
Hi,
 I’m trying to run flink on yarn by using command: bin/flink run -m 
yarn-cluster -yn 2 -ys 4 ./examples/batch/WordCount.jar
 But I got the following error:


2017-02-17 15:52:40,746 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- No path for the flink jar passed. Using the location of class 
org.apache.flink.yarn.YarnClusterDescriptor to locate the jar

2017-02-17 15:52:40,746 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- No path for the flink jar passed. Using the location of class 
org.apache.flink.yarn.YarnClusterDescriptor to locate the jar

2017-02-17 15:52:40,775 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Using values:

2017-02-17 15:52:40,775 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- TaskManager count = 2

2017-02-17 15:52:40,775 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- JobManager memory = 1024

2017-02-17 15:52:40,775 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- TaskManager memory = 1024

2017-02-17 15:52:40,796 INFO  org.apache.hadoop.yarn.client.RMProxy 
- Connecting to ResourceManager at /0.0.0.0:8032

2017-02-17 15:52:41,680 WARN  org.apache.flink.yarn.YarnClusterDescriptor   
- The configuration directory ('/home/software/flink-1.1.4/conf') 
contains both LOG4J and Logback configuration files. Please delete or rename 
one of them.

2017-02-17 15:52:41,702 INFO  org.apache.flink.yarn.Utils   
- Copying from file:/home/software/flink-1.1.4/conf/logback.xml to 
hdfs://10.199.202.161:9000/user/root/.flink/application_1487247313588_0017/logback.xml

2017-02-17 15:52:42,025 INFO  org.apache.flink.yarn.Utils   
- Copying from file:/home/software/flink-1.1.4/lib to 
hdfs://10.199.202.161:9000/user/root/.flink/application_1487247313588_0017/lib

2017-02-17 15:52:42,695 INFO  org.apache.flink.yarn.Utils   
- Copying from 
file:/home/software/flink-1.1.4/conf/log4j.properties to 
hdfs://10.199.202.161:9000/user/root/.flink/application_1487247313588_0017/log4j.properties

2017-02-17 15:52:42,722 INFO  org.apache.flink.yarn.Utils   
- Copying from 
file:/home/software/flink-1.1.4/lib/flink-dist_2.10-1.1.4.jar to 
hdfs://10.199.202.161:9000/user/root/.flink/application_1487247313588_0017/flink-dist_2.10-1.1.4.jar

2017-02-17 15:52:43,346 INFO  org.apache.flink.yarn.Utils   
- Copying from /home/software/flink-1.1.4/conf/flink-conf.yaml to 
hdfs://10.199.202.161:9000/user/root/.flink/application_1487247313588_0017/flink-conf.yaml

2017-02-17 15:52:43,386 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Submitting application master application_1487247313588_0017

2017-02-17 15:52:43,425 INFO  
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted 
application application_1487247313588_0017

2017-02-17 15:52:43,425 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Waiting for the cluster to be allocated

2017-02-17 15:52:43,427 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Deploying cluster, current state ACCEPTED

2017-02-17 15:52:48,471 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- YARN application has been deployed successfully.

Cluster started: Yarn cluster with application id application_1487247313588_0017

Using address 10.199.202.162:43809 to connect to JobManager.

JobManager web interface address 
http://vip-rc-ucsww.vclound.com:8088/proxy/application_1487247313588_0017/

Using the parallelism provided by the remote cluster (8). To use another 
parallelism, set it at the ./bin/flink client.

Starting execution of program

2017-02-17 15:52:49,278 INFO  org.apache.flink.yarn.YarnClusterClient   
- Starting program in interactive mode

Executing WordCount example with default input data set.

Use --input to specify file input.

Printing result to stdout. Use --output to specify output path.

2017-02-17 15:52:49,609 INFO  org.apache.flink.yarn.YarnClusterClient   
- Waiting until all TaskManagers have connected

Waiting until all TaskManagers have connected

2017-02-17 15:52:49,610 INFO  org.apache.flink.yarn.YarnClusterClient   
- Starting client actor system.





The program finished with the following exception:



org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error.

 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)

 at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:404)

 at 

CSV sink partitioning and bucketing

2017-02-17 Thread Flavio Pompermaier
Hi to all,
in my use case I'd need to output my Row objects into an output folder as
CSV on HDFS but creating/overwriting new subfolders based on an attribute
(for example create a subfolder for each value of a specified column).
Then, it could be interesting to bucketing the data inside those folders by
number of lines,i.e. every file inside those directory cannot contain more
than 1000 lines.

For example, if I have a dataset (of Row) containing people I need to write
my dataset as CSV into an output folder X  partitioned by year (where each
file cannot have more then 1000 rows), like:

X/1990/file1
   /1990/file2
   /1991/file1
etc..

Does something like that exists in Flink?
In principle I could use Hive for this but at the moment I'd try to avoid
to add another component to our pipeline...Moreover, my feeling is that
very few people is using Flink on Hive..am I wrong?
Any advice on how to proceed?

Best,
Flavio