Re: Performance Issue

2015-09-24 Thread Rico Bergmann
I took a first glance. 

I ran 2 test setups. One with a limited test data generator, the outputs around 
200 events per second. In this setting the new implementation keeps up with the 
incoming message rate. 

The other setup had an unlimited generation (at highest possible rate). There 
the same problem as before can be observed. After 2 minutes runtime the output 
of my program is more than a minute behind ... And increasing over time. But I 
don't know whether this could be a setup problem. I noticed the os load of my 
testsystem was around 90%. So it might be more a setup problem ...

Thanks for your support so far. 

Cheers. Rico. 





> Am 24.09.2015 um 09:33 schrieb Aljoscha Krettek :
> 
> Hi Rico,
> you should be able to get it with these steps:
> 
> git clone https://github.com/StephanEwen/incubator-flink.git flink
> cd flink
> git checkout -t origin/windows
> 
> This will get you on Stephan's windowing branch. Then you can do a
> 
> mvn clean install -DskipTests
> 
> to build it.
> 
> I will merge his stuff later today, then you should also be able to use it by 
> running the 0.10-SNAPSHOT version.
> 
> Cheers,
> Aljoscha
> 
> 
>> On Thu, 24 Sep 2015 at 09:11 Rico Bergmann  wrote:
>> Hi!
>> 
>> Sounds great. How can I get the source code before it's merged to the master 
>> branch? Unfortunately I only have 2 days left for trying this out ...
>> 
>> Greets. Rico. 
>> 
>> 
>> 
>>> Am 24.09.2015 um 00:57 schrieb Stephan Ewen :
>>> 
>>> Hi Rico!
>>> 
>>> We have finished the first part of the Window API reworks. You can find the 
>>> code here: https://github.com/apache/flink/pull/1175
>>> 
>>> It should fix the issues and offer vastly improved performance (up to 50x 
>>> faster). For now, it supports time windows, but we will support the other 
>>> cases in the next days.
>>> 
>>> I'll ping you once it is merged, I'd be curious if it fixes your issue. 
>>> Sorry that you ran into this problem...
>>> 
>>> Greetings,
>>> Stephan
>>> 
>>> 
 On Mon, Sep 7, 2015 at 12:00 PM, Rico Bergmann  
 wrote:
 Hi!
 
 While working with grouping and windowing I encountered a strange 
 behavior. I'm doing:
> dataStream.groupBy(KeySelector).window(Time.of(x, 
> TimeUnit.SECONDS)).mapWindow(toString).flatten()
 
 When I run the program containing this snippet it initially outputs data 
 at a rate around 150 events per sec. (That is roughly the input rate for 
 the program). After about 10-30 minutes the rate drops down below 5 events 
 per sec. This leads to event delivery offsets getting bigger and bigger 
 ... 
 
 Any explanation for this? I know you are reworking the streaming API. But 
 it would be useful to know, why this happens ...
 
 Cheers. Rico. 


Re: HBase issue

2015-09-24 Thread Lydia Ickler
I am really trying to get HBase to work...
Is there maybe a tutorial for all the config files?
Best regards,
Lydia


