Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/6959#discussion_r33108664
  
    --- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java
 ---
    @@ -70,67 +92,38 @@
       private final boolean enablePerfMetrics;
     
       /**
    -   * @return true if UnsafeFixedWidthAggregationMap supports grouping keys 
with the given schema,
    -   *         false otherwise.
    -   */
    -  public static boolean supportsGroupKeySchema(StructType schema) {
    -    for (StructField field: schema.fields()) {
    -      if (!UnsafeRow.readableFieldTypes.contains(field.dataType())) {
    -        return false;
    -      }
    -    }
    -    return true;
    -  }
    -
    -  /**
    -   * @return true if UnsafeFixedWidthAggregationMap supports aggregation 
buffers with the given
    -   *         schema, false otherwise.
    -   */
    -  public static boolean supportsAggregationBufferSchema(StructType schema) 
{
    -    for (StructField field: schema.fields()) {
    -      if (!UnsafeRow.settableFieldTypes.contains(field.dataType())) {
    -        return false;
    -      }
    -    }
    -    return true;
    -  }
    -
    -  /**
        * Create a new UnsafeFixedWidthAggregationMap.
        *
    -   * @param emptyAggregationBuffer the default value for new keys (a 
"zero" of the agg. function)
    -   * @param aggregationBufferSchema the schema of the aggregation buffer, 
used for row conversion.
    -   * @param groupingKeySchema the schema of the grouping key, used for row 
conversion.
    +   * @param initProjection the default value for new keys (a "zero" of the 
agg. function)
    +   * @param keyConverter the converter of the grouping key, used for row 
conversion.
    +   * @param bufferConverter the converter of the aggregation buffer, used 
for row conversion.
        * @param memoryManager the memory manager used to allocate our Unsafe 
memory structures.
        * @param initialCapacity the initial capacity of the map (a sizing hint 
to avoid re-hashing).
        * @param enablePerfMetrics if true, performance metrics will be 
recorded (has minor perf impact)
        */
       public UnsafeFixedWidthAggregationMap(
    -      InternalRow emptyAggregationBuffer,
    -      StructType aggregationBufferSchema,
    -      StructType groupingKeySchema,
    +      Function1<InternalRow, InternalRow> initProjection,
    +      UnsafeRowConverter keyConverter,
    +      UnsafeRowConverter bufferConverter,
           TaskMemoryManager memoryManager,
           int initialCapacity,
           boolean enablePerfMetrics) {
    -    this.emptyAggregationBuffer =
    -      convertToUnsafeRow(emptyAggregationBuffer, aggregationBufferSchema);
    -    this.aggregationBufferSchema = aggregationBufferSchema;
    -    this.groupingKeyToUnsafeRowConverter = new 
UnsafeRowConverter(groupingKeySchema);
    -    this.groupingKeySchema = groupingKeySchema;
    -    this.map = new BytesToBytesMap(memoryManager, initialCapacity, 
enablePerfMetrics);
    +    this.initProjection = initProjection;
    --- End diff --
    
    This is necessary, for example, you need to create a `OpenHashSet` for 
every grouping key. Also, this is only called when having object in the initial 
value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to