RE: ype of TypeVariable could not be determined

2016-03-08 Thread Radu Tudoran
Hi,

The issue is that this problem appears when I want to create a stream source.

StreamExecutionEnvironment.addSource(new MySourceFunction())
…
Where the stream source class is

MySourceFunction implements SourceFunction
{
…
}

In such a case I am not sure how I can pass the outertype  nor how I can pass 
it using the “.returns()” method as Timo suggested



Dr. Radu Tudoran
Research Engineer - Big Data Expert
IT R Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!

From: Wang Yangjun [mailto:yangjun.w...@aalto.fi]
Sent: Tuesday, March 08, 2016 7:15 PM
To: user@flink.apache.org
Subject: Re: ype of TypeVariable could not be determined

Hi Radu,

I met this issue also. The reason is outTypeInfo couldn't be created base on 
generic type when a transform applied.

public  SingleOutputStreamOperator transform(String operatorName, 
TypeInformation outTypeInfo, OneInputStreamOperator operator)
The solution would be passed Calss to your UDF and create TypeInfomation 
by yourself.

Best,
Jun

From: Radu Tudoran >
Reply-To: "user@flink.apache.org" 
>
Date: Tuesday 8 March 2016 at 19:57
To: "user@flink.apache.org" 
>
Subject: ype of TypeVariable could not be determined

Hi,

I am trying to create a custom stream source. I first build this with generic 
and I run into problems regarding type extraction. I tried to put concrete 
types but run into the same issue (see errors below). Can anyone provide a 
solution to solve this issue.

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of 
TypeVariable 'IN' in 'class test.MySourceFunction' could not be determined. 
This is most likely a type erasure problem. The type extraction currently 
supports types with generic variables only in cases where all variables in the 
return type can be deduced from the input type(s).
   at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:498)
   at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:380)
   at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:346)
   at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1152)
   at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1107)
   at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1089)
   ... 1 more


Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of 
TypeVariable 'TupleEvent2' in 'class test.MySourceFunctionTuple' could not be 
determined. This is most likely a type erasure problem. The type extraction 
currently supports types with generic variables only in cases where all 
variables in the return type can be deduced from the input type(s).
   at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:498)
   at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:380)
   at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:346)
   at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1152)
   at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1107)
   at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1089)
   ... 1 more


Dr. Radu Tudoran
Research Engineer - Big Data Expert
IT R Division


Re: ype of TypeVariable could not be determined

2016-03-08 Thread Timo Walther

Hi Radu,

the exception can have multiple causes. It would be great if you could 
share some example code. In most cases the problem is the following:


public class MapFunction { }

new MapFunction();

The type WhatEverType is type erasured by Java. The type must not be 
declared in the "new" call but in the functions signature (IN,OUT) to 
order to be preserved. Alternatively you can pass the type manually by 
using the ".returns()" method or let the function implement the 
"ResultTypeQueryable" interface.


Hope that helps.

Regards,
Timo


On 08.03.2016 18:57, Radu Tudoran wrote:


Hi,

I am trying to create a custom stream source. I first build this with 
generic and I run into problems regarding type extraction. I tried to 
put concrete types but run into the same issue (see errors below). Can 
anyone provide a solution to solve this issue.


Caused by: 
_org.apache.flink.api.common.functions.InvalidTypesException_: Type of 
TypeVariable 'IN' in 'class test.MySourceFunction' could not be 
determined. This is most likely a type erasure problem. The type 
extraction currently supports types with generic variables only in 
cases where all variables in the return type can be deduced from the 
input type(s).


at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(_TypeExtractor.java:498_)


at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(_TypeExtractor.java:380_)


at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(_TypeExtractor.java:346_)


at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(_StreamExecutionEnvironment.java:1152_)


at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(_StreamExecutionEnvironment.java:1107_)


at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(_StreamExecutionEnvironment.java:1089_)


... 1 more

Caused by: 
_org.apache.flink.api.common.functions.InvalidTypesException_: Type of 
TypeVariable 'TupleEvent2' in 'class test.MySourceFunctionTuple' could 
not be determined. This is most likely a type erasure problem. The 
type extraction currently supports types with generic variables only 
in cases where all variables in the return type can be deduced from 
the input type(s).


at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(_TypeExtractor.java:498_)


at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(_TypeExtractor.java:380_)


at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(_TypeExtractor.java:346_)


at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(_StreamExecutionEnvironment.java:1152_)


at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(_StreamExecutionEnvironment.java:1107_)


at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(_StreamExecutionEnvironment.java:1089_)


... 1 more

Dr. Radu Tudoran

Research Engineer - Big Data Expert

IT R Division

cid:image007.jpg@01CD52EB.AD060EE0

HUAWEI TECHNOLOGIES Duesseldorf GmbH

European Research Center

Riesstrasse 25, 80992 München

E-mail: _radu.tudoran@huawei.com_

Mobile: +49 15209084330

Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com 


Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN

This e-mail and its attachments contain confidential information from 
HUAWEI, which is intended only for the person or entity whose address 
is listed above. Any use of the information contained herein in any 
way (including, but not limited to, total or partial disclosure, 
reproduction, or dissemination) by persons other than the intended 
recipient(s) is prohibited. If you receive this e-mail in error, 
please notify the sender by phone or email immediately and delete it!






Re: Jobmanager HA with Rolling Sink in HDFS

2016-03-08 Thread Maximilian Bode
Hi Aljoscha,

yeah I should have been clearer. I did mean those accumulators but am not 
trusting them in the sense of total number (as you said, they are reset on 
failure). On the other hand, if they do not change for a while it is pretty 
obvious that the job has ingested everything in the queue. But you are right, 
this is kind of heuristic. In combination with the fact that the 
DateTimeBucketer does not create new folders I believe this should be 
sufficient to decide when the job has basically finished, though.

So the setup is the following: The Flink job consists of a 
FlinkKafkaConsumer08, a map containing just an IntCounter accumulator and 
finally a rolling sink writing to HDFS. I start it in a per-job yarn session 
with n=3, s=4. Then I pour 2 million records in the Kafka queue the application 
is reading from. If no job/task managers are killed, the behavior is exactly as 
expected: the output files in HDFS grow with time and I can exactly monitor via 
the accumulator when every record has been ingested from Kafka. After that 
time, I give the job a few seconds and then cancel it via the web interface. 
Then still some time later (to give the job the chance to output the few 
records still hanging around) a wc -l on the output files yields exactly the 
expected 2 million.

On the other hand, if I kill a task manager while the job is in progress, one 
of the 12 output files seems to be missing as described before. A wc -l on only 
the relevant bytes as I mentioned in an earlier mail then leads to a number 
smaller than 2 million.

We are using an FsStateBackend in HDFS with a checkpoint interval of 10s.

Cheers,
 Max
—
Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

> Am 08.03.2016 um 17:46 schrieb Aljoscha Krettek :
> 
> Hi,
> with accumulator you mean the ones you get from 
> RuntimeContext.addAccumulator/getAccumulator? I’m afraid these are not 
> fault-tolerant which means that the count in these probably doesn’t reflect 
> the actual number of elements that were processed. When a job fails and 
> restarts the accumulators should start from scratch. This makes me wonder how 
> yours ever reach the required 2 mio, for it to be considered “done”.
> 
> This keeps getting more mysterious…
> 
> By the way, what are you using as StateBackend and checkpoint interval?
> 
> Cheers,
> Aljoscha
>> On 08 Mar 2016, at 13:38, Maximilian Bode  
>> wrote:
>> 
>> Hi,
>> thanks for the fast answer. Answers inline.
>> 
>>> Am 08.03.2016 um 13:31 schrieb Aljoscha Krettek :
>>> 
>>> Hi,
>>> a missing part file for one of the parallel sinks is not necessarily a 
>>> problem. This can happen if that parallel instance of the sink never 
>>> received data after the job successfully restarted.
>>> 
>>> Missing data, however, is a problem. Maybe I need some more information 
>>> about your setup:
>>> 
>>> - When are you inspecting the part files?
>> Some time after the cluster is shut down
>>> - Do you shutdown the Flink Job before checking? If so, how do you shut it 
>>> down.
>> Via 'cancel' in the Jobmanager Web Interface. Some records seem to be 
>> written only after cancelling the job, right?
>>> - When do you know whether all the data from Kafka was consumed by Flink 
>>> and has passed through the pipeline into HDFS?
>> I have an accumulator in a map right before writing into HDFS. Also, the 
>> RollingSink has a DataTimeBucketer which makes it transparent when no new 
>> data is arriving anymore as the last bucket is from some minutes ago.
>>> 
>>> Cheers,
>>> Aljoscha
 On 08 Mar 2016, at 13:19, Maximilian Bode  
 wrote:
 
 Hi Aljoscha,
 
 oh I see. I was under the impression this file was used internally and the 
 output being completed at the end. Ok, so I extracted the relevant lines 
 using
