collabH commented on code in PR #149:
URL: https://github.com/apache/bahir-flink/pull/149#discussion_r895434557


##########
flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/AbstractKuduInputFormat.java:
##########
@@ -42,39 +43,42 @@
 
 /**
  * Input format for reading the contents of a Kudu table (defined by the 
provided {@link KuduTableInfo}) in both batch
- * and stream programs. Rows of the Kudu table are mapped to {@link Row} 
instances that can converted to other data
+ * and stream programs. Rows of the Kudu table are mapped to {@link T} 
instances that can converted to other data
  * types by the user later if necessary.
  *
- * <p> For programmatic access to the schema of the input rows users can use 
the {@link org.apache.flink.connectors.kudu.table.KuduCatalog}
+ * <p> For programmatic access to the schema of the input rows users can use 
the {@link KuduCatalog}
  * or overwrite the column order manually by providing a list of projected 
column names.
  */
 @PublicEvolving
-public class KuduRowInputFormat extends RichInputFormat<Row, KuduInputSplit> {
+public abstract class AbstractKuduInputFormat<T> extends RichInputFormat<T, 
KuduInputSplit> implements ResultTypeQueryable<T> {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private final KuduReaderConfig readerConfig;
     private final KuduTableInfo tableInfo;
-
-    private List<KuduFilterInfo> tableFilters;
-    private List<String> tableProjections;
-
+    private final List<KuduFilterInfo> tableFilters;
+    private final List<String> tableProjections;
+    private final RowResultConvertor<T> rowResultConvertor;
     private boolean endReached;
+    private transient KuduReader<T> kuduReader;
+    private transient KuduReaderIterator<T> resultIterator;
 
-    private transient KuduReader kuduReader;
-    private transient KuduReaderIterator resultIterator;
-
-    public KuduRowInputFormat(KuduReaderConfig readerConfig, KuduTableInfo 
tableInfo) {
-        this(readerConfig, tableInfo, new ArrayList<>(), null);
+    public AbstractKuduInputFormat(KuduReaderConfig readerConfig, 
RowResultConvertor<T> rowResultConvertor,
+                                   KuduTableInfo tableInfo) {
+        this(readerConfig, rowResultConvertor, tableInfo, new ArrayList<>(), 
null);
     }
 
-    public KuduRowInputFormat(KuduReaderConfig readerConfig, KuduTableInfo 
tableInfo, List<String> tableProjections) {
-        this(readerConfig, tableInfo, new ArrayList<>(), tableProjections);
+    public AbstractKuduInputFormat(KuduReaderConfig readerConfig, 
RowResultConvertor<T> rowResultConvertor,
+                                   KuduTableInfo tableInfo, List<String> 
tableProjections) {
+        this(readerConfig, rowResultConvertor, tableInfo, new ArrayList<>(), 
tableProjections);
     }
 
-    public KuduRowInputFormat(KuduReaderConfig readerConfig, KuduTableInfo 
tableInfo, List<KuduFilterInfo> tableFilters, List<String> tableProjections) {
+    public AbstractKuduInputFormat(KuduReaderConfig readerConfig, 
RowResultConvertor<T> rowResultConvertor,
+                                   KuduTableInfo tableInfo, 
List<KuduFilterInfo> tableFilters,
+                                   List<String> tableProjections) {
 
         this.readerConfig = checkNotNull(readerConfig, "readerConfig could not 
be null");
+        this.rowResultConvertor = checkNotNull(rowResultConvertor, 
"readerConfig could not be null");

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to