>From Peeyush Gupta <[email protected]>: Peeyush Gupta has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19472 )
Change subject: [ASTERIXDB-3578][EXT] Error with query on delta table with IN predicate ...................................................................... [ASTERIXDB-3578][EXT] Error with query on delta table with IN predicate - user model changes: no - storage format changes: no - interface changes: no Ext-ref: MB-65533 Change-Id: I7177a10da9145a8de862991b3e57213138dae9b9 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19472 Integration-Tests: Jenkins <[email protected]> Reviewed-by: Peeyush Gupta <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> Tested-by: Jenkins <[email protected]> --- A asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.5.adm M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/DeltaTableFilterBuilder.java A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.09.query.sqlpp M asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm A asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.3.adm A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.07.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.08.query.sqlpp M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java A asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.7.adm A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.06.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.9.adm A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.03.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.04.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.6.adm A asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.1.adm A asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.4.adm A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.05.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.8.adm M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.02.query.sqlpp 20 files changed, 322 insertions(+), 30 deletions(-) Approvals: Ali Alsuliman: Looks good to me, approved Peeyush Gupta: Looks good to me, but someone else must approve Jenkins: Verified; Verified Anon. E. Moose #1000171: diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.02.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.02.query.sqlpp new file mode 100644 index 0000000..8e3b6b1 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.02.query.sqlpp @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + USE test; + + SELECT element ds FROM DeltalakeDataset as ds WHERE ds.date = "01-01-2025" AND ds.name IN ["Order 1", "Order 3", "Order 4"] order by ds.id; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.03.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.03.query.sqlpp new file mode 100644 index 0000000..71167a5 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.03.query.sqlpp @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + USE test; + + SELECT element ds FROM DeltalakeDataset as ds WHERE ds.date = "01-01-2025" and ds.hour in [10, 16, 18] order by ds.id; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.04.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.04.query.sqlpp new file mode 100644 index 0000000..3bddae5 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.04.query.sqlpp @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + USE test; + + SELECT element ds FROM DeltalakeDataset as ds WHERE ds.date = "01-01-2025" or ds.date="01-02-2025" order by ds.id; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.05.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.05.query.sqlpp new file mode 100644 index 0000000..8620657 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.05.query.sqlpp @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + USE test; + + SELECT element ds FROM DeltalakeDataset as ds + WHERE (ds.date > "01-02-2025" and ds.hour = 10) or (ds.date < "01-02-2025" and ds.hour = 15) + order by ds.id; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.06.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.06.query.sqlpp new file mode 100644 index 0000000..7b88e1f --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.06.query.sqlpp @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + USE test; + +SELECT ds.id as id1, ds2.id as id2 FROM DeltalakeDataset as ds, DeltalakeDataset as ds2 +WHERE ds.hour = ds2.hour and ds.date = "01-01-2025" and ds2.date is not null +order by ds.id, ds2.id; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.07.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.07.query.sqlpp new file mode 100644 index 0000000..6478788 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.07.query.sqlpp @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + USE test; + + SELECT element ds FROM DeltalakeDataset as ds + WHERE (ds.date > "01-02-2025" and (ds.hour = 10 or ds.hour=16 or ds.hour=12) and ds.id>1) or (ds.date < "01-02-2025" and ds.hour = 15) + order by ds.id; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.08.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.08.query.sqlpp new file mode 100644 index 0000000..eaf58cd --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.08.query.sqlpp @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + USE test; + + SELECT element ds FROM DeltalakeDataset as ds + WHERE (ds.date >= "01-02-2025" and (ds.hour = 10 or ds.hour=16 or ds.hour=12) and ds.id>1) or + (ds.date < "01-02-2025" and ds.hour = 15) or ds.date =10 + order by ds.id; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.09.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.09.query.sqlpp new file mode 100644 index 0000000..1a9dd7c --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.09.query.sqlpp @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + USE test; + + SELECT element ds FROM DeltalakeDataset as ds + WHERE ds.date = 10 or ds.date = "01-01-2025" + order by ds.id; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.1.adm new file mode 100644 index 0000000..86d41fe --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.1.adm @@ -0,0 +1,5 @@ +{ "id": 0, "name": "Order 1", "date": "01-01-2025", "hour": 10 } +{ "id": 1, "name": "Order 2", "date": "01-01-2025", "hour": 10 } +{ "id": 2, "name": "Order 3", "date": "01-01-2025", "hour": 10 } +{ "id": 3, "name": "Order 10", "date": "01-01-2025", "hour": 15 } +{ "id": 4, "name": "Order 11", "date": "01-01-2025", "hour": 15 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm index 86d41fe..cf5ecd4 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm @@ -1,5 +1,2 @@ { "id": 0, "name": "Order 1", "date": "01-01-2025", "hour": 10 } -{ "id": 1, "name": "Order 2", "date": "01-01-2025", "hour": 10 } -{ "id": 2, "name": "Order 3", "date": "01-01-2025", "hour": 10 } -{ "id": 3, "name": "Order 10", "date": "01-01-2025", "hour": 15 } -{ "id": 4, "name": "Order 11", "date": "01-01-2025", "hour": 15 } \ No newline at end of file +{ "id": 2, "name": "Order 3", "date": "01-01-2025", "hour": 10 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.3.adm new file mode 100644 index 0000000..00ba142 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.3.adm @@ -0,0 +1,3 @@ +{ "id": 0, "name": "Order 1", "date": "01-01-2025", "hour": 10 } +{ "id": 1, "name": "Order 2", "date": "01-01-2025", "hour": 10 } +{ "id": 2, "name": "Order 3", "date": "01-01-2025", "hour": 10 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.4.adm new file mode 100644 index 0000000..b02a62d --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.4.adm @@ -0,0 +1,9 @@ +{ "id": 0, "name": "Order 1", "date": "01-01-2025", "hour": 10 } +{ "id": 1, "name": "Order 2", "date": "01-01-2025", "hour": 10 } +{ "id": 2, "name": "Order 3", "date": "01-01-2025", "hour": 10 } +{ "id": 3, "name": "Order 10", "date": "01-01-2025", "hour": 15 } +{ "id": 4, "name": "Order 11", "date": "01-01-2025", "hour": 15 } +{ "id": 5, "name": "Order 21", "date": "01-02-2025", "hour": 12 } +{ "id": 6, "name": "Order 22", "date": "01-02-2025", "hour": 12 } +{ "id": 7, "name": "Order 30", "date": "01-02-2025", "hour": 16 } +{ "id": 8, "name": "Order 31", "date": "01-02-2025", "hour": 16 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.5.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.5.adm new file mode 100644 index 0000000..6fd91f1 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.5.adm @@ -0,0 +1,2 @@ +{ "id": 3, "name": "Order 10", "date": "01-01-2025", "hour": 15 } +{ "id": 4, "name": "Order 11", "date": "01-01-2025", "hour": 15 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.6.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.6.adm new file mode 100644 index 0000000..5bfd257 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.6.adm @@ -0,0 +1,13 @@ +{ "id1": 0, "id2": 0 } +{ "id1": 0, "id2": 1 } +{ "id1": 0, "id2": 2 } +{ "id1": 1, "id2": 0 } +{ "id1": 1, "id2": 1 } +{ "id1": 1, "id2": 2 } +{ "id1": 2, "id2": 0 } +{ "id1": 2, "id2": 1 } +{ "id1": 2, "id2": 2 } +{ "id1": 3, "id2": 3 } +{ "id1": 3, "id2": 4 } +{ "id1": 4, "id2": 3 } +{ "id1": 4, "id2": 4 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.7.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.7.adm new file mode 100644 index 0000000..6fd91f1 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.7.adm @@ -0,0 +1,2 @@ +{ "id": 3, "name": "Order 10", "date": "01-01-2025", "hour": 15 } +{ "id": 4, "name": "Order 11", "date": "01-01-2025", "hour": 15 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.8.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.8.adm new file mode 100644 index 0000000..e943ba7 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.8.adm @@ -0,0 +1,6 @@ +{ "id": 3, "name": "Order 10", "date": "01-01-2025", "hour": 15 } +{ "id": 4, "name": "Order 11", "date": "01-01-2025", "hour": 15 } +{ "id": 5, "name": "Order 21", "date": "01-02-2025", "hour": 12 } +{ "id": 6, "name": "Order 22", "date": "01-02-2025", "hour": 12 } +{ "id": 7, "name": "Order 30", "date": "01-02-2025", "hour": 16 } +{ "id": 8, "name": "Order 31", "date": "01-02-2025", "hour": 16 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.9.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.9.adm new file mode 100644 index 0000000..86d41fe --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.9.adm @@ -0,0 +1,5 @@ +{ "id": 0, "name": "Order 1", "date": "01-01-2025", "hour": 10 } +{ "id": 1, "name": "Order 2", "date": "01-01-2025", "hour": 10 } +{ "id": 2, "name": "Order 3", "date": "01-01-2025", "hour": 10 } +{ "id": 3, "name": "Order 10", "date": "01-01-2025", "hour": 15 } +{ "id": 4, "name": "Order 11", "date": "01-01-2025", "hour": 15 } \ No newline at end of file diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java index 3c998a5..4e902b9 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java @@ -62,6 +62,7 @@ import io.delta.kernel.data.Row; import io.delta.kernel.defaults.engine.DefaultEngine; import io.delta.kernel.engine.Engine; +import io.delta.kernel.exceptions.KernelEngineException; import io.delta.kernel.exceptions.KernelException; import io.delta.kernel.expressions.Expression; import io.delta.kernel.expressions.Predicate; @@ -107,7 +108,7 @@ Snapshot snapshot; try { snapshot = table.getLatestSnapshot(engine); - } catch (KernelException e) { + } catch (KernelException | KernelEngineException e) { LOGGER.info("Failed to get latest snapshot for table: {}", tableMetadataPath, e); throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e)); } @@ -136,30 +137,36 @@ scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build(); } scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine)); - CloseableIterator<FilteredColumnarBatch> iter = scan.getScanFiles(engine); + List<Row> scanFiles; + try { + scanFiles = getScanFiles(scan, engine); + } catch (UnsupportedOperationException | IllegalStateException e) { + // Delta kernel API failed to apply expression due to type mismatch. + // We need to fall back to skip applying the filter and return all files. + LOGGER.info("Exception encountered while getting delta table files to scan {}", e.getMessage()); + scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build(); + scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine)); + scanFiles = getScanFiles(scan, engine); + } + LOGGER.info("Number of delta table parquet data files to scan: {}", scanFiles.size()); + locationConstraints = getPartitions(appCtx); + configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA); + distributeFiles(scanFiles, getPartitionConstraint().getLocations().length); + issueWarnings(warnings, warningCollector); + } + private List<Row> getScanFiles(Scan scan, Engine engine) { List<Row> scanFiles = new ArrayList<>(); + CloseableIterator<FilteredColumnarBatch> iter = scan.getScanFiles(engine); while (iter.hasNext()) { - FilteredColumnarBatch batch = null; - try { - batch = iter.next(); - } catch (UnsupportedOperationException e) { - // Failed to apply expression due to type mismatch. We can skip the files where partitioned column - // type is different from the type of value provided in the predicate - LOGGER.info("Unsupported operation {}", e.getMessage()); - continue; - } + FilteredColumnarBatch batch = iter.next(); CloseableIterator<Row> rowIter = batch.getRows(); while (rowIter.hasNext()) { Row row = rowIter.next(); scanFiles.add(row); } } - LOGGER.info("Number of files to scan: {}", scanFiles.size()); - locationConstraints = getPartitions(appCtx); - configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA); - distributeFiles(scanFiles, getPartitionConstraint().getLocations().length); - issueWarnings(warnings, warningCollector); + return scanFiles; } private void issueWarnings(List<Warning> warnings, IWarningCollector warningCollector) { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index 82b5dad..fac1199 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@ -120,6 +120,7 @@ import io.delta.kernel.defaults.engine.DefaultEngine; import io.delta.kernel.engine.Engine; +import io.delta.kernel.exceptions.KernelEngineException; import io.delta.kernel.exceptions.KernelException; public class ExternalDataUtils { @@ -540,7 +541,7 @@ io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath); try { table.getLatestSnapshot(engine); - } catch (KernelException e) { + } catch (KernelException | KernelEngineException e) { LOGGER.info("Failed to get latest snapshot for table: {}", tableMetadataPath, e); throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e)); } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/DeltaTableFilterBuilder.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/DeltaTableFilterBuilder.java index 73ed81e..112dba2 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/DeltaTableFilterBuilder.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/DeltaTableFilterBuilder.java @@ -35,6 +35,7 @@ import org.apache.asterix.om.constants.AsterixConstantValue; import org.apache.asterix.om.functions.IFunctionDescriptor; import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.ATypeTag; import org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo; import org.apache.commons.lang3.mutable.Mutable; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -48,8 +49,7 @@ import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; import org.apache.logging.log4j.LogManager; - -import com.microsoft.azure.storage.core.Logger; +import org.apache.logging.log4j.Logger; import io.delta.kernel.expressions.Column; import io.delta.kernel.expressions.Expression; @@ -58,7 +58,7 @@ public class DeltaTableFilterBuilder extends AbstractFilterBuilder { - private static final org.apache.logging.log4j.Logger LOGGER = LogManager.getLogger(); + private static final Logger LOGGER = LogManager.getLogger(); public DeltaTableFilterBuilder(ExternalDatasetProjectionFiltrationInfo projectionFiltrationInfo, JobGenContext context, IVariableTypeEnvironment typeEnv) { @@ -72,7 +72,7 @@ try { deltaTablePredicate = createExpression(filterExpression); } catch (Exception e) { - LOGGER.error("Error creating DeltaTable filter expression ", e); + LOGGER.error("Error creating DeltaTable filter expression, skipping filter pushdown", e); } } if (deltaTablePredicate != null && !(deltaTablePredicate instanceof Predicate)) { @@ -138,12 +138,16 @@ private Expression handleFunction(ILogicalExpression expr) throws AlgebricksException { AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr; IFunctionDescriptor fd = resolveFunction(funcExpr); - List<Expression> args = handleArgs(funcExpr); FunctionIdentifier fid = fd.getIdentifier(); + if (funcExpr.getArguments().size() != 2 + && !(fid.equals(AlgebricksBuiltinFunctions.AND) || fid.equals(AlgebricksBuiltinFunctions.OR))) { + throw new RuntimeException("Predicate should only have 2 arguments: " + funcExpr); + } + List<Expression> args = handleArgs(funcExpr); if (fid.equals(AlgebricksBuiltinFunctions.AND)) { - return new Predicate("AND", args); + return createAndOrPredicate("AND", args, 0); } else if (fid.equals(AlgebricksBuiltinFunctions.OR)) { - return new Predicate("OR", args); + return createAndOrPredicate("OR", args, 0); } else if (fid.equals(AlgebricksBuiltinFunctions.EQ)) { return new Predicate("=", args); } else if (fid.equals(AlgebricksBuiltinFunctions.GE)) { @@ -173,8 +177,40 @@ protected Column createColumnExpression(ILogicalExpression expression) { ARecordType path = filterPaths.get(expression); if (path.getFieldNames().length != 1) { - throw new RuntimeException("Unsupported expression: " + expression); + throw new RuntimeException("Unsupported column expression: " + expression); + } else if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.OBJECT) { + // The field could be a nested field + List<String> fieldList = new ArrayList<>(); + fieldList = createPathExpression(path, fieldList); + return new Column(fieldList.toArray(new String[0])); + } else if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.ANY) { + return new Column(path.getFieldNames()[0]); + } else { + throw new RuntimeException("Unsupported column expression: " + expression); } - return new Column(path.getFieldNames()[0]); + } + + private List<String> createPathExpression(ARecordType path, List<String> fieldList) { + if (path.getFieldNames().length != 1) { + throw new RuntimeException("Error creating column expression"); + } else { + fieldList.add(path.getFieldNames()[0]); + } + if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.OBJECT) { + return createPathExpression((ARecordType) path.getFieldTypes()[0], fieldList); + } else if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.ANY) { + return fieldList; + } else { + throw new RuntimeException("Error creating column expression"); + } + } + + // Converts or(pred1, pred2, pred3) to or(pred1, or(pred2, pred3)) + private Predicate createAndOrPredicate(String function, List<Expression> args, int index) { + if (index == args.size() - 2) { + return new Predicate(function, args.get(index), args.get(index + 1)); + } else { + return new Predicate(function, args.get(index), createAndOrPredicate(function, args, index + 1)); + } } } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19472 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: ionic Gerrit-Change-Id: I7177a10da9145a8de862991b3e57213138dae9b9 Gerrit-Change-Number: 19472 Gerrit-PatchSet: 3 Gerrit-Owner: Peeyush Gupta <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Peeyush Gupta <[email protected]> Gerrit-MessageType: merged