for i in part-*; do head -c $(cat "_$i.valid-length" | strings) "$i" > 
 "$i.final"; done
 which seems to do the trick.
 
 Unfortunately, now some records are missing again. In particular, there 
 are the files
part-0-0, part-1-0, ..., part-10-0, part-11-0, each with corresponding 
 .valid-length files
part-0-1, part-1-1, ..., part-10-0
 in the bucket, where job parallelism=12. So it looks to us as if one of 
 the files was not even created in the second attempt. This behavior seems 
 to be what somewhat reproducible, cf. my earlier email where the part-11 
 file disappeared as well.
 
 Thanks again for your help.
 
 Cheers,
 Max
 —
 Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
 Geschäftsführer: 

JDBCInputFormat preparation with Flink 1.1-SNAPSHOT and Scala 2.11

2016-03-08 Thread Prez Cannady
I’m attempting to create a stream using JDBCInputFormat.  Objective is to 
convert each record into a tuple and then serialize for input into a Kafka 
topic.  Here’s what I have so far.

```
val env = StreamExecutionEnvironment.getExecutionEnvironment

val inputFormat = JDBCInputFormat.buildJDBCInputFormat()
  .setDrivername("org.postgresql.Driver")
  .setDBUrl("jdbc:postgresql:test")
  .setQuery("select name from persons")
  .finish()

val stream : DataStream[Tuple1[String]] = env.createInput(...)
```

I think this is essentially what I want to do.  It would be nice if I could 
return tuples of arbitrary length, but reading the code suggests I have to 
commit to a defined arity.  So I have some questions.

1. Is there a better way to read from a database (i.e., defining my own 
`InputFormat` using Slick)?
2. To get the above example working, what should I supply to `createInput`?


Prez Cannady  
p: 617 500 3378  
e: revp...@opencorrelate.org   
GH: https://github.com/opencorrelate   
LI: https://www.linkedin.com/in/revprez   











protobuf messages from Kafka to elasticsearch using flink

2016-03-08 Thread Madhukar Thota
Friends,

Can someone guide me or share an example on  how to consume protobuf
message from kafka and index into Elasticsearch using flink?


Re: [ANNOUNCE] Flink 1.0.0 has been released

2016-03-08 Thread Igor Berman
Congratulations!
Very nice work, very interesting features.

One question regarding CEP: do you think it's feasible to define pattern
over window of 1 month or even more?
Is there some deep explanation regarding how this partial states are saved?
I mean events that create "funnel" might be separated by very large periods
of inactivity/noise



On 8 March 2016 at 17:17, Kostas Tzoumas  wrote:

> Hi everyone!
>
> As you might have noticed, Apache Flink 1.0.0 has been released and
> announced!
>
> You can read more about the release at the ASF blog and the Flink blog
> -
> https://blogs.apache.org/foundation/entry/the_apache_software_foundation_announces88
> - http://flink.apache.org/news/2016/03/08/release-1.0.0.html
>
> Don't forget to retweet and spread the news :-)
> - https://twitter.com/TheASF/status/707174116969857024
> - https://twitter.com/ApacheFlink/status/707175973482012672
>
> Check out the changelog and the migration guide, download the release, and
> check out the documentation
> - http://flink.apache.org/blog/release_1.0.0-changelog_known_issues.html
> -
> https://cwiki.apache.org/confluence/display/FLINK/Migration+Guide%3A+0.10.x+to+1.0.x
> - https://cwiki.apache.org/confluence/display/FLINK/Stability+Annotations
> - http://flink.apache.org/downloads.html
> - https://ci.apache.org/projects/flink/flink-docs-release-1.0/
>
> Many congratulations to the Flink community for making this happen!
>
> Best,
> Kostas
>


Re: ype of TypeVariable could not be determined

2016-03-08 Thread Wang Yangjun
Hi Radu,

I met this issue also. The reason is outTypeInfo couldn't be created base on 
generic type when a transform applied.

public  SingleOutputStreamOperator transform(String operatorName, 
TypeInformation outTypeInfo, OneInputStreamOperator operator)

The solution would be passed Calss to your UDF and create TypeInfomation 
by yourself.

Best,
Jun

From: Radu Tudoran >
Reply-To: "user@flink.apache.org" 
>
Date: Tuesday 8 March 2016 at 19:57
To: "user@flink.apache.org" 
>
Subject: ype of TypeVariable could not be determined

Hi,

I am trying to create a custom stream source. I first build this with generic 
and I run into problems regarding type extraction. I tried to put concrete 
types but run into the same issue (see errors below). Can anyone provide a 
solution to solve this issue.

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of 
TypeVariable 'IN' in 'class test.MySourceFunction' could not be determined. 
This is most likely a type erasure problem. The type extraction currently 
supports types with generic variables only in cases where all variables in the 
return type can be deduced from the input type(s).
   at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:498)
   at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:380)
   at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:346)
   at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1152)
   at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1107)
   at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1089)
   ... 1 more


Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of 
TypeVariable 'TupleEvent2' in 'class test.MySourceFunctionTuple' could not be 
determined. This is most likely a type erasure problem. The type extraction 
currently supports types with generic variables only in cases where all 
variables in the return type can be deduced from the input type(s).
   at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:498)
   at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:380)
   at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:346)
   at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1152)
   at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1107)
   at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1089)
   ... 1 more


Dr. Radu Tudoran
Research Engineer - Big Data Expert
IT R Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!



Stack overflow from self referencing Avro schema

2016-03-08 Thread David Kim
Hello all,

I'm running into a StackOverflowError using flink 1.0.0. I have an Avro
schema that has a self reference. For example:

item.avsc

{

  "namespace": "..."

  "type": "record"
  "name": "Item",
  "fields": [
{
  "name": "parent"
  "type": ["null, "Item"]
}
  ]
}


When running my flink job, I'm running into the follow error:

Exception in thread "Thread-94" java.lang.StackOverflowError
at 
org.apache.flink.api.java.typeutils.TypeExtractor.countTypeInHierarchy(TypeExtractor.java:1105)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1397)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1319)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:609)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1531)
at 
org.apache.flink.api.java.typeutils.AvroTypeInfo.generateFieldsFromAvroSchema(AvroTypeInfo.java:53)
at 
org.apache.flink.api.java.typeutils.AvroTypeInfo.(AvroTypeInfo.java:48)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1394)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1319)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:609)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1531)
at 
org.apache.flink.api.java.typeutils.AvroTypeInfo.generateFieldsFromAvroSchema(AvroTypeInfo.java:53)
at 
org.apache.flink.api.java.typeutils.AvroTypeInfo.(AvroTypeInfo.java:48)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1394)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1319)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:609)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1531)
at 
org.apache.flink.api.java.typeutils.AvroTypeInfo.generateFieldsFromAvroSchema(AvroTypeInfo.java:53)


Interestingly if I change the type to an Avro array in the schema, this
error is not thrown.

Thanks!
David


ype of TypeVariable could not be determined

2016-03-08 Thread Radu Tudoran
Hi,

I am trying to create a custom stream source. I first build this with generic 
and I run into problems regarding type extraction. I tried to put concrete 
types but run into the same issue (see errors below). Can anyone provide a 
solution to solve this issue.

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of 
TypeVariable 'IN' in 'class test.MySourceFunction' could not be determined. 
This is most likely a type erasure problem. The type extraction currently 
supports types with generic variables only in cases where all variables in the 
return type can be deduced from the input type(s).
   at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:498)
   at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:380)
   at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:346)
   at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1152)
   at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1107)
   at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1089)
   ... 1 more


Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of 
TypeVariable 'TupleEvent2' in 'class test.MySourceFunctionTuple' could not be 
determined. This is most likely a type erasure problem. The type extraction 
currently supports types with generic variables only in cases where all 
variables in the return type can be deduced from the input type(s).
   at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:498)
   at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:380)
   at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:346)
   at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1152)
   at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1107)
   at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1089)
   ... 1 more


Dr. Radu Tudoran
Research Engineer - Big Data Expert
IT R Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!



Re: Jobmanager HA with Rolling Sink in HDFS

