> If the call to mapResultToOutType(Result) finished without an error there
is no need to restart from the same row.
> The new scanner should start from the next row.
> Is that so or am I missing something?

Yeah, your are right. I've filed the issue
https://issues.apache.org/jira/browse/FLINK-14941 to address this bug.
Thanks.


On Mon, Nov 25, 2019 at 6:57 PM Mark Davis <moda...@protonmail.com> wrote:

> Hi Flavio,
>
>
> When the resultScanner dies because of a timeout (this happens a lot when
>> you have backpressure and the time between 2 consecutive reads exceed the
>> scanner timeout), the code creates a new scanner and restart from where it
>> was (starRow = currentRow).
>> So there should not be any duplicates (in theory), but this could be the
>> root of the problem..
>>
>
> Yes, you are right, the nextRecord() exception handling is responsible for
> the duplicate record processing:
>
> org.apache.hadoop.hbase.client.ScannerTimeoutException: 1038878ms passed
> since the last invocation, timeout is currently set to 60000
> at
> org.apache.hadoop.hbase.client.ClientScanner.loadCache(ClientScanner.java:453)
> at
> org.apache.hadoop.hbase.client.ClientScanner.next(ClientScanner.java:371)
> at
> org.apache.flink.addons.hbase.AbstractTableInputFormat.nextRecord(AbstractTableInputFormat.java:130)
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:192)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.hadoop.hbase.UnknownScannerException:
> org.apache.hadoop.hbase.UnknownScannerException: Name: 135281, already
> closed?
> at
> org.apache.hadoop.hbase.regionserver.RSRpcServices.scan(RSRpcServices.java:2389)
> at
> org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:32385)
> at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2150)
> at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:112)
> at
> org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:187)
> at
> org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:167)
>
> But I am not sure that the handling of the HBase exception thrown from
> ClientScanner.next() is correct.
> If the call to mapResultToOutType(Result) finished without an error there
> is no need to restart from the same row.
> The new scanner should start from the next row.
> Is that so or am I missing something?
>
> Best regards,
>   Mark
>
>

Reply via email to