Xiao-zhen-Liu commented on code in PR #5434:
URL: https://github.com/apache/texera/pull/5434#discussion_r3392576067
##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala:
##########
@@ -580,10 +580,8 @@ class RegionExecutionCoordinator(
DocumentFactory.createDocument(stateURI, State.schema)
if (!isRestart) {
val (_, eid, _, _) = decodeURI(resultURI)
Review Comment:
Inherited from the old code, but worth a look now that this defines an event
contract: the eid is parsed back out of the URI string here
(`decodeURI(resultURI)`), even though the only consumer is execution-scoped —
`attachToExecution` already has `executionId` in hand. You could drop `eid`
from the event and the `decodeURI` call entirely, and let the subscriber use
its own execution id. Self-describing events are also a reasonable choice, so
feel free to keep it; flagging since it removes a field and a decode.
##########
amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala:
##########
@@ -353,6 +357,10 @@ class ExecutionResultService(
)
)
+ addSubscription(
+
client.registerCallback[OperatorPortResultUriAvailable](persistOperatorPortResultUri)
Review Comment:
Before this PR the engine wrote the row unconditionally; now
`operator_port_executions` only gets populated if a result service was attached
before `startWorkflow`. That holds today (both the websocket and sync REST
paths attach in `WorkflowExecutionService.executeWorkflow` before starting),
but it is an invariant enforced by call ordering rather than by structure.
Worth a short comment here or on the event definition saying "must be
registered before the workflow starts, otherwise result URIs are silently never
persisted" — the failure mode is invisible (results compute fine, the dashboard
just cannot find them), so the next person touching startup ordering should hit
a warning sign.
##########
amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala:
##########
@@ -501,4 +509,19 @@ class ExecutionResultService(
}
}
+ /**
+ * Callback body for `OperatorPortResultUriAvailable`: persist the URI to
+ * `operator_port_executions`. Lifted to an instance method so a unit test
+ * can drive it directly without spinning up an `AmberClient`.
+ */
+ private[service] def persistOperatorPortResultUri(
+ evt: OperatorPortResultUriAvailable
+ ): Unit = {
+ WorkflowExecutionsResource.insertOperatorPortResultUri(
Review Comment:
This method only forwards to a static helper on the JAX-RS resource, but
#5430 scoped this work as deleting that helper. Now that
`ExecutionResultService` is its only production caller, the five lines of DAO
code could just live here and
`WorkflowExecutionsResource.insertOperatorPortResultUri` could be deleted as
planned (the resource spec can insert fixture rows with
`OperatorPortExecutionsDao` directly). That would also make the PR match its
own description. If you would rather defer that to the follow-up that moves the
other static DB helpers off the resource, that is fine too — but please update
the description and the issue so the series stays trackable.
##########
amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala:
##########
@@ -184,32 +209,24 @@ object TestUtils {
ControllerConfig.default,
error => {}
)
+ val findResultUri = collectResultUriLookup(client)
Review Comment:
The collect-terminal-results block here is character-for-character identical
to the one in `DataProcessingSpec`. The duplication predates this PR, but since
both copies were rewritten here anyway, extracting a shared helper would halve
the diff and gives the rest of the #5424 series one place to update instead of
two. (nit)
##########
amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala:
##########
@@ -159,6 +162,28 @@ object TestUtils {
p
}
+ /**
+ * Subscribe to engine-emitted result-URI events and return a lookup over
+ * the collected URIs keyed by `(logicalOpId, portId)`. Test fixtures use
+ * this to observe URIs directly from the event stream, bypassing the
+ * `operator_port_executions` table (which only gets populated in
+ * production by `ExecutionResultService`'s parallel subscription).
+ */
+ def collectResultUriLookup(
Review Comment:
Before this PR, `DataProcessingSpec` covered the full path: engine inserts
the row, test reads it back through `getResultUriByLogicalPortId`. Now the e2e
tests observe the event directly and the DB insert is only covered by the
isolated callback-body unit test — nothing anywhere verifies that
`attachToExecution` actually wires the event to the insert. Since
`DataProcessingSpec` already stands up the mock DB, one cheap option: have this
helper (or one test case) also call
`WorkflowExecutionsResource.insertOperatorPortResultUri` in the callback and
keep a single DB-backed assertion. That restores end-to-end coverage of the
seam this PR introduces, and answers @kunwp1's question about what happens when
the registration is missing.
##########
amber/src/test/scala/org/apache/texera/web/service/ExecutionResultServiceSpec.scala:
##########
@@ -36,8 +56,110 @@ import
org.apache.texera.web.service.ExecutionResultService.{
}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+import java.net.URI
+import java.sql.Timestamp
+import scala.jdk.CollectionConverters._
+
+class ExecutionResultServiceSpec
+ extends AnyFlatSpec
+ with Matchers
+ with BeforeAndAfterAll
+ with BeforeAndAfterEach
+ with MockTexeraDB {
+
+ private val testWid: Integer = 9000 + scala.util.Random.nextInt(1000)
+ private val testUid: Integer = 9000 + scala.util.Random.nextInt(1000)
+ private var executionsDao: WorkflowExecutionsDao = _
+
+ override protected def beforeAll(): Unit = {
+ initializeDBAndReplaceDSLContext()
+ }
+
+ override protected def afterAll(): Unit = {
+ shutdownDB()
+ }
+
+ override protected def beforeEach(): Unit = {
+ val user = new User
+ user.setUid(testUid)
+ user.setName("execution-result-test-user")
+ user.setEmail(s"[email protected]")
+ user.setPassword("password")
+ new UserDao(getDSLContext.configuration()).insert(user)
+
+ val workflow = new Workflow
+ workflow.setWid(testWid)
+ workflow.setName(s"execution-result-test-$testWid")
+ workflow.setContent("{}")
+ workflow.setDescription("")
+ workflow.setCreationTime(new Timestamp(System.currentTimeMillis()))
+ workflow.setLastModifiedTime(new Timestamp(System.currentTimeMillis()))
+ new WorkflowDao(getDSLContext.configuration()).insert(workflow)
+
+ val version = new WorkflowVersion
+ version.setWid(testWid)
+ version.setContent("{}")
+ version.setCreationTime(new Timestamp(System.currentTimeMillis()))
+ new WorkflowVersionDao(getDSLContext.configuration()).insert(version)
+
+ executionsDao = new WorkflowExecutionsDao(getDSLContext.configuration())
+ }
+
+ override protected def afterEach(): Unit = {
+ val ctx = getDSLContext
+ ctx
+ .deleteFrom(OPERATOR_PORT_EXECUTIONS)
+ .where(
+ OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.in(
+ ctx.select(WORKFLOW_EXECUTIONS.EID).from(WORKFLOW_EXECUTIONS)
+ )
+ )
+ .execute()
+ ctx.deleteFrom(WORKFLOW_EXECUTIONS).execute()
+ import org.apache.texera.dao.jooq.generated.Tables.{USER, WORKFLOW,
WORKFLOW_VERSION}
+
ctx.deleteFrom(WORKFLOW_VERSION).where(WORKFLOW_VERSION.WID.eq(testWid)).execute()
+ ctx.deleteFrom(WORKFLOW).where(WORKFLOW.WID.eq(testWid)).execute()
+ ctx.deleteFrom(USER).where(USER.UID.eq(testUid)).execute()
+ }
-class ExecutionResultServiceSpec extends AnyFlatSpec with Matchers {
+ "persistOperatorPortResultUri" should
+ "insert the URI carried by an OperatorPortResultUriAvailable event" in {
+ val execution = new WorkflowExecutions
+ execution.setVid(1)
Review Comment:
To add to Chris's comment: this is not just cosmetic. The version's vid is
auto-generated and the sequence does not reset between tests, so `setVid(1)`
only matches because this test is declared first in the file. Adding any
earlier DB-touching test, or a future test-order change, turns this into an FK
violation. `version.getVid` after the insert (the DAO writes the generated key
back into the pojo, same as `execution.getEid` below) fixes it for good.
##########
amber/src/test/scala/org/apache/texera/web/service/ExecutionResultServiceSpec.scala:
##########
@@ -36,8 +56,110 @@ import
org.apache.texera.web.service.ExecutionResultService.{
}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+import java.net.URI
+import java.sql.Timestamp
+import scala.jdk.CollectionConverters._
+
+class ExecutionResultServiceSpec
+ extends AnyFlatSpec
+ with Matchers
+ with BeforeAndAfterAll
+ with BeforeAndAfterEach
+ with MockTexeraDB {
+
+ private val testWid: Integer = 9000 + scala.util.Random.nextInt(1000)
Review Comment:
The spec owns its embedded DB so the random wid/uid do not prevent any real
collision — they just make a failure non-reproducible across runs. Fixed values
are simpler and replay the same way every time. (nit)
##########
amber/src/test/scala/org/apache/texera/web/service/ExecutionResultServiceSpec.scala:
##########
@@ -36,8 +56,110 @@ import
org.apache.texera.web.service.ExecutionResultService.{
}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+import java.net.URI
+import java.sql.Timestamp
+import scala.jdk.CollectionConverters._
+
+class ExecutionResultServiceSpec
+ extends AnyFlatSpec
+ with Matchers
+ with BeforeAndAfterAll
+ with BeforeAndAfterEach
+ with MockTexeraDB {
+
+ private val testWid: Integer = 9000 + scala.util.Random.nextInt(1000)
+ private val testUid: Integer = 9000 + scala.util.Random.nextInt(1000)
+ private var executionsDao: WorkflowExecutionsDao = _
+
+ override protected def beforeAll(): Unit = {
+ initializeDBAndReplaceDSLContext()
+ }
+
+ override protected def afterAll(): Unit = {
+ shutdownDB()
+ }
+
+ override protected def beforeEach(): Unit = {
+ val user = new User
+ user.setUid(testUid)
+ user.setName("execution-result-test-user")
+ user.setEmail(s"[email protected]")
+ user.setPassword("password")
+ new UserDao(getDSLContext.configuration()).insert(user)
+
+ val workflow = new Workflow
+ workflow.setWid(testWid)
+ workflow.setName(s"execution-result-test-$testWid")
+ workflow.setContent("{}")
+ workflow.setDescription("")
+ workflow.setCreationTime(new Timestamp(System.currentTimeMillis()))
+ workflow.setLastModifiedTime(new Timestamp(System.currentTimeMillis()))
+ new WorkflowDao(getDSLContext.configuration()).insert(workflow)
+
+ val version = new WorkflowVersion
+ version.setWid(testWid)
+ version.setContent("{}")
+ version.setCreationTime(new Timestamp(System.currentTimeMillis()))
+ new WorkflowVersionDao(getDSLContext.configuration()).insert(version)
+
+ executionsDao = new WorkflowExecutionsDao(getDSLContext.configuration())
+ }
+
+ override protected def afterEach(): Unit = {
Review Comment:
Two small things: the executions/port-executions deletes here are unscoped
(they wipe the whole table) while the user/workflow deletes below are scoped to
the test ids — scoping them consistently would keep this spec safe if it ever
shares a DB. Also the `import ... {USER, WORKFLOW, WORKFLOW_VERSION}`
mid-method reads oddly; it can join the imports at the top of the file. (nit)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]