gyfora commented on a change in pull request #82:
URL: https://github.com/apache/bahir-flink/pull/82#discussion_r430908058



##########
File path: 
flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
##########
@@ -109,12 +156,167 @@ public boolean isLimitPushedDown() {
         for (int i = 0; i < ints.length; i++) {
             fieldNames[i] = prevFieldNames.get(ints[i]);
         }
-        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, 
fieldNames);
+        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, 
predicates, fieldNames);
+    }
+
+    @Override
+    public TableSource<Row> applyPredicate(List<Expression> predicates) {
+        // try to convert Flink filter expressions to Kudu Filter Info
+        List<KuduFilterInfo> kuduPredicates = new 
ArrayList<>(predicates.size());
+        List<Expression> unsupportedExpressions = new 
ArrayList<>(predicates.size());
+        for (Expression pred : predicates) {
+            KuduFilterInfo kuduPred = toKuduFilterInfo(pred);
+            if (kuduPred != null) {
+                LOG.info("Predicate [{}] converted into KuduFilterInfo [{}] 
and pushed into " +
+                    "KuduTable [{}].", pred, kuduPred, tableInfo.getName());
+                kuduPredicates.add(kuduPred);
+            } else {
+                unsupportedExpressions.add(pred);
+                LOG.info("Predicate [{}] could not be pushed into 
KuduFilterInfo for KuduTable [{}].",
+                    pred, tableInfo.getName());
+            }
+        }
+        // update list of Flink expressions to unsupported expressions
+        predicates.clear();
+        predicates.addAll(unsupportedExpressions);
+        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, 
kuduPredicates, projectedFields);
+    }
+
+    /**
+     * Converts Flink Expression to KuduFilterInfo.
+     */
+    @Nullable
+    private KuduFilterInfo toKuduFilterInfo(Expression predicate) {

Review comment:
       Could we move all the logic translating between Expression to 
KuduFilterInfo to a Utility class? That would leave the source cleaner.

##########
File path: 
flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
##########
@@ -17,55 +17,106 @@
 
 package org.apache.flink.connectors.kudu.table;
 
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.connectors.kudu.batch.KuduRowInputFormat;
+import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
 import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.planner.expressions.Attribute;
+import org.apache.flink.table.planner.expressions.BinaryComparison;
+import org.apache.flink.table.planner.expressions.EqualTo;
+import org.apache.flink.table.planner.expressions.GreaterThan;
+import org.apache.flink.table.planner.expressions.GreaterThanOrEqual;
+import org.apache.flink.table.planner.expressions.IsNotNull;
+import org.apache.flink.table.planner.expressions.IsNull;
+import org.apache.flink.table.planner.expressions.LessThan;
+import org.apache.flink.table.planner.expressions.LessThanOrEqual;
+import org.apache.flink.table.planner.expressions.Literal;
+import org.apache.flink.table.planner.expressions.UnaryExpression;
+import org.apache.flink.table.sources.FilterableTableSource;
+import org.apache.flink.table.sources.InputFormatTableSource;
 import org.apache.flink.table.sources.LimitableTableSource;
 import org.apache.flink.table.sources.ProjectableTableSource;
-import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.types.Row;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
-public class KuduTableSource implements StreamTableSource<Row>, 
LimitableTableSource<Row>, ProjectableTableSource<Row> {
+public class KuduTableSource extends InputFormatTableSource<Row> implements
+    LimitableTableSource<Row>, ProjectableTableSource<Row>, 
FilterableTableSource<Row> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KuduTableSource.class);
+
+    @SuppressWarnings("unchecked")
+    private static final Set<BasicTypeInfo> VALID_LITERAL_TYPE = new HashSet() 
{{

Review comment:
       Should probably go to a Utility class with all the filter logic

##########
File path: 
flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.table;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class KuduTableSourceTest extends KuduTestBase {
+    private BatchTableEnvironment tableEnv;
+    private KuduCatalog catalog;
+
+    @BeforeEach
+    public void init() {
+        KuduTableInfo tableInfo = booksTableInfo("books", true);
+        setUpDatabase(tableInfo);
+        ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+        tableEnv = 
KuduTableTestUtils.createBatchTableEnvWithBlinkPlannerBatchMode(env);
+        catalog = new KuduCatalog(harness.getMasterAddressesAsString());
+        tableEnv.registerCatalog("kudu", catalog);
+        tableEnv.useCatalog("kudu");
+    }
+
+    @Test
+    public void testFullScan() throws Exception {
+        Table table = tableEnv.sqlQuery("select * from books order by id");
+        DataSet<Row> dataSet = tableEnv.toDataSet(table, Row.class);
+        List<Row> result = dataSet.collect();
+        // check result
+        assertEquals(5, result.size());
+        assertEquals("1001,Java for dummies,Tan Ah Teck,11.11,11",
+            result.get(0).toString());
+        tableEnv.sqlUpdate("DROP TABLE books");
+    }
+
+    @Test
+    public void testScanWithProjectionAndFilter() throws Exception {

Review comment:
       With this test we cannot be sure that the filters were actually pushed 
down and it works correctly.
   
   We should also add a unit test style test that validates the KuduFilterInfos 
created and the KuduTableSource directly.
   Things like the applyPredicate or isFilterPushed down and all the rest.
   
   The problem here is that we add a lot of different filtering logic that can 
easily break the output of SQL queries if incorrect, so we really want to make 
sure that it is tested.

##########
File path: 
flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
##########
@@ -109,12 +156,167 @@ public boolean isLimitPushedDown() {
         for (int i = 0; i < ints.length; i++) {
             fieldNames[i] = prevFieldNames.get(ints[i]);
         }
-        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, 
fieldNames);
+        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, 
predicates, fieldNames);
+    }
+
+    @Override
+    public TableSource<Row> applyPredicate(List<Expression> predicates) {
+        // try to convert Flink filter expressions to Kudu Filter Info
+        List<KuduFilterInfo> kuduPredicates = new 
ArrayList<>(predicates.size());
+        List<Expression> unsupportedExpressions = new 
ArrayList<>(predicates.size());
+        for (Expression pred : predicates) {
+            KuduFilterInfo kuduPred = toKuduFilterInfo(pred);
+            if (kuduPred != null) {
+                LOG.info("Predicate [{}] converted into KuduFilterInfo [{}] 
and pushed into " +
+                    "KuduTable [{}].", pred, kuduPred, tableInfo.getName());
+                kuduPredicates.add(kuduPred);
+            } else {
+                unsupportedExpressions.add(pred);
+                LOG.info("Predicate [{}] could not be pushed into 
KuduFilterInfo for KuduTable [{}].",
+                    pred, tableInfo.getName());
+            }
+        }
+        // update list of Flink expressions to unsupported expressions
+        predicates.clear();

Review comment:
       Tha javadocs of applyPredicate specifies that we should remove the 
applicable expressions from the list only.
   I know it's not a large difference but maybe it would be better to use an 
iterator and remove the expression if pushed down.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to