[ 
https://issues.apache.org/jira/browse/FLINK-12399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833107#comment-16833107
 ] 

Rong Rong edited comment on FLINK-12399 at 9/4/19 4:28 PM:
-----------------------------------------------------------

Hi [~josh.bradt]. I think I found the root cause of this issue.

Apparently you have to override the method {{explainSource}} in order to let 
calcite know that the new created TableSource with filter pushedDown is 
different from the original created CustomeTableSource (where you have not 
applyPredicates).
I think this might be related to the #4 changelog point 
https://github.com/apache/flink/pull/8324 when I try upgrading to CALCITE 
1.19.0 I also encounter some weird issues where calcite tries to find the 
correct tablesource from the digest strings. 

I will assigned to myself and start looking into this issue. Please let me know 
if adding the override resolves your issue at this moment.


was (Author: walterddr):
Hi [~josh.bradt]. I think I found the root cause of this issue.

Apparently you have to override the method {{explainSource}} in order to let 
calcite know that the new created TableSource with filter pushedDown is 
different from the original created CustomeTableSource (where you have not 
applyPredicates).
I think this might be related to the #4 changelog point 
https://github.com/apache/flink/pull/8324: when I try upgrading to CALCITE 
1.19.0 I also encounter some weird issues where calcite tries to find the 
correct tablesource from the digest strings. 

I will assigned to myself and start looking into this issue. Please let me know 
if adding the override resolves your issue at this moment.

> FilterableTableSource does not use filters on job run
> -----------------------------------------------------
>
>                 Key: FLINK-12399
>                 URL: https://issues.apache.org/jira/browse/FLINK-12399
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.8.0
>            Reporter: Josh Bradt
>            Assignee: Rong Rong
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: flink-filter-bug.tar.gz
>
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> As discussed [on the mailing 
> list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Filter-push-down-not-working-for-a-custom-BatchTableSource-tp27654.html],
>  there appears to be a bug where a job that uses a custom 
> FilterableTableSource does not keep the filters that were pushed down into 
> the table source. More specifically, the table source does receive filters 
> via applyPredicates, and a new table source with those filters is returned, 
> but the final job graph appears to use the original table source, which does 
> not contain any filters.
> I attached a minimal example program to this ticket. The custom table source 
> is as follows: 
> {code:java}
> public class CustomTableSource implements BatchTableSource<Model>, 
> FilterableTableSource<Model> {
>     private static final Logger LOG = 
> LoggerFactory.getLogger(CustomTableSource.class);
>     private final Filter[] filters;
>     private final FilterConverter converter = new FilterConverter();
>     public CustomTableSource() {
>         this(null);
>     }
>     private CustomTableSource(Filter[] filters) {
>         this.filters = filters;
>     }
>     @Override
>     public DataSet<Model> getDataSet(ExecutionEnvironment execEnv) {
>         if (filters == null) {
>            LOG.info("==== No filters defined ====");
>         } else {
>             LOG.info("==== Found filters ====");
>             for (Filter filter : filters) {
>                 LOG.info("FILTER: {}", filter);
>             }
>         }
>         return execEnv.fromCollection(allModels());
>     }
>     @Override
>     public TableSource<Model> applyPredicate(List<Expression> predicates) {
>         LOG.info("Applying predicates");
>         List<Filter> acceptedFilters = new ArrayList<>();
>         for (final Expression predicate : predicates) {
>             converter.convert(predicate).ifPresent(acceptedFilters::add);
>         }
>         return new CustomTableSource(acceptedFilters.toArray(new Filter[0]));
>     }
>     @Override
>     public boolean isFilterPushedDown() {
>         return filters != null;
>     }
>     @Override
>     public TypeInformation<Model> getReturnType() {
>         return TypeInformation.of(Model.class);
>     }
>     @Override
>     public TableSchema getTableSchema() {
>         return TableSchema.fromTypeInfo(getReturnType());
>     }
>     private List<Model> allModels() {
>         List<Model> models = new ArrayList<>();
>         models.add(new Model(1, 2, 3, 4));
>         models.add(new Model(10, 11, 12, 13));
>         models.add(new Model(20, 21, 22, 23));
>         return models;
>     }
> }
> {code}
>  
> When run, it logs
> {noformat}
> 15:24:54,888 INFO  com.klaviyo.filterbug.CustomTableSource                    
>    - Applying predicates
> 15:24:54,901 INFO  com.klaviyo.filterbug.CustomTableSource                    
>    - Applying predicates
> 15:24:54,910 INFO  com.klaviyo.filterbug.CustomTableSource                    
>    - Applying predicates
> 15:24:54,977 INFO  com.klaviyo.filterbug.CustomTableSource                    
>    - ==== No filters defined ===={noformat}
> which appears to indicate that although filters are getting pushed down, the 
> final job does not use them.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to