Yicong-Huang commented on code in PR #5707:
URL: https://github.com/apache/texera/pull/5707#discussion_r3411187509
##########
common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala:
##########
@@ -133,6 +133,35 @@ object DocumentFactory {
}
}
+ /**
+ * Create the document at `uri`, unless `reuseExisting` is set and a
document
+ * already exists there -- in which case the existing document is kept
+ * untouched. This lets a caller whose output accumulates across re-runs
+ * (e.g. a LoopEnd port whose region re-executes once per loop iteration)
+ * preserve the already-populated document instead of clobbering it, since
+ * `createDocument` overrides any existing document.
+ *
+ * `exists` / `create` default to this object's own `documentExists` /
+ * `createDocument`; they are parameterized only so the create-or-reuse
+ * decision can be unit-tested without an iceberg backend.
+ *
+ * @return true iff a document was (re)created.
+ */
+ def createOrReuseDocument(
+ uri: URI,
+ schema: Schema,
+ reuseExisting: Boolean,
+ exists: URI => Boolean = documentExists,
+ create: (URI, Schema) => Unit = (u, s) => { createDocument(u, s); () }
+ ): Boolean = {
Review Comment:
it is weird to return a boolean, as the callsite needs to do a branch to
handle it.
how about return the document itself, no matter if it is reused or newly
created? then callsite logic can be the same.
##########
common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/metadata/OutputPortReuseFlagSpec.scala:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.metadata
+
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+/**
+ * Guard for the `OutputPort.reusesOutputStorage` flag.
+ *
+ * The flag tells the region scheduler to reuse (append to) a port's storage
+ * across region re-executions instead of recreating it. Only an operator
whose
+ * output accumulates across re-executions should set it -- today that is no
+ * operator on `main` (the only one that will, Loop End, is not yet merged).
+ *
+ * This pins the flag off for every registered operator so it can't be turned
+ * on unexpectedly. When the loop operators land, update this to allow Loop
+ * End's output port (and only it).
+ */
+class OutputPortReuseFlagSpec extends AnyFlatSpec with Matchers {
+
+ "No registered operator" should "enable OutputPort.reusesOutputStorage on
any of its output ports" in {
+ OperatorMetadataGenerator.operatorTypeMap.keys.foreach { opClass =>
+ opClass.getConstructor().newInstance().operatorInfo.outputPorts.foreach
{ port =>
+ withClue(s"${opClass.getSimpleName} / output port ${port.id}: ") {
+ port.reusesOutputStorage shouldBe false
+ }
+ }
+ }
Review Comment:
thanks for adding the test. good to have. but what I meant is more on the
runtime check in the scheduler, an assertion would be great to prevent
production from using this flag.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]