Copilot commented on code in PR #4816:
URL: https://github.com/apache/texera/pull/4816#discussion_r3177595183


##########
common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sleep/SleepOpDescSpec.scala:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.sleep
+
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class SleepOpDescSpec extends AnyFlatSpec with Matchers {
+
+  private val workflowId = WorkflowIdentity(1L)
+  private val executionId = ExecutionIdentity(1L)
+
+  "SleepOpDesc.operatorInfo" should "advertise the user-friendly name and 
Control group" in {
+    val info = (new SleepOpDesc).operatorInfo
+    info.userFriendlyName shouldBe "Sleep"
+    info.operatorGroupName shouldBe OperatorGroupConstants.CONTROL_GROUP
+    info.operatorDescription should include("Sleep")
+  }
+
+  it should "expose exactly one input port and one output port" in {
+    val info = (new SleepOpDesc).operatorInfo
+    info.inputPorts should have length 1
+    info.outputPorts should have length 1
+  }
+
+  "SleepOpDesc.getPhysicalOp" should "produce a non-parallelizable PhysicalOp 
pinned to a single worker" in {
+    // Sleep is non-parallelizable on purpose: tuples must traverse the
+    // sleep path serially so the delay is observable as a back-pressure
+    // signal upstream. The descriptor pins both flags explicitly.
+    val op = new SleepOpDesc
+    op.sleepTime = 5
+    val physical = op.getPhysicalOp(workflowId, executionId)
+    physical.parallelizable shouldBe false
+    physical.suggestedWorkerNum shouldBe Some(1)
+  }
+
+  it should "wire the SleepOpExec class name into the OpExecInitInfo" in {
+    // The descriptor's getPhysicalOp encodes a fully-qualified Exec class
+    // name; pin it so a rename of SleepOpExec breaks this spec deliberately.
+    val op = new SleepOpDesc
+    op.sleepTime = 1
+    val physical = op.getPhysicalOp(workflowId, executionId)
+    physical.opExecInitInfo.toString should include(
+      "org.apache.texera.amber.operator.sleep.SleepOpExec"
+    )
+  }

Review Comment:
   Using `physical.opExecInitInfo.toString` to assert the executor class name 
is brittle (toString format changes will break the test even if behavior is 
unchanged). Prefer pattern matching on `OpExecWithClassName` and asserting the 
`className` field directly (and optionally `descString` non-empty).



##########
common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/fetcher/URLFetcherOpDescSpec.scala:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.fetcher
+
+import org.apache.texera.amber.core.tuple.AttributeType
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class URLFetcherOpDescSpec extends AnyFlatSpec with Matchers {
+
+  private val workflowId = WorkflowIdentity(1L)
+  private val executionId = ExecutionIdentity(1L)
+
+  private def configured(decoding: DecodingMethod): URLFetcherOpDesc = {
+    val op = new URLFetcherOpDesc
+    op.url = "https://example.test/data";
+    op.decodingMethod = decoding
+    op
+  }
+
+  "URLFetcherOpDesc.operatorInfo" should "advertise the user-friendly name and 
API group" in {
+    val info = (new URLFetcherOpDesc).operatorInfo
+    info.userFriendlyName shouldBe "URL Fetcher"
+    info.operatorGroupName shouldBe OperatorGroupConstants.API_GROUP
+    info.operatorDescription should include("URL")
+  }
+
+  it should "expose no input ports and one output port (source-shaped)" in {
+    val info = (new URLFetcherOpDesc).operatorInfo
+    info.inputPorts shouldBe empty
+    info.outputPorts should have length 1
+  }
+
+  "URLFetcherOpDesc.sourceSchema" should "produce a single STRING column when 
decoding is UTF-8" in {
+    val op = configured(DecodingMethod.UTF_8)
+    val schema = op.sourceSchema()
+    schema.getAttributes should have length 1
+    schema.getAttributes.head.getName shouldBe "URL content"
+    schema.getAttributes.head.getType shouldBe AttributeType.STRING
+  }
+
+  it should "produce an ANY column for raw-bytes decoding" in {
+    val op = configured(DecodingMethod.RAW_BYTES)
+    val schema = op.sourceSchema()
+    schema.getAttributes should have length 1
+    schema.getAttributes.head.getName shouldBe "URL content"
+    schema.getAttributes.head.getType shouldBe AttributeType.ANY
+  }
+
+  it should "default to ANY when decodingMethod is left unset (current 
behavior)" in {
+    // Pin: `var decodingMethod: DecodingMethod = _` defaults to null.
+    // sourceSchema's branch is `if (decodingMethod == DecodingMethod.UTF_8)
+    // STRING else ANY`, so a null comparison falls through to ANY without
+    // raising. Documenting the current behavior so a future explicit-null
+    // check breaks this spec deliberately.
+    val op = new URLFetcherOpDesc
+    op.url = "https://example.test/data";
+    val schema = op.sourceSchema()
+    schema.getAttributes.head.getType shouldBe AttributeType.ANY
+  }
+
+  "URLFetcherOpDesc.getPhysicalOp" should "wire the URLFetcherOpExec class 
name into the OpExecInitInfo" in {
+    val op = configured(DecodingMethod.UTF_8)
+    val physical = op.getPhysicalOp(workflowId, executionId)
+    physical.opExecInitInfo.toString should include(
+      "org.apache.texera.amber.operator.source.fetcher.URLFetcherOpExec"
+    )
+  }
+
+  it should "propagate sourceSchema onto the single output port" in {
+    val op = configured(DecodingMethod.UTF_8)
+    val physical = op.getPhysicalOp(workflowId, executionId)
+    physical.outputPorts.size shouldBe op.operatorInfo.outputPorts.size

Review Comment:
   The test name says it verifies schema propagation onto the output port, but 
the assertion only compares port counts. Consider exercising 
`physical.propagateSchema.func(...)` and asserting the returned mapping 
contains the output port id with a schema equal to `op.sourceSchema()` (or 
matching attributes/types).
   



##########
common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/fetcher/URLFetcherOpDescSpec.scala:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.fetcher
+
+import org.apache.texera.amber.core.tuple.AttributeType
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class URLFetcherOpDescSpec extends AnyFlatSpec with Matchers {
+
+  private val workflowId = WorkflowIdentity(1L)
+  private val executionId = ExecutionIdentity(1L)
+
+  private def configured(decoding: DecodingMethod): URLFetcherOpDesc = {
+    val op = new URLFetcherOpDesc
+    op.url = "https://example.test/data";
+    op.decodingMethod = decoding
+    op
+  }
+
+  "URLFetcherOpDesc.operatorInfo" should "advertise the user-friendly name and 
API group" in {
+    val info = (new URLFetcherOpDesc).operatorInfo
+    info.userFriendlyName shouldBe "URL Fetcher"
+    info.operatorGroupName shouldBe OperatorGroupConstants.API_GROUP
+    info.operatorDescription should include("URL")
+  }
+
+  it should "expose no input ports and one output port (source-shaped)" in {
+    val info = (new URLFetcherOpDesc).operatorInfo
+    info.inputPorts shouldBe empty
+    info.outputPorts should have length 1
+  }
+
+  "URLFetcherOpDesc.sourceSchema" should "produce a single STRING column when 
decoding is UTF-8" in {
+    val op = configured(DecodingMethod.UTF_8)
+    val schema = op.sourceSchema()
+    schema.getAttributes should have length 1
+    schema.getAttributes.head.getName shouldBe "URL content"
+    schema.getAttributes.head.getType shouldBe AttributeType.STRING
+  }
+
+  it should "produce an ANY column for raw-bytes decoding" in {
+    val op = configured(DecodingMethod.RAW_BYTES)
+    val schema = op.sourceSchema()
+    schema.getAttributes should have length 1
+    schema.getAttributes.head.getName shouldBe "URL content"
+    schema.getAttributes.head.getType shouldBe AttributeType.ANY
+  }
+
+  it should "default to ANY when decodingMethod is left unset (current 
behavior)" in {
+    // Pin: `var decodingMethod: DecodingMethod = _` defaults to null.
+    // sourceSchema's branch is `if (decodingMethod == DecodingMethod.UTF_8)
+    // STRING else ANY`, so a null comparison falls through to ANY without
+    // raising. Documenting the current behavior so a future explicit-null
+    // check breaks this spec deliberately.
+    val op = new URLFetcherOpDesc
+    op.url = "https://example.test/data";
+    val schema = op.sourceSchema()

Review Comment:
   This test uses `schema.getAttributes.head` without first asserting the 
schema has at least one attribute (unlike the other sourceSchema tests). Add a 
length check to avoid an unhelpful `NoSuchElementException` if the schema shape 
ever changes.
   



##########
common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/intersect/IntersectOpDescSpec.scala:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.intersect
+
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.{HashPartition, SinglePartition, 
UnknownPartition}
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class IntersectOpDescSpec extends AnyFlatSpec with Matchers {
+
+  private val workflowId = WorkflowIdentity(1L)
+  private val executionId = ExecutionIdentity(1L)
+
+  "IntersectOpDesc.operatorInfo" should "advertise the user-friendly name and 
Set group" in {
+    val info = (new IntersectOpDesc).operatorInfo
+    info.userFriendlyName shouldBe "Intersect"
+    info.operatorGroupName shouldBe OperatorGroupConstants.SET_GROUP
+    info.operatorDescription should include("intersect")
+  }
+
+  it should "expose two input ports (PortIdentity 0 and 1) and one blocking 
output" in {
+    val info = (new IntersectOpDesc).operatorInfo
+    info.inputPorts should have length 2
+    info.inputPorts.map(_.id.id) shouldBe List(0, 1)
+    info.outputPorts should have length 1
+    info.outputPorts.head.blocking shouldBe true
+  }
+
+  "IntersectOpDesc.getPhysicalOp" should "require HashPartition on both input 
ports" in {
+    val op = new IntersectOpDesc
+    val physical = op.getPhysicalOp(workflowId, executionId)
+    physical.partitionRequirement shouldBe List(
+      Option(HashPartition()),
+      Option(HashPartition())
+    )
+  }
+
+  it should "always derive HashPartition for the output regardless of input 
partitions" in {
+    // The Intersect set semantics require both inputs to be hash-aligned, so
+    // the derived output partition must remain hash even when the upstream
+    // inputs report differing partition kinds.
+    val op = new IntersectOpDesc
+    val physical = op.getPhysicalOp(workflowId, executionId)
+    physical.derivePartition(List(SinglePartition(), UnknownPartition())) 
shouldBe HashPartition()
+    physical.derivePartition(
+      List(HashPartition(List("a")), HashPartition(List("b")))
+    ) shouldBe HashPartition()
+  }
+
+  it should "wire the IntersectOpExec class name into the OpExecInitInfo" in {
+    val op = new IntersectOpDesc
+    val physical = op.getPhysicalOp(workflowId, executionId)
+    physical.opExecInitInfo.toString should include(
+      "org.apache.texera.amber.operator.intersect.IntersectOpExec"
+    )
+  }

Review Comment:
   This assertion relies on `opExecInitInfo.toString` containing the class 
name, which is an implementation detail and can change without functional 
impact. Prefer matching `physical.opExecInitInfo` as 
`OpExecWithClassName(className, _)` and asserting `className` equals the 
expected FQCN.



##########
common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/fetcher/URLFetcherOpDescSpec.scala:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.fetcher
+
+import org.apache.texera.amber.core.tuple.AttributeType
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class URLFetcherOpDescSpec extends AnyFlatSpec with Matchers {
+
+  private val workflowId = WorkflowIdentity(1L)
+  private val executionId = ExecutionIdentity(1L)
+
+  private def configured(decoding: DecodingMethod): URLFetcherOpDesc = {
+    val op = new URLFetcherOpDesc
+    op.url = "https://example.test/data";
+    op.decodingMethod = decoding
+    op
+  }
+
+  "URLFetcherOpDesc.operatorInfo" should "advertise the user-friendly name and 
API group" in {
+    val info = (new URLFetcherOpDesc).operatorInfo
+    info.userFriendlyName shouldBe "URL Fetcher"
+    info.operatorGroupName shouldBe OperatorGroupConstants.API_GROUP
+    info.operatorDescription should include("URL")
+  }
+
+  it should "expose no input ports and one output port (source-shaped)" in {
+    val info = (new URLFetcherOpDesc).operatorInfo
+    info.inputPorts shouldBe empty
+    info.outputPorts should have length 1
+  }
+
+  "URLFetcherOpDesc.sourceSchema" should "produce a single STRING column when 
decoding is UTF-8" in {
+    val op = configured(DecodingMethod.UTF_8)
+    val schema = op.sourceSchema()
+    schema.getAttributes should have length 1
+    schema.getAttributes.head.getName shouldBe "URL content"
+    schema.getAttributes.head.getType shouldBe AttributeType.STRING
+  }
+
+  it should "produce an ANY column for raw-bytes decoding" in {
+    val op = configured(DecodingMethod.RAW_BYTES)
+    val schema = op.sourceSchema()
+    schema.getAttributes should have length 1
+    schema.getAttributes.head.getName shouldBe "URL content"
+    schema.getAttributes.head.getType shouldBe AttributeType.ANY
+  }
+
+  it should "default to ANY when decodingMethod is left unset (current 
behavior)" in {
+    // Pin: `var decodingMethod: DecodingMethod = _` defaults to null.
+    // sourceSchema's branch is `if (decodingMethod == DecodingMethod.UTF_8)
+    // STRING else ANY`, so a null comparison falls through to ANY without
+    // raising. Documenting the current behavior so a future explicit-null
+    // check breaks this spec deliberately.
+    val op = new URLFetcherOpDesc
+    op.url = "https://example.test/data";
+    val schema = op.sourceSchema()
+    schema.getAttributes.head.getType shouldBe AttributeType.ANY
+  }
+
+  "URLFetcherOpDesc.getPhysicalOp" should "wire the URLFetcherOpExec class 
name into the OpExecInitInfo" in {
+    val op = configured(DecodingMethod.UTF_8)
+    val physical = op.getPhysicalOp(workflowId, executionId)
+    physical.opExecInitInfo.toString should include(
+      "org.apache.texera.amber.operator.source.fetcher.URLFetcherOpExec"
+    )
+  }

Review Comment:
   Checking the executor class via `physical.opExecInitInfo.toString` is 
brittle. Prefer asserting via pattern match on `OpExecWithClassName` and 
checking the `className` field so the test only fails on actual wiring changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to