aglinxinyuan commented on code in PR #5707:
URL: https://github.com/apache/texera/pull/5707#discussion_r3411235154
##########
common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala:
##########
@@ -133,6 +133,35 @@ object DocumentFactory {
}
}
+ /**
+ * Create the document at `uri`, unless `reuseExisting` is set and a
document
+ * already exists there -- in which case the existing document is kept
+ * untouched. This lets a caller whose output accumulates across re-runs
+ * (e.g. a LoopEnd port whose region re-executes once per loop iteration)
+ * preserve the already-populated document instead of clobbering it, since
+ * `createDocument` overrides any existing document.
+ *
+ * `exists` / `create` default to this object's own `documentExists` /
+ * `createDocument`; they are parameterized only so the create-or-reuse
+ * decision can be unit-tested without an iceberg backend.
+ *
+ * @return true iff a document was (re)created.
+ */
+ def createOrReuseDocument(
+ uri: URI,
+ schema: Schema,
+ reuseExisting: Boolean,
+ exists: URI => Boolean = documentExists,
+ create: (URI, Schema) => Unit = (u, s) => { createDocument(u, s); () }
+ ): Boolean = {
Review Comment:
Done in `5690bd5` — `createOrReuseDocument` now returns the
`VirtualDocument` (opened when reused, created otherwise) instead of a
`Boolean`, so the call site no longer branches on create-vs-reuse.
##########
common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/metadata/OutputPortReuseFlagSpec.scala:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.metadata
+
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+/**
+ * Guard for the `OutputPort.reusesOutputStorage` flag.
+ *
+ * The flag tells the region scheduler to reuse (append to) a port's storage
+ * across region re-executions instead of recreating it. Only an operator
whose
+ * output accumulates across re-executions should set it -- today that is no
+ * operator on `main` (the only one that will, Loop End, is not yet merged).
+ *
+ * This pins the flag off for every registered operator so it can't be turned
+ * on unexpectedly. When the loop operators land, update this to allow Loop
+ * End's output port (and only it).
+ */
+class OutputPortReuseFlagSpec extends AnyFlatSpec with Matchers {
+
+ "No registered operator" should "enable OutputPort.reusesOutputStorage on
any of its output ports" in {
+ OperatorMetadataGenerator.operatorTypeMap.keys.foreach { opClass =>
+ opClass.getConstructor().newInstance().operatorInfo.outputPorts.foreach
{ port =>
+ withClue(s"${opClass.getSimpleName} / output port ${port.id}: ") {
+ port.reusesOutputStorage shouldBe false
+ }
+ }
+ }
Review Comment:
Added the runtime guard in `5690bd5`: `RegionExecutionCoordinator` now
`require(...)`s that no output port sets `reusesOutputStorage` while
provisioning — it fails loudly if anything enables the flag in production,
since it only activates with the loop operators (not on main). Kept
`OutputPortReuseFlagSpec` too. Both get relaxed/updated when the loop operators
land.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]