aglinxinyuan commented on code in PR #5876: URL: https://github.com/apache/texera/pull/5876#discussion_r3449388919
########## common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUDFOpDescV2Spec.scala: ########## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.udf.python + +import org.apache.texera.amber.core.executor.OpExecWithCode +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema} +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class PythonUDFOpDescV2Spec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + "PythonUDFOpDescV2.operatorInfo" should + "advertise the name, Python group, dynamic ports, and a default 1-in/1-out shape" in { + val info = (new PythonUDFOpDescV2).operatorInfo + info.userFriendlyName shouldBe "Python UDF" + info.operatorGroupName shouldBe OperatorGroupConstants.PYTHON_GROUP + info.dynamicInputPorts shouldBe true + info.dynamicOutputPorts shouldBe true + info.inputPorts should have length 1 + info.outputPorts should have length 1 + } + + "PythonUDFOpDescV2" should "default code/workers/flags" in { + val d = new PythonUDFOpDescV2 + d.code shouldBe "" + d.workers shouldBe 1 + d.retainInputColumns shouldBe false + d.defaultEnv shouldBe true + } + + "PythonUDFOpDescV2.getPhysicalOp" should + "wire OpExecWithCode(code, \"python\") and carry port identities" in { + val d = new PythonUDFOpDescV2 + d.code = "yield t" + val physical = d.getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithCode(code, language) => + language shouldBe "python" + code shouldBe "yield t" + case other => fail(s"expected OpExecWithCode, got $other") + } + physical.inputPorts.keySet shouldBe d.operatorInfo.inputPorts.map(_.id).toSet + physical.outputPorts.keySet shouldBe d.operatorInfo.outputPorts.map(_.id).toSet + } + + it should "reject a non-positive worker count" in { + val d = new PythonUDFOpDescV2 + d.workers = 0 + intercept[IllegalArgumentException] { d.getPhysicalOp(workflowId, executionId) } + } Review Comment: Added — getPhysicalOp now has a test asserting a RuntimeException for a blank envName when the default env is disabled. ########## common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUDFOpDescV2Spec.scala: ########## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.udf.python + +import org.apache.texera.amber.core.executor.OpExecWithCode +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema} +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class PythonUDFOpDescV2Spec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + "PythonUDFOpDescV2.operatorInfo" should + "advertise the name, Python group, dynamic ports, and a default 1-in/1-out shape" in { + val info = (new PythonUDFOpDescV2).operatorInfo + info.userFriendlyName shouldBe "Python UDF" + info.operatorGroupName shouldBe OperatorGroupConstants.PYTHON_GROUP + info.dynamicInputPorts shouldBe true + info.dynamicOutputPorts shouldBe true + info.inputPorts should have length 1 + info.outputPorts should have length 1 + } + + "PythonUDFOpDescV2" should "default code/workers/flags" in { + val d = new PythonUDFOpDescV2 + d.code shouldBe "" + d.workers shouldBe 1 + d.retainInputColumns shouldBe false + d.defaultEnv shouldBe true + } + + "PythonUDFOpDescV2.getPhysicalOp" should + "wire OpExecWithCode(code, \"python\") and carry port identities" in { + val d = new PythonUDFOpDescV2 + d.code = "yield t" + val physical = d.getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithCode(code, language) => + language shouldBe "python" + code shouldBe "yield t" + case other => fail(s"expected OpExecWithCode, got $other") + } + physical.inputPorts.keySet shouldBe d.operatorInfo.inputPorts.map(_.id).toSet + physical.outputPorts.keySet shouldBe d.operatorInfo.outputPorts.map(_.id).toSet + } + + it should "reject a non-positive worker count" in { + val d = new PythonUDFOpDescV2 + d.workers = 0 + intercept[IllegalArgumentException] { d.getPhysicalOp(workflowId, executionId) } + } + + "PythonUDFOpDescV2 schema propagation" should + "emit only the output columns when input columns are not retained (default)" in { + val d = new PythonUDFOpDescV2 + d.outputColumns = List(new Attribute("res", AttributeType.INTEGER)) + val input = Schema().add(new Attribute("in", AttributeType.STRING)) + val out = d.getExternalOutputSchemas(Map(d.operatorInfo.inputPorts.head.id -> input)) + out shouldBe Map( + d.operatorInfo.outputPorts.head.id -> Schema().add( + new Attribute("res", AttributeType.INTEGER) + ) + ) + } + + it should "retain input columns plus the output columns when retainInputColumns is true" in { + val d = new PythonUDFOpDescV2 + d.retainInputColumns = true + d.outputColumns = List(new Attribute("res", AttributeType.INTEGER)) + val input = Schema().add(new Attribute("in", AttributeType.STRING)) + val out = d.getExternalOutputSchemas(Map(d.operatorInfo.inputPorts.head.id -> input)) + out shouldBe Map( + d.operatorInfo.outputPorts.head.id -> Schema() + .add(new Attribute("in", AttributeType.STRING)) + .add(new Attribute("res", AttributeType.INTEGER)) + ) + } Review Comment: Added a regression test asserting getExternalOutputSchemas throws when a retained input column collides with an output column. ########## common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUDFOpDescV2Spec.scala: ########## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.udf.python + +import org.apache.texera.amber.core.executor.OpExecWithCode +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema} +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class PythonUDFOpDescV2Spec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + "PythonUDFOpDescV2.operatorInfo" should + "advertise the name, Python group, dynamic ports, and a default 1-in/1-out shape" in { + val info = (new PythonUDFOpDescV2).operatorInfo + info.userFriendlyName shouldBe "Python UDF" + info.operatorGroupName shouldBe OperatorGroupConstants.PYTHON_GROUP + info.dynamicInputPorts shouldBe true + info.dynamicOutputPorts shouldBe true + info.inputPorts should have length 1 + info.outputPorts should have length 1 + } + + "PythonUDFOpDescV2" should "default code/workers/flags" in { + val d = new PythonUDFOpDescV2 + d.code shouldBe "" + d.workers shouldBe 1 + d.retainInputColumns shouldBe false + d.defaultEnv shouldBe true + } + + "PythonUDFOpDescV2.getPhysicalOp" should + "wire OpExecWithCode(code, \"python\") and carry port identities" in { + val d = new PythonUDFOpDescV2 + d.code = "yield t" + val physical = d.getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithCode(code, language) => + language shouldBe "python" + code shouldBe "yield t" + case other => fail(s"expected OpExecWithCode, got $other") + } + physical.inputPorts.keySet shouldBe d.operatorInfo.inputPorts.map(_.id).toSet + physical.outputPorts.keySet shouldBe d.operatorInfo.outputPorts.map(_.id).toSet + } + + it should "reject a non-positive worker count" in { + val d = new PythonUDFOpDescV2 + d.workers = 0 + intercept[IllegalArgumentException] { d.getPhysicalOp(workflowId, executionId) } + } + + "PythonUDFOpDescV2 schema propagation" should + "emit only the output columns when input columns are not retained (default)" in { + val d = new PythonUDFOpDescV2 + d.outputColumns = List(new Attribute("res", AttributeType.INTEGER)) + val input = Schema().add(new Attribute("in", AttributeType.STRING)) + val out = d.getExternalOutputSchemas(Map(d.operatorInfo.inputPorts.head.id -> input)) + out shouldBe Map( + d.operatorInfo.outputPorts.head.id -> Schema().add( + new Attribute("res", AttributeType.INTEGER) + ) + ) + } + + it should "retain input columns plus the output columns when retainInputColumns is true" in { + val d = new PythonUDFOpDescV2 + d.retainInputColumns = true + d.outputColumns = List(new Attribute("res", AttributeType.INTEGER)) + val input = Schema().add(new Attribute("in", AttributeType.STRING)) + val out = d.getExternalOutputSchemas(Map(d.operatorInfo.inputPorts.head.id -> input)) + out shouldBe Map( + d.operatorInfo.outputPorts.head.id -> Schema() + .add(new Attribute("in", AttributeType.STRING)) + .add(new Attribute("res", AttributeType.INTEGER)) + ) + } + + "PythonUDFOpDescV2" should "round-trip its config fields through the polymorphic base" in { + val d = new PythonUDFOpDescV2 + d.code = "print(1)" + d.workers = 3 + d.retainInputColumns = true + d.defaultEnv = false + d.envName = "myenv" + val restored = objectMapper.readValue(objectMapper.writeValueAsString(d), classOf[LogicalOp]) + restored shouldBe a[PythonUDFOpDescV2] + val p = restored.asInstanceOf[PythonUDFOpDescV2] + p.code shouldBe "print(1)" + p.workers shouldBe 3 + p.retainInputColumns shouldBe true + p.defaultEnv shouldBe false + p.envName shouldBe "myenv" + } Review Comment: Done — the round-trip now sets and asserts outputColumns. ########## common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/source/PythonUDFSourceOpDescV2Spec.scala: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.udf.python.source + +import org.apache.texera.amber.core.executor.OpExecWithCode +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema} +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class PythonUDFSourceOpDescV2Spec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + "PythonUDFSourceOpDescV2.operatorInfo" should + "advertise the 1-out Python UDF source (no inputs, one output, reconfigurable)" in { + val info = (new PythonUDFSourceOpDescV2).operatorInfo + info.userFriendlyName shouldBe "1-out Python UDF" + info.operatorGroupName shouldBe OperatorGroupConstants.PYTHON_GROUP + info.inputPorts shouldBe empty + info.outputPorts should have length 1 + info.supportReconfiguration shouldBe true + } + + "PythonUDFSourceOpDescV2.sourceSchema" should "be empty by default and reflect the configured columns" in { + (new PythonUDFSourceOpDescV2).sourceSchema().getAttributes shouldBe empty + val d = new PythonUDFSourceOpDescV2 + d.columns = List(new Attribute("a", AttributeType.STRING)) + d.sourceSchema() shouldBe Schema().add(new Attribute("a", AttributeType.STRING)) + } + + "PythonUDFSourceOpDescV2.getPhysicalOp" should + "wire OpExecWithCode(code, \"python\") as a source op with one output port" in { + val d = new PythonUDFSourceOpDescV2 + d.code = "yield {'a': 1}" + val physical = d.getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithCode(code, language) => + language shouldBe "python" + code shouldBe "yield {'a': 1}" + case other => fail(s"expected OpExecWithCode, got $other") + } + physical.outputPorts.keySet shouldBe d.operatorInfo.outputPorts.map(_.id).toSet + } + + it should "reject a non-positive worker count" in { + val d = new PythonUDFSourceOpDescV2 + d.workers = 0 + intercept[IllegalArgumentException] { d.getPhysicalOp(workflowId, executionId) } + } + + "PythonUDFSourceOpDescV2" should "round-trip its config fields through the polymorphic base" in { + val d = new PythonUDFSourceOpDescV2 + d.code = "yield" + d.workers = 2 + d.defaultEnv = false + d.envName = "venv" + val restored = objectMapper.readValue(objectMapper.writeValueAsString(d), classOf[LogicalOp]) + restored shouldBe a[PythonUDFSourceOpDescV2] + val p = restored.asInstanceOf[PythonUDFSourceOpDescV2] + p.code shouldBe "yield" + p.workers shouldBe 2 + p.defaultEnv shouldBe false + p.envName shouldBe "venv" + } Review Comment: Done — the round-trip now covers the columns field. ########## common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonTableReducerOpDescSpec.scala: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.udf.python + +import org.apache.texera.amber.core.tuple.{AttributeType, Schema} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class PythonTableReducerOpDescSpec extends AnyFlatSpec with Matchers { + + private def unit(name: String, expr: String, t: AttributeType): LambdaAttributeUnit = + new LambdaAttributeUnit(name, expr, null, t) + + "PythonTableReducerOpDesc.operatorInfo" should "advertise the name and Python group" in { + val info = (new PythonTableReducerOpDesc).operatorInfo + info.userFriendlyName shouldBe "Python Table Reducer" + info.operatorDescription shouldBe "Reduce Table to Tuple" + info.operatorGroupName shouldBe OperatorGroupConstants.PYTHON_GROUP + info.inputPorts should have length 1 + info.outputPorts should have length 1 + } + + "PythonTableReducerOpDesc" should "default lambdaAttributeUnits to an empty list" in { + (new PythonTableReducerOpDesc).lambdaAttributeUnits shouldBe empty + } + + "PythonTableReducerOpDesc.getOutputSchemas" should + "fold each lambda unit into an output column keyed by the declared output port" in { + val d = new PythonTableReducerOpDesc + d.lambdaAttributeUnits = List(unit("score", "1 + 1", AttributeType.INTEGER)) + d.getOutputSchemas(Map.empty) shouldBe Map( + d.operatorInfo.outputPorts.head.id -> Schema().add("score", AttributeType.INTEGER) + ) + } + + it should "reject an empty lambda list" in { + intercept[IllegalArgumentException] { + (new PythonTableReducerOpDesc).getOutputSchemas(Map.empty) + } + } + + "PythonTableReducerOpDesc.generatePythonCode" should "emit the reducer table operator" in { + val d = new PythonTableReducerOpDesc + d.lambdaAttributeUnits = List(unit("score", "1 + 1", AttributeType.INTEGER)) + val code = d.generatePythonCode() + code should include("class ProcessTableOperator(UDFTableOperator)") + code should include("score") + } + + "PythonTableReducerOpDesc" should "round-trip its lambda units through the polymorphic base" in { + val d = new PythonTableReducerOpDesc + d.lambdaAttributeUnits = List(unit("score", "1 + 1", AttributeType.INTEGER)) + val restored = objectMapper.readValue(objectMapper.writeValueAsString(d), classOf[LogicalOp]) + restored shouldBe a[PythonTableReducerOpDesc] + val r = restored.asInstanceOf[PythonTableReducerOpDesc] + r.lambdaAttributeUnits should have length 1 + r.lambdaAttributeUnits.head.attributeName shouldBe "score" + r.lambdaAttributeUnits.head.attributeType shouldBe AttributeType.INTEGER + } Review Comment: Done — the round-trip now also pins expression and newAttributeName. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