> Am 23.09.2015 um 17:40 schrieb Maximilian Michels :
> 
> In the issue, it states that it should be sufficient to append the 
> hbase-protocol.jar file to the Hadoop classpath. Flink respects the Hadoop 
> classpath and will append it to its own classpath upon launching a cluster.
> 
> To do that, you need to modify the classpath with one of the commands below. 
> Note that this has to be performed on all cluster nodes.
> 
> export HADOOP_CLASSPATH="${HADOOP_CLASSPATH}:/path/to/hbase-protocol.jar"
> export HADOOP_CLASSPATH="${HADOOP_CLASSPATH}:$(hbase mapredcp)"
> export HADOOP_CLASSPATH="${HADOOP_CLASSPATH}:$(hbase classpath)"
> 
> Alternatively, you can build a fat jar from your project with the missing 
> dependency. Flink will then automatically distribute the jar file upon job 
> submission. Just add this Maven dependency to your fat-jar pom:
> 
> 
> org.apache.hbase
> hbase-protocol
> 1.1.2
> 
> 
> Let me know if any of the two approaches work for you. After all, this is a 
> workaround because of an HBase optimzation..
> 
> Cheers,
> Max
> 
> 
> 
>> On Wed, Sep 23, 2015 at 11:16 AM, Aljoscha Krettek  
>> wrote:
>> It might me that this is causing the problem: 
>> https://issues.apache.org/jira/browse/HBASE-10304
>> 
>> In your log I see the same exception. Anyone has any idea what we could do 
>> about this?
>> 
>> 
>>> On Tue, 22 Sep 2015 at 22:40 Lydia Ickler  wrote:
>>> Hi, 
>>> 
>>> I am trying to get the HBaseReadExample to run. I have filled a table with 
>>> the HBaseWriteExample and purposely split it over 3 regions.
>>> Now when I try to read from it the first split seems to be scanned (170 
>>> rows) fine and after that the Connections of Zookeeper and RCP are suddenly 
>>> closed down.
>>> 
>>> Does anyone has an idea why this is happening?
>>> 
>>> Best regards,
>>> Lydia 
>>> 
>>> 
>>> 22:28:10,178 DEBUG org.apache.flink.runtime.operators.DataSourceTask - 
>>> Opening input split Locatable Split (2) at [grips5:60020]:  DataSource (at 
>>> createInput(ExecutionEnvironment.java:502) 
>>> (org.apache.flink.HBaseReadExample$1)) (1/1)
>>> 22:28:10,178 INFO  org.apache.flink.addons.hbase.TableInputFormat   
>>>  - opening split [2|[grips5:60020]||-]
>>> 22:28:10,189 DEBUG org.apache.zookeeper.ClientCnxn  
>>>  - Reading reply sessionid:0x24ff6a96ecd000a, packet:: clientPath:null 
>>> serverPath:null finished:false header:: 3,4  replyHeader:: 3,51539607639,0  
>>> request:: '/hbase/meta-region-server,F  response:: 
>>> #0001a726567696f6e7365727665723a363030$
>>> 22:28:10,202 DEBUG org.apache.zookeeper.ClientCnxn  
>>>  - Reading reply sessionid:0x24ff6a96ecd000a, packet:: clientPath:null 
>>> serverPath:null finished:false header:: 4,4  replyHeader:: 4,51539607639,0  
>>> request:: '/hbase/meta-region-server,F  response:: 
>>> #0001a726567696f6e7365727665723a363030$
>>> 22:28:10,211 DEBUG LocalActorRefProvider(akka://flink)  
>>>  - resolve of path sequence [/temp/$b] failed
>>> 22:28:10,233 DEBUG org.apache.hadoop.hbase.util.ByteStringer
>>>  - Failed to classload HBaseZeroCopyByteString: 
>>> java.lang.IllegalAccessError: class 
>>> com.google.protobuf.HBaseZeroCopyByteString cannot access its superclass 
>>> com.google.protobuf.LiteralByteString
>>> 22:28:10,358 DEBUG org.apache.hadoop.ipc.RpcClient  
>>>  - Use SIMPLE authentication for service ClientService, sasl=false
>>> 22:28:10,370 DEBUG org.apache.hadoop.ipc.RpcClient  
>>>  - Connecting to grips1/130.73.20.14:60020
>>> 22:28:10,380 DEBUG org.apache.hadoop.ipc.RpcClient  
>>>  - IPC Client (2145423150) connection to grips1/130.73.20.14:60020 from 
>>> hduser: starting, connections 1
>>> 22:28:10,394 DEBUG org.apache.hadoop.ipc.RpcClient  
>>>  - IPC Client (2145423150) connection to grips1/130.73.20.14:60020 from 
>>> hduser: got response header call_id: 0, totalSize: 469 bytes
>>> 22:28:10,397 DEBUG org.apache.hadoop.ipc.RpcClient  
>>>  - IPC Client (2145423150) connection to grips1/130.73.20.14:60020 from 
>>> hduser: wrote request header call_id: 0 method_name: "Get" request_param: 
>>> true
>>> 22:28:10,413 DEBUG org.apache.zookeeper.ClientCnxn  
>>>  - Reading reply sessionid:0x24ff6a96ecd000a, packet:: clientPath:null 
>>> serverPath:null finished:false header:: 5,4  replyHeader:: 5,51539607639,0  
>>> request:: '/hbase/meta-region-server,F  response:: 
>>> #0001a726567696f6e7365727665723a363030$
>>> 22:28:10,424 DEBUG org.apache.hadoop.ipc.RpcClient  
>>>  - IPC Client (2145423150) connection to 

Re: Performance Issue

2015-09-24 Thread Rico Bergmann
Hi!

Sounds great. How can I get the source code before it's merged to the master 
branch? Unfortunately I only have 2 days left for trying this out ...

Greets. Rico. 



> Am 24.09.2015 um 00:57 schrieb Stephan Ewen :
> 
> Hi Rico!
> 
> We have finished the first part of the Window API reworks. You can find the 
> code here: https://github.com/apache/flink/pull/1175
> 
> It should fix the issues and offer vastly improved performance (up to 50x 
> faster). For now, it supports time windows, but we will support the other 
> cases in the next days.
> 
> I'll ping you once it is merged, I'd be curious if it fixes your issue. Sorry 
> that you ran into this problem...
> 
> Greetings,
> Stephan
> 
> 
>> On Mon, Sep 7, 2015 at 12:00 PM, Rico Bergmann  wrote:
>> Hi!
>> 
>> While working with grouping and windowing I encountered a strange behavior. 
>> I'm doing:
>>> dataStream.groupBy(KeySelector).window(Time.of(x, 
>>> TimeUnit.SECONDS)).mapWindow(toString).flatten()
>> 
>> When I run the program containing this snippet it initially outputs data at 
>> a rate around 150 events per sec. (That is roughly the input rate for the 
>> program). After about 10-30 minutes the rate drops down below 5 events per 
>> sec. This leads to event delivery offsets getting bigger and bigger ... 
>> 
>> Any explanation for this? I know you are reworking the streaming API. But it 
>> would be useful to know, why this happens ...
>> 
>> Cheers. Rico. 
> 


Re: Performance Issue

2015-09-24 Thread Aljoscha Krettek
Hi Rico,
you should be able to get it with these steps:

git clone https://github.com/StephanEwen/incubator-flink.git flink
cd flink
git checkout -t origin/windows

This will get you on Stephan's windowing branch. Then you can do a

mvn clean install -DskipTests

to build it.

I will merge his stuff later today, then you should also be able to use it
by running the 0.10-SNAPSHOT version.

Cheers,
Aljoscha


On Thu, 24 Sep 2015 at 09:11 Rico Bergmann  wrote:

> Hi!
>
> Sounds great. How can I get the source code before it's merged to the
> master branch? Unfortunately I only have 2 days left for trying this out ...
>
> Greets. Rico.
>
>
>
> Am 24.09.2015 um 00:57 schrieb Stephan Ewen :
>
> Hi Rico!
>
> We have finished the first part of the Window API reworks. You can find
> the code here: https://github.com/apache/flink/pull/1175
>
> It should fix the issues and offer vastly improved performance (up to 50x
> faster). For now, it supports time windows, but we will support the other
> cases in the next days.
>
> I'll ping you once it is merged, I'd be curious if it fixes your issue.
> Sorry that you ran into this problem...
>
> Greetings,
> Stephan
>
>
> On Mon, Sep 7, 2015 at 12:00 PM, Rico Bergmann 
> wrote:
>
>> Hi!
>>
>> While working with grouping and windowing I encountered a strange
>> behavior. I'm doing:
>>
>> dataStream.groupBy(KeySelector).window(Time.of(x,
>> TimeUnit.SECONDS)).mapWindow(toString).flatten()
>>
>>
>> When I run the program containing this snippet it initially outputs data
>> at a rate around 150 events per sec. (That is roughly the input rate for
>> the program). After about 10-30 minutes the rate drops down below 5 events
>> per sec. This leads to event delivery offsets getting bigger and bigger ...
>>
>> Any explanation for this? I know you are reworking the streaming API. But
>> it would be useful to know, why this happens ...
>>
>> Cheers. Rico.
>>
>
>


Re: HBase issue

2015-09-24 Thread Lydia Ickler
Hi I tried that but unfortunately it still gets stuck at the second split.

Can it be that I have set something in my configurations wrong? In Hadoop? Or 
Flink?

The strange thing is that the HBaseWriteExample works great!

Best regards,
Lydia


> Am 23.09.2015 um 17:40 schrieb Maximilian Michels :
> 
> In the issue, it states that it should be sufficient to append the 
> hbase-protocol.jar file to the Hadoop classpath. Flink respects the Hadoop 
> classpath and will append it to its own classpath upon launching a cluster.
> 
> To do that, you need to modify the classpath with one of the commands below. 
> Note that this has to be performed on all cluster nodes.
> 
> export HADOOP_CLASSPATH="${HADOOP_CLASSPATH}:/path/to/hbase-protocol.jar"
> export HADOOP_CLASSPATH="${HADOOP_CLASSPATH}:$(hbase mapredcp)"
> export HADOOP_CLASSPATH="${HADOOP_CLASSPATH}:$(hbase classpath)"
> 
> Alternatively, you can build a fat jar from your project with the missing 
> dependency. Flink will then automatically distribute the jar file upon job 
> submission. Just add this Maven dependency to your fat-jar pom:
> 
> 
> org.apache.hbase
> hbase-protocol
> 1.1.2
> 
> 
> Let me know if any of the two approaches work for you. After all, this is a 
> workaround because of an HBase optimzation..
> 
> Cheers,
> Max
> 
> 
> 
> On Wed, Sep 23, 2015 at 11:16 AM, Aljoscha Krettek  > wrote:
> It might me that this is causing the problem: 
> https://issues.apache.org/jira/browse/HBASE-10304 
> 
> 
> In your log I see the same exception. Anyone has any idea what we could do 
> about this?
> 
> 
> On Tue, 22 Sep 2015 at 22:40 Lydia Ickler  > wrote:
> Hi, 
> 
> I am trying to get the HBaseReadExample to run. I have filled a table with 
> the HBaseWriteExample and purposely split it over 3 regions.
> Now when I try to read from it the first split seems to be scanned (170 rows) 
> fine and after that the Connections of Zookeeper and RCP are suddenly closed 
> down.
> 
> Does anyone has an idea why this is happening?
> 
> Best regards,
> Lydia 
> 
> 
> 22:28:10,178 DEBUG org.apache.flink.runtime.operators.DataSourceTask  
>- Opening input split Locatable Split (2) at [grips5:60020]:  DataSource 
> (at createInput(ExecutionEnvironment.java:502) 
> (org.apache.flink.HBaseReadExample$1)) (1/1)
> 22:28:10,178 INFO  org.apache.flink.addons.hbase.TableInputFormat 
>- opening split [2|[grips5:60020]||-]
> 22:28:10,189 DEBUG org.apache.zookeeper.ClientCnxn
>- Reading reply sessionid:0x24ff6a96ecd000a, packet:: clientPath:null 
> serverPath:null finished:false header:: 3,4  replyHeader:: 3,51539607639,0  
> request:: '/hbase/meta-region-server,F  response:: 
> #0001a726567696f6e7365727665723a363030$
> 22:28:10,202 DEBUG org.apache.zookeeper.ClientCnxn
>- Reading reply sessionid:0x24ff6a96ecd000a, packet:: clientPath:null 
> serverPath:null finished:false header:: 4,4  replyHeader:: 4,51539607639,0  
> request:: '/hbase/meta-region-server,F  response:: 
> #0001a726567696f6e7365727665723a363030$
> 22:28:10,211 DEBUG LocalActorRefProvider(akka://flink <>) 
>   - resolve of path sequence [/temp/$b] failed
> 22:28:10,233 DEBUG org.apache.hadoop.hbase.util.ByteStringer  
>- Failed to classload HBaseZeroCopyByteString: 
> java.lang.IllegalAccessError: class 
> com.google.protobuf.HBaseZeroCopyByteString cannot access its superclass 
> com.google.protobuf.LiteralByteString
> 22:28:10,358 DEBUG org.apache.hadoop.ipc.RpcClient
>- Use SIMPLE authentication for service ClientService, sasl=false
> 22:28:10,370 DEBUG org.apache.hadoop.ipc.RpcClient
>- Connecting to grips1/130.73.20.14:60020 
> 22:28:10,380 DEBUG org.apache.hadoop.ipc.RpcClient
>- IPC Client (2145423150 ) connection to 
> grips1/130.73.20.14:60020  from hduser: starting, 
> connections 1
> 22:28:10,394 DEBUG org.apache.hadoop.ipc.RpcClient
>- IPC Client (2145423150 ) connection to 
> grips1/130.73.20.14:60020  from hduser: got 
> response header call_id: 0, totalSize: 469 bytes
> 22:28:10,397 DEBUG org.apache.hadoop.ipc.RpcClient
>- IPC Client (2145423150 ) connection to 
> grips1/130.73.20.14:60020  from hduser: wrote 
> request header call_id: 0 method_name: "Get" request_param: true
> 22:28:10,413 DEBUG org.apache.zookeeper.ClientCnxn
>- Reading reply sessionid:0x24ff6a96ecd000a, packet:: 

Re: HBase issue

2015-09-24 Thread Robert Metzger
I'm really sorry that you are facing the issue.
I saw your message on the Hbase-user mailing list [1]. Maybe you can follow
up with Ted so that he can help you.
There are only a few Flink user on this mailing list using it with HBase. I
actually think that the problem is more on the HBase than on the Flink
side, so the HBase list can probably help you better.
(I'm not saying we should stop helping you here, I just think that the
chances on the HBase list are much higher ;) )



[1] http://qnalist.com/questions/6164722/flink-hbase

On Thu, Sep 24, 2015 at 12:49 PM, Lydia Ickler 
wrote:

> I am really trying to get HBase to work...
> Is there maybe a tutorial for all the config files?
> Best regards,
> Lydia
>
>
> Am 23.09.2015 um 17:40 schrieb Maximilian Michels :
>
> In the issue, it states that it should be sufficient to append the
> hbase-protocol.jar file to the Hadoop classpath. Flink respects the Hadoop
> classpath and will append it to its own classpath upon launching a cluster.
>
> To do that, you need to modify the classpath with one of the commands
> below. Note that this has to be performed on all cluster nodes.
>
> export HADOOP_CLASSPATH="${HADOOP_CLASSPATH}:/path/to/hbase-protocol.jar"
> export HADOOP_CLASSPATH="${HADOOP_CLASSPATH}:$(hbase mapredcp)"
> export HADOOP_CLASSPATH="${HADOOP_CLASSPATH}:$(hbase classpath)"
>
> Alternatively, you can build a fat jar from your project with the missing
> dependency. Flink will then automatically distribute the jar file upon job
> submission. Just add this Maven dependency to your fat-jar pom:
>
> 
> org.apache.hbase
> hbase-protocol
> 1.1.2
> 
>
> Let me know if any of the two approaches work for you. After all, this is
> a workaround because of an HBase optimzation..
>
> Cheers,
> Max
>
>
>
> On Wed, Sep 23, 2015 at 11:16 AM, Aljoscha Krettek 
> wrote:
>
>> It might me that this is causing the problem:
>> https://issues.apache.org/jira/browse/HBASE-10304
>>
>> In your log I see the same exception. Anyone has any idea what we could
>> do about this?
>>
>>
>> On Tue, 22 Sep 2015 at 22:40 Lydia Ickler 
>> wrote:
>>
>>> Hi,
>>>
>>> I am trying to get the HBaseReadExample to run. I have filled a table
>>> with the HBaseWriteExample and purposely split it over 3 regions.
>>> Now when I try to read from it the first split seems to be scanned (170
>>> rows) fine and after that the Connections of Zookeeper and RCP are suddenly
>>> closed down.
>>>
>>> Does anyone has an idea why this is happening?
>>>
>>> Best regards,
>>> Lydia
>>>
>>>
>>> 22:28:10,178 DEBUG org.apache.flink.runtime.operators.DataSourceTask
>>> - Opening input split Locatable Split (2) at [grips5:60020]:
>>> DataSource (at createInput(ExecutionEnvironment.java:502)
>>> (org.apache.flink.HBaseReadExample$1)) (1/1)
>>> 22:28:10,178 INFO  org.apache.flink.addons.hbase.TableInputFormat
>>> - opening split [2|[grips5:60020]||-]
>>> 22:28:10,189 DEBUG org.apache.zookeeper.ClientCnxn
>>> - Reading reply sessionid:0x24ff6a96ecd000a, packet::
>>> clientPath:null serverPath:null finished:false header:: 3,4  replyHeader::
>>> 3,51539607639,0  request:: '/hbase/meta-region-server,F  response::
>>> #0001a726567696f6e7365727665723a363030$
>>> 22:28:10,202 DEBUG org.apache.zookeeper.ClientCnxn
>>> - Reading reply sessionid:0x24ff6a96ecd000a, packet::
>>> clientPath:null serverPath:null finished:false header:: 4,4  replyHeader::
>>> 4,51539607639,0  request:: '/hbase/meta-region-server,F  response::
>>> #0001a726567696f6e7365727665723a363030$
>>> 22:28:10,211 DEBUG LocalActorRefProvider(akka://flink)
>>>   - resolve of path sequence [/temp/$b] failed
>>> 22:28:10,233 DEBUG org.apache.hadoop.hbase.util.ByteStringer
>>> - Failed to classload HBaseZeroCopyByteString:
>>> java.lang.IllegalAccessError: class
>>> com.google.protobuf.HBaseZeroCopyByteString cannot access its superclass
>>> com.google.protobuf.LiteralByteString
>>> 22:28:10,358 DEBUG org.apache.hadoop.ipc.RpcClient
>>> - Use SIMPLE authentication for service ClientService, sasl=false
>>> 22:28:10,370 DEBUG org.apache.hadoop.ipc.RpcClient
>>> - Connecting to grips1/130.73.20.14:60020
>>> 22:28:10,380 DEBUG org.apache.hadoop.ipc.RpcClient
>>> - IPC Client (2145423150) connection to grips1/
>>> 130.73.20.14:60020 from hduser: starting, connections 1
>>> 22:28:10,394 DEBUG org.apache.hadoop.ipc.RpcClient
>>> - IPC Client (2145423150) connection to grips1/
>>> 130.73.20.14:60020 from hduser: got response header call_id: 0,
>>> totalSize: 469 bytes
>>> 22:28:10,397 DEBUG org.apache.hadoop.ipc.RpcClient
>>> - IPC Client (2145423150) connection to grips1/
>>> 130.73.20.14:60020 from hduser: wrote request header call_id: 0
>>> method_name: "Get" request_param: true
>>> 22:28:10,413 DEBUG org.apache.zookeeper.ClientCnxn
>>> - Reading reply 

Re: HBase issue

2015-09-24 Thread Flavio Pompermaier
I'm actually the last developer that touched the HBase connector but I
never faced that problems with the version specified in the extension pom.
>From what I can tell looking at your logs it seems that there are some
classpath problem ( Failed to classload HBaseZeroCopyByteString:
java.lang.IllegalAccessError: class
com.google.protobuf.HBaseZeroCopyByteString cannot access its superclass
com.google.protobuf.LiteralByteString). If I were you I'd check if there
are any jar conflict resolved bad by maven and if all the required jars are
in the classpath.

The only strange thing that that extension does is to manage client
timeouts that kills the scanner resources of HBase since Flink jobs are
lazy in data fetching and HBase can close the client connection if two
consecutive calls to scanner.next() takes too much.

Best,
Flavio

On Thu, Sep 24, 2015 at 2:07 PM, Robert Metzger  wrote:

> I'm really sorry that you are facing the issue.
> I saw your message on the Hbase-user mailing list [1]. Maybe you can
> follow up with Ted so that he can help you.
> There are only a few Flink user on this mailing list using it with HBase.
> I actually think that the problem is more on the HBase than on the Flink
> side, so the HBase list can probably help you better.
> (I'm not saying we should stop helping you here, I just think that the
> chances on the HBase list are much higher ;) )
>
>
>
> [1] http://qnalist.com/questions/6164722/flink-hbase
>
> On Thu, Sep 24, 2015 at 12:49 PM, Lydia Ickler 
> wrote:
>
>> I am really trying to get HBase to work...
>> Is there maybe a tutorial for all the config files?
>> Best regards,
>> Lydia
>>
>>
>> Am 23.09.2015 um 17:40 schrieb Maximilian Michels :
>>
>> In the issue, it states that it should be sufficient to append the
>> hbase-protocol.jar file to the Hadoop classpath. Flink respects the Hadoop
>> classpath and will append it to its own classpath upon launching a cluster.
>>
>> To do that, you need to modify the classpath with one of the commands
>> below. Note that this has to be performed on all cluster nodes.
>>
>> export HADOOP_CLASSPATH="${HADOOP_CLASSPATH}:/path/to/hbase-protocol.jar"
>> export HADOOP_CLASSPATH="${HADOOP_CLASSPATH}:$(hbase mapredcp)"
>> export HADOOP_CLASSPATH="${HADOOP_CLASSPATH}:$(hbase classpath)"
>>
>> Alternatively, you can build a fat jar from your project with the missing
>> dependency. Flink will then automatically distribute the jar file upon job
>> submission. Just add this Maven dependency to your fat-jar pom:
>>
>> 
>> org.apache.hbase
>> hbase-protocol
>> 1.1.2
>> 
>>
>> Let me know if any of the two approaches work for you. After all, this is
>> a workaround because of an HBase optimzation..
>>
>> Cheers,
>> Max
>>
>>
>>
>> On Wed, Sep 23, 2015 at 11:16 AM, Aljoscha Krettek 
>> wrote:
>>
>>> It might me that this is causing the problem:
>>> https://issues.apache.org/jira/browse/HBASE-10304
>>>
>>> In your log I see the same exception. Anyone has any idea what we could
>>> do about this?
>>>
>>>
>>> On Tue, 22 Sep 2015 at 22:40 Lydia Ickler 
>>> wrote:
>>>
 Hi,

 I am trying to get the HBaseReadExample to run. I have filled a table
 with the HBaseWriteExample and purposely split it over 3 regions.
 Now when I try to read from it the first split seems to be scanned (170
 rows) fine and after that the Connections of Zookeeper and RCP are suddenly
 closed down.

 Does anyone has an idea why this is happening?

 Best regards,
 Lydia


 22:28:10,178 DEBUG org.apache.flink.runtime.operators.DataSourceTask
   - Opening input split Locatable Split (2) at [grips5:60020]:
 DataSource (at createInput(ExecutionEnvironment.java:502)
 (org.apache.flink.HBaseReadExample$1)) (1/1)
 22:28:10,178 INFO  org.apache.flink.addons.hbase.TableInputFormat
   - opening split [2|[grips5:60020]||-]
 22:28:10,189 DEBUG org.apache.zookeeper.ClientCnxn
   - Reading reply sessionid:0x24ff6a96ecd000a, packet::
 clientPath:null serverPath:null finished:false header:: 3,4  replyHeader::
 3,51539607639,0  request:: '/hbase/meta-region-server,F  response::
 #0001a726567696f6e7365727665723a363030$
 22:28:10,202 DEBUG org.apache.zookeeper.ClientCnxn
   - Reading reply sessionid:0x24ff6a96ecd000a, packet::
 clientPath:null serverPath:null finished:false header:: 4,4  replyHeader::
 4,51539607639,0  request:: '/hbase/meta-region-server,F  response::
 #0001a726567696f6e7365727665723a363030$
 22:28:10,211 DEBUG LocalActorRefProvider(akka://flink)
   - resolve of path sequence [/temp/$b] failed
 22:28:10,233 DEBUG org.apache.hadoop.hbase.util.ByteStringer
   - Failed to classload HBaseZeroCopyByteString:
 java.lang.IllegalAccessError: class
 

Re: Performance Issue

2015-09-24 Thread Aljoscha Krettek
Hi Rico,
are you generating the data directly in your flink program or some external
queue, such as Kafka?

Cheers,
Aljoscha

On Thu, 24 Sep 2015 at 13:47 Rico Bergmann  wrote:

> And as side note:
>
> The problem with duplicates seems also to be solved!
>
> Cheers Rico.
>
>
>
> Am 24.09.2015 um 12:21 schrieb Rico Bergmann :
>
> I took a first glance.
>
> I ran 2 test setups. One with a limited test data generator, the outputs
> around 200 events per second. In this setting the new implementation keeps
> up with the incoming message rate.
>
> The other setup had an unlimited generation (at highest possible rate).
> There the same problem as before can be observed. After 2 minutes runtime
> the output of my program is more than a minute behind ... And increasing
> over time. But I don't know whether this could be a setup problem. I
> noticed the os load of my testsystem was around 90%. So it might be more a
> setup problem ...
>
> Thanks for your support so far.
>
> Cheers. Rico.
>
>
>
>
>
> Am 24.09.2015 um 09:33 schrieb Aljoscha Krettek :
>
> Hi Rico,
> you should be able to get it with these steps:
>
> git clone https://github.com/StephanEwen/incubator-flink.git flink
> cd flink
> git checkout -t origin/windows
>
> This will get you on Stephan's windowing branch. Then you can do a
>
> mvn clean install -DskipTests
>
> to build it.
>
> I will merge his stuff later today, then you should also be able to use it
> by running the 0.10-SNAPSHOT version.
>
> Cheers,
> Aljoscha
>
>
> On Thu, 24 Sep 2015 at 09:11 Rico Bergmann  wrote:
>
>> Hi!
>>
>> Sounds great. How can I get the source code before it's merged to the
>> master branch? Unfortunately I only have 2 days left for trying this out ...
>>
>> Greets. Rico.
>>
>>
>>
>> Am 24.09.2015 um 00:57 schrieb Stephan Ewen :
>>
>> Hi Rico!
>>
>> We have finished the first part of the Window API reworks. You can find
>> the code here: https://github.com/apache/flink/pull/1175
>>
>> It should fix the issues and offer vastly improved performance (up to 50x
>> faster). For now, it supports time windows, but we will support the other
>> cases in the next days.
>>
>> I'll ping you once it is merged, I'd be curious if it fixes your issue.
>> Sorry that you ran into this problem...
>>
>> Greetings,
>> Stephan
>>
>>
>> On Mon, Sep 7, 2015 at 12:00 PM, Rico Bergmann 
>> wrote:
>>
>>> Hi!
>>>
>>> While working with grouping and windowing I encountered a strange
>>> behavior. I'm doing:
>>>
>>> dataStream.groupBy(KeySelector).window(Time.of(x,
>>> TimeUnit.SECONDS)).mapWindow(toString).flatten()
>>>
>>>
>>> When I run the program containing this snippet it initially outputs data
>>> at a rate around 150 events per sec. (That is roughly the input rate for
>>> the program). After about 10-30 minutes the rate drops down below 5 events
>>> per sec. This leads to event delivery offsets getting bigger and bigger ...
>>>
>>> Any explanation for this? I know you are reworking the streaming API.
>>> But it would be useful to know, why this happens ...
>>>
>>> Cheers. Rico.
>>>
>>
>>


Re: Performance Issue

2015-09-24 Thread Stephan Ewen
Hi Rico!

When you say that the program falls behind the unlimited generating source,
I assume you have some unbounded buffering channel (like Kafka) between the
generator and the Flink job. Is that correct? Flink itself backpressures to
the sources, but if the source is Kafka, this does of course not affect the
Kafka data producer.

In that case, you probably "underprovisioned" the streaming job for the
data rate. The new windowing should have much better throughput, but it may
not be high enough for the data rate, which means you probably need more
cores.

It may be worth checking other aspects of the program. Depending on what
types you use, serialization can be expensive (especially for types like
JSON).

Also, please make sure you start the system in streaming mode
("start-cluster-streaming.sh" rather than "start-cluster.sh") - that makes
a difference in memory behavior for streaming jobs.

Greetings,
Stephan


On Thu, Sep 24, 2015 at 2:53 PM, Aljoscha Krettek 
wrote:

> Hi Rico,
> are you generating the data directly in your flink program or some
> external queue, such as Kafka?
>
> Cheers,
> Aljoscha
>
> On Thu, 24 Sep 2015 at 13:47 Rico Bergmann  wrote:
>
>> And as side note:
>>
>> The problem with duplicates seems also to be solved!
>>
>> Cheers Rico.
>>
>>
>>
>> Am 24.09.2015 um 12:21 schrieb Rico Bergmann :
>>
>> I took a first glance.
>>
>> I ran 2 test setups. One with a limited test data generator, the outputs
>> around 200 events per second. In this setting the new implementation keeps
>> up with the incoming message rate.
>>
>> The other setup had an unlimited generation (at highest possible rate).
>> There the same problem as before can be observed. After 2 minutes runtime
>> the output of my program is more than a minute behind ... And increasing
>> over time. But I don't know whether this could be a setup problem. I
>> noticed the os load of my testsystem was around 90%. So it might be more a
>> setup problem ...
>>
>> Thanks for your support so far.
>>
>> Cheers. Rico.
>>
>>
>>
>>
>>
>> Am 24.09.2015 um 09:33 schrieb Aljoscha Krettek :
>>
>> Hi Rico,
>> you should be able to get it with these steps:
>>
>> git clone https://github.com/StephanEwen/incubator-flink.git flink
>> cd flink
>> git checkout -t origin/windows
>>
>> This will get you on Stephan's windowing branch. Then you can do a
>>
>> mvn clean install -DskipTests
>>
>> to build it.
>>
>> I will merge his stuff later today, then you should also be able to use
>> it by running the 0.10-SNAPSHOT version.
>>
>> Cheers,
>> Aljoscha
>>
>>
>> On Thu, 24 Sep 2015 at 09:11 Rico Bergmann  wrote:
>>
>>> Hi!
>>>
>>> Sounds great. How can I get the source code before it's merged to the
>>> master branch? Unfortunately I only have 2 days left for trying this out ...
>>>
>>> Greets. Rico.
>>>
>>>
>>>
>>> Am 24.09.2015 um 00:57 schrieb Stephan Ewen :
>>>
>>> Hi Rico!
>>>
>>> We have finished the first part of the Window API reworks. You can find
>>> the code here: https://github.com/apache/flink/pull/1175
>>>
>>> It should fix the issues and offer vastly improved performance (up to
>>> 50x faster). For now, it supports time windows, but we will support the
>>> other cases in the next days.
>>>
>>> I'll ping you once it is merged, I'd be curious if it fixes your issue.
>>> Sorry that you ran into this problem...
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Mon, Sep 7, 2015 at 12:00 PM, Rico Bergmann 
>>> wrote:
>>>
 Hi!

 While working with grouping and windowing I encountered a strange
 behavior. I'm doing:

 dataStream.groupBy(KeySelector).window(Time.of(x,
 TimeUnit.SECONDS)).mapWindow(toString).flatten()


 When I run the program containing this snippet it initially outputs
 data at a rate around 150 events per sec. (That is roughly the input rate
 for the program). After about 10-30 minutes the rate drops down below 5
 events per sec. This leads to event delivery offsets getting bigger and
 bigger ...

 Any explanation for this? I know you are reworking the streaming API.
 But it would be useful to know, why this happens ...

 Cheers. Rico.

>>>
>>>


Re: Performance Issue

2015-09-24 Thread Rico Bergmann
The test data is generated in a flink program running in a separate jvm. The 
generated data is then written to a Kafka topic from which my programs reads 
the data ...



> Am 24.09.2015 um 14:53 schrieb Aljoscha Krettek :
> 
> Hi Rico,
> are you generating the data directly in your flink program or some external 
> queue, such as Kafka?
> 
> Cheers,
> Aljoscha
> 
>> On Thu, 24 Sep 2015 at 13:47 Rico Bergmann  wrote:
>> And as side note:
>> 
>> The problem with duplicates seems also to be solved!
>> 
>> Cheers Rico. 
>> 
>> 
>> 
>>> Am 24.09.2015 um 12:21 schrieb Rico Bergmann :
>>> 
>>> I took a first glance. 
>>> 
>>> I ran 2 test setups. One with a limited test data generator, the outputs 
>>> around 200 events per second. In this setting the new implementation keeps 
>>> up with the incoming message rate. 
>>> 
>>> The other setup had an unlimited generation (at highest possible rate). 
>>> There the same problem as before can be observed. After 2 minutes runtime 
>>> the output of my program is more than a minute behind ... And increasing 
>>> over time. But I don't know whether this could be a setup problem. I 
>>> noticed the os load of my testsystem was around 90%. So it might be more a 
>>> setup problem ...
>>> 
>>> Thanks for your support so far. 
>>> 
>>> Cheers. Rico. 
>>> 
>>> 
>>> 
>>> 
>>> 
 Am 24.09.2015 um 09:33 schrieb Aljoscha Krettek :
 
 Hi Rico,
 you should be able to get it with these steps:
 
 git clone https://github.com/StephanEwen/incubator-flink.git flink
 cd flink
 git checkout -t origin/windows
 
 This will get you on Stephan's windowing branch. Then you can do a
 
 mvn clean install -DskipTests
 
 to build it.
 
 I will merge his stuff later today, then you should also be able to use it 
 by running the 0.10-SNAPSHOT version.
 
 Cheers,
 Aljoscha
 
 
> On Thu, 24 Sep 2015 at 09:11 Rico Bergmann  wrote:
> Hi!
> 
> Sounds great. How can I get the source code before it's merged to the 
> master branch? Unfortunately I only have 2 days left for trying this out 
> ...
> 
> Greets. Rico. 
> 
> 
> 
>> Am 24.09.2015 um 00:57 schrieb Stephan Ewen :
>> 
>> Hi Rico!
>> 
>> We have finished the first part of the Window API reworks. You can find 
>> the code here: https://github.com/apache/flink/pull/1175
>> 
>> It should fix the issues and offer vastly improved performance (up to 
>> 50x faster). For now, it supports time windows, but we will support the 
>> other cases in the next days.
>> 
>> I'll ping you once it is merged, I'd be curious if it fixes your issue. 
>> Sorry that you ran into this problem...
>> 
>> Greetings,
>> Stephan
>> 
>> 
>>> On Mon, Sep 7, 2015 at 12:00 PM, Rico Bergmann  
>>> wrote:
>>> Hi!
>>> 
>>> While working with grouping and windowing I encountered a strange 
>>> behavior. I'm doing:
 dataStream.groupBy(KeySelector).window(Time.of(x, 
 TimeUnit.SECONDS)).mapWindow(toString).flatten()
>>> 
>>> When I run the program containing this snippet it initially outputs 
>>> data at a rate around 150 events per sec. (That is roughly the input 
>>> rate for the program). After about 10-30 minutes the rate drops down 
>>> below 5 events per sec. This leads to event delivery offsets getting 
>>> bigger and bigger ... 
>>> 
>>> Any explanation for this? I know you are reworking the streaming API. 
>>> But it would be useful to know, why this happens ...
>>> 
>>> Cheers. Rico. 


Re: Performance Issue

2015-09-24 Thread Stephan Ewen
Makes sense. The generation process seems to be inherently faster than the
consumption process (Flink program).

Without backpressure, these two will run out of sync, and Kafka does not do
any backpressure (by design).

On Thu, Sep 24, 2015 at 4:51 PM, Rico Bergmann  wrote:

> The test data is generated in a flink program running in a separate jvm.
> The generated data is then written to a Kafka topic from which my programs
> reads the data ...
>
>
>
> Am 24.09.2015 um 14:53 schrieb Aljoscha Krettek :
>
> Hi Rico,
> are you generating the data directly in your flink program or some
> external queue, such as Kafka?
>
> Cheers,
> Aljoscha
>
> On Thu, 24 Sep 2015 at 13:47 Rico Bergmann  wrote:
>
>> And as side note:
>>
>> The problem with duplicates seems also to be solved!
>>
>> Cheers Rico.
>>
>>
>>
>> Am 24.09.2015 um 12:21 schrieb Rico Bergmann :
>>
>> I took a first glance.
>>
>> I ran 2 test setups. One with a limited test data generator, the outputs
>> around 200 events per second. In this setting the new implementation keeps
>> up with the incoming message rate.
>>
>> The other setup had an unlimited generation (at highest possible rate).
>> There the same problem as before can be observed. After 2 minutes runtime
>> the output of my program is more than a minute behind ... And increasing
>> over time. But I don't know whether this could be a setup problem. I
>> noticed the os load of my testsystem was around 90%. So it might be more a
>> setup problem ...
>>
>> Thanks for your support so far.
>>
>> Cheers. Rico.
>>
>>
>>
>>
>>
>> Am 24.09.2015 um 09:33 schrieb Aljoscha Krettek :
>>
>> Hi Rico,
>> you should be able to get it with these steps:
>>
>> git clone https://github.com/StephanEwen/incubator-flink.git flink
>> cd flink
>> git checkout -t origin/windows
>>
>> This will get you on Stephan's windowing branch. Then you can do a
>>
>> mvn clean install -DskipTests
>>
>> to build it.
>>
>> I will merge his stuff later today, then you should also be able to use
>> it by running the 0.10-SNAPSHOT version.
>>
>> Cheers,
>> Aljoscha
>>
>>
>> On Thu, 24 Sep 2015 at 09:11 Rico Bergmann  wrote:
>>
>>> Hi!
>>>
>>> Sounds great. How can I get the source code before it's merged to the
>>> master branch? Unfortunately I only have 2 days left for trying this out ...
>>>
>>> Greets. Rico.
>>>
>>>
>>>
>>> Am 24.09.2015 um 00:57 schrieb Stephan Ewen :
>>>
>>> Hi Rico!
>>>
>>> We have finished the first part of the Window API reworks. You can find
>>> the code here: https://github.com/apache/flink/pull/1175
>>>
>>> It should fix the issues and offer vastly improved performance (up to
>>> 50x faster). For now, it supports time windows, but we will support the
>>> other cases in the next days.
>>>
>>> I'll ping you once it is merged, I'd be curious if it fixes your issue.
>>> Sorry that you ran into this problem...
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Mon, Sep 7, 2015 at 12:00 PM, Rico Bergmann 
>>> wrote:
>>>
 Hi!

 While working with grouping and windowing I encountered a strange
 behavior. I'm doing:

 dataStream.groupBy(KeySelector).window(Time.of(x,
 TimeUnit.SECONDS)).mapWindow(toString).flatten()


 When I run the program containing this snippet it initially outputs
 data at a rate around 150 events per sec. (That is roughly the input rate
 for the program). After about 10-30 minutes the rate drops down below 5
 events per sec. This leads to event delivery offsets getting bigger and
 bigger ...

 Any explanation for this? I know you are reworking the streaming API.
 But it would be useful to know, why this happens ...

 Cheers. Rico.

>>>
>>>


Re: FlinkKafkaConsumer and multiple topics

2015-09-24 Thread Jakob Ericsson
We did some rebalance of topics in our Kafka cluster today. I had a flink
job running and it crashed when some of the partitions were moved, other
consumers (non flink) continued to work.

Should I configure it differently or could this be a bug?

09/24/2015 15:34:31 Source: Custom Source(3/4) switched to FAILED
java.lang.Exception: Error while fetching from broker:
Exception for partition 6: kafka.common.UnknownTopicOrPartitionException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at java.lang.Class.newInstance(Class.java:442)
at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:405)

at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:168)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Error while fetching from broker:
Exception for partition 6: kafka.common.UnknownTopicOrPartitionException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at java.lang.Class.newInstance(Class.java:442)
at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:405)

