Maybe the problem is indeed this..the fact that the scan starts from the
last seen row..in this case maybe the first result should be skipped
because it was already read..

On Mon, Nov 25, 2019 at 10:22 AM Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> What I can tell is how the HBase input format works..if you look
> at AbstractTableInputFormat [1] this is the nextRecord() function:
>
>         public T nextRecord(T reuse) throws IOException {
>               if (resultScanner == null) {
>                       throw new IOException("No table result scanner 
> provided!");
>               }
>               try {
>                       Result res = resultScanner.next();
>                       if (res != null) {
>                               scannedRows++;
>                               currentRow = res.getRow();
>                               return mapResultToOutType(res);
>                       }
>               } catch (Exception e) {
>                       resultScanner.close();
>                       //workaround for timeout on scan
>                       LOG.warn("Error after scan of " + scannedRows + " rows. 
> Retry with a new scanner...", e);
>                       scan.setStartRow(currentRow);
>                       resultScanner = table.getScanner(scan);
>                       Result res = resultScanner.next();
>                       if (res != null) {
>                               scannedRows++;
>                               currentRow = res.getRow();
>                               return mapResultToOutType(res);
>                       }
>               }
>
>               endReached = true;
>               return null;
>       }
>
> When the resultScanner dies because of a timeout (this happens a lot when
> you have backpressure and the time between 2 consecutive reads exceed the
> scanner timeout), the code creates a new scanner and restart from where it
> was (starRow = currentRow).
> So there should not be any duplicates (in theory), but this could be the
> root of the problem..
>
> Best,
> Flavio
>
> [1]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java
>
> On Sat, Nov 23, 2019 at 11:07 PM Mark Davis <moda...@protonmail.com>
> wrote:
>
>> Hello,
>>
>> I am reading Results from an HBase table and process them with Batch API.
>> Everything works fine until I receive a ScannerTimeoutException from HBase.
>> Maybe my transformations get stuck or a GC pause happen - hard to tell.
>> The HBase Client restarts the scan and the processing continues.
>> Except one problem - every time I receive this Exception I observe a
>> duplicate Result processing - the Result which was processed just before
>> ScannerTimeoutException is thrown is processed twice.
>>
>> Is this expected behavior? Should I be prepared to handle it?
>> And how should I handle it? Keeping track of all processed Results is not
>> feasible in my case.
>>
>> Here is a simple job demonstrating an issue (HBase scan and RPC timeouts
>> are set to 60 sec)
>>
>> Thank you!
>>
>> Best regards,
>> Mark
>>
>>
>>   public static void main(String[] args) throws Exception {
>>     ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>>     env.setParallelism(1);
>>
>>     env.createInput(new Src())
>>         .map(new Mapper())
>>         .print();
>>   }
>>
>>   private static class Mapper implements MapFunction<Tuple1<String>,
>> String> {
>>
>>     private int cnt = 0;
>>
>>     @Override
>>     public String map(Tuple1<String> value) throws Exception {
>>       if (cnt++ % 2 == 0) {
>>         Thread.sleep(120000);
>>       }
>>       return value.f0;
>>     }
>>
>>   }
>>
>>   private static class Src extends
>> AbstractTableInputFormat<Tuple1<String>> {
>>
>>     @Override
>>     protected Scan getScanner() {
>>       Scan scan = new Scan();
>>       scan.setStartRow(getStartRow());
>>       scan.setStopRow(getEndRow());
>>       scan.setCaching(1);
>>       scan.setCacheBlocks(false);
>>       return scan;
>>     }
>>
>>     @Override
>>     protected String getTableName() {
>>       return getTable();
>>     }
>>
>>     @Override
>>     protected Tuple1<String> mapResultToOutType(Result r) {
>>       return new Tuple1<String>(Bytes.toString(r.getRow()));
>>     }
>>
>>     @Override
>>     public void configure(org.apache.flink.configuration.Configuration
>> parameters) {
>>       scan = getScanner();
>>       try {
>>         table = new HTable(getHadoopConf(), getTableName());
>>       } catch (IOException e) {
>>         e.printStackTrace();
>>       }
>>     }
>>
>>   }
>>
>
>

Reply via email to