2016-03-08 Thread Aljoscha Krettek
Hi,
with accumulator you mean the ones you get from 
RuntimeContext.addAccumulator/getAccumulator? I’m afraid these are not 
fault-tolerant which means that the count in these probably doesn’t reflect the 
actual number of elements that were processed. When a job fails and restarts 
the accumulators should start from scratch. This makes me wonder how yours ever 
reach the required 2 mio, for it to be considered “done”.

This keeps getting more mysterious… 

By the way, what are you using as StateBackend and checkpoint interval?

Cheers,
Aljoscha
> On 08 Mar 2016, at 13:38, Maximilian Bode  wrote:
> 
> Hi,
> thanks for the fast answer. Answers inline.
> 
>> Am 08.03.2016 um 13:31 schrieb Aljoscha Krettek :
>> 
>> Hi,
>> a missing part file for one of the parallel sinks is not necessarily a 
>> problem. This can happen if that parallel instance of the sink never 
>> received data after the job successfully restarted.
>> 
>> Missing data, however, is a problem. Maybe I need some more information 
>> about your setup:
>> 
>> - When are you inspecting the part files?
> Some time after the cluster is shut down
>> - Do you shutdown the Flink Job before checking? If so, how do you shut it 
>> down.
> Via 'cancel' in the Jobmanager Web Interface. Some records seem to be written 
> only after cancelling the job, right?
>> - When do you know whether all the data from Kafka was consumed by Flink and 
>> has passed through the pipeline into HDFS?
> I have an accumulator in a map right before writing into HDFS. Also, the 
> RollingSink has a DataTimeBucketer which makes it transparent when no new 
> data is arriving anymore as the last bucket is from some minutes ago.
>> 
>> Cheers,
>> Aljoscha
>>> On 08 Mar 2016, at 13:19, Maximilian Bode  
>>> wrote:
>>> 
>>> Hi Aljoscha,
>>> 
>>> oh I see. I was under the impression this file was used internally and the 
>>> output being completed at the end. Ok, so I extracted the relevant lines 
>>> using
>>> for i in part-*; do head -c $(cat "_$i.valid-length" | strings) "$i" > 
>>> "$i.final"; done
>>> which seems to do the trick.
>>> 
>>> Unfortunately, now some records are missing again. In particular, there are 
>>> the files
>>> part-0-0, part-1-0, ..., part-10-0, part-11-0, each with corresponding 
>>> .valid-length files
>>> part-0-1, part-1-1, ..., part-10-0
>>> in the bucket, where job parallelism=12. So it looks to us as if one of the 
>>> files was not even created in the second attempt. This behavior seems to be 
>>> what somewhat reproducible, cf. my earlier email where the part-11 file 
>>> disappeared as well.
>>> 
>>> Thanks again for your help.
>>> 
>>> Cheers,
>>> Max
>>> —
>>> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>> 
 Am 08.03.2016 um 11:05 schrieb Aljoscha Krettek :
 
 Hi,
 are you taking the “.valid-length” files into account. The problem with 
 doing “exactly-once” with HDFS is that before Hadoop 2.7 it was not 
 possible to truncate files. So the trick we’re using is to write the 
 length up to which a file is valid if we would normally need to truncate 
 it. (If the job fails in the middle of writing the output files have to be 
 truncated to a valid position.) For example, say you have an output file 
 part-8-0. Now, if there exists a file part-8-0.valid-length this file 
 tells you up to which position the file part-8-0 is valid. So you should 
 only read up to this point.
 
 The name of the “.valid-length” suffix can also be configured, by the way, 
 as can all the other stuff.
 
 If this is not the problem then I definitely have to investigate further. 
 I’ll also look into the Hadoop 2.4.1 build problem.
 
 Cheers,
 Aljoscha
> On 08 Mar 2016, at 10:26, Maximilian Bode  
> wrote:
> 
> Hi Aljoscha,
> thanks again for getting back to me. I built from your branch and the 
> exception is not occurring anymore. The RollingSink state can be restored.
> 
> Still, the exactly-once guarantee seems not to be fulfilled, there are 
> always some extra records after killing either a task manager or the job 
> manager. Do you have an idea where this behavior might be coming from? (I 
> guess concrete numbers will not help greatly as there are so many 
> parameters influencing them. Still, in our test scenario, we produce 2 
> million records in a Kafka queue but in the final output files there are 
> on the order of 2.1 million records, so a 5% error. The job is running in 
> a per-job YARN session with n=3, s=4 with a checkpointing interval of 
> 10s.)

Re: Window apply problem

2016-03-08 Thread Aljoscha Krettek
Hi,
there is also PurgingTrigger, which turns any Trigger into a trigger that also 
purges when firing. Use it like this:

.trigger(PurgingTrigger.of(CountTrigger.of(5)))

Cheers,
Aljoscha
> On 08 Mar 2016, at 17:23, Marcela Charfuelan  
> wrote:
> 
> Thanks Jun,
> Very useful, I was confusing the parameters because my input is tuples, which 
> I might not need in the end...
> 
> I have now what I wanted (non-parallel and not so efficient I guess, any 
> suggestion to improve is welcome) and I have to modify the trigger so to 
> FIRE_AND_PURGE when it reaches N, the max number of items per window, 
> otherwise it will count the whole data every time...
> 
> So my example looks like this now:
> 
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> DataStream transactions = env.fromElements(
>   "1 2 4 3 4",
>   "3 4 5 4 6",
>   "7 3 3 6 1",
>   "1 3 2 4 6" 
> );
> DataStream> counts = transactions
>   .flatMap(new LineSplitter())  // because I am expecting one 
> transaction per line
>   .windowAll(GlobalWindows.create())
>   .trigger(MyCountTrigger.of(5))
>   .apply(new MyWindowFunction());
> 
> counts.print();   
> env.execute("ItemsCount");
> 
> 
> public static class MyWindowFunction implements 
> AllWindowFunction, Hashtable, 
> GlobalWindow> { 
>   public Hashtable itemsMap = new 
> Hashtable();
>   
>   @Override
>   public void apply (GlobalWindow window,
> Iterable> tuples,
> Collector> 
> out) throws Exception {
>for(Tuple2 tuple : tuples){
> if(itemsMap.containsKey(tuple.f0)){
>itemsMap.put(tuple.f0, itemsMap.get(tuple.f0)+1);
> } else {
>itemsMap.put(tuple.f0,1);
> }
> }
> out.collect(itemsMap);
>}
>   }
> 
> Regards,
> Marcela.
> 
> 
> 
> On 08.03.2016 09:34, Wang Yangjun wrote:
>> Hello Marcela,
>> 
>> I am not sure what is the “parameters mismatch” here. From the example you 
>> shown, it seems that you just want do a window word count. Right?
>> 
>> Could you try this code and is it want you want?
>> 
>> Best,
>> Jun
>> -
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setParallelism(1);
>> Integer[] array = new Integer[]{1, 2, 4, 3, 4, 3, 4, 5, 4, 6, 7, 3, 3, 6, 1, 
>> 1, 3, 2, 4, 6};
>> List list = Arrays.asList(array);
>> DataStream> counts = env.fromCollection(list)
>> .windowAll(GlobalWindows.create())
>> .trigger(CountTrigger.of(5)).apply(new AllWindowFunction> Tuple2, GlobalWindow>() {
>> @Override
>> public void apply(GlobalWindow window, Iterable tuples, 
>> Collector> out) throws Exception {
>> HashMap map = new HashMap<>();
>> for(Integer tuple : tuples){
>> Integer value = 0;
>> if(map.containsKey(tuple)){
>> value = map.get(tuple);
>> }
>> map.put(tuple, value+1);
>> }
>> 
>> for(Map.Entry entry : map.entrySet()) {
>> out.collect(new Tuple2<>(entry.getKey(), 
>> entry.getValue()));
>> }
>> }
>> });
>> 
>> counts.print();
>> 
>> env.execute("Stream WordCount");
>> 
>> 
>> 
>> 
>> 
>> On 08/03/16 02:57, "Marcela Charfuelan"  wrote:
>> 
>>> hello,
>>> 
>>> I want to make a function for counting items (per type) in windows of
>>> size N; For example for N=5 and the stream:
>>> 1 2 4 3 4 3 4 5 4 6 7 3 3 6 1 1 3 2 4 6
>>> 
>>> I would like to generate the tuples:
>>> w(1 2 4 3 4) -> (1,1)(2,1)(4,2)(3,1)
>>> w(3 4 5 4 6) -> (1,1)(2,1)(4,4)(3,2)(5,1)(6,1)
>>> w(7 3 3 6 1) -> (1,2)(2,1)(4,4)(3,4)(5,1)(6,2)(7,1)
>>> w(1 3 2 4 6) -> (1,3)(2,2)(4,5)(3,5)(5,1)(6,3)(7,1)
>>> 
>>> I am trying to apply my own function with "Window apply", something like:
>>> 
>>> items
>>> .windowAll(GlobalWindows.create())
>>> .trigger(CountTrigger.of(5))
>>> .apply(new MyWindowfunction())
>>> 
>>> but in this case there is a parameters mismatch with apply and
>>> WindowFunction, so I am not sure if it is not possible here. any suggestion?
>>> 
>>> Looking at the streaming java examples, the (commented) apply 