at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:421)


On Fri, Sep 18, 2015 at 2:30 PM, Robert Metzger  wrote:

> Hi,
>
> did you manually add a Kafka dependency into your project? Maybe you are
> overwriting the Kafka version to a lower version?
>
> I'm sorry that our consumer is crashing when its supposed to read an
> invalid topic .. but In general, thats a good behavior ;)
>
> Maybe you can check whether the topic exists from your user code?
> The getPartitionsForTopic() method is actually a public static method that
> you can call.
> If its throwing an exception, the topic doesn't exist anymore.
>
>
> Robert
>
> On Fri, Sep 18, 2015 at 2:21 PM, Jakob Ericsson 
> wrote:
>
>> Hit another problem. It is probably related to a topic that still exist
>> in zk but is not used anymore (therefore no partitions) or I want to start
>> a listener for a topic that hasn't yet been created. I would like it not to
>> crash.
>>
>> Also, some funny Scala <-> Java
>>
>> Exception in thread "main" java.lang.NoSuchMethodError:
>> kafka.common.ErrorMapping.InvalidTopicCode()S
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:619)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:280)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer081.(FlinkKafkaConsumer081.java:55)
>>
>> On Fri, Sep 18, 2015 at 11:02 AM, Jakob Ericsson <
>> jakob.erics...@gmail.com> wrote:
>>
>>> That will work. We have some utility classes for exposing the ZK-info.
>>>
>>> On Fri, Sep 18, 2015 at 10:50 AM, Robert Metzger 
>>> wrote:
>>>
 Hi Jakob,

 currently, its not possible to subscribe to multiple topics with one
 FlinkKafkaConsumer.

 So for now, you have to create a FKC for each topic .. so you'll end up
 with 50 sources.

 As soon as Kafka releases the new consumer, it will support subscribing
 to multiple topics (I think even with pattern support) and we can easily
 expose the APIs also to the FlinkKafkaConsumer.
 As you can see here:
 https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan
 Kafka has plans to release the new consumer API in October.

