[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-10-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15563131#comment-15563131
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2330


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
> Fix For: 1.1.3
>
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
>   ... 37 more
> {quote}
> I found that in the [documentation of the InputFormat 
> interface|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/io/InputFormat.html]
>  is clearly states
> {quote}IMPORTANT NOTE: Input formats must be written such that an instance 
> can be opened again after it was closed. That is due to the fact that the 
> input format is used for potentially multiple splits. After a split is done, 
> the format's close function is invoked and, if another split is available, 
> the open function is invoked afterwards for the next split.{quote}
> It appears that this specific InputFormat has not been checked 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-10-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15562203#comment-15562203
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2330
  
Merging


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
> Fix For: 1.1.3
>
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
>   ... 37 more
> {quote}
> I found that in the [documentation of the InputFormat 
> interface|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/io/InputFormat.html]
>  is clearly states
> {quote}IMPORTANT NOTE: Input formats must be written such that an instance 
> can be opened again after it was closed. That is due to the fact that the 
> input format is used for potentially multiple splits. After a split is done, 
> the format's close function is invoked and, if another split is available, 
> the open function is invoked afterwards for the next split.{quote}
> It appears that this specific InputFormat has not 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-10-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15561846#comment-15561846
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2330
  
I disabled tests for the hadoop1 profile. 
Will build the PR one more time and merge if everything passes.


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
> Fix For: 1.1.3
>
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
>   ... 37 more
> {quote}
> I found that in the [documentation of the InputFormat 
> interface|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/io/InputFormat.html]
>  is clearly states
> {quote}IMPORTANT NOTE: Input formats must be written such that an instance 
> can be opened again after it was closed. That is due to the fact that the 
> input format is used for potentially multiple splits. After a split is done, 
> the format's close function is invoked and, if another split is available, 
> the open 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-10-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15561703#comment-15561703
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2330
  
I'll propose to drop the hadoop1 builds on the dev ML.


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
> Fix For: 1.1.3
>
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
>   ... 37 more
> {quote}
> I found that in the [documentation of the InputFormat 
> interface|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/io/InputFormat.html]
>  is clearly states
> {quote}IMPORTANT NOTE: Input formats must be written such that an instance 
> can be opened again after it was closed. That is due to the fact that the 
> input format is used for potentially multiple splits. After a split is done, 
> the format's close function is invoked and, if another split is available, 
> the open function is invoked afterwards for the next split.{quote}
> It 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15560672#comment-15560672
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2330
  
I noticed building this PR for hadoop1 (`mvn clean install 
-Dhadoop.profile=1`) fails:

> The following artifacts could not be resolved: 
org.apache.hadoop:hadoop-hdfs:jar:tests:1.2.1, 
org.apache.hbase:hbase-hadoop2-compat:jar:tests:0.98.11-hadoop1: Could not find 
artifact org.apache.hadoop:hadoop-hdfs:jar:tests:1.2.1 in central

I'm not a Maven guru. Is it possible to disable compiling and executing the 
tests for the hadoop1 profile?


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
> Fix For: 1.1.3
>
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
>   ... 37 more
> {quote}
> I found that in the [documentation of the InputFormat 
> interface|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/io/InputFormat.html]
>  

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-10-04 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15545404#comment-15545404
 ] 

Fabian Hueske commented on FLINK-4311:
--

Merged for Flink 1.1 as 98b399d4b4ddc9ab5d01e40dcb9ab0889f0d1067

> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
>   ... 37 more
> {quote}
> I found that in the [documentation of the InputFormat 
> interface|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/io/InputFormat.html]
>  is clearly states
> {quote}IMPORTANT NOTE: Input formats must be written such that an instance 
> can be opened again after it was closed. That is due to the fact that the 
> input format is used for potentially multiple splits. After a split is done, 
> the format's close function is invoked and, if another split is available, 
> the open function is invoked afterwards for the next split.{quote}
> It appears that this specific InputFormat has not been checked against this 
> constraint.



--
This message was sent by 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-10-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15544916#comment-15544916
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2330
  