Re: Window apply problem

2016-03-08 Thread Marcela Charfuelan

Thanks Jun,
Very useful, I was confusing the parameters because my input is tuples, 
which I might not need in the end...


I have now what I wanted (non-parallel and not so efficient I guess, any 
suggestion to improve is welcome) and I have to modify the trigger so to 
FIRE_AND_PURGE when it reaches N, the max number of items per window, 
otherwise it will count the whole data every time...


So my example looks like this now:

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);  
DataStream transactions = env.fromElements(
"1 2 4 3 4",
"3 4 5 4 6",
"7 3 3 6 1",
"1 3 2 4 6"   
);  
DataStream> counts = transactions
		.flatMap(new LineSplitter())  // because I am expecting one 
transaction per line

.windowAll(GlobalWindows.create())
.trigger(MyCountTrigger.of(5))
.apply(new MyWindowFunction());

counts.print(); 
env.execute("ItemsCount");


public static class MyWindowFunction implements 
AllWindowFunction, Hashtable, 
GlobalWindow> {		
		public Hashtable itemsMap = new Hashtable();


@Override
public void apply (GlobalWindow window,
  Iterable> tuples,
			  Collector> out) throws 
Exception {

 for(Tuple2 tuple : tuples){
 if(itemsMap.containsKey(tuple.f0)){
 itemsMap.put(tuple.f0, itemsMap.get(tuple.f0)+1);
 } else {
 itemsMap.put(tuple.f0,1);
 }
 }
 out.collect(itemsMap);
 }
}

Regards,
Marcela.



On 08.03.2016 09:34, Wang Yangjun wrote:

Hello Marcela,

I am not sure what is the “parameters mismatch” here. From the example you 
shown, it seems that you just want do a window word count. Right?

Could you try this code and is it want you want?

Best,
Jun
-
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Integer[] array = new Integer[]{1, 2, 4, 3, 4, 3, 4, 5, 4, 6, 7, 3, 3, 6, 1, 1, 
3, 2, 4, 6};
List list = Arrays.asList(array);
DataStream> counts = env.fromCollection(list)
 .windowAll(GlobalWindows.create())
 .trigger(CountTrigger.of(5)).apply(new AllWindowFunction, GlobalWindow>() {
 @Override
 public void apply(GlobalWindow window, Iterable tuples, 
Collector> out) throws Exception {
 HashMap map = new HashMap<>();
 for(Integer tuple : tuples){
 Integer value = 0;
 if(map.containsKey(tuple)){
 value = map.get(tuple);
 }
 map.put(tuple, value+1);
 }

 for(Map.Entry entry : map.entrySet()) {
 out.collect(new Tuple2<>(entry.getKey(), 
entry.getValue()));
 }
 }
 });

counts.print();

env.execute("Stream WordCount");





On 08/03/16 02:57, "Marcela Charfuelan"  wrote:


hello,

I want to make a function for counting items (per type) in windows of
size N; For example for N=5 and the stream:
1 2 4 3 4 3 4 5 4 6 7 3 3 6 1 1 3 2 4 6

I would like to generate the tuples:
w(1 2 4 3 4) -> (1,1)(2,1)(4,2)(3,1)
w(3 4 5 4 6) -> (1,1)(2,1)(4,4)(3,2)(5,1)(6,1)
w(7 3 3 6 1) -> (1,2)(2,1)(4,4)(3,4)(5,1)(6,2)(7,1)
w(1 3 2 4 6) -> (1,3)(2,2)(4,5)(3,5)(5,1)(6,3)(7,1)

I am trying to apply my own function with "Window apply", something like:

items
.windowAll(GlobalWindows.create())
.trigger(CountTrigger.of(5))
.apply(new MyWindowfunction())

but in this case there is a parameters mismatch with apply and
WindowFunction, so I am not sure if it is not possible here. any suggestion?

Looking at the streaming java examples, the (commented) apply example
shown in GroupedProcessingTimeWindowExample()
which is applied to a timeWindow, does not work either:

.keyBy(new FirstFieldKeyExtractor, Long>())
.timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
.apply(new SummingWindowFunction())

So what I am missing here? any help is appreciated.

Regards,
Marcela.






--
Dr. Marcela Charfuelan, Senior Researcher
TU Berlin, School of Electrical Engineering and Computer Sciences
Database Systems and Information Management (DIMA)
EN7, Einsteinufer 17, D-10587 Berlin
Room: EN 725  Phone: +49 30-314-23556
URL: 

RE: [ANNOUNCE] Flink 1.0.0 has been released

2016-03-08 Thread Radu Tudoran
Hi,

Do you have also a linkedin post that I could share - or should I make a 
blogpost in which I take this announcement? 



Dr. Radu Tudoran
Research Engineer - Big Data Expert
IT R Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!

-Original Message-
From: Kostas Tzoumas [mailto:ktzou...@apache.org] 
Sent: Tuesday, March 08, 2016 4:17 PM
To: user@flink.apache.org; d...@flink.apache.org; n...@flink.apache.org
Subject: [ANNOUNCE] Flink 1.0.0 has been released

Hi everyone!

As you might have noticed, Apache Flink 1.0.0 has been released and announced!

You can read more about the release at the ASF blog and the Flink blog
-
https://blogs.apache.org/foundation/entry/the_apache_software_foundation_announces88
- http://flink.apache.org/news/2016/03/08/release-1.0.0.html

Don't forget to retweet and spread the news :-)
- https://twitter.com/TheASF/status/707174116969857024
- https://twitter.com/ApacheFlink/status/707175973482012672

Check out the changelog and the migration guide, download the release, and 
check out the documentation
- http://flink.apache.org/blog/release_1.0.0-changelog_known_issues.html
-
https://cwiki.apache.org/confluence/display/FLINK/Migration+Guide%3A+0.10.x+to+1.0.x
- https://cwiki.apache.org/confluence/display/FLINK/Stability+Annotations
- http://flink.apache.org/downloads.html
- https://ci.apache.org/projects/flink/flink-docs-release-1.0/

Many congratulations to the Flink community for making this happen!

Best,
Kostas


[ANNOUNCE] Flink 1.0.0 has been released

2016-03-08 Thread Kostas Tzoumas
Hi everyone!

As you might have noticed, Apache Flink 1.0.0 has been released and
announced!

You can read more about the release at the ASF blog and the Flink blog
-
https://blogs.apache.org/foundation/entry/the_apache_software_foundation_announces88
- http://flink.apache.org/news/2016/03/08/release-1.0.0.html

Don't forget to retweet and spread the news :-)
- https://twitter.com/TheASF/status/707174116969857024
- https://twitter.com/ApacheFlink/status/707175973482012672

Check out the changelog and the migration guide, download the release, and
check out the documentation
- http://flink.apache.org/blog/release_1.0.0-changelog_known_issues.html
-
https://cwiki.apache.org/confluence/display/FLINK/Migration+Guide%3A+0.10.x+to+1.0.x
- https://cwiki.apache.org/confluence/display/FLINK/Stability+Annotations
- http://flink.apache.org/downloads.html
- https://ci.apache.org/projects/flink/flink-docs-release-1.0/

Many congratulations to the Flink community for making this happen!

Best,
Kostas


Re: rebalance of streaming job after taskManager restart