Re: FlinkKafkaConsumer and multiple topics

2015-09-24 Thread Robert Metzger
Hi Jakob,

what do you exactly mean by rebalance of topics? Did the leader of the
partitions change?
Were topics deleted?

Flink's KafkaConsumer does not try to recover from these exceptions. We
rely on Flink's fault tolerance mechanisms to restart the data consumption
(from the last valid offset).
Do you have set the setNumberOfExecutionRetries() on the ExecutionConfig?


On Thu, Sep 24, 2015 at 9:57 PM, Jakob Ericsson 
wrote:

> We did some rebalance of topics in our Kafka cluster today. I had a flink
> job running and it crashed when some of the partitions were moved, other
> consumers (non flink) continued to work.
>
> Should I configure it differently or could this be a bug?
>
> 09/24/2015 15:34:31 Source: Custom Source(3/4) switched to FAILED
> java.lang.Exception: Error while fetching from broker:
> Exception for partition 6: kafka.common.UnknownTopicOrPartitionException
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
> at java.lang.Class.newInstance(Class.java:442)
> at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
> at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
> at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:405)
>
> at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:168)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Error while fetching from broker:
> Exception for partition 6: kafka.common.UnknownTopicOrPartitionException
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
> at java.lang.Class.newInstance(Class.java:442)
> at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
> at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
> at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:405)
>
> at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:421)
>
>
> On Fri, Sep 18, 2015 at 2:30 PM, Robert Metzger 
> wrote:
>
>> Hi,
>>
>> did you manually add a Kafka dependency into your project? Maybe you are
>> overwriting the Kafka version to a lower version?
>>
>> I'm sorry that our consumer is crashing when its supposed to read an
>> invalid topic .. but In general, thats a good behavior ;)
>>
>> Maybe you can check whether the topic exists from your user code?
>> The getPartitionsForTopic() method is actually a public static method that
>> you can call.
>> If its throwing an exception, the topic doesn't exist anymore.
>>
>>
>> Robert
>>
>> On Fri, Sep 18, 2015 at 2:21 PM, Jakob Ericsson > > wrote:
>>
>>> Hit another problem. It is probably related to a topic that still exist
>>> in zk but is not used anymore (therefore no partitions) or I want to start
>>> a listener for a topic that hasn't yet been created. I would like it not to
>>> crash.
>>>
>>> Also, some funny Scala <-> Java
>>>
>>> Exception in thread "main" java.lang.NoSuchMethodError:
>>> kafka.common.ErrorMapping.InvalidTopicCode()S
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:619)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:280)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer081.(FlinkKafkaConsumer081.java:55)
>>>
>>> On Fri, Sep 18, 2015 at 11:02 AM, Jakob Ericsson <
>>> jakob.erics...@gmail.com> wrote:
>>>
 That will work. We have some utility classes for exposing the ZK-info.

 On Fri, Sep 18, 2015 at 10:50 AM, Robert Metzger 
 wrote:

