> If the call to mapResultToOutType(Result) finished without an error there is no need to restart from the same row. > The new scanner should start from the next row. > Is that so or am I missing something?
Yeah, your are right. I've filed the issue https://issues.apache.org/jira/browse/FLINK-14941 to address this bug. Thanks. On Mon, Nov 25, 2019 at 6:57 PM Mark Davis <moda...@protonmail.com> wrote: > Hi Flavio, > > > When the resultScanner dies because of a timeout (this happens a lot when >> you have backpressure and the time between 2 consecutive reads exceed the >> scanner timeout), the code creates a new scanner and restart from where it >> was (starRow = currentRow). >> So there should not be any duplicates (in theory), but this could be the >> root of the problem.. >> > > Yes, you are right, the nextRecord() exception handling is responsible for > the duplicate record processing: > > org.apache.hadoop.hbase.client.ScannerTimeoutException: 1038878ms passed > since the last invocation, timeout is currently set to 60000 > at > org.apache.hadoop.hbase.client.ClientScanner.loadCache(ClientScanner.java:453) > at > org.apache.hadoop.hbase.client.ClientScanner.next(ClientScanner.java:371) > at > org.apache.flink.addons.hbase.AbstractTableInputFormat.nextRecord(AbstractTableInputFormat.java:130) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:192) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.hadoop.hbase.UnknownScannerException: > org.apache.hadoop.hbase.UnknownScannerException: Name: 135281, already > closed? > at > org.apache.hadoop.hbase.regionserver.RSRpcServices.scan(RSRpcServices.java:2389) > at > org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:32385) > at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2150) > at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:112) > at > org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:187) > at > org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:167) > > But I am not sure that the handling of the HBase exception thrown from > ClientScanner.next() is correct. > If the call to mapResultToOutType(Result) finished without an error there > is no need to restart from the same row. > The new scanner should start from the next row. > Is that so or am I missing something? > > Best regards, > Mark > >