2016-03-08 Thread Aljoscha Krettek
Yes, there are plans to make this more streamlined but we are not there yet, 
unfortunately.
> On 08 Mar 2016, at 16:07, Maciek Próchniak  wrote:
> 
> Hi,
> 
> thanks for quick answer - yes, I does what I want to accomplish,
> but I was hoping for some "easier" solution.
> Are there any plans for "restart" button/command or sth similar? I mean, the 
> whole process of restarting is ready as I understand - as it's triggered when 
> task manager dies.
> 
> thanks,
> maciek
> 
> On 08/03/2016 16:03, Aljoscha Krettek wrote:
>> Hi,
>> I think what you can do is make a savepoint of your program, then cancel it 
>> and restart it from the savepoint. This should make Flink redistribute it on 
>> all TaskManagers.
>> 
>> See 
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html
>> and
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/cli.html#savepoints
>> for documentation about savepoints.
>> 
>> The steps to follow should be:
>>  bin/flink savepoint 
>> 
>> this will print a savepoint path that you will need later.
>>  bin/flink cancel 
>> 
>> bin/flink run -s  …
>> 
>> The last command is your usual run command but with the additional “-s” 
>> parameter to continue from a savepoint.
>> 
>> I hope that helps.
>> 
>> Cheers,
>> Aljoscha
>>> On 08 Mar 2016, at 15:48, Maciek Próchniak  wrote:
>>> 
>>> Hi,
>>> 
>>> we have streaming job with paralelism 2 and two task managers. The job is 
>>> occupying one slot on each task manager. When I stop manager2 the job is 
>>> restarted and it runs on manager1 - occupying two of it's slots.
>>> How can I trigger restart (or other similar process) that will cause the 
>>> job to be balanced among task managers?
>>> 
>>> thanks,
>>> maciek
>> 
> 



Re: rebalance of streaming job after taskManager restart

2016-03-08 Thread Maciek Próchniak

Hi,

thanks for quick answer - yes, I does what I want to accomplish,
but I was hoping for some "easier" solution.
Are there any plans for "restart" button/command or sth similar? I mean, 
the whole process of restarting is ready as I understand - as it's 
triggered when task manager dies.


thanks,
maciek

On 08/03/2016 16:03, Aljoscha Krettek wrote:

Hi,
I think what you can do is make a savepoint of your program, then cancel it and 
restart it from the savepoint. This should make Flink redistribute it on all 
TaskManagers.

See 
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html
and
https://ci.apache.org/projects/flink/flink-docs-master/apis/cli.html#savepoints
for documentation about savepoints.

The steps to follow should be:
  
bin/flink savepoint 


this will print a savepoint path that you will need later.
  
bin/flink cancel 


bin/flink run -s  …

The last command is your usual run command but with the additional “-s” 
parameter to continue from a savepoint.

I hope that helps.

Cheers,
Aljoscha

On 08 Mar 2016, at 15:48, Maciek Próchniak  wrote:

Hi,

we have streaming job with paralelism 2 and two task managers. The job is 
occupying one slot on each task manager. When I stop manager2 the job is 
restarted and it runs on manager1 - occupying two of it's slots.
How can I trigger restart (or other similar process) that will cause the job to 
be balanced among task managers?

thanks,
maciek






Re: rebalance of streaming job after taskManager restart

2016-03-08 Thread Aljoscha Krettek
Hi,
I think what you can do is make a savepoint of your program, then cancel it and 
restart it from the savepoint. This should make Flink redistribute it on all 
TaskManagers.

See 
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html
and
https://ci.apache.org/projects/flink/flink-docs-master/apis/cli.html#savepoints
for documentation about savepoints.

The steps to follow should be:
 
bin/flink savepoint 

this will print a savepoint path that you will need later.
 
bin/flink cancel 

bin/flink run -s  …

The last command is your usual run command but with the additional “-s” 
parameter to continue from a savepoint.

I hope that helps.

Cheers,
Aljoscha
> On 08 Mar 2016, at 15:48, Maciek Próchniak  wrote:
> 
> Hi,
> 
> we have streaming job with paralelism 2 and two task managers. The job is 
> occupying one slot on each task manager. When I stop manager2 the job is 
> restarted and it runs on manager1 - occupying two of it's slots.
> How can I trigger restart (or other similar process) that will cause the job 
> to be balanced among task managers?
> 
> thanks,
> maciek



rebalance of streaming job after taskManager restart

2016-03-08 Thread Maciek Próchniak

Hi,

we have streaming job with paralelism 2 and two task managers. The job 
is occupying one slot on each task manager. When I stop manager2 the job 
is restarted and it runs on manager1 - occupying two of it's slots.
How can I trigger restart (or other similar process) that will cause the 
job to be balanced among task managers?


thanks,
maciek


Re: Webclient script misses building from source

2016-03-08 Thread Andrea Sella
I missed it!

Thank you,
Andrea

2016-03-08 14:27 GMT+01:00 Aljoscha Krettek :

> Hi Andrea,
> in Flink 1.0 there is no more a separate web client. The web client is
> part of the default JobManager dashboard now.
>
> You can also disable the web client part of the JobManager dashboard by
> setting:
>
> jobmanager.web.submit.enable: false
>
> in flink-conf.yaml.
>
> Cheers,
> Aljoscha
> > On 08 Mar 2016, at 14:21, Andrea Sella 
> wrote:
> >
> > Hi,
> >
> > I've built Flink from source but I was not able to find in
> build-target/bin the script start-webclient.sh to launch the WebUI.
> > The script is available just in the binaries or I have to add an
> argument to trigger its generation?
> >
> > Thanks in advance,
> > Andrea
> >
>
>


Re: Webclient script misses building from source

2016-03-08 Thread Aljoscha Krettek
Hi Andrea,
in Flink 1.0 there is no more a separate web client. The web client is part of 
the default JobManager dashboard now.

You can also disable the web client part of the JobManager dashboard by setting:

jobmanager.web.submit.enable: false

in flink-conf.yaml.

Cheers,
Aljoscha
> On 08 Mar 2016, at 14:21, Andrea Sella  wrote:
> 
> Hi,
> 
> I've built Flink from source but I was not able to find in build-target/bin 
> the script start-webclient.sh to launch the WebUI.
> The script is available just in the binaries or I have to add an argument to 
> trigger its generation?
> 
> Thanks in advance,
> Andrea
> 



Webclient script misses building from source

2016-03-08 Thread Andrea Sella
Hi,

I've built Flink from source but I was not able to find in build-target/bin
the script start-webclient.sh to launch the WebUI.
The script is available just in the binaries or I have to add an argument
to trigger its generation?

Thanks in advance,
Andrea


Re: Jobmanager HA with Rolling Sink in HDFS

2016-03-08 Thread Maximilian Bode
Hi,
thanks for the fast answer. Answers inline.

> Am 08.03.2016 um 13:31 schrieb Aljoscha Krettek :
> 
> Hi,
> a missing part file for one of the parallel sinks is not necessarily a 
> problem. This can happen if that parallel instance of the sink never received 
> data after the job successfully restarted.
> 
> Missing data, however, is a problem. Maybe I need some more information about 
> your setup:
> 
> - When are you inspecting the part files?
Some time after the cluster is shut down
> - Do you shutdown the Flink Job before checking? If so, how do you shut it 
> down.
Via 'cancel' in the Jobmanager Web Interface. Some records seem to be written 
only after cancelling the job, right?
> - When do you know whether all the data from Kafka was consumed by Flink and 
> has passed through the pipeline into HDFS?
I have an accumulator in a map right before writing into HDFS. Also, the 
RollingSink has a DataTimeBucketer which makes it transparent when no new data 
is arriving anymore as the last bucket is from some minutes ago.
> 
> Cheers,
> Aljoscha
>> On 08 Mar 2016, at 13:19, Maximilian Bode  
>> wrote:
>> 
>> Hi Aljoscha,
>> 
>> oh I see. I was under the impression this file was used internally and the 
>> output being completed at the end. Ok, so I extracted the relevant lines 
>> using
>>  for i in part-*; do head -c $(cat "_$i.valid-length" | strings) "$i" > 
>> "$i.final"; done
>> which seems to do the trick.
>> 
>> Unfortunately, now some records are missing again. In particular, there are 
>> the files
>>  part-0-0, part-1-0, ..., part-10-0, part-11-0, each with corresponding 
>> .valid-length files
>>  part-0-1, part-1-1, ..., part-10-0
>> in the bucket, where job parallelism=12. So it looks to us as if one of the 
>> files was not even created in the second attempt. This behavior seems to be 
>> what somewhat reproducible, cf. my earlier email where the part-11 file 
>> disappeared as well.
>> 
>> Thanks again for your help.
>> 
>> Cheers,
>> Max
>> —
>> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>> 
>>> Am 08.03.2016 um 11:05 schrieb Aljoscha Krettek :
>>> 
>>> Hi,
>>> are you taking the “.valid-length” files into account. The problem with 
>>> doing “exactly-once” with HDFS is that before Hadoop 2.7 it was not 
>>> possible to truncate files. So the trick we’re using is to write the length 
>>> up to which a file is valid if we would normally need to truncate it. (If 
>>> the job fails in the middle of writing the output files have to be 
>>> truncated to a valid position.) For example, say you have an output file 
>>> part-8-0. Now, if there exists a file part-8-0.valid-length this file tells 
>>> you up to which position the file part-8-0 is valid. So you should only 
>>> read up to this point.
>>> 
>>> The name of the “.valid-length” suffix can also be configured, by the way, 
>>> as can all the other stuff.
>>> 
>>> If this is not the problem then I definitely have to investigate further. 
>>> I’ll also look into the Hadoop 2.4.1 build problem.
>>> 
>>> Cheers,
>>> Aljoscha
 On 08 Mar 2016, at 10:26, Maximilian Bode  
 wrote:
 
 Hi Aljoscha,
 thanks again for getting back to me. I built from your branch and the 
 exception is not occurring anymore. The RollingSink state can be restored.
 
 Still, the exactly-once guarantee seems not to be fulfilled, there are 
 always some extra records after killing either a task manager or the job 
 manager. Do you have an idea where this behavior might be coming from? (I 
 guess concrete numbers will not help greatly as there are so many 
 parameters influencing them. Still, in our test scenario, we produce 2 
 million records in a Kafka queue but in the final output files there are 
 on the order of 2.1 million records, so a 5% error. The job is running in 
 a per-job YARN session with n=3, s=4 with a checkpointing interval of 10s.)
 
 On another (maybe unrelated) note: when I pulled your branch, the Travis 
 build did not go through for -Dhadoop.version=2.4.1. I have not looked 
 into this further as of now, is this one of the tests known to fail 
 sometimes?
 
 Cheers,
 Max
 
 —
 Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
 Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
 Sitz: Unterföhring * Amtsgericht München * HRB 135082
 
> Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek :
> 
> Hi Maximilian,
> sorry for the delay, we where very busy with the release last 

Re: Flink streaming throughput

2016-03-08 Thread Aljoscha Krettek
Hi,
Another interesting test would be a combination of 3) and 2). I.e. no JSON 
parsing and no sink. This would show what the raw throughput can be before 
being slowed down by writing to Elasticsearch.

Also .print() is also not feasible for production since it just prints every 
element to the stdout log on the TaskManagers, which itself can cause quite a 
slowdown. You could try:

datastream.addSink(new DiscardingSink())

which is a dummy sink that does nothing.

Cheers,
Aljoscha
> On 08 Mar 2016, at 13:31, おぎばやしひろのり  wrote:
> 
> Stephan,
> 
> Sorry for the delay in my response.
> I tried 3 cases you suggested.
> 
> This time, I set parallelism to 1 for simpicity.
> 
> 0) base performance (same as the first e-mail): 1,480msg/sec
> 1) Disable checkpointing : almost same as 0)
> 2) No ES sink. just print() : 1,510msg/sec
> 3) JSON to TSV : 8,000msg/sec
> 
> So, as you can see, the bottleneck was JSON parsing. I also want to
> try eliminating Kafka to see
> if there is a room to improve performance.(Currently, I am using
> FlinkKafkaConsumer082 with Kafka 0.9
> I think I should try Flink 1.0 and FlinkKafkaConsumer09).
> Anyway, I think 8,000msg/sec with 1 CPU is not so bad thinking of
> Flink's scalability and fault tolerance.
> Thank you for your advice.
> 
> Regards,
> Hironori Ogibayashi
> 
> 2016-02-26 21:46 GMT+09:00 おぎばやしひろのり :
>> Stephan,
>> 
>> Thank you for your quick response.
>> I will try and post the result later.
>> 
>> Regards,
>> Hironori
>> 
>> 2016-02-26 19:45 GMT+09:00 Stephan Ewen :
>>> Hi!
>>> 
>>> I would try and dig bit by bit into what the bottleneck is:
>>> 
>>> 1) Disable the checkpointing, see what difference that makes
>>> 2) Use a dummy sink (discarding) rather than elastic search, to see if that
>>> is limiting
>>> 3) Check the JSON parsing. Many JSON libraries are very CPU intensive and
>>> easily dominate the entire pipeline.
>>> 
>>> Greetings,
>>> Stephan
>>> 
>>> 
>>> On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり  wrote:
 
 Hello,
 
 I started evaluating Flink and tried simple performance test.
 The result was just about 4000 messages/sec with 300% CPU usage. I
 think this is quite low and wondering if it is a reasonable result.
 If someone could check it, it would be great.
 
 Here is the detail:
 
 [servers]
 - 3 Kafka broker with 3 partitions
 - 3 Flink TaskManager + 1 JobManager
 - 1 Elasticsearch
 All of them are separate VM with 8vCPU, 8GB memory
 
 [test case]
 The application counts access log by URI with in 1 minute window and
 send the result to Elasticsearch. The actual code is below.
 I used '-p 3' option to flink run command, so the task was distributed
 to 3 TaskManagers.
 In the test, I sent about 5000 logs/sec to Kafka.
 
 [result]
 - From Elasticsearch records, the total access count for all URI was
 about 260,000/min = 4300/sec. This is the entire throughput.
 - Kafka consumer lag was keep growing.
 - The CPU usage of each TaskManager machine was about 13-14%. From top
 command output, Flink java process was using 100%(1 CPU full)
 
 So I thought the bottleneck here was CPU used by Flink Tasks.
 
 Here is the application code.
 ---
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(1000)
 ...
val stream = env
  .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new
 SimpleStringSchema(), properties))
  .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String,
 AnyRef]] }
  .map{ x => x.get("uri") match {
case Some(y) => (y.asInstanceOf[String],1)
case None => ("", 1)
  }}
  .keyBy(0)
  .timeWindow(Time.of(1, TimeUnit.MINUTES))
  .sum(1)
  .map{ x => (System.currentTimeMillis(), x)}
  .addSink(new ElasticsearchSink(config, transports, new
 IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]]  {
override def createIndexRequest(element: Tuple2[Long,
 Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = {
  val json = new HashMap[String, AnyRef]
  json.put("@timestamp", new Timestamp(element._1))
  json.put("uri", element._2._1)
  json.put("count", element._2._2: java.lang.Integer)
  println("SENDING: " + element)
 
 Requests.indexRequest.index("dummy2").`type`("my-type").source(json)
}
  }))
 ---
 
 Regards,
 Hironori Ogibayashi
>>> 
>>> 



Re: Jobmanager HA with Rolling Sink in HDFS

2016-03-08 Thread Aljoscha Krettek
Hi,
a missing part file for one of the parallel sinks is not necessarily a problem. 
This can happen if that parallel instance of the sink never received data after 
the job successfully restarted. 

Missing data, however, is a problem. Maybe I need some more information about 
your setup:

 - When are you inspecting the part files?
 - Do you shutdown the Flink Job before checking? If so, how do you shut it 
down.
 - When do you know whether all the data from Kafka was consumed by Flink and 
has passed through the pipeline into HDFS?

