Yingyi Bu has submitted this change and it was merged. Change subject: ASTERIXDB-1791, ASTERIXDB-1796: fix failure handling in runtime operators. ......................................................................
ASTERIXDB-1791, ASTERIXDB-1796: fix failure handling in runtime operators. This change includes the following parts: - Fix the implementation of fail() and close() in several runtime operators to avoid file handle leak and job hang; - Add an erase method to RunFileWriter which closes files before deleting them in order release the holding disk space; - Call RunFileWriter.close() and RunFileReader.close() in "finally" blocks. - Fix RunFileReader to not truncate files to be deleted - it is not the root cause of un-released disk space - open deleted files are the root cuase; - Check file handle leaks in LangExecutionUtil.tearDown(). Change-Id: I203168171e6dac16b57d2eda960823e3810e22a3 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1513 Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]> --- M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java M asterixdb/asterix-app/src/test/resources/runtimets/queries/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.aql A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.1.ddl.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.2.update.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.3.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.1.ddl.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.2.update.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.3.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.1.ddl.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.2.update.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.3.query.sqlpp M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.1.ddl.sqlpp M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.2.update.sqlpp M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.1.ddl.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.2.update.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.3.query.sqlpp M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/NoTupleSourceRuntimeFactory.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java M hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java 54 files changed, 1,027 insertions(+), 360 deletions(-) Approvals: Till Westmann: Looks good to me, approved Jenkins: Verified; Verified Objections: Jenkins: Violations found; Violations found diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java index a3d6102..d04217c 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java @@ -83,47 +83,53 @@ TestPartitionWriterFactory partitionWriterFactory = new TestPartitionWriterFactory(); IFrameWriter partitioner = connector.createPartitioner(ctx, rDesc, partitionWriterFactory, CURRENT_PRODUCER, NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS); - partitioner.open(); - FrameTupleAccessor fta = new FrameTupleAccessor(rDesc); List<TestFrameWriter> recipients = new ArrayList<>(); - for (IFrameWriter writer : partitionWriterFactory.getWriters().values()) { - recipients.add((TestFrameWriter) writer); - } - partitioner.flush(); - for (TestFrameWriter writer : recipients) { - Assert.assertEquals(writer.nextFrameCount(), 1); - fta.reset(writer.getLastFrame()); - Assert.assertEquals(fta.getTupleCount(), 1); - FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta); - Assert.assertEquals(MessagingFrameTupleAppender.NULL_FEED_MESSAGE, - MessagingFrameTupleAppender.getMessageType(tempBuffer)); - } - message.getBuffer().clear(); - message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE); - message.getBuffer().flip(); - partitioner.flush(); - for (TestFrameWriter writer : recipients) { - Assert.assertEquals(writer.nextFrameCount(), 2); - fta.reset(writer.getLastFrame()); - Assert.assertEquals(fta.getTupleCount(), 1); - FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta); - Assert.assertEquals(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE, - MessagingFrameTupleAppender.getMessageType(tempBuffer)); - } + try { + partitioner.open(); + FrameTupleAccessor fta = new FrameTupleAccessor(rDesc); + for (IFrameWriter writer : partitionWriterFactory.getWriters().values()) { + recipients.add((TestFrameWriter) writer); + } + partitioner.flush(); + for (TestFrameWriter writer : recipients) { + Assert.assertEquals(writer.nextFrameCount(), 1); + fta.reset(writer.getLastFrame()); + Assert.assertEquals(fta.getTupleCount(), 1); + FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta); + Assert.assertEquals(MessagingFrameTupleAppender.NULL_FEED_MESSAGE, + MessagingFrameTupleAppender.getMessageType(tempBuffer)); + } + message.getBuffer().clear(); + message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE); + message.getBuffer().flip(); + partitioner.flush(); + for (TestFrameWriter writer : recipients) { + Assert.assertEquals(writer.nextFrameCount(), 2); + fta.reset(writer.getLastFrame()); + Assert.assertEquals(fta.getTupleCount(), 1); + FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta); + Assert.assertEquals(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE, + MessagingFrameTupleAppender.getMessageType(tempBuffer)); + } - message.getBuffer().clear(); - message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE); - message.getBuffer().flip(); - partitioner.flush(); - for (TestFrameWriter writer : recipients) { - Assert.assertEquals(writer.nextFrameCount(), 3); - fta.reset(writer.getLastFrame()); - Assert.assertEquals(fta.getTupleCount(), 1); - FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta); - Assert.assertEquals(MessagingFrameTupleAppender.NULL_FEED_MESSAGE, - MessagingFrameTupleAppender.getMessageType(tempBuffer)); + message.getBuffer().clear(); + message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE); + message.getBuffer().flip(); + partitioner.flush(); + for (TestFrameWriter writer : recipients) { + Assert.assertEquals(writer.nextFrameCount(), 3); + fta.reset(writer.getLastFrame()); + Assert.assertEquals(fta.getTupleCount(), 1); + FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta); + Assert.assertEquals(MessagingFrameTupleAppender.NULL_FEED_MESSAGE, + MessagingFrameTupleAppender.getMessageType(tempBuffer)); + } + } catch (Throwable t) { + partitioner.fail(); + throw t; + } finally { + partitioner.close(); } - partitioner.close(); for (TestFrameWriter writer : recipients) { Assert.assertEquals(writer.nextFrameCount(), 4); Assert.assertEquals(writer.closeCount(), 1); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java index 4db00c8..0e6be0f 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java @@ -19,7 +19,12 @@ package org.apache.asterix.test.runtime; +import java.io.BufferedReader; import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.lang.management.ManagementFactory; +import java.lang.management.RuntimeMXBean; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -28,7 +33,9 @@ import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.test.common.TestExecutor; import org.apache.asterix.testframework.context.TestCaseContext; +import org.apache.commons.lang.SystemUtils; import org.apache.commons.lang3.StringUtils; +import org.junit.Assert; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -63,6 +70,9 @@ } public static void tearDown() throws Exception { + // Check whether there are leaked open run file handles. + checkRunFileLeaks(); + TestLibrarian.removeLibraryDir(); ExecutionTestUtil.tearDown(cleanupOnStop); ExecutionTestUtil.integrationUtil.removeTestStorageFiles(); @@ -117,4 +127,22 @@ System.err.flush(); } } + + private static void checkRunFileLeaks() throws IOException { + if (SystemUtils.IS_OS_WINDOWS) { + return; + } + // Only run the check on Linux and MacOS. + RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean(); + String processName = runtimeMXBean.getName(); + String processId = processName.split("@")[0]; + + // Checks whether there are leaked run files from operators. + Process process = Runtime.getRuntime() + .exec(new String[] { "bash", "-c", "lsof -p " + processId + "|grep waf|wc -l" }); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { + int runFileCount = Integer.parseInt(reader.readLine().trim()); + Assert.assertTrue(runFileCount == 0); + } + } } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.aql index 1757130..9b2e66d 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.aql +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.aql @@ -18,11 +18,11 @@ */ use dataverse tpch; - + for $l in dataset('LineItem') //where inject-failure($l.l_shipdate <= '1998-09-02', $l.l_orderkey=5999) /*+ hash*/ -group by $l_returnflag := $l.l_returnflag, $l_linestatus := $l.l_linestatus +group by $l_returnflag := $l.l_returnflag, $l_linestatus := $l.l_linestatus with $l order by $l_returnflag, $l_linestatus return { @@ -32,8 +32,8 @@ "sum_base_price": sum(for $i in $l return $i.l_extendedprice), "sum_disc_price": sum(for $i in $l return $i.l_extendedprice * (1 - $i.l_discount)), "sum_charge": sum(for $i in $l return $i.l_extendedprice * (1 - $i.l_discount) * (1 + $i.l_tax)), - "ave_qty": avg(for $i in $l return $i.l_quantity), + "ave_qty": avg(for $i in $l return $i.l_quantity), "ave_price": avg(for $i in $l return $i.l_extendedprice), "ave_disc": avg(for $i in $l return $i.l_discount), "count_order": count($l) -} +} diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.1.ddl.sqlpp new file mode 100644 index 0000000..6b277db --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.1.ddl.sqlpp @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +drop dataverse tpch if exists; +create dataverse tpch; + +use tpch; + + +create type LineItemType as + closed { + l_orderkey : integer, + l_partkey : integer, + l_suppkey : integer, + l_linenumber : integer, + l_quantity : double, + l_extendedprice : double, + l_discount : double, + l_tax : double, + l_returnflag : string, + l_linestatus : string, + l_shipdate : string, + l_commitdate : string, + l_receiptdate : string, + l_shipinstruct : string, + l_shipmode : string, + l_comment : string +} + +create dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.2.update.sqlpp new file mode 100644 index 0000000..39205db --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.2.update.sqlpp @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use tpch; + + +load dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`), +(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.3.query.sqlpp new file mode 100644 index 0000000..a3b7589 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.3.query.sqlpp @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +USE tpch; + +SET `import-private-functions` "true"; + +SELECT inject_failure(l_orderkey, l_orderkey=5988), SUM(l_quantity) t_sum_quantity +FROM LineItem l +GROUP BY l_orderkey; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.1.ddl.sqlpp new file mode 100644 index 0000000..6b277db --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.1.ddl.sqlpp @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +drop dataverse tpch if exists; +create dataverse tpch; + +use tpch; + + +create type LineItemType as + closed { + l_orderkey : integer, + l_partkey : integer, + l_suppkey : integer, + l_linenumber : integer, + l_quantity : double, + l_extendedprice : double, + l_discount : double, + l_tax : double, + l_returnflag : string, + l_linestatus : string, + l_shipdate : string, + l_commitdate : string, + l_receiptdate : string, + l_shipinstruct : string, + l_shipmode : string, + l_comment : string +} + +create dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.2.update.sqlpp new file mode 100644 index 0000000..39205db --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.2.update.sqlpp @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use tpch; + + +load dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`), +(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.3.query.sqlpp new file mode 100644 index 0000000..c11a0ee --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.3.query.sqlpp @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +USE tpch; + +SET `import-private-functions` "true"; + +SELECT inject_failure(l_orderkey, l_orderkey=5988), SUM(l_quantity) t_sum_quantity +FROM LineItem l +/*+ hash */ +GROUP BY l_orderkey; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.1.ddl.sqlpp new file mode 100644 index 0000000..6b277db --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.1.ddl.sqlpp @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +drop dataverse tpch if exists; +create dataverse tpch; + +use tpch; + + +create type LineItemType as + closed { + l_orderkey : integer, + l_partkey : integer, + l_suppkey : integer, + l_linenumber : integer, + l_quantity : double, + l_extendedprice : double, + l_discount : double, + l_tax : double, + l_returnflag : string, + l_linestatus : string, + l_shipdate : string, + l_commitdate : string, + l_receiptdate : string, + l_shipinstruct : string, + l_shipmode : string, + l_comment : string +} + +create dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.2.update.sqlpp new file mode 100644 index 0000000..39205db --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.2.update.sqlpp @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use tpch; + + +load dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`), +(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.3.query.sqlpp new file mode 100644 index 0000000..0314a6e --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.3.query.sqlpp @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +USE tpch; + +SET `import-private-functions` "true"; + +SELECT inject_failure(l_orderkey, l_orderkey=1024), l_quantity +FROM LineItem l +ORDER BY l_shipdate, l_orderkey; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.1.ddl.sqlpp index f91df13..5ebec80 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.1.ddl.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.1.ddl.sqlpp @@ -20,10 +20,10 @@ drop dataverse tpch if exists; create dataverse tpch; -use test; +use tpch; -create type test.LineItemType as +create type LineItemType as closed { l_orderkey : integer, l_partkey : integer, diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.2.update.sqlpp index 5fe734c..0340837 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.2.update.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.2.update.sqlpp @@ -20,5 +20,6 @@ use tpch; -load dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted; +load dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`), +(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.sqlpp index cac4a08..e96644b 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.sqlpp @@ -17,33 +17,22 @@ * under the License. */ -use tpch; +USE tpch; +SET `import-private-functions` "true"; -select element {'l_returnflag':l_returnflag,'l_linestatus':l_linestatus,'sum_qty':tpch.coll_sum(( - select element i.l_quantity - from l as i - )),'sum_base_price':tpch.coll_sum(( - select element i.l_extendedprice - from l as i - )),'sum_disc_price':tpch.coll_sum(( - select element (i.l_extendedprice * (1 - i.l_discount)) - from l as i - )),'sum_charge':tpch.coll_sum(( - select element (i.l_extendedprice * (1 - i.l_discount) * (1 + i.l_tax)) - from l as i - )),'ave_qty':tpch.coll_avg(( - select element i.l_quantity - from l as i - )),'ave_price':tpch.coll_avg(( - select element i.l_extendedprice - from l as i - )),'ave_disc':tpch.coll_avg(( - select element i.l_discount - from l as i - )),'count_order':tpch.coll_count(l)} -from LineItem as l -/* +hash */ -group by l.l_returnflag as l_returnflag,l.l_linestatus as l_linestatus -order by l_returnflag,l_linestatus +SELECT l_returnflag, + l_linestatus, + sum(l_quantity) AS sum_qty, + sum(l_extendedprice) AS sum_base_price, + sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price, + sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge, + avg(l_quantity) AS ave_qty, + avg(l_extendedprice) AS ave_price, + avg(l_discount) AS ave_disc, + count(1) AS count_order +FROM LineItem l +WHERE l.l_shipdate <= '1998-09-02' AND inject_failure(true, l.l_orderkey=5988) +GROUP BY l_returnflag, l_linestatus +ORDER BY l_returnflag, l_linestatus ; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.1.ddl.sqlpp new file mode 100644 index 0000000..9e64c15 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.1.ddl.sqlpp @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +drop dataverse tpch if exists; +create dataverse tpch; + +use tpch; + + +create type LineItemType as { + l_orderkey : bigint, + l_partkey : bigint, + l_suppkey : bigint, + l_linenumber : bigint, + l_quantity : bigint, + l_extendedprice : double, + l_discount : double, + l_tax : double, + l_returnflag : string, + l_linestatus : string, + l_shipdate : string, + l_commitdate : string, + l_receiptdate : string, + l_shipinstruct : string, + l_shipmode : string, + l_comment : string +} + +create type OrderType as { + o_orderkey : bigint, + o_custkey : bigint, + o_orderstatus : string, + o_totalprice : double, + o_orderdate : string, + o_orderpriority : string, + o_clerk : string, + o_shippriority : bigint, + o_comment : string +} + +create type CustomerType as { + c_custkey : bigint, + c_name : string, + c_address : string, + c_nationkey : bigint, + c_phone : string, + c_acctbal : double, + c_mktsegment : string, + c_comment : string +} + +create dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber; + +create dataset Orders(OrderType) primary key o_orderkey; + +create dataset Customer(CustomerType) primary key c_custkey; + diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.2.update.sqlpp new file mode 100644 index 0000000..83ec6c3 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.2.update.sqlpp @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use tpch; + +load dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),(`format`=`delimited-text`), +(`delimiter`=`|`)); + +load dataset Orders using localfs ((`path`=`asterix_nc1://data/tpch0.001/orders.tbl`),(`format`=`delimited-text`), +(`delimiter`=`|`)); + +load dataset Customer using localfs ((`path`=`asterix_nc1://data/tpch0.001/customer.tbl`),(`format`=`delimited-text`), +(`delimiter`=`|`)); diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.3.query.sqlpp new file mode 100644 index 0000000..35acf08 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.3.query.sqlpp @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +USE tpch; + +SET `import-private-functions` "true"; + +WITH tmp AS +( + SELECT l_orderkey, SUM(l_quantity) t_sum_quantity + FROM LineItem + GROUP BY l_orderkey +) + +SELECT c.c_name, c.c_custkey, inject_failure(o.o_orderkey, o.o_orderkey=5988), + o.o_orderdate, o.o_totalprice, l.l_quantity +FROM LineItem l +JOIN tmp t ON t.l_orderkey = l.l_orderkey +JOIN Orders o ON o.o_orderkey = t.l_orderkey +JOIN Customer c ON c.c_custkey = o.o_custkey +; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml index 0d8da65..431b215 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml @@ -2028,13 +2028,36 @@ </test-case> </test-group> <test-group name="failure"> - <!-- - <test-case FilePath="failure"> - <compilation-unit name="q1_pricing_summary_report_failure"> - <output-dir compare="Text">q1_pricing_summary_report_failure</output-dir> - </compilation-unit> - </test-case> - --> + <test-case FilePath="failure"> + <compilation-unit name="group_by_failure"> + <output-dir compare="Text">group_by_failure</output-dir> + <expected-error>Injected failure in asterix:inject-failure</expected-error> + </compilation-unit> + </test-case> + <test-case FilePath="failure"> + <compilation-unit name="group_by_hash_failure"> + <output-dir compare="Text">group_by_hash_failure</output-dir> + <expected-error>Injected failure in asterix:inject-failure</expected-error> + </compilation-unit> + </test-case> + <test-case FilePath="failure"> + <compilation-unit name="q01_pricing_summary_report_failure"> + <output-dir compare="Text">q01_pricing_summary_report_failure</output-dir> + <expected-error>Injected failure in asterix:inject-failure</expected-error> + </compilation-unit> + </test-case> + <test-case FilePath="failure"> + <compilation-unit name="q18_large_volume_customer_failure"> + <output-dir compare="Text">q18_large_volume_customer_failure</output-dir> + <expected-error>Injected failure in asterix:inject-failure</expected-error> + </compilation-unit> + </test-case> + <test-case FilePath="failure"> + <compilation-unit name="order_by_failure"> + <output-dir compare="Text">order_by_failure</output-dir> + <expected-error>Injected failure in asterix:inject-failure</expected-error> + </compilation-unit> + </test-case> </test-group> <!-- <test-group name="flwor"> diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java index baf5dba..dd713e6 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java @@ -155,6 +155,9 @@ public void close() throws HyracksDataException { try { appender.write(writer, true); + } catch (Exception e) { + writer.fail(); + throw e; } finally { writer.close(); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java index cd04515..9982477 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java @@ -129,8 +129,9 @@ } } catch (Throwable th) { LOGGER.log(Level.WARNING, th.getMessage(), th); + } finally { + writer.close(); } - writer.close(); } @Override diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java index 99fff19..fe2d4ec 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java @@ -74,15 +74,17 @@ throw new RuntimeDataException( ErrorCode.OPERATORS_FEED_INTAKE_OPERATOR_NODE_PUSHABLE_FAIL_AT_INGESTION); } - } catch (Throwable ie) { + } catch (Exception e) { /* - * An Interrupted Exception is thrown if the Intake job cannot progress further due to failure of another node involved in the Hyracks job. - * As the Intake job involves only the intake operator, the exception is indicative of a failure at the sibling intake operator location. - * The surviving intake partitions must continue to live and receive data from the external source. + * An Interrupted Exception is thrown if the Intake job cannot progress further due to failure of another + * node involved in the Hyracks job. As the Intake job involves only the intake operator, the exception is + * indicative of a failure at the sibling intake operator location. The surviving intake partitions must + * continue to live and receive data from the external source. */ - throw new HyracksDataException(ie); + writer.fail(); + throw e; } finally { - writer.close(); + writer.close(); } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java index b2ffd6e..6869523 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java @@ -301,10 +301,11 @@ @Override public void close() throws HyracksDataException { try { - cursor.close(); - writer.close(); - } catch (Exception e) { - throw new HyracksDataException(e); + try { + cursor.close(); + } finally { + writer.close(); + } } finally { indexHelper.close(); } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/NoTupleSourceRuntimeFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/NoTupleSourceRuntimeFactory.java index c3e2681..377ad60 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/NoTupleSourceRuntimeFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/NoTupleSourceRuntimeFactory.java @@ -39,8 +39,11 @@ @Override public void open() throws HyracksDataException { - writer.open(); - writer.close(); + try { + writer.open(); + } finally { + writer.close(); + } } }; diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java index d38c5b7..33078ff 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java @@ -170,8 +170,14 @@ if (isSink) { return; } - flushIfNotFailed(); - writer.close(); + try { + flushIfNotFailed(); + } catch (Exception e) { + writer.fail(); + throw e; + } finally { + writer.close(); + } appender.reset(frame, true); } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java index e94f4b7..2d8eaed 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java @@ -53,6 +53,9 @@ public void close() throws HyracksDataException { try { flushIfNotFailed(); + } catch (Exception e) { + writer.fail(); + throw e; } finally { writer.close(); } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java index 1123c5e..3ac8c40 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java @@ -21,8 +21,6 @@ import java.nio.ByteBuffer; import java.util.Arrays; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.api.comm.IFrameWriter; @@ -36,6 +34,8 @@ import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable; +import com.fasterxml.jackson.databind.node.ObjectNode; + public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { private static final long serialVersionUID = 1L; diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java index 60d7eec..aefc99d 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java @@ -179,7 +179,7 @@ @Override public void fail() throws HyracksDataException { if (isOpen) { - writer.fail(); + super.fail(); } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java index 8d312d5..b5bd1a9 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java @@ -53,7 +53,7 @@ public static final int RESULT_FAILURE_EXCEPTION = 16; public static final int RESULT_FAILURE_NO_EXCEPTION = 17; public static final int INCONSISTENT_RESULT_METADATA = 18; - public static final int CANNOT_TRUNCATE_OR_DELETE_FILE = 19; + public static final int CANNOT_DELETE_FILE = 19; public static final int NOT_A_JOBID = 20; public static final int ERROR_FINDING_DISTRIBUTED_JOB = 21; public static final int DUPLICATE_DISTRIBUTED_JOB = 22; diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties index 7f90c35..f536d3e 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties @@ -37,7 +37,7 @@ 16 = Failure producing result set %1$s for job %2$s 17 = No exception for failed result set %1$s for job %2$s 18 = Inconsistent metadata for result set %1$s" -19 = Can't truncate or delete the file: %1$s +19 = Cannot delete the file: %1$s 20 = '%1$s' is not a valid job id. 21 = The distributed job %1$s was not found 22 = The distributed job %1$s already exists diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java index f7aa2e8..952eb75 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java @@ -55,6 +55,8 @@ private boolean partitionRegistered; + private boolean failed = false; + public DatasetPartitionWriter(IHyracksTaskContext ctx, IDatasetPartitionManager manager, JobId jobId, ResultSetId rsId, boolean asyncMode, boolean orderedResult, int partition, int nPartitions, DatasetMemoryManager datasetMemoryManager, IWorkspaceFileFactory fileFactory) { @@ -97,6 +99,7 @@ @Override public void fail() throws HyracksDataException { try { + failed = true; resultState.closeAndDelete(); resultState.abort(); registerResultPartitionLocation(false); @@ -111,8 +114,13 @@ if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("close(" + partition + ")"); } - registerResultPartitionLocation(true); - resultState.close(); + try { + if (!failed) { + registerResultPartitionLocation(true); + } + } finally { + resultState.close(); + } try { manager.reportPartitionWriteCompletion(jobId, resultSetId, partition); } catch (HyracksException e) { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java index be7ed3d..c501b5b 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java @@ -86,6 +86,7 @@ public synchronized void close() { eos.set(true); + closeWriteFileHandle(); notifyAll(); } @@ -93,15 +94,19 @@ // Deleting a job is equivalent to aborting the job for all practical purposes, so the same action, needs // to be taken when there are more requests to these result states. failed.set(true); + closeWriteFileHandle(); + if (fileRef != null) { + fileRef.delete(); + } + } + + private void closeWriteFileHandle() { if (writeFileHandle != null) { try { ioManager.close(writeFileHandle); } catch (IOException e) { // Since file handle could not be closed, just ignore. } - } - if (fileRef != null) { - fileRef.delete(); } } @@ -114,7 +119,6 @@ } size += ioManager.syncWrite(writeFileHandle, size, buffer); - notifyAll(); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java index f0e7f0e..33b8980 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java @@ -72,16 +72,11 @@ } public void close() throws IOException { - channel.close(); raf.close(); } public FileReference getFileReference() { return fileRef; - } - - public RandomAccessFile getRandomAccessFile() { - return raf; } public FileChannel getFileChannel() { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java index e5e81ab..e51d2bd 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java @@ -80,10 +80,13 @@ @Override public void close() throws HyracksDataException { closeTime = System.currentTimeMillis(); - ((Task) ctx) - .setPartitionSendProfile(new PartitionProfile(new PartitionId(ctx.getJobletContext().getJobId(), - cd.getConnectorId(), senderIndex, receiverIndex), openTime, closeTime, mrep)); - writer.close(); + try { + ((Task) ctx).setPartitionSendProfile( + new PartitionProfile(new PartitionId(ctx.getJobletContext().getJobId(), cd.getConnectorId(), + senderIndex, receiverIndex), openTime, closeTime, mrep)); + } finally { + writer.close(); + } } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java index 365b01c..d9a4c7c 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java @@ -62,8 +62,14 @@ if (!open) { throw new HyracksDataException("Closing SerializingDataWriter that has not been opened"); } - tupleAppender.write(frameWriter, true); - frameWriter.close(); + try { + tupleAppender.write(frameWriter, true); + } catch (Exception e) { + frameWriter.fail(); + throw e; + } finally { + frameWriter.close(); + } open = false; } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java index b69f377..f0bd318 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java @@ -29,7 +29,6 @@ import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IFileHandle; import org.apache.hyracks.api.io.IIOManager; -import org.apache.hyracks.control.nc.io.FileHandle; public class RunFileReader implements IFrameReader { private final FileReference file; @@ -49,7 +48,7 @@ @Override public void open() throws HyracksDataException { // Opens RW mode because we need to truncate the given file if required. - handle = ioManager.open(file, IIOManager.FileReadWriteMode.READ_WRITE, + handle = ioManager.open(file, IIOManager.FileReadWriteMode.READ_ONLY, IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC); readPtr = 0; } @@ -87,12 +86,10 @@ public void close() throws HyracksDataException { if (deleteAfterClose) { try { - // Truncates the file size to zero since OS might be keeping the file for a while. - ((FileHandle) handle).getFileChannel().truncate(0); ioManager.close(handle); FileUtils.deleteQuietly(file.getFile()); } catch (IOException e) { - throw HyracksDataException.create(ErrorCode.CANNOT_TRUNCATE_OR_DELETE_FILE, e, file.toString()); + throw HyracksDataException.create(ErrorCode.CANNOT_DELETE_FILE, e, file.toString()); } } else { ioManager.close(handle); diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java index 8031422..915c63d 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java @@ -27,8 +27,8 @@ import org.apache.hyracks.api.io.IIOManager; public class RunFileWriter implements IFrameWriter { - private final FileReference file; private final IIOManager ioManager; + private FileReference file; private boolean failed; private IFileHandle handle; @@ -69,6 +69,15 @@ } } + public void erase() throws HyracksDataException { + close(); + file.delete(); + + // Make sure we never access the file if it is deleted. + file = null; + handle = null; + } + public FileReference getFileReference() { return file; } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java index 7d10802..c049b8d 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java @@ -107,10 +107,9 @@ @Override public void close() throws HyracksDataException { if (isFailed && state.getRuns() != null) { - for (int i = 0; i < state.getRuns().length; i++) { - RunFileWriter run = state.getRuns()[i]; + for (RunFileWriter run : state.getRuns()) { if (run != null) { - run.getFileReference().delete(); + run.erase(); } } } else { diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java index 0dbb063..b17215f 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java @@ -95,9 +95,14 @@ writer.open(); doPass(table, partitionRuns, numberOfTuples, writer, 1); // level 0 use used at build stage. } catch (Exception e) { - generatedRuns.forEach(run -> run.getFileReference().delete()); - writer.fail(); - throw new HyracksDataException(e); + try { + for (RunFileWriter run : generatedRuns) { + run.erase(); + } + } finally { + writer.fail(); + } + throw e; } finally { writer.close(); } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java index e0ef2b3..d29e9ab 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java @@ -75,10 +75,17 @@ } public void flushSpilledPartitions() throws HyracksDataException { - for (int i = 0; i < runWriters.length; ++i) { - if (runWriters[i] != null) { - flushPartitionToRun(i, runWriters[i]); - runWriters[i].close(); + try { + for (int i = 0; i < runWriters.length; ++i) { + if (runWriters[i] != null) { + flushPartitionToRun(i, runWriters[i]); + } + } + } finally { + for (int i = 0; i < runWriters.length; ++i) { + if (runWriters[i] != null) { + runWriters[i].close(); + } } } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java index ad14bad..b622c9c 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java @@ -470,8 +470,12 @@ @Override public void close() throws HyracksDataException { try { - state.joiner.join(inBuffer.getBuffer(), writer); - state.joiner.closeJoin(writer); + try { + state.joiner.join(inBuffer.getBuffer(), writer); + state.joiner.completeJoin(writer); + } finally { + state.joiner.releaseMemory(); + } ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(state.nPartitions, hpcf0) .createPartitioner(); ITuplePartitionComputer hpcRep1 = new RepartitionComputerFactory(state.nPartitions, hpcf1) @@ -508,25 +512,35 @@ if (buildWriter != null) { RunFileReader buildReader = buildWriter.createDeleteOnCloseReader(); - buildReader.open(); - while (buildReader.nextFrame(inBuffer)) { - ByteBuffer copyBuffer = ctx.allocateFrame(inBuffer.getFrameSize()); - FrameUtils.copyAndFlip(inBuffer.getBuffer(), copyBuffer); - joiner.build(copyBuffer); - inBuffer.reset(); + try { + buildReader.open(); + while (buildReader.nextFrame(inBuffer)) { + ByteBuffer copyBuffer = ctx.allocateFrame(inBuffer.getFrameSize()); + FrameUtils.copyAndFlip(inBuffer.getBuffer(), copyBuffer); + joiner.build(copyBuffer); + inBuffer.reset(); + } + } finally { + buildReader.close(); } - buildReader.close(); } // probe RunFileReader probeReader = probeWriter.createDeleteOnCloseReader(); - probeReader.open(); - while (probeReader.nextFrame(inBuffer)) { - joiner.join(inBuffer.getBuffer(), writer); - inBuffer.reset(); + try { + probeReader.open(); + try { + while (probeReader.nextFrame(inBuffer)) { + joiner.join(inBuffer.getBuffer(), writer); + inBuffer.reset(); + } + joiner.completeJoin(writer); + } finally { + joiner.releaseMemory(); + } + } finally { + probeReader.close(); } - probeReader.close(); - joiner.closeJoin(writer); } } } finally { diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java index 8e52838..ec1c3a9 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java @@ -196,8 +196,11 @@ accessorProbe.reset(newAccessorProbe.getBuffer()); } - public void closeJoin(IFrameWriter writer) throws HyracksDataException { + public void completeJoin(IFrameWriter writer) throws HyracksDataException { appender.write(writer, true); + } + + public void releaseMemory() throws HyracksDataException { int nFrames = buffers.size(); // Frames assigned to the data table will be released here. if (bufferManager != null) { @@ -206,7 +209,6 @@ } } buffers.clear(); - if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine("InMemoryHashJoin has finished using " + nFrames + " frames for Thread ID " + Thread.currentThread().getId() + "."); diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java index a8d3f7e..cbeadd8 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java @@ -275,9 +275,13 @@ @Override public void close() throws HyracksDataException { try { - state.joiner.closeJoin(writer); + state.joiner.completeJoin(writer); } finally { - writer.close(); + try { + state.joiner.releaseMemory(); + } finally { + writer.close(); + } } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java index 202aac6..16c21df 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java @@ -50,7 +50,6 @@ private final IFrame outBuffer; private final IFrame innerBuffer; private final VariableFrameMemoryManager outerBufferMngr; - private RunFileReader runFileReader; private final RunFileWriter runFileWriter; private final boolean isLeftOuter; private final ArrayTupleBuilder missingTupleBuilder; @@ -103,14 +102,17 @@ public void join(ByteBuffer outerBuffer, IFrameWriter writer) throws HyracksDataException { if (outerBufferMngr.insertFrame(outerBuffer) < 0) { - runFileReader = runFileWriter.createReader(); - runFileReader.open(); - while (runFileReader.nextFrame(innerBuffer)) { - for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) { - blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer); + RunFileReader runFileReader = runFileWriter.createReader(); + try { + runFileReader.open(); + while (runFileReader.nextFrame(innerBuffer)) { + for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) { + blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer); + } } + } finally { + runFileReader.close(); } - runFileReader.close(); outerBufferMngr.reset(); if (outerBufferMngr.insertFrame(outerBuffer) < 0) { throw new HyracksDataException("The given outer frame of size:" + outerBuffer.capacity() @@ -174,20 +176,25 @@ } } - public void closeJoin(IFrameWriter writer) throws HyracksDataException { - runFileReader = runFileWriter.createDeleteOnCloseReader(); - runFileReader.open(); - while (runFileReader.nextFrame(innerBuffer)) { - for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) { - blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer); + public void completeJoin(IFrameWriter writer) throws HyracksDataException { + RunFileReader runFileReader = runFileWriter.createDeleteOnCloseReader(); + try { + runFileReader.open(); + while (runFileReader.nextFrame(innerBuffer)) { + for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) { + blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer); + } } + } finally { + runFileReader.close(); } - runFileReader.close(); - outerBufferMngr.reset(); - appender.write(writer, true); } + public void releaseMemory() throws HyracksDataException { + outerBufferMngr.reset(); + } + private int compare(FrameTupleAccessor accessor0, int tIndex0, FrameTupleAccessor accessor1, int tIndex1) throws HyracksDataException { int c = tpComparator.compare(accessor0, tIndex0, accessor1, tIndex1); diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java index 09b7544..5d79f75 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java @@ -173,6 +173,7 @@ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) { return new AbstractUnaryInputUnaryOutputOperatorNodePushable() { private JoinCacheTaskState state; + boolean failed = false; @Override public void open() throws HyracksDataException { @@ -188,8 +189,24 @@ @Override public void close() throws HyracksDataException { + if (failed) { + try { + state.joiner.closeCache(); + } finally { + writer.close(); + } + return; + } try { - state.joiner.closeJoin(writer); + try { + state.joiner.completeJoin(writer); + } finally { + state.joiner.releaseMemory(); + } + } catch (Exception e) { + state.joiner.closeCache(); + writer.fail(); + throw e; } finally { writer.close(); } @@ -197,6 +214,7 @@ @Override public void fail() throws HyracksDataException { + failed = true; writer.fail(); } }; diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java index 17f009e..a5e2f6f 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java @@ -240,10 +240,10 @@ /** * In case of failure happens, we need to clear up the generated temporary files. */ - public void clearBuildTempFiles() { + public void clearBuildTempFiles() throws HyracksDataException { for (int i = 0; i < buildRFWriters.length; i++) { if (buildRFWriters[i] != null) { - buildRFWriters[i].getFileReference().delete(); + buildRFWriters[i].erase(); } } } @@ -258,17 +258,22 @@ runFileWriters = probeRFWriters; break; } - - for (int pid = spilledStatus.nextSetBit(0); pid >= 0 - && pid < numOfPartitions; pid = spilledStatus.nextSetBit(pid + 1)) { - if (bufferManager.getNumTuples(pid) > 0) { - bufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, whichSide)); - bufferManager.clearPartition(pid); + try { + for (int pid = spilledStatus.nextSetBit(0); pid >= 0 + && pid < numOfPartitions; pid = spilledStatus.nextSetBit(pid + 1)) { + if (bufferManager.getNumTuples(pid) > 0) { + bufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, whichSide)); + bufferManager.clearPartition(pid); + } } - // It doesn't matter whether a spilled partition currently holds a tuple in memory or not. - // The file that holds the corresponding spilled partition needs to be closed. - if (runFileWriters[pid] != null) { - runFileWriters[pid].close(); + } finally { + // Force to close all run file writers. + if (runFileWriters != null) { + for (RunFileWriter runFileWriter : runFileWriters) { + if (runFileWriter != null) { + runFileWriter.close(); + } + } } } } @@ -418,26 +423,28 @@ private boolean loadSpilledPartitionToMem(int pid, RunFileWriter wr) throws HyracksDataException { RunFileReader r = wr.createReader(); - r.open(); - if (reloadBuffer == null) { - reloadBuffer = new VSizeFrame(ctx); - } - while (r.nextFrame(reloadBuffer)) { - accessorBuild.reset(reloadBuffer.getBuffer()); - for (int tid = 0; tid < accessorBuild.getTupleCount(); tid++) { - if (!bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) { + try { + r.open(); + if (reloadBuffer == null) { + reloadBuffer = new VSizeFrame(ctx); + } + while (r.nextFrame(reloadBuffer)) { + accessorBuild.reset(reloadBuffer.getBuffer()); + for (int tid = 0; tid < accessorBuild.getTupleCount(); tid++) { + if (bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) { + continue; + } // for some reason (e.g. due to fragmentation) if the inserting failed, // we need to clear the occupied frames bufferManager.clearPartition(pid); - r.close(); return false; } } + // Closes and deletes the run file if it is already loaded into memory. + r.setDeleteAfterClose(true); + } finally { + r.close(); } - - // Closes and deletes the run file if it is already loaded into memory. - r.setDeleteAfterClose(true); - r.close(); spilledStatus.set(pid, false); buildRFWriters[pid] = null; return true; @@ -538,10 +545,13 @@ return spilledStatus.nextSetBit(0) < 0; } - public void closeProbe(IFrameWriter writer) throws HyracksDataException { + public void completeProbe(IFrameWriter writer) throws HyracksDataException { //We do NOT join the spilled partitions here, that decision is made at the descriptor level //(which join technique to use) - inMemJoiner.closeJoin(writer); + inMemJoiner.completeJoin(writer); + } + + public void releaseResource() throws HyracksDataException { inMemJoiner.closeTable(); closeAllSpilledPartitions(SIDE.PROBE); bufferManager.close(); @@ -553,10 +563,10 @@ /** * In case of failure happens, we need to clear up the generated temporary files. */ - public void clearProbeTempFiles() { + public void clearProbeTempFiles() throws HyracksDataException { for (int i = 0; i < probeRFWriters.length; i++) { if (probeRFWriters[i] != null) { - probeRFWriters[i].getFileReference().delete(); + probeRFWriters[i].erase(); } } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java index a72c0c6..d5e3568 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java @@ -385,6 +385,7 @@ private FrameTupleAppender nullResultAppender = null; private FrameTupleAccessor probeTupleAccessor; + private boolean failed = false; @Override public void open() throws HyracksDataException { @@ -406,21 +407,33 @@ @Override public void fail() throws HyracksDataException { - state.hybridHJ.clearProbeTempFiles(); + failed = true; writer.fail(); } @Override public void close() throws HyracksDataException { + if (failed) { + try { + // Clear temp files if fail() was called. + state.hybridHJ.clearBuildTempFiles(); + state.hybridHJ.clearProbeTempFiles(); + } finally { + writer.close(); // writer should always be closed. + } + logProbeComplete(); + return; + } try { - state.hybridHJ.closeProbe(writer); - + try { + state.hybridHJ.completeProbe(writer); + } finally { + state.hybridHJ.releaseResource(); + } BitSet partitionStatus = state.hybridHJ.getPartitionStatus(); - rPartbuff.reset(); for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid = partitionStatus .nextSetBit(pid + 1)) { - RunFileReader bReader = state.hybridHJ.getBuildRFReader(pid); RunFileReader pReader = state.hybridHJ.getProbeRFReader(pid); @@ -434,10 +447,25 @@ int pSize = state.hybridHJ.getProbePartitionSizeInTup(pid); joinPartitionPair(bReader, pReader, bSize, pSize, 1); } - + } catch (Exception e) { + // Since writer.nextFrame() is called in the above "try" body, we have to call writer.fail() + // to send the failure signal to the downstream, when there is a throwable thrown. + writer.fail(); + // Clear temp files as this.fail() nor this.close() will no longer be called after close(). + state.hybridHJ.clearBuildTempFiles(); + state.hybridHJ.clearProbeTempFiles(); + // Re-throw the whatever is caught. + throw e; } finally { - writer.close(); + try { + logProbeComplete(); + } finally { + writer.close(); + } } + } + + private void logProbeComplete() { if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine("OptimizedHybridHashJoin closed its probe phase"); } @@ -542,9 +570,7 @@ boolean isReversed = probeKeys == OptimizedHybridHashJoinOperatorDescriptor.this.buildKeys && buildKeys == OptimizedHybridHashJoinOperatorDescriptor.this.probeKeys; - assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles"; - OptimizedHybridHashJoin rHHj; int n = getNumberOfPartitions(state.memForJoin, tableSize, fudgeFactor, nPartitions); rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL, probeKeys, @@ -552,79 +578,107 @@ nonMatchWriterFactories); //checked-confirmed rHHj.setIsReversed(isReversed); - buildSideReader.open(); - rHHj.initBuild(); - rPartbuff.reset(); - while (buildSideReader.nextFrame(rPartbuff)) { - rHHj.build(rPartbuff.getBuffer()); + try { + buildSideReader.open(); + try { + rHHj.initBuild(); + rPartbuff.reset(); + while (buildSideReader.nextFrame(rPartbuff)) { + rHHj.build(rPartbuff.getBuffer()); + } + } finally { + // Makes sure that files are always properly closed. + rHHj.closeBuild(); + } + } finally { + buildSideReader.close(); } - rHHj.closeBuild(); - buildSideReader.close(); - probeSideReader.open(); - rHHj.initProbe(); - rPartbuff.reset(); - while (probeSideReader.nextFrame(rPartbuff)) { - rHHj.probe(rPartbuff.getBuffer(), writer); + try { + probeSideReader.open(); + rPartbuff.reset(); + try { + rHHj.initProbe(); + while (probeSideReader.nextFrame(rPartbuff)) { + rHHj.probe(rPartbuff.getBuffer(), writer); + } + rHHj.completeProbe(writer); + } finally { + rHHj.releaseResource(); + } + } finally { + // Makes sure that files are always properly closed. + probeSideReader.close(); } - rHHj.closeProbe(writer); - probeSideReader.close(); - int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize(); - int maxAfterProbeSize = rHHj.getMaxProbePartitionSize(); - int afterMax = Math.max(maxAfterBuildSize, maxAfterProbeSize); + try { + int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize(); + int maxAfterProbeSize = rHHj.getMaxProbePartitionSize(); + int afterMax = Math.max(maxAfterBuildSize, maxAfterProbeSize); - BitSet rPStatus = rHHj.getPartitionStatus(); - if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.1.1 - Keep applying HHJ - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH " - + "(isLeftOuter || build<probe) - [Level " + level + "]"); - } - for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) { - RunFileReader rbrfw = rHHj.getBuildRFReader(rPid); - RunFileReader rprfw = rHHj.getProbeRFReader(rPid); - int rbSizeInTuple = rHHj.getBuildPartitionSizeInTup(rPid); - int rpSizeInTuple = rHHj.getProbePartitionSizeInTup(rPid); + BitSet rPStatus = rHHj.getPartitionStatus(); + if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { + //Case 2.1.1 - Keep applying HHJ + if (LOGGER.isLoggable(Level.FINE)) { + LOGGER.fine("\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH " + + "(isLeftOuter || build<probe) - [Level " + level + "]"); + } + for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) { + RunFileReader rbrfw = rHHj.getBuildRFReader(rPid); + RunFileReader rprfw = rHHj.getProbeRFReader(rPid); + int rbSizeInTuple = rHHj.getBuildPartitionSizeInTup(rPid); + int rpSizeInTuple = rHHj.getProbePartitionSizeInTup(rPid); - if (rbrfw == null || rprfw == null) { - if (isLeftOuter && rprfw != null) { // For the outer join, we don't reverse the role. - appendNullToProbeTuples(rprfw); + if (rbrfw == null || rprfw == null) { + if (isLeftOuter && rprfw != null) { + // For the outer join, we don't reverse the role. + appendNullToProbeTuples(rprfw); + } + continue; } - continue; - } - if (isReversed) { - joinPartitionPair(rprfw, rbrfw, rpSizeInTuple, rbSizeInTuple, level + 1); - } else { - joinPartitionPair(rbrfw, rprfw, rbSizeInTuple, rpSizeInTuple, level + 1); - } - } - - } else { //Case 2.1.2 - Switch to NLJ - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine( - "\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build<probe)" - + " - [Level " + level + "]"); - } - for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) { - RunFileReader rbrfw = rHHj.getBuildRFReader(rPid); - RunFileReader rprfw = rHHj.getProbeRFReader(rPid); - - if (rbrfw == null || rprfw == null) { - if (isLeftOuter && rprfw != null) { // For the outer join, we don't reverse the role. - appendNullToProbeTuples(rprfw); + if (isReversed) { + joinPartitionPair(rprfw, rbrfw, rpSizeInTuple, rbSizeInTuple, level + 1); + } else { + joinPartitionPair(rbrfw, rprfw, rbSizeInTuple, rpSizeInTuple, level + 1); } - continue; } - int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid); - int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid); - // NLJ order is outer + inner, the order is reversed from the other joins - if (isLeftOuter || probeSideInTups < buildSideInTups) { - applyNestedLoopJoin(probeRd, buildRd, memSizeInFrames, rprfw, rbrfw); //checked-modified - } else { - applyNestedLoopJoin(buildRd, probeRd, memSizeInFrames, rbrfw, rprfw); //checked-modified + } else { //Case 2.1.2 - Switch to NLJ + if (LOGGER.isLoggable(Level.FINE)) { + LOGGER.fine( + "\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH " + + "(isLeftOuter || build<probe) - [Level " + level + "]"); + } + for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) { + RunFileReader rbrfw = rHHj.getBuildRFReader(rPid); + RunFileReader rprfw = rHHj.getProbeRFReader(rPid); + + if (rbrfw == null || rprfw == null) { + if (isLeftOuter && rprfw != null) { + // For the outer join, we don't reverse the role. + appendNullToProbeTuples(rprfw); + } + continue; + } + + int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid); + int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid); + // NLJ order is outer + inner, the order is reversed from the other joins + if (isLeftOuter || probeSideInTups < buildSideInTups) { + //checked-modified + applyNestedLoopJoin(probeRd, buildRd, memSizeInFrames, rprfw, rbrfw); + } else { + //checked-modified + applyNestedLoopJoin(buildRd, probeRd, memSizeInFrames, rbrfw, rprfw); + } } } + } catch (Exception e) { + // Make sure that temporary run files generated in recursive hybrid hash joins + // are closed and deleted. + rHHj.clearBuildTempFiles(); + rHHj.clearProbeTempFiles(); + throw e; } } @@ -635,17 +689,20 @@ if (probeTupleAccessor == null) { probeTupleAccessor = new FrameTupleAccessor(probeRd); } - probReader.open(); - while (probReader.nextFrame(rPartbuff)) { - probeTupleAccessor.reset(rPartbuff.getBuffer()); - for (int tid = 0; tid < probeTupleAccessor.getTupleCount(); tid++) { - FrameUtils.appendConcatToWriter(writer, nullResultAppender, probeTupleAccessor, tid, + try { + probReader.open(); + while (probReader.nextFrame(rPartbuff)) { + probeTupleAccessor.reset(rPartbuff.getBuffer()); + for (int tid = 0; tid < probeTupleAccessor.getTupleCount(); tid++) { + FrameUtils.appendConcatToWriter(writer, nullResultAppender, probeTupleAccessor, tid, nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize()); + } } + nullResultAppender.write(writer, true); + } finally { + probReader.close(); } - probReader.close(); - nullResultAppender.write(writer, true); } private void applyInMemHashJoin(int[] bKeys, int[] pKeys, int tabSize, RecordDescriptor buildRDesc, @@ -654,9 +711,7 @@ throws HyracksDataException { boolean isReversed = pKeys == OptimizedHybridHashJoinOperatorDescriptor.this.buildKeys && bKeys == OptimizedHybridHashJoinOperatorDescriptor.this.probeKeys; - assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles"; - IDeallocatableFramePool framePool = new DeallocatableFramePool(ctx, state.memForJoin * ctx.getInitialFrameSize()); ISimpleFrameBufferManager bufferManager = new FramePoolBackedFrameBufferManager(framePool); @@ -667,39 +722,52 @@ new FrameTuplePairComparator(pKeys, bKeys, comparators), isLeftOuter, nonMatchWriter, table, predEvaluator, isReversed, bufferManager); - bReader.open(); - rPartbuff.reset(); - while (bReader.nextFrame(rPartbuff)) { - // We need to allocate a copyBuffer, because this buffer gets added to the buffers list - // in the InMemoryHashJoin. - ByteBuffer copyBuffer = bufferManager.acquireFrame(rPartbuff.getFrameSize()); - // If a frame cannot be allocated, there may be a chance if we can compact the table, - // one or more frame may be reclaimed. - if (copyBuffer == null) { - if (joiner.compactHashTable() > 0) { - copyBuffer = bufferManager.acquireFrame(rPartbuff.getFrameSize()); - } + try { + bReader.open(); + rPartbuff.reset(); + while (bReader.nextFrame(rPartbuff)) { + // We need to allocate a copyBuffer, because this buffer gets added to the buffers list + // in the InMemoryHashJoin. + ByteBuffer copyBuffer = bufferManager.acquireFrame(rPartbuff.getFrameSize()); + // If a frame cannot be allocated, there may be a chance if we can compact the table, + // one or more frame may be reclaimed. if (copyBuffer == null) { - // Still no frame is allocated? At this point, we have no way to get a frame. - throw new HyracksDataException( - "Can't allocate one more frame. Assign more memory to InMemoryHashJoin."); + if (joiner.compactHashTable() > 0) { + copyBuffer = bufferManager.acquireFrame(rPartbuff.getFrameSize()); + } + if (copyBuffer == null) { + // Still no frame is allocated? At this point, we have no way to get a frame. + throw new HyracksDataException( + "Can't allocate one more frame. Assign more memory to InMemoryHashJoin."); + } } + FrameUtils.copyAndFlip(rPartbuff.getBuffer(), copyBuffer); + joiner.build(copyBuffer); + rPartbuff.reset(); } - FrameUtils.copyAndFlip(rPartbuff.getBuffer(), copyBuffer); - joiner.build(copyBuffer); - rPartbuff.reset(); + } finally { + bReader.close(); } - bReader.close(); - rPartbuff.reset(); - //probe - pReader.open(); - while (pReader.nextFrame(rPartbuff)) { - joiner.join(rPartbuff.getBuffer(), writer); + try { + //probe + pReader.open(); rPartbuff.reset(); + try { + while (pReader.nextFrame(rPartbuff)) { + joiner.join(rPartbuff.getBuffer(), writer); + rPartbuff.reset(); + } + joiner.completeJoin(writer); + } finally { + joiner.releaseMemory(); + } + } finally { + try { + pReader.close(); + } finally { + joiner.closeTable(); + } } - pReader.close(); - joiner.closeJoin(writer); - joiner.closeTable(); } private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize, @@ -716,40 +784,38 @@ nlj.setIsReversed(isReversed); IFrame cacheBuff = new VSizeFrame(ctx); - innerReader.open(); - while (innerReader.nextFrame(cacheBuff)) { - nlj.cache(cacheBuff.getBuffer()); - cacheBuff.reset(); + try { + innerReader.open(); + while (innerReader.nextFrame(cacheBuff)) { + nlj.cache(cacheBuff.getBuffer()); + cacheBuff.reset(); + } + } finally { + try { + nlj.closeCache(); + } finally { + innerReader.close(); + } } - nlj.closeCache(); - - IFrame joinBuff = new VSizeFrame(ctx); - outerReader.open(); - - while (outerReader.nextFrame(joinBuff)) { - nlj.join(joinBuff.getBuffer(), writer); - joinBuff.reset(); + try { + IFrame joinBuff = new VSizeFrame(ctx); + outerReader.open(); + try { + while (outerReader.nextFrame(joinBuff)) { + nlj.join(joinBuff.getBuffer(), writer); + joinBuff.reset(); + } + nlj.completeJoin(writer); + } finally { + nlj.releaseMemory(); + } + } finally { + outerReader.close(); } - - nlj.closeJoin(writer); - outerReader.close(); - innerReader.close(); } }; return op; } - } - - public void setSkipInMemHJ(boolean b) { - skipInMemoryHJ = b; - } - - public void setForceNLJ(boolean b) { - forceNLJ = b; - } - - public void setForceRR(boolean b) { - forceRoleReversal = !isLeftOuter && b; } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java index 4253114..e7da174 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java @@ -82,14 +82,12 @@ while (in.nextFrame(frame)) { writer.nextFrame(frame.getBuffer()); } - } catch (Throwable th) { - throw new HyracksDataException(th); } finally { in.close(); } - } catch (Throwable th) { + } catch (Exception e) { writer.fail(); - throw new HyracksDataException(th); + throw e; } finally { writer.close(); if (numConsumers.decrementAndGet() == 0) { diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java index 90b4b6c..b422ef4 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java @@ -80,7 +80,8 @@ final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(outRecordDesc); return new AbstractUnaryInputSinkOperatorNodePushable() { - IFrameWriter datasetPartitionWriter; + private IFrameWriter datasetPartitionWriter; + private boolean failed = false; @Override public void open() throws HyracksDataException { @@ -110,15 +111,22 @@ @Override public void fail() throws HyracksDataException { + failed = true; datasetPartitionWriter.fail(); } @Override public void close() throws HyracksDataException { - if (frameOutputStream.getTupleCount() > 0) { - frameOutputStream.flush(datasetPartitionWriter); + try { + if (!failed && frameOutputStream.getTupleCount() > 0) { + frameOutputStream.flush(datasetPartitionWriter); + } + } catch (Exception e) { + datasetPartitionWriter.fail(); + throw e; + } finally { + datasetPartitionWriter.close(); } - datasetPartitionWriter.close(); } }; } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java index 6d9d085..f4158ac 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java @@ -84,10 +84,13 @@ finalWriter = prepareSkipMergingFinalResultWriter(writer); finalWriter.open(); if (sorter != null) { - if (sorter.hasRemaining()) { - sorter.flush(finalWriter); + try { + if (sorter.hasRemaining()) { + sorter.flush(finalWriter); + } + } finally { + sorter.close(); } - sorter.close(); } } else { /** recycle sort buffer */ @@ -128,10 +131,15 @@ RunFileWriter mergeFileWriter = prepareIntermediateMergeRunFile(); IFrameWriter mergeResultWriter = prepareIntermediateMergeResultWriter(mergeFileWriter); - mergeResultWriter.open(); - merge(mergeResultWriter, partialRuns); - mergeResultWriter.close(); - + try { + mergeResultWriter.open(); + merge(mergeResultWriter, partialRuns); + } catch (Throwable t) { + mergeResultWriter.fail(); + throw t; + } finally { + mergeResultWriter.close(); + } reader = mergeFileWriter.createReader(); } runs.add(reader); diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java index cdabcda..ef9e4b6 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java @@ -103,7 +103,7 @@ writer.nextFrame(frame); } catch (Exception e) { writer.fail(); - throw new HyracksDataException(e); + throw e; } finally { writer.close(); } diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java index bba18b3..683857f 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java @@ -84,9 +84,9 @@ public void initialize() throws HyracksDataException { try { writer.open(); - } catch (Throwable th) { + } catch (Exception e) { writer.fail(); - throw new HyracksDataException(th); + throw e; } finally { writer.close(); } diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java index f83ab6a..521dff1 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java @@ -227,12 +227,14 @@ ResultValidateWriter writer = new ResultValidateWriter(keyValueMap); - getBuilder().open(); - for (IFrame frame : input) { - getBuilder().nextFrame(frame.getBuffer()); + try { + getBuilder().open(); + for (IFrame frame : input) { + getBuilder().nextFrame(frame.getBuffer()); + } + } finally { + getBuilder().close(); } - getBuilder().close(); - getMerger().setOutputFrameWriter(0, writer, outputRec); getMerger().initialize(); } diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java index 673c6fa..cfd4f30 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java @@ -130,13 +130,16 @@ assertTrue(runs.size() > 0); for (GeneratedRunFileReader run : runs) { - run.open(); - int preKey = Integer.MIN_VALUE; - while (run.nextFrame(frame)) { - fta.reset(frame.getBuffer()); - preKey = assertFTADataIsSorted(fta, keyValuePair, preKey); + try { + run.open(); + int preKey = Integer.MIN_VALUE; + while (run.nextFrame(frame)) { + fta.reset(frame.getBuffer()); + preKey = assertFTADataIsSorted(fta, keyValuePair, preKey); + } + } finally { + run.close(); } - run.close(); } assertTrue(keyValuePair.isEmpty()); } -- To view, visit https://asterix-gerrit.ics.uci.edu/1513 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I203168171e6dac16b57d2eda960823e3810e22a3 Gerrit-PatchSet: 14 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Yingyi Bu <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Taewoo Kim <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]>