Re: FlinkKafkaConsumer and multiple topics

2015-09-24 Thread Jakob Ericsson
What I actually meant was partition reassignment (
https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-6.ReassignPartitionsTool
).
No topics were deleted.
We added a bunch of new servers and needed to reassign some partitions to
spread the load.

No, I haven't set the setNumberOfExecutionRetries().

On Thu, Sep 24, 2015 at 10:06 PM, Robert Metzger 
wrote:

> Hi Jakob,
>
> what do you exactly mean by rebalance of topics? Did the leader of the
> partitions change?
> Were topics deleted?
>
> Flink's KafkaConsumer does not try to recover from these exceptions. We
> rely on Flink's fault tolerance mechanisms to restart the data consumption
> (from the last valid offset).
> Do you have set the setNumberOfExecutionRetries() on the ExecutionConfig?
>
>
> On Thu, Sep 24, 2015 at 9:57 PM, Jakob Ericsson 
> wrote:
>
>> We did some rebalance of topics in our Kafka cluster today. I had a flink
>> job running and it crashed when some of the partitions were moved, other
>> consumers (non flink) continued to work.
>>
>> Should I configure it differently or could this be a bug?
>>
>> 09/24/2015 15:34:31 Source: Custom Source(3/4) switched to FAILED
>> java.lang.Exception: Error while fetching from broker:
>> Exception for partition 6: kafka.common.UnknownTopicOrPartitionException
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>> at java.lang.Class.newInstance(Class.java:442)
>> at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
>> at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:405)
>>
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:168)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.IOException: Error while fetching from broker:
>> Exception for partition 6: kafka.common.UnknownTopicOrPartitionException
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>> at java.lang.Class.newInstance(Class.java:442)
>> at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
>> at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:405)
>>
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:421)
>>
>>
>> On Fri, Sep 18, 2015 at 2:30 PM, Robert Metzger 
>> wrote:
>>
>>> Hi,
>>>
>>> did you manually add a Kafka dependency into your project? Maybe you are
>>> overwriting the Kafka version to a lower version?
>>>
>>> I'm sorry that our consumer is crashing when its supposed to read an
>>> invalid topic .. but In general, thats a good behavior ;)
>>>
>>> Maybe you can check whether the topic exists from your user code?
>>> The getPartitionsForTopic() method is actually a public static method that
>>> you can call.
>>> If its throwing an exception, the topic doesn't exist anymore.
>>>
>>>
>>> Robert
>>>
>>> On Fri, Sep 18, 2015 at 2:21 PM, Jakob Ericsson <
>>> jakob.erics...@gmail.com> wrote:
>>>
 Hit another problem. It is probably related to a topic that still exist
 in zk but is not used anymore (therefore no partitions) or I want to start
 a listener for a topic that hasn't yet been created. I would like it not to
 crash.

 Also, some funny Scala <-> Java

 Exception in thread "main" java.lang.NoSuchMethodError:
 kafka.common.ErrorMapping.InvalidTopicCode()S
 at
 