Cheers,
Aljoscha
> On 08 Mar 2016, at 13:19, Maximilian Bode  wrote:
> 
> Hi Aljoscha,
> 
> oh I see. I was under the impression this file was used internally and the 
> output being completed at the end. Ok, so I extracted the relevant lines using
>   for i in part-*; do head -c $(cat "_$i.valid-length" | strings) "$i" > 
> "$i.final"; done
> which seems to do the trick.
> 
> Unfortunately, now some records are missing again. In particular, there are 
> the files
>   part-0-0, part-1-0, ..., part-10-0, part-11-0, each with corresponding 
> .valid-length files
>   part-0-1, part-1-1, ..., part-10-0
> in the bucket, where job parallelism=12. So it looks to us as if one of the 
> files was not even created in the second attempt. This behavior seems to be 
> what somewhat reproducible, cf. my earlier email where the part-11 file 
> disappeared as well.
> 
> Thanks again for your help.
> 
> Cheers,
>  Max
> — 
> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> 
>> Am 08.03.2016 um 11:05 schrieb Aljoscha Krettek :
>> 
>> Hi,
>> are you taking the “.valid-length” files into account. The problem with 
>> doing “exactly-once” with HDFS is that before Hadoop 2.7 it was not possible 
>> to truncate files. So the trick we’re using is to write the length up to 
>> which a file is valid if we would normally need to truncate it. (If the job 
>> fails in the middle of writing the output files have to be truncated to a 
>> valid position.) For example, say you have an output file part-8-0. Now, if 
>> there exists a file part-8-0.valid-length this file tells you up to which 
>> position the file part-8-0 is valid. So you should only read up to this 
>> point.
>> 
>> The name of the “.valid-length” suffix can also be configured, by the way, 
>> as can all the other stuff.
>> 
>> If this is not the problem then I definitely have to investigate further. 
>> I’ll also look into the Hadoop 2.4.1 build problem.
>> 
>> Cheers,
>> Aljoscha
>>> On 08 Mar 2016, at 10:26, Maximilian Bode  
>>> wrote:
>>> 
>>> Hi Aljoscha,
>>> thanks again for getting back to me. I built from your branch and the 
>>> exception is not occurring anymore. The RollingSink state can be restored.
>>> 
>>> Still, the exactly-once guarantee seems not to be fulfilled, there are 
>>> always some extra records after killing either a task manager or the job 
>>> manager. Do you have an idea where this behavior might be coming from? (I 
>>> guess concrete numbers will not help greatly as there are so many 
>>> parameters influencing them. Still, in our test scenario, we produce 2 
>>> million records in a Kafka queue but in the final output files there are on 
>>> the order of 2.1 million records, so a 5% error. The job is running in a 
>>> per-job YARN session with n=3, s=4 with a checkpointing interval of 10s.)
>>> 
>>> On another (maybe unrelated) note: when I pulled your branch, the Travis 
>>> build did not go through for -Dhadoop.version=2.4.1. I have not looked into 
>>> this further as of now, is this one of the tests known to fail sometimes?
>>> 
>>> Cheers,
>>> Max
>>> 
>>> — 
>>> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>> 
 Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek :
 
 Hi Maximilian,
 sorry for the delay, we where very busy with the release last week. I had 
 a hunch about the problem but I think I found a fix now. The problem is in 
 snapshot restore. When restoring, the sink tries to clean up any files 
 that where previously in progress. If Flink restores to the same snapshot 
 twice in a row then it will try to clean up the leftover files twice but 
 they are not there anymore, this causes the exception.
 
 I have a fix in my branch: 
 https://github.com/aljoscha/flink/tree/rolling-sink-fix
 
 Could you maybe try if this solves your problem? Which version of Flink 
 are you using? You would have to build from 

Re: Flink streaming throughput

2016-03-08 Thread おぎばやしひろのり
Stephan,

Sorry for the delay in my response.
I tried 3 cases you suggested.

This time, I set parallelism to 1 for simpicity.

0) base performance (same as the first e-mail): 1,480msg/sec
1) Disable checkpointing : almost same as 0)
2) No ES sink. just print() : 1,510msg/sec
3) JSON to TSV : 8,000msg/sec

So, as you can see, the bottleneck was JSON parsing. I also want to
try eliminating Kafka to see
if there is a room to improve performance.(Currently, I am using
FlinkKafkaConsumer082 with Kafka 0.9
I think I should try Flink 1.0 and FlinkKafkaConsumer09).
Anyway, I think 8,000msg/sec with 1 CPU is not so bad thinking of
Flink's scalability and fault tolerance.
Thank you for your advice.

Regards,
Hironori Ogibayashi

2016-02-26 21:46 GMT+09:00 おぎばやしひろのり :
> Stephan,
>
> Thank you for your quick response.
> I will try and post the result later.
>
> Regards,
> Hironori
>
> 2016-02-26 19:45 GMT+09:00 Stephan Ewen :
>> Hi!
>>
>> I would try and dig bit by bit into what the bottleneck is:
>>
>>  1) Disable the checkpointing, see what difference that makes
>>  2) Use a dummy sink (discarding) rather than elastic search, to see if that
>> is limiting
>>  3) Check the JSON parsing. Many JSON libraries are very CPU intensive and
>> easily dominate the entire pipeline.
>>
>> Greetings,
>> Stephan
>>
>>
>> On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり  wrote:
>>>
>>> Hello,
>>>
>>> I started evaluating Flink and tried simple performance test.
>>> The result was just about 4000 messages/sec with 300% CPU usage. I
>>> think this is quite low and wondering if it is a reasonable result.
>>> If someone could check it, it would be great.
>>>
>>> Here is the detail:
>>>
>>> [servers]
>>> - 3 Kafka broker with 3 partitions
>>> - 3 Flink TaskManager + 1 JobManager
>>> - 1 Elasticsearch
>>> All of them are separate VM with 8vCPU, 8GB memory
>>>
>>> [test case]
>>> The application counts access log by URI with in 1 minute window and
>>> send the result to Elasticsearch. The actual code is below.
>>> I used '-p 3' option to flink run command, so the task was distributed
>>> to 3 TaskManagers.
>>> In the test, I sent about 5000 logs/sec to Kafka.
>>>
>>> [result]
>>> - From Elasticsearch records, the total access count for all URI was
>>> about 260,000/min = 4300/sec. This is the entire throughput.
>>> - Kafka consumer lag was keep growing.
>>> - The CPU usage of each TaskManager machine was about 13-14%. From top
>>> command output, Flink java process was using 100%(1 CPU full)
>>>
>>> So I thought the bottleneck here was CPU used by Flink Tasks.
>>>
>>> Here is the application code.
>>> ---
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> env.enableCheckpointing(1000)
>>> ...
>>> val stream = env
>>>   .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new
>>> SimpleStringSchema(), properties))
>>>   .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String,
>>> AnyRef]] }
>>>   .map{ x => x.get("uri") match {
>>> case Some(y) => (y.asInstanceOf[String],1)
>>> case None => ("", 1)
>>>   }}
>>>   .keyBy(0)
>>>   .timeWindow(Time.of(1, TimeUnit.MINUTES))
>>>   .sum(1)
>>>   .map{ x => (System.currentTimeMillis(), x)}
>>>   .addSink(new ElasticsearchSink(config, transports, new
>>> IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]]  {
>>> override def createIndexRequest(element: Tuple2[Long,
>>> Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = {
>>>   val json = new HashMap[String, AnyRef]
>>>   json.put("@timestamp", new Timestamp(element._1))
>>>   json.put("uri", element._2._1)
>>>   json.put("count", element._2._2: java.lang.Integer)
>>>   println("SENDING: " + element)
>>>
>>> Requests.indexRequest.index("dummy2").`type`("my-type").source(json)
>>> }
>>>   }))
>>> ---
>>>
>>> Regards,
>>> Hironori Ogibayashi
>>
>>


Re: Jobmanager HA with Rolling Sink in HDFS

2016-03-08 Thread Maximilian Bode
Hi Aljoscha,

oh I see. I was under the impression this file was used internally and the 
output being completed at the end. Ok, so I extracted the relevant lines using
for i in part-*; do head -c $(cat "_$i.valid-length" | strings) "$i" > 
"$i.final"; done
which seems to do the trick.

Unfortunately, now some records are missing again. In particular, there are the 
files
part-0-0, part-1-0, ..., part-10-0, part-11-0, each with corresponding 
.valid-length files
part-0-1, part-1-1, ..., part-10-0
in the bucket, where job parallelism=12. So it looks to us as if one of the 
files was not even created in the second attempt. This behavior seems to be 
what somewhat reproducible, cf. my earlier email where the part-11 file 
disappeared as well.

Thanks again for your help.

Cheers,
 Max
—
Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

