Hi Josh, I think I found the root cause of this issue (please see my comment in https://issues.apache.org/jira/browse/FLINK-12399). As of now, you can try override the expalinSource() interface to let calcite know that the tablesource after calling applyPredicate is different from the one before calling the function.
Let me know if this works for you :-) Thanks, Rong On Fri, May 3, 2019 at 1:03 PM Josh Bradt <josh.br...@klaviyo.com> wrote: > Hi Fabian, > > Thanks for taking a look. I've filed this ticket: > https://issues.apache.org/jira/browse/FLINK-12399 > > Thanks, > > Josh > > On Fri, May 3, 2019 at 3:41 AM Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Josh, >> >> The code looks good to me. >> This seems to be a bug then. >> It's strange that it works for ORC. >> >> Would you mind opening a Jira ticket and maybe a simple reproducable code >> example? >> >> Thank you, >> Fabian >> >> Am Do., 2. Mai 2019 um 18:23 Uhr schrieb Josh Bradt < >> josh.br...@klaviyo.com>: >> >>> Hi Fabian, >>> >>> Thanks for your reply. My custom table source does not implement >>> ProjectableTableSource. I believe that isFilterPushedDown is implemented >>> correctly since it's nearly identical to what's written in the >>> OrcTableSource. I pasted a slightly simplified version of the >>> implementation below. If you wouldn't mind reading over it, is there >>> anything obviously wrong? >>> >>> public final class CustomerTableSource implements >>> BatchTableSource<Customer>, >>> FilterableTableSource<Customer> { >>> >>> // Iterator that gets data from a REST API as POJO instances >>> private final AppResourceIterator<Customer> resourceIterator; >>> private final String tableName; >>> private final Class<Customer> modelClass; >>> private final AppRequestFilter[] filters; >>> >>> public CustomerTableSource( >>> AppResourceIterator<Customer> resourceIterator, >>> String tableName, >>> Class<Customer> modelClass) { >>> >>> this(resourceIterator, tableName, modelClass, null); >>> } >>> >>> protected CustomerTableSource( >>> AppResourceIterator<Customer> resourceIterator, >>> String tableName, >>> Class<Customer> modelClass, >>> AppRequestFilter[] filters) { >>> >>> this.resourceIterator = resourceIterator; >>> this.tableName = tableName; >>> this.modelClass = modelClass; >>> this.filters = filters; >>> } >>> >>> @Override >>> public TableSource<Customer> applyPredicate(List<Expression> >>> predicates) { >>> List<Expression> acceptedPredicates = new ArrayList<>(); >>> List<AppRequestFilter> acceptedFilters = new ArrayList<>(); >>> >>> for (final Expression predicate : predicates) { >>> buildFilterForPredicate(predicate).ifPresent(filter -> { >>> acceptedFilters.add(filter); >>> acceptedPredicates.add(predicate); >>> }); >>> } >>> >>> predicates.removeAll(acceptedPredicates); >>> >>> return new CustomerTableSource( >>> resourceIterator.withFilters(acceptedFilters), >>> tableName, >>> modelClass, >>> acceptedFilters.toArray(new AppRequestFilter[0]) >>> ); >>> } >>> >>> public Optional<AppRequestFilter> buildFilterForPredicate(Expression >>> predicate) { >>> // Code for translating an Expression into an AppRequestFilter >>> // Returns Optional.empty() for predicates we don't want to / can't >>> apply >>> } >>> >>> @Override >>> public boolean isFilterPushedDown() { >>> return filters != null; >>> } >>> >>> @Override >>> public DataSet<Customer> getDataSet(ExecutionEnvironment execEnv) { >>> return execEnv.fromCollection(resourceIterator, modelClass); >>> } >>> >>> @Override >>> public TypeInformation<Customer> getReturnType() { >>> return TypeInformation.of(modelClass); >>> } >>> >>> @Override >>> public TableSchema getTableSchema() { >>> return TableSchema.fromTypeInfo(getReturnType()); >>> } >>> } >>> >>> >>> Thanks, >>> >>> Josh >>> >>> On Thu, May 2, 2019 at 3:42 AM Fabian Hueske <fhue...@gmail.com> wrote: >>> >>>> Hi Josh, >>>> >>>> Does your TableSource also implement ProjectableTableSource? >>>> If yes, you need to make sure that the filter information is also >>>> forwarded if ProjectableTableSource.projectFields() is called after >>>> FilterableTableSource.applyPredicate(). >>>> Also make sure to correctly implement >>>> FilterableTableSource.isFilterPushedDown(). >>>> >>>> Hope this helps, >>>> Fabian >>>> >>>> Am Di., 30. Apr. 2019 um 22:29 Uhr schrieb Josh Bradt < >>>> josh.br...@klaviyo.com>: >>>> >>>>> Hi all, >>>>> >>>>> I'm trying to implement filter push-down on a custom BatchTableSource >>>>> that retrieves data from a REST API and returns it as POJO instances. I've >>>>> implemented FilterableTableSource as described in the docs, returning a >>>>> new >>>>> instance of my table source containing the predicates that I've removed >>>>> from the list of predicates passed into applyPredicate. However, when >>>>> getDataSet is eventually called, it's called on the instance of the table >>>>> source that was originally registered with the table environment, which >>>>> does not have any filters in it. I've stepped through the code in a >>>>> debugger, and applyPredicates is definitely being called, and it's >>>>> definitely returning new instances of my table source, but they don't seem >>>>> to be being used. >>>>> >>>>> I also played with the OrcTableSource, which is the only example of a >>>>> push-down filter implementation I could find, and it doesn't behave this >>>>> way. When I set a breakpoint in getDataSet in that case, it's being called >>>>> on one of the new instances of the table source that contains the accepted >>>>> filters. >>>>> >>>>> Are there any other requirements for implementing push-down filters >>>>> that aren't listed in the docs? Or does anyone have any tips for this? >>>>> >>>>> Thanks, >>>>> >>>>> Josh >>>>> >>>>> -- >>>>> Josh Bradt >>>>> Software Engineer >>>>> 225 Franklin St, Boston, MA 02110 >>>>> klaviyo.com <https://www.klaviyo.com> >>>>> [image: Klaviyo Logo] >>>>> >>>> >>> >>> -- >>> Josh Bradt >>> Software Engineer >>> 225 Franklin St, Boston, MA 02110 >>> klaviyo.com <https://www.klaviyo.com> >>> [image: Klaviyo Logo] >>> >> > > -- > Josh Bradt > Software Engineer > 225 Franklin St, Boston, MA 02110 > klaviyo.com <https://www.klaviyo.com> > [image: Klaviyo Logo] >