Re: /home/user/udfs.jar same question

2015-09-24 Thread Chiwan Park
Hi Deng,

The jarFiles parameter of `createRemoteEnvironment` means that the path of your 
custom library jar. If you don’t need custom library, you can omit the 
parameter.

Regards,
Chiwan Park

> On Sep 25, 2015, at 10:48 AM, Deng Jie  wrote:
> 
> Dear Flink org,i have same question,like:
> public static void main(String[] args) throws Exception {
> 
> ExecutionEnvironment env 
> =
>  ExecutionEnvironment
> 
> .createRemoteEnvironment("flink-master", 6123, "/home/user/udfs.jar");
> 
> 
> DataSet
>  data = env.readTextFile("hdfs://path/to/file");
> 
> 
> data
> 
> .filter(new FilterFunction() {
> 
> 
> public boolean filter(String value) {
> 
> 
> return value.startsWith("http://;);
> 
> 
> }
> 
> 
> })
> 
> 
> .writeAsText("hdfs://path/to/result");
> 
> 
> env
> .execute();
> }
> 
> How to write the file(udfs.jar),can you give me an example?In addition,can 
> this parameter as an option?
> 
> -- Best wishes for you





/home/user/udfs.jar same question

2015-09-24 Thread Deng Jie
Dear Flink org,i have same question,like:

public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment
.createRemoteEnvironment("flink-master", 6123, "/home/user/udfs.jar");

DataSet data = env.readTextFile("hdfs://path/to/file");

data
.filter(new FilterFunction() {
public boolean filter(String value) {
return value.startsWith("http://;);
}
})
.writeAsText("hdfs://path/to/result");

env.execute();}


How to write the file(udfs.jar),can you give me an example?In addition,can
this parameter as an option?

-- Best wishes for you