> Am 08.03.2016 um 11:05 schrieb Aljoscha Krettek :
> 
> Hi,
> are you taking the “.valid-length” files into account. The problem with doing 
> “exactly-once” with HDFS is that before Hadoop 2.7 it was not possible to 
> truncate files. So the trick we’re using is to write the length up to which a 
> file is valid if we would normally need to truncate it. (If the job fails in 
> the middle of writing the output files have to be truncated to a valid 
> position.) For example, say you have an output file part-8-0. Now, if there 
> exists a file part-8-0.valid-length this file tells you up to which position 
> the file part-8-0 is valid. So you should only read up to this point.
> 
> The name of the “.valid-length” suffix can also be configured, by the way, as 
> can all the other stuff.
> 
> If this is not the problem then I definitely have to investigate further. 
> I’ll also look into the Hadoop 2.4.1 build problem.
> 
> Cheers,
> Aljoscha
>> On 08 Mar 2016, at 10:26, Maximilian Bode  
>> wrote:
>> 
>> Hi Aljoscha,
>> thanks again for getting back to me. I built from your branch and the 
>> exception is not occurring anymore. The RollingSink state can be restored.
>> 
>> Still, the exactly-once guarantee seems not to be fulfilled, there are 
>> always some extra records after killing either a task manager or the job 
>> manager. Do you have an idea where this behavior might be coming from? (I 
>> guess concrete numbers will not help greatly as there are so many parameters 
>> influencing them. Still, in our test scenario, we produce 2 million records 
>> in a Kafka queue but in the final output files there are on the order of 2.1 
>> million records, so a 5% error. The job is running in a per-job YARN session 
>> with n=3, s=4 with a checkpointing interval of 10s.)
>> 
>> On another (maybe unrelated) note: when I pulled your branch, the Travis 
>> build did not go through for -Dhadoop.version=2.4.1. I have not looked into 
>> this further as of now, is this one of the tests known to fail sometimes?
>> 
>> Cheers,
>> Max
>> 
>> —
>> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>> 
>>> Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek :
>>> 
>>> Hi Maximilian,
>>> sorry for the delay, we where very busy with the release last week. I had a 
>>> hunch about the problem but I think I found a fix now. The problem is in 
>>> snapshot restore. When restoring, the sink tries to clean up any files that 
>>> where previously in progress. If Flink restores to the same snapshot twice 
>>> in a row then it will try to clean up the leftover files twice but they are 
>>> not there anymore, this causes the exception.
>>> 
>>> I have a fix in my branch: 
>>> https://github.com/aljoscha/flink/tree/rolling-sink-fix
>>> 
>>> Could you maybe try if this solves your problem? Which version of Flink are 
>>> you using? You would have to build from source to try it out. Alternatively 
>>> I could build it and put it onto a maven snapshot repository for you to try 
>>> it out.
>>> 
>>> Cheers,
>>> Aljoscha
 On 03 Mar 2016, at 14:50, Aljoscha Krettek  wrote:
 
 Hi,
 did you check whether there are any files at your specified HDFS output 
 location? If yes, which files are there?
 
 Cheers,
 Aljoscha
> On 03 Mar 2016, at 14:29, Maximilian Bode  
> wrote:
> 
> Just for the sake of completeness: this also happens when killing a task 
> manager and is therefore probably unrelated to job manager HA.
> 
>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode 
>> :
>> 

Re: Jobmanager HA with Rolling Sink in HDFS

2016-03-08 Thread Aljoscha Krettek
Hi,
are you taking the “.valid-length” files into account. The problem with doing 
“exactly-once” with HDFS is that before Hadoop 2.7 it was not possible to 
truncate files. So the trick we’re using is to write the length up to which a 
file is valid if we would normally need to truncate it. (If the job fails in 
the middle of writing the output files have to be truncated to a valid 
position.) For example, say you have an output file part-8-0. Now, if there 
exists a file part-8-0.valid-length this file tells you up to which position 
the file part-8-0 is valid. So you should only read up to this point.

The name of the “.valid-length” suffix can also be configured, by the way, as 
can all the other stuff.

If this is not the problem then I definitely have to investigate further. I’ll 
also look into the Hadoop 2.4.1 build problem.

Cheers,
Aljoscha
> On 08 Mar 2016, at 10:26, Maximilian Bode  wrote:
> 
> Hi Aljoscha,
> thanks again for getting back to me. I built from your branch and the 
> exception is not occurring anymore. The RollingSink state can be restored.
> 
> Still, the exactly-once guarantee seems not to be fulfilled, there are always 
> some extra records after killing either a task manager or the job manager. Do 
> you have an idea where this behavior might be coming from? (I guess concrete 
> numbers will not help greatly as there are so many parameters influencing 
> them. Still, in our test scenario, we produce 2 million records in a Kafka 
> queue but in the final output files there are on the order of 2.1 million 
> records, so a 5% error. The job is running in a per-job YARN session with 
> n=3, s=4 with a checkpointing interval of 10s.)
> 
> On another (maybe unrelated) note: when I pulled your branch, the Travis 
> build did not go through for -Dhadoop.version=2.4.1. I have not looked into 
> this further as of now, is this one of the tests known to fail sometimes?
> 
> Cheers,
>  Max
> 
> — 
> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> 
>> Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek :
>> 
>> Hi Maximilian,
>> sorry for the delay, we where very busy with the release last week. I had a 
>> hunch about the problem but I think I found a fix now. The problem is in 
>> snapshot restore. When restoring, the sink tries to clean up any files that 
>> where previously in progress. If Flink restores to the same snapshot twice 
>> in a row then it will try to clean up the leftover files twice but they are 
>> not there anymore, this causes the exception.
>> 
>> I have a fix in my branch: 
>> https://github.com/aljoscha/flink/tree/rolling-sink-fix
>> 
>> Could you maybe try if this solves your problem? Which version of Flink are 
>> you using? You would have to build from source to try it out. Alternatively 
>> I could build it and put it onto a maven snapshot repository for you to try 
>> it out.
>> 
>> Cheers,
>> Aljoscha
>>> On 03 Mar 2016, at 14:50, Aljoscha Krettek  wrote:
>>> 
>>> Hi,
>>> did you check whether there are any files at your specified HDFS output 
>>> location? If yes, which files are there?
>>> 
>>> Cheers,
>>> Aljoscha
 On 03 Mar 2016, at 14:29, Maximilian Bode  
 wrote:
 
 Just for the sake of completeness: this also happens when killing a task 
 manager and is therefore probably unrelated to job manager HA.
 
> Am 03.03.2016 um 14:17 schrieb Maximilian Bode 
> :
> 
> Hi everyone,
> 
> unfortunately, I am running into another problem trying to establish 
> exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS).
> 
> When using
> 
> RollingSink> sink = new 
> RollingSink>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
> sink.setBucketer(new NonRollingBucketer());
> output.addSink(sink);
> 
> and then killing the job manager, the new job manager is unable to 
> restore the old state throwing
> ---
> java.lang.Exception: Could not restore checkpointed state to operators 
> and functions
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:744)
> Caused by: java.lang.Exception: Failed to restore state to function: 
> In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 
> was neither moved to pending nor is still in progress.
>   at 
> 

Re: Window apply problem

2016-03-08 Thread Wang Yangjun
Hello Marcela,

I am not sure what is the “parameters mismatch” here. From the example you 
shown, it seems that you just want do a window word count. Right? 

Could you try this code and is it want you want?

Best,
Jun
-
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Integer[] array = new Integer[]{1, 2, 4, 3, 4, 3, 4, 5, 4, 6, 7, 3, 3, 6, 1, 1, 
3, 2, 4, 6};
List list = Arrays.asList(array);
DataStream> counts = env.fromCollection(list)
.windowAll(GlobalWindows.create())
.trigger(CountTrigger.of(5)).apply(new AllWindowFunction, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable tuples, 
Collector> out) throws Exception {
HashMap map = new HashMap<>();
for(Integer tuple : tuples){
Integer value = 0;
if(map.containsKey(tuple)){
value = map.get(tuple);
}
map.put(tuple, value+1);
}

for(Map.Entry entry : map.entrySet()) {
out.collect(new Tuple2<>(entry.getKey(), entry.getValue()));
}
}
});

counts.print();

env.execute("Stream WordCount");





On 08/03/16 02:57, "Marcela Charfuelan"  wrote:

>hello,
>
>I want to make a function for counting items (per type) in windows of 
>size N; For example for N=5 and the stream:
>1 2 4 3 4 3 4 5 4 6 7 3 3 6 1 1 3 2 4 6
>
>I would like to generate the tuples:
>w(1 2 4 3 4) -> (1,1)(2,1)(4,2)(3,1)
>w(3 4 5 4 6) -> (1,1)(2,1)(4,4)(3,2)(5,1)(6,1)
>w(7 3 3 6 1) -> (1,2)(2,1)(4,4)(3,4)(5,1)(6,2)(7,1)
>w(1 3 2 4 6) -> (1,3)(2,2)(4,5)(3,5)(5,1)(6,3)(7,1)
>
>I am trying to apply my own function with "Window apply", something like:
>
>items
>.windowAll(GlobalWindows.create())
>.trigger(CountTrigger.of(5))
>.apply(new MyWindowfunction())
>
>but in this case there is a parameters mismatch with apply and 
>WindowFunction, so I am not sure if it is not possible here. any suggestion?
>
>Looking at the streaming java examples, the (commented) apply example 
>shown in GroupedProcessingTimeWindowExample()
>which is applied to a timeWindow, does not work either:
>
>.keyBy(new FirstFieldKeyExtractor, Long>())
>.timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
>.apply(new SummingWindowFunction())
>
>So what I am missing here? any help is appreciated.
>
>Regards,
>Marcela.
>
>
>