mengw15 commented on code in PR #4024:
URL: https://github.com/apache/texera/pull/4024#discussion_r2509916930
##########
common/workflow-core/src/main/scala/org/apache/amber/core/tuple/Tuple.scala:
##########
@@ -112,6 +112,29 @@ case class Tuple @JsonCreator() (
object Tuple {
+ /** Build a Tuple from (name -> value) pairs, coercing values to the schema
types. */
+ def of(schema: Schema, values: (String, Any)*): Tuple = {
+ val nameToValue: Map[String, Any] = values.toMap
+ val coercedFields: Array[Any] =
+ schema.getAttributes.map { attribute =>
+ val rawValue: Any = nameToValue.getOrElse(attribute.getName, null)
+ AttributeTypeUtils.parseField(rawValue, attribute.getType, force =
true)
+ }.toArray
+ Tuple(schema, coercedFields)
+ }
+
+ /** Build a Tuple without coercion.
+ * Uses the builder’s runtime type checks; values must already match the
schema’s field classes.
+ * Missing attributes (or unknown attribute names) will cause an error.
+ */
+ def ofStrict(schema: Schema, values: (String, Any)*): Tuple =
Review Comment:
do we need this function? there is no usage of this function
##########
common/workflow-core/src/main/scala/org/apache/amber/core/tuple/Schema.scala:
##########
@@ -191,6 +191,10 @@ case class Schema @JsonCreator() (
object Schema {
+ /** Build a Schema with (name, type) pairs, in order, rejecting duplicates.
*/
+ def of(attrs: (String, AttributeType)*): Schema =
Review Comment:
I checked that the change is you moved the two functions of TupleOf and
SchemaOf from the
common/workflow-operator/src/test/scala/org/apache/amber/operator/sort/StableMergeSortOpExecSpec.scala
file to here
what's the reason of this change?
and should we separate this change to a different pr? since its only relate
to the test file
##########
common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeTypeUtils.scala:
##########
@@ -387,6 +387,136 @@ object AttributeTypeUtils extends Serializable {
}
}
+ /** Three-way compare for the given attribute type.
Review Comment:
I checked, this change is basically move the functions from the
common/workflow-operator/src/main/scala/org/apache/amber/operator/aggregate/AggregationOperation.scala
file to this file.
But you also refactor the code, what is the reason of refactoring(or can we
use the original code here)?
##########
common/workflow-core/src/main/scala/org/apache/amber/core/tuple/Tuple.scala:
##########
@@ -112,6 +112,29 @@ case class Tuple @JsonCreator() (
object Tuple {
+ /** Build a Tuple from (name -> value) pairs, coercing values to the schema
types. */
+ def of(schema: Schema, values: (String, Any)*): Tuple = {
Review Comment:
same as Schema.of
"I checked that the change is you moved the two functions of TupleOf and
SchemaOf from the
common/workflow-operator/src/test/scala/org/apache/amber/operator/sort/StableMergeSortOpExecSpec.scala
file to here
what's the reason of this change?
and should we separate this change to a different pr? since its only relate
to the test file"
##########
common/workflow-operator/src/main/scala/org/apache/amber/operator/sortPartitions/SortPartitionsOpExec.scala:
##########
@@ -47,18 +47,11 @@ class SortPartitionsOpExec(descString: String) extends
OperatorExecutor {
override def onFinish(port: Int): Iterator[TupleLike] = sortTuples()
- private def compareTuples(t1: Tuple, t2: Tuple): Boolean = {
- val attributeType =
t1.getSchema.getAttribute(desc.sortAttributeName).getType
- val attributeIndex = t1.getSchema.getIndex(desc.sortAttributeName)
- attributeType match {
- case AttributeType.LONG =>
- t1.getField[Long](attributeIndex) < t2.getField[Long](attributeIndex)
- case AttributeType.INTEGER =>
- t1.getField[Int](attributeIndex) < t2.getField[Int](attributeIndex)
- case AttributeType.DOUBLE =>
- t1.getField[Double](attributeIndex) <
t2.getField[Double](attributeIndex)
- case _ =>
- true // unsupported type
- }
- }
+ private def compareTuples(t1: Tuple, t2: Tuple): Boolean =
Review Comment:
can we remove this function and use AttributeTypeUtils.compare directly in
line 41?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]