Hi @nielsbasjes, I just posted to the dev mailinglist and proposed to 
update the HBase dependency to 1.2.3 (as 
[FLINK-2765](https://issues.apache.org/jira/browse/FLINK-2765) suggests). By 
the end of the week we have a decision and I will merge this PR to the master 
branch.

In the meantime, I will merge the fixed TableInputFormat changes to the 
Flink 1.1 branch and revert all breaking changes (pom.xml, RichInputFormat, 
hbase-site.xml, tests, ...). 
For Flink 1.2.0 we want these changes.

Thanks, Fabian


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
>   ... 37 more
> {quote}
> I found that in the [documentation of the InputFormat 
> 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15512604#comment-15512604
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2330
  
Hi @nielsbasjes, thanks for fixing and cleaning up the `TableInputFormat`. 
This PR is good to merge.


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
>   ... 37 more
> {quote}
> I found that in the [documentation of the InputFormat 
> interface|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/io/InputFormat.html]
>  is clearly states
> {quote}IMPORTANT NOTE: Input formats must be written such that an instance 
> can be opened again after it was closed. That is due to the fact that the 
> input format is used for potentially multiple splits. After a split is done, 
> the format's close function is invoked and, if another split is available, 
> the open function is invoked afterwards for the next 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15509694#comment-15509694
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user nielsbasjes commented on a diff in the pull request:

https://github.com/apache/flink/pull/2330#discussion_r79816123
  
--- Diff: 
flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
 ---
@@ -67,18 +66,23 @@
protected abstract T mapResultToTuple(Result r);
 
/**
-* creates a {@link Scan} object and a {@link HTable} connection
+* Creates a {@link Scan} object and opens the {@link HTable} 
connection.
+* These are opened here because they are needed in the 
createInputSplits
+* which is called before the openInputFormat method.
+* So the connection is opened in {@link #configure(Configuration)} and 
closed in {@link #closeInputFormat()}.
 *
-* @param parameters
+* @param parameters The configuration that is to be used
 * @see Configuration
 */
@Override
public void configure(Configuration parameters) {
-   this.table = createTable();
-   this.scan = getScanner();
+   table = createTable();
+   scan = getScanner();
--- End diff --

Done. Yet because 'configure()' doesn't have a way to fail nicely I added 
those checks to the other methods.


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15509696#comment-15509696
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user nielsbasjes commented on a diff in the pull request:

https://github.com/apache/flink/pull/2330#discussion_r79816145
  
--- Diff: 
flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TestTableInputFormatITCase.java
 ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestTableInputFormatITCase extends 
HBaseTestingClusterAutostarter {
+   private static final String TEST_TABLE_NAME = 
"TableInputFormatTestTable";
+   private static final byte[] TEST_TABLE_FAMILY_NAME = "F".getBytes();
+   private static final byte[] TEST_TABLE_COLUMN_NAME = "Col".getBytes();
+
+   // These are the row ids AND also the values we will put in the test 
table
+   private static final String[] ROW_IDS = {"000", "111", "222", "333", 
"444", "555", "666", "777", "888", "999"};
+
+   @Before
+   public void createTestTable() throws IOException {
+   TableName tableName = TableName.valueOf(TEST_TABLE_NAME);
+   byte[][] splitKeys = {"0".getBytes(), "3".getBytes(), 
"6".getBytes(), "9".getBytes()};
+   createTable(tableName, TEST_TABLE_FAMILY_NAME, splitKeys);
+   HTable table = openTable(tableName);
+
+   for (String rowId : ROW_IDS) {
+   byte[] rowIdBytes = rowId.getBytes();
+   Put p = new Put(rowIdBytes);
+   // Use the rowId as the value to facilitate the testing 
better
+   p.add(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME, 
rowIdBytes);
+   table.put(p);
+   }
+
+   table.close();
+   }
+
+   class InputFormatForTestTable extends TableInputFormat {
+   @Override
+   protected Scan getScanner() {
+   return new Scan();
+   }
+
+   @Override
+   protected String getTableName() {
+   return TEST_TABLE_NAME;
+   }
+
+   @Override
+   protected Tuple1 mapResultToTuple(Result r) {
+   return new Tuple1<>(new 
String(r.getValue(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME)));
+   }
+   }
+
+   @Test
+   public void testTableInputFormat() {
+   ExecutionEnvironment environment = 
ExecutionEnvironment.getExecutionEnvironment();
+   environment.setParallelism(1);
+
+   DataSet resultDataSet =
+   environment.createInput(new 
InputFormatForTestTable()).map(new MapFunction() {
+   @Override
+   public String map(Tuple1 value) throws 
Exception {
+   return value.f0;
+   }
+   });
+
+   List resultSet = new ArrayList<>();
+   resultDataSet.output(new 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15509695#comment-15509695
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user nielsbasjes commented on a diff in the pull request:

https://github.com/apache/flink/pull/2330#discussion_r79816134
  
--- Diff: 
flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TestTableInputFormatITCase.java
 ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestTableInputFormatITCase extends 
HBaseTestingClusterAutostarter {
--- End diff --

Done


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15509367#comment-15509367
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2330#discussion_r79791290
  
--- Diff: 
flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TestTableInputFormatITCase.java
 ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestTableInputFormatITCase extends 
HBaseTestingClusterAutostarter {
--- End diff --

Please rename to `TableInputFormatITCase`.


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15509366#comment-15509366
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2330#discussion_r79791053
  
--- Diff: 
flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
 ---
@@ -67,18 +66,23 @@
protected abstract T mapResultToTuple(Result r);
--- End diff --

Can we remove the confusing comment `"abstract methods allow for multiple 
table and scanners in the same job"` and add JavaDocs to all abstract methods 
that describe what is expected from their implementation?

A `TableInputFormat` instance should only scan a single table. In case more 
tables need to be read, each could be read with a separate `TableInputFormat` 
instance and the output of those could be unioned if needed.


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
>   

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15509369#comment-15509369
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2330#discussion_r79791990
  
--- Diff: 
flink-batch-connectors/flink-hbase/src/test/resources/hbase-site.xml ---
@@ -1,43 +0,0 @@
-
-
-
-
--- End diff --

I see, thanks.
I think so far this file has been used as a template. Not sure how valuable 
it is to have.


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
>   ... 37 more
> {quote}
> I found that in the [documentation of the InputFormat 
> interface|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/io/InputFormat.html]
>  is clearly states
> {quote}IMPORTANT NOTE: Input formats must be written such that an instance 
> can be opened again after it was closed. That is due to the fact that the 
> input 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15509368#comment-15509368
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2330#discussion_r79791682
  
--- Diff: 
flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TestTableInputFormatITCase.java
 ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestTableInputFormatITCase extends 
HBaseTestingClusterAutostarter {
+   private static final String TEST_TABLE_NAME = 
"TableInputFormatTestTable";
+   private static final byte[] TEST_TABLE_FAMILY_NAME = "F".getBytes();
+   private static final byte[] TEST_TABLE_COLUMN_NAME = "Col".getBytes();
+
+   // These are the row ids AND also the values we will put in the test 
table
+   private static final String[] ROW_IDS = {"000", "111", "222", "333", 
"444", "555", "666", "777", "888", "999"};
+
+   @Before
+   public void createTestTable() throws IOException {
+   TableName tableName = TableName.valueOf(TEST_TABLE_NAME);
+   byte[][] splitKeys = {"0".getBytes(), "3".getBytes(), 
"6".getBytes(), "9".getBytes()};
+   createTable(tableName, TEST_TABLE_FAMILY_NAME, splitKeys);
+   HTable table = openTable(tableName);
+
+   for (String rowId : ROW_IDS) {
+   byte[] rowIdBytes = rowId.getBytes();
+   Put p = new Put(rowIdBytes);
+   // Use the rowId as the value to facilitate the testing 
better
+   p.add(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME, 
rowIdBytes);
+   table.put(p);
+   }
+
+   table.close();
+   }
+
+   class InputFormatForTestTable extends TableInputFormat {
+   @Override
+   protected Scan getScanner() {
+   return new Scan();
+   }
+
+   @Override
+   protected String getTableName() {
+   return TEST_TABLE_NAME;
+   }
+
+   @Override
+   protected Tuple1 mapResultToTuple(Result r) {
+   return new Tuple1<>(new 
String(r.getValue(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME)));
+   }
+   }
+
+   @Test
+   public void testTableInputFormat() {
+   ExecutionEnvironment environment = 
ExecutionEnvironment.getExecutionEnvironment();
+   environment.setParallelism(1);
+
+   DataSet resultDataSet =
+   environment.createInput(new 
InputFormatForTestTable()).map(new MapFunction() {
+   @Override
+   public String map(Tuple1 value) throws 
Exception {
+   return value.f0;
+   }
+   });
+
+   List resultSet = new ArrayList<>();
+   resultDataSet.output(new 
LocalCollectionOutputFormat<>(resultSet));

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15509370#comment-15509370
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2330#discussion_r79791171
  
--- Diff: 
flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
 ---
@@ -67,18 +66,23 @@
protected abstract T mapResultToTuple(Result r);
 
/**
-* creates a {@link Scan} object and a {@link HTable} connection
+* Creates a {@link Scan} object and opens the {@link HTable} 
connection.
+* These are opened here because they are needed in the 
createInputSplits
+* which is called before the openInputFormat method.
+* So the connection is opened in {@link #configure(Configuration)} and 
closed in {@link #closeInputFormat()}.
 *
-* @param parameters
+* @param parameters The configuration that is to be used
 * @see Configuration
 */
@Override
public void configure(Configuration parameters) {
-   this.table = createTable();
-   this.scan = getScanner();
+   table = createTable();
+   scan = getScanner();
--- End diff --

Can you add a check that `table` and `scan` are properly initialized, i.e., 
`!= null`?


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15504563#comment-15504563
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2330#discussion_r79469513
  
--- Diff: 
flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
 ---
@@ -131,37 +153,27 @@ public T nextRecord(T reuse) throws IOException {
}
 
@Override
-   public void open(TableInputSplit split) throws IOException {
-   if (split == null){
-   throw new IOException("Input split is null!");
-   }
-   if (table == null){
-   throw new IOException("No HTable provided!");
-   }
-   if (scan == null){
-   throw new IOException("No Scan instance provided");
+   public void close() throws IOException {
+   LOG.info("Closing split (scanned {} rows)", scannedRows);
+   this.lastRow = null;
+   try {
+   if(resultScanner !=null) {
--- End diff --

insert space between `!=` and `null`


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15504565#comment-15504565
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2330#discussion_r79472506
  
--- Diff: 
flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TestTableInputFormat.java
 ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestTableInputFormat extends HBaseTestingClusterAutostarter {
+   private static final String TEST_TABLE_NAME = 
"TableInputFormatTestTable";
+   private static final byte[] TEST_TABLE_FAMILY_NAME = "F".getBytes();
+   private static final byte[] TEST_TABLE_COLUMN_NAME = "Col".getBytes();
+
+   // These are the row ids AND also the values we will put in the test 
table
+   private static final String[] ROW_IDS = {"000", "111", "222", "333", 
"444", "555", "666", "777", "888", "999"};
+
+   @Before
+   public void createTestTable() throws IOException {
+   TableName tableName = TableName.valueOf(TEST_TABLE_NAME);
+   byte[][] splitKeys = {"0".getBytes(), "3".getBytes(), 
"6".getBytes(), "9".getBytes()};
+   createTable(tableName, TEST_TABLE_FAMILY_NAME, splitKeys);
+   HTable table = openTable(tableName);
+
+   for (String rowId : ROW_IDS) {
+   byte[] rowIdBytes = rowId.getBytes();
+   Put p = new Put(rowIdBytes);
+   // Use the rowId as the value to facilitate the testing 
better
+   p.add(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME, 
rowIdBytes);
+   table.put(p);
+   }
+
+   table.close();
+   }
+
+   class InputFormatForTestTable extends TableInputFormat {
+   @Override
+   protected Scan getScanner() {
+   return new Scan();
+   }
+
+   @Override
+   protected String getTableName() {
+   return TEST_TABLE_NAME;
+   }
+
+   @Override
+   protected Tuple1 mapResultToTuple(Result r) {
+   return new Tuple1<>(new 
String(r.getValue(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME)));
+   }
+   }
+
+   @Test
+   public void testTableInputFormat() {
--- End diff --

Can we make this test a bit more lightweight and not execute a Flink 
program?
Instead we could test the interface methods of the InputFormat such as:
- createInputSplits
- configure
- open
- nextRecord
- close

etc.

if you split the test into several methods, please make sure that HBase is 
only initalized once with `@BeforeClass`.


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>   

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15504568#comment-15504568
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2330#discussion_r79477173
  
--- Diff: 
flink-batch-connectors/flink-hbase/src/test/resources/log4j-test.properties ---
@@ -15,9 +15,16 @@
 # specific language governing permissions and limitations
 # under the License.
 
-log4j.rootLogger=${hadoop.root.logger}
-hadoop.root.logger=INFO,console
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p 
%c{2}: %m%n
+log4j.rootLogger=DEBUG, stdout, file
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.threshold=INFO
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %-5p 
%30c{1}:%4L - %m%n
+## file appender
--- End diff --

Can you remove the file appender configuration? This creates a 3.5 MB file 
and make the test heavier. We are suffering from long build times and try to 
keep new tests as lightweight as possible. Thanks


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15504562#comment-15504562
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2330#discussion_r79469573
  
--- Diff: 
flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
 ---
@@ -131,37 +153,27 @@ public T nextRecord(T reuse) throws IOException {
}
 
@Override
-   public void open(TableInputSplit split) throws IOException {
-   if (split == null){
-   throw new IOException("Input split is null!");
-   }
-   if (table == null){
-   throw new IOException("No HTable provided!");
-   }
-   if (scan == null){
-   throw new IOException("No Scan instance provided");
+   public void close() throws IOException {
+   LOG.info("Closing split (scanned {} rows)", scannedRows);
+   this.lastRow = null;
+   try {
+   if(resultScanner !=null) {
+   this.resultScanner.close();
+   }
+   } finally {
+   this.resultScanner = null;
}
-
-   logSplitInfo("opening", split);
-   scan.setStartRow(split.getStartRow());
-   lastRow = split.getEndRow();
-   scan.setStopRow(lastRow);
-
-   this.rs = table.getScanner(scan);
-   this.endReached = false;
-   this.scannedRows = 0;
}
 
@Override
-   public void close() throws IOException {
-   if(rs!=null){
-   this.rs.close();
-   }
-   if(table!=null){
-   this.table.close();
+   public void closeInputFormat() throws IOException {
+   try {
+   if(table!=null) {
--- End diff --

insert spaces


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15504569#comment-15504569
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2330#discussion_r79469241
  
--- Diff: 
flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
 ---
@@ -93,32 +102,45 @@ private HTable createTable() {
}
 
@Override
+   public void open(TableInputSplit split) throws IOException {
+   if (split == null) {
+   throw new IOException("Input split is null!");
+   }
+
+   logSplitInfo("opening", split);
+   scan.setStartRow(split.getStartRow());
+   lastRow = split.getEndRow();
+   scan.setStopRow(lastRow);
+
+   resultScanner = table.getScanner(scan);
+   endReached = false;
+   scannedRows = 0;
+   }
+
+   @Override
public boolean reachedEnd() throws IOException {
return this.endReached;
}
 
@Override
public T nextRecord(T reuse) throws IOException {
-   if (this.rs == null){
+   if (this.resultScanner == null){
throw new IOException("No table result scanner 
provided!");
}
try{
--- End diff --

insert space


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15504567#comment-15504567
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2330#discussion_r79470876
  
--- Diff: 
flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TestTableInputFormat.java
 ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestTableInputFormat extends HBaseTestingClusterAutostarter {
--- End diff --

Long running integration tests have to follow the `*ITCase` naming pattern. 
This will cause them to be executed in Maven's verify phase.


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15504570#comment-15504570
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2330#discussion_r79469197
  
--- Diff: 
flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
 ---
@@ -93,32 +102,45 @@ private HTable createTable() {
}
 
@Override
+   public void open(TableInputSplit split) throws IOException {
+   if (split == null) {
+   throw new IOException("Input split is null!");
+   }
+
+   logSplitInfo("opening", split);
+   scan.setStartRow(split.getStartRow());
+   lastRow = split.getEndRow();
+   scan.setStopRow(lastRow);
+
+   resultScanner = table.getScanner(scan);
+   endReached = false;
+   scannedRows = 0;
+   }
+
+   @Override
public boolean reachedEnd() throws IOException {
return this.endReached;
}
 
@Override
public T nextRecord(T reuse) throws IOException {
-   if (this.rs == null){
+   if (this.resultScanner == null){
--- End diff --

add a space before `{`


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15504566#comment-15504566
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2330#discussion_r79477333
  
--- Diff: 
flink-batch-connectors/flink-hbase/src/test/resources/hbase-site.xml ---
@@ -1,43 +0,0 @@
-
-
-
-
--- End diff --

I'm not so familiar with HBase. Is the config no longer required for HBase 
1.1.2 or why did you remove it?


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
>   ... 37 more
> {quote}
> I found that in the [documentation of the InputFormat 
> interface|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/io/InputFormat.html]
>  is clearly states
> {quote}IMPORTANT NOTE: Input formats must be written such that an instance 
> can be opened again after it was closed. That is due to the fact that the 
> input format 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15504564#comment-15504564
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2330#discussion_r79468879
  
--- Diff: 
flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
 ---
@@ -67,15 +67,24 @@
protected abstract T mapResultToTuple(Result r);
 
/**
-* creates a {@link Scan} object and a {@link HTable} connection
-*
-* @param parameters
+* Creates a {@link Scan} object and opens the {@link HTable} 
connection.
+* These are opened here because they are needed in the 
createInputSplits
+* which is called before the openInputFormat method.
+* So the connection is opened in {@link #configure(Configuration)} and 
closed in {@link #closeInputFormat()}.
+* @param parameters The configuration that is to be used
 * @see Configuration
 */
@Override
public void configure(Configuration parameters) {
-   this.table = createTable();
-   this.scan = getScanner();
+   table = createTable();
+   scan = getScanner();
+   }
+
+   /**
+* Do nothing.
+*/
+   @Override
+   public void openInputFormat() throws IOException {
--- End diff --

No need to override this method.


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-08-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15445391#comment-15445391
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user nielsbasjes commented on the issue:

https://github.com/apache/flink/pull/2330
  
Current version has a problem in building the shaded jars.
I runs into an infinite loop in creating the dependency-reduced-pom.xml as 
described here: 

**Shade Plugin gets stuck in infinite loop building dependency reduced 
POM** https://issues.apache.org/jira/browse/MSHADE-148
Although all my versions are newer than the fix described there I still see 
the problem.



> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
>   ... 37 more
> {quote}
> I found that in the [documentation of the InputFormat 
> interface|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/io/InputFormat.html]
>  is clearly states
> {quote}IMPORTANT NOTE: Input formats must be written such that an instance 
> 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-08-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15439102#comment-15439102
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user nielsbasjes commented on the issue:

https://github.com/apache/flink/pull/2330
  
I managed to resolve the problems with running these unit tests. 
These problems were caused by version conflicts in guava.
Now we have a HBaseMiniCluster that is started, a table with multiple 
regions is created. And the TableInputFormat is used to extract the rows again. 
By setting the paralellism to 1 the same TableInputFormat instance is used for 
multiple regions and succeeds (the problem this all started with).

Please review.


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
>   ... 37 more
> {quote}
> I found that in the [documentation of the InputFormat 
> interface|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/io/InputFormat.html]
>  is clearly states
> 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15436952#comment-15436952
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user nielsbasjes commented on the issue:

https://github.com/apache/flink/pull/2330
  
I did a few serious attempts to create a unit test that fires the 
HBaseMiniCluster ... and failed.


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
>   ... 37 more
> {quote}
> I found that in the [documentation of the InputFormat 
> interface|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/io/InputFormat.html]
>  is clearly states
> {quote}IMPORTANT NOTE: Input formats must be written such that an instance 
> can be opened again after it was closed. That is due to the fact that the 
> input format is used for potentially multiple splits. After a split is done, 
> the format's close function is invoked and, if another split is available, 
> the open function is invoked afterwards for the next 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-08-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15415291#comment-15415291
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user nielsbasjes commented on the issue:

https://github.com/apache/flink/pull/2330
  
I will add a unit test for this.


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
>   ... 37 more
> {quote}
> I found that in the [documentation of the InputFormat 
> interface|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/io/InputFormat.html]
>  is clearly states
> {quote}IMPORTANT NOTE: Input formats must be written such that an instance 
> can be opened again after it was closed. That is due to the fact that the 
> input format is used for potentially multiple splits. After a split is done, 
> the format's close function is invoked and, if another split is available, 
> the open function is invoked afterwards for the next split.{quote}
> It appears that this specific InputFormat has not 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15413108#comment-15413108
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user nielsbasjes commented on the issue:

https://github.com/apache/flink/pull/2330
  
Question: Is this change good? 
Or do you have more things that I need to change before it can be committed?


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
>   ... 37 more
> {quote}
> I found that in the [documentation of the InputFormat 
> interface|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/io/InputFormat.html]
>  is clearly states
> {quote}IMPORTANT NOTE: Input formats must be written such that an instance 
> can be opened again after it was closed. That is due to the fact that the 
> input format is used for potentially multiple splits. After a split is done, 
> the format's close function is invoked and, if another split is available, 
> the open function is invoked afterwards 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15407443#comment-15407443
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user nielsbasjes commented on the issue:

https://github.com/apache/flink/pull/2330
  
I had another look at the "multiple tables" question. The name of the table 
comes from the getTableName method that is to be implemented by the subclass. I 
consider it to be extremely unlikely that multiple calls to that method in a 
single instance will yield different table names.


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
>   ... 37 more
> {quote}
> I found that in the [documentation of the InputFormat 
> interface|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/io/InputFormat.html]
>  is clearly states
> {quote}IMPORTANT NOTE: Input formats must be written such that an instance 
> can be opened again after it was closed. That is due to the fact that the 
> input format is used for 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15407410#comment-15407410
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user nielsbasjes commented on a diff in the pull request:

https://github.com/apache/flink/pull/2330#discussion_r73482199
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ---
@@ -328,7 +328,11 @@ public void run() {
synchronized (checkpointLock) {
LOG.info("Reader terminated, and 
exiting...");
 
-   this.format.closeInputFormat();
+   try {
+   this.format.closeInputFormat();
+   } catch (IOException e) {
+   // Ignoring
--- End diff --

Done


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
>   at 
> 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15407394#comment-15407394
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user nielsbasjes commented on a diff in the pull request:

https://github.com/apache/flink/pull/2330#discussion_r73480826
  
--- Diff: 
flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
 ---
@@ -237,7 +244,7 @@ private void logSplitInfo(String action, 
TableInputSplit split) {
 *End key of the region
 * @return true, if this region needs to be included as part of the 
input (default).
 */
-   private static boolean includeRegionInSplit(final byte[] startKey, 
final byte[] endKey) {
+   protected boolean includeRegionInSplit(final byte[] startKey, final 
byte[] endKey) {
--- End diff --

This function is according to the documentation intended so people can 
override it in a subclass. You cannot overrule a static function (and 
especially not if it is private).

http://stackoverflow.com/questions/2223386/why-doesnt-java-allow-overriding-of-static-methods


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
>   at 
> 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406293#comment-15406293
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2330
  
I don't know, and it seems the InputFormat itself doesn't know either. If 
we go by the previous implementation then yes, there will only be one table. 
However, based on the comments on Line 64: `// abstract methods allow for 
multiple table and scanners in the same job` we have to conclude that there can 
be different tables.

I'd be curious what @twalthr thinks about this.


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
>   ... 37 more
> {quote}
> I found that in the [documentation of the InputFormat 
> interface|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/io/InputFormat.html]
>  is clearly states
> {quote}IMPORTANT NOTE: Input formats must be written such that an instance 
> can be 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406284#comment-15406284
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2330#discussion_r73382033
  
--- Diff: 
flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
 ---
@@ -237,7 +244,7 @@ private void logSplitInfo(String action, 
TableInputSplit split) {
 *End key of the region
 * @return true, if this region needs to be included as part of the 
input (default).
 */
-   private static boolean includeRegionInSplit(final byte[] startKey, 
final byte[] endKey) {
+   protected boolean includeRegionInSplit(final byte[] startKey, final 
byte[] endKey) {
--- End diff --

why are you changing this?


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
>   ... 37 more
> {quote}
> I found that in the [documentation 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406281#comment-15406281
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2330#discussion_r73381826
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ---
@@ -328,7 +328,11 @@ public void run() {
synchronized (checkpointLock) {
LOG.info("Reader terminated, and 
exiting...");
 
-   this.format.closeInputFormat();
+   try {
+   this.format.closeInputFormat();
+   } catch (IOException e) {
+   // Ignoring
--- End diff --

it would be good to log the exception


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
>   at 
> 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406043#comment-15406043
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user nielsbasjes commented on the issue:

https://github.com/apache/flink/pull/2330
  
Note that this version still assumes that the single instance will only see 
multiple splits for the same table. Is that a safe assumption?


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
>   ... 37 more
> {quote}
> I found that in the [documentation of the InputFormat 
> interface|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/io/InputFormat.html]
>  is clearly states
> {quote}IMPORTANT NOTE: Input formats must be written such that an instance 
> can be opened again after it was closed. That is due to the fact that the 
> input format is used for potentially multiple splits. After a split is done, 
> the format's close function is invoked and, if another split is available, 
> the open 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15405932#comment-15405932
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2330
  
I would say yes, since `open()` and `close()` can also throw an 
`IOException`.


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
>   ... 37 more
> {quote}
> I found that in the [documentation of the InputFormat 
> interface|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/io/InputFormat.html]
>  is clearly states
> {quote}IMPORTANT NOTE: Input formats must be written such that an instance 
> can be opened again after it was closed. That is due to the fact that the 
> input format is used for potentially multiple splits. After a split is done, 
> the format's close function is invoked and, if another split is available, 
> the open function is invoked afterwards for the next split.{quote}
> It appears 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15405928#comment-15405928
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user nielsbasjes commented on the issue:

https://github.com/apache/flink/pull/2330
  
Now I see why I missed these two; They are newer than the 1.0.3 I was 
working with.
Is it a good idea to add ' throws IOException' to these two in 
RichInputFormat ?


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
>   ... 37 more
> {quote}
> I found that in the [documentation of the InputFormat 
> interface|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/io/InputFormat.html]
>  is clearly states
> {quote}IMPORTANT NOTE: Input formats must be written such that an instance 
> can be opened again after it was closed. That is due to the fact that the 
> input format is used for potentially multiple splits. After a split is done, 
> the format's close function is invoked and, if another split 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15405918#comment-15405918
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user nielsbasjes commented on the issue:

https://github.com/apache/flink/pull/2330
  
Yes, that is indeed the right place to do this. 
Bummer this method does not allow throwing exceptions.


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
>   ... 37 more
> {quote}
> I found that in the [documentation of the InputFormat 
> interface|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/io/InputFormat.html]
>  is clearly states
> {quote}IMPORTANT NOTE: Input formats must be written such that an instance 
> can be opened again after it was closed. That is due to the fact that the 
> input format is used for potentially multiple splits. After a split is done, 
> the format's close function is invoked and, if another split is available, 
> the open function is invoked afterwards for 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15405912#comment-15405912
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2330
  
maybe you can move the table initialization into `openInputFormat()` 
(called once before all splits) and close it in `closeInputFormat()` (called 
once after all splits).


> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
>   ... 37 more
> {quote}
> I found that in the [documentation of the InputFormat 
> interface|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/io/InputFormat.html]
>  is clearly states
> {quote}IMPORTANT NOTE: Input formats must be written such that an instance 
> can be opened again after it was closed. That is due to the fact that the 
> input format is used for potentially multiple splits. After a split is done, 
> the format's close function is invoked and, if another split is 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15405896#comment-15405896
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

Github user nielsbasjes commented on the issue:

https://github.com/apache/flink/pull/2330
  
Oh damn, 
I just noticed a major issue in this: In order to create the input splits 
the table needs to be available "before" the call to the 'open' method.




> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
>   ... 37 more
> {quote}
> I found that in the [documentation of the InputFormat 
> interface|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/io/InputFormat.html]
>  is clearly states
> {quote}IMPORTANT NOTE: Input formats must be written such that an instance 
> can be opened again after it was closed. That is due to the fact that the 
> input format is used for potentially multiple splits. After a split is done, 
> the format's close function is invoked and, if another split is 

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

2016-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15405893#comment-15405893
 ] 

ASF GitHub Bot commented on FLINK-4311:
---

GitHub user nielsbasjes opened a pull request:

https://github.com/apache/flink/pull/2330

FLINK-4311 Fixed several problems in TableInputFormat

Question: Do you guys want a unit test for this?
In HBase itself I have done this in the past yet this required a large 
chunk of additional software to start and stop an HBase minicluster during the 
unit tests.
I.e. pull in this thing: 

https://github.com/apache/hbase/blob/master/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/FilterTestingCluster.java
and then do something like this:

https://github.com/apache/hbase/blob/master/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestScanRowPrefix.java


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/nielsbasjes/flink FLINK-4311

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2330.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2330


commit 5c3d53c810f8df6d5544685ef3f1004c46541daf
Author: Niels Basjes 
Date:   2016-08-03T12:54:34Z

[FLINK-4311] TableInputFormat can handle reuse for next input split

commit 8696f5e257c7434d62e662c4c97f4ede2da5411b
Author: Niels Basjes 
Date:   2016-08-03T12:56:01Z

[FLINK-4311] Cannot override a static member function.




> TableInputFormat fails when reused on next split
> 
>
> Key: FLINK-4311
> URL: https://issues.apache.org/jira/browse/FLINK-4311
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>   at 
> org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155)
>   at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>   at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>   at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>   at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>   at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>   ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens