thebalu commented on a change in pull request #78:
URL: https://github.com/apache/bahir-flink/pull/78#discussion_r411144580



##########
File path: 
flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderIterator.java
##########
@@ -51,56 +52,63 @@ public boolean hasNext() throws KuduException {
         }
     }
 
-    public KuduRow next() {
+    public Row next() {
         RowResult row = this.rowIterator.next();
-        return toKuduRow(row);
+        return toFlinkRow(row);
     }
 
     private void nextRows() throws KuduException {
         this.rowIterator = scanner.nextRows();
     }
 
-    private KuduRow toKuduRow(RowResult row) {
+    private Row toFlinkRow(RowResult row) {
         Schema schema = row.getColumnProjection();
 
-        KuduRow values = new KuduRow(schema.getColumnCount());
+        Row values = new Row(schema.getColumnCount());
         schema.getColumns().forEach(column -> {
             String name = column.getName();
             int pos = schema.getColumnIndex(name);
-            if(row.isNull(name)) {
-                values.setField(pos, name, null);
+            if (row.isNull(name)) {
+                values.setField(pos, null);
             } else {
                 Type type = column.getType();
                 switch (type) {
                     case BINARY:
-                        values.setField(pos, name, row.getBinary(name));
+                        values.setField(pos, row.getBinary(name));
                         break;
                     case STRING:
-                        values.setField(pos, name, row.getString(name));
+                        values.setField(pos, row.getString(name));
                         break;
                     case BOOL:
-                        values.setField(pos, name, row.getBoolean(name));
+                        values.setField(pos, row.getBoolean(name));
                         break;
                     case DOUBLE:
-                        values.setField(pos, name, row.getDouble(name));
+                        values.setField(pos, row.getDouble(name));
                         break;
                     case FLOAT:
-                        values.setField(pos, name, row.getFloat(name));
+                        values.setField(pos, row.getFloat(name));
                         break;
                     case INT8:
-                        values.setField(pos, name, row.getByte(name));
+                        values.setField(pos, row.getByte(name));
                         break;
                     case INT16:
-                        values.setField(pos, name, row.getShort(name));
+                        values.setField(pos, row.getShort(name));
                         break;
                     case INT32:
-                        values.setField(pos, name, row.getInt(name));
+                        values.setField(pos, row.getInt(name));
                         break;
                     case INT64:
-                        values.setField(pos, name, row.getLong(name));
+                        values.setField(pos, row.getLong(name));
                         break;
                     case UNIXTIME_MICROS:
-                        values.setField(pos, name, row.getLong(name) / 1000);
+                        try {
+                            values.setField(pos, row.getTimestamp(name));
+                        } catch (Exception e) {

Review comment:
       This is actually not necessary, especially if this is simplified (above 
comment)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to