Re: How to hint Spark to use HashAggregate() for UDAF

2017-01-09 Thread Andy Dang
Hi Takeshi,

Thanks for the answer. My UDAF aggregates data into an array of rows.

Apparently this makes it ineligible to using Hash-based aggregate based on
the logic at:
https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java#L74
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java#L108

The list of support data type is VERY limited unfortunately.

It doesn't make sense to me that data type must be mutable for the UDAF to
use hash-based aggregate, but I could be missing something here :). I could
achieve hash-based aggregate by turning this query to RDD mode, but that is
counter intuitive IMO.

---
Regards,
Andy

On Mon, Jan 9, 2017 at 2:05 PM, Takeshi Yamamuro <linguin@gmail.com>
wrote:

> Hi,
>
> Spark always uses hash-based aggregates if the types of aggregated data
> are supported there;
> otherwise, spark fails to use hash-based ones, then it uses sort-based
> ones.
> See: https://github.com/apache/spark/blob/master/sql/
> core/src/main/scala/org/apache/spark/sql/execution/
> aggregate/AggUtils.scala#L38
>
> So, I'm not sure about your query though, it seems the types of aggregated
> data in your query
> are not supported for hash-based aggregates.
>
> // maropu
>
>
>
> On Mon, Jan 9, 2017 at 10:52 PM, Andy Dang <nam...@gmail.com> wrote:
>
>> Hi all,
>>
>> It appears to me that Dataset.groupBy().agg(udaf) requires a full sort,
>> which is very inefficient for certain aggration:
>>
>> The code is very simple:
>> - I have a UDAF
>> - What I want to do is: dataset.groupBy(cols).agg(udaf).count()
>>
>> The physical plan I got was:
>> *HashAggregate(keys=[], functions=[count(1)], output=[count#67L])
>> +- Exchange SinglePartition
>>+- *HashAggregate(keys=[], functions=[partial_count(1)],
>> output=[count#71L])
>>   +- *Project
>>  +- Generate explode(internal_col#31), false, false,
>> [internal_col#42]
>> +- SortAggregate(key=[key#0], functions=[aggregatefunction(key#0,
>> nested#1, nestedArray#2, nestedObjectArray#3, value#4L,
>> com.[...]uDf@108b121f, 0, 0)], output=[internal_col#31])
>>+- *Sort [key#0 ASC], false, 0
>>   +- Exchange hashpartitioning(key#0, 200)
>>  +- SortAggregate(key=[key#0],
>> functions=[partial_aggregatefunction(key#0, nested#1, nestedArray#2,
>> nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)],
>> output=[key#0,internal_col#37])
>> +- *Sort [key#0 ASC], false, 0
>>+- Scan ExistingRDD[key#0,nested#1,nes
>> tedArray#2,nestedObjectArray#3,value#4L]
>>
>> How can I make Spark to use HashAggregate (like the count(*) expression)
>> instead of SortAggregate with my UDAF?
>>
>> Is it intentional? Is there an issue tracking this?
>>
>> ---
>> Regards,
>> Andy
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>


How to hint Spark to use HashAggregate() for UDAF

2017-01-09 Thread Andy Dang
Hi all,

It appears to me that Dataset.groupBy().agg(udaf) requires a full sort,
which is very inefficient for certain aggration:

The code is very simple:
- I have a UDAF
- What I want to do is: dataset.groupBy(cols).agg(udaf).count()

The physical plan I got was:
*HashAggregate(keys=[], functions=[count(1)], output=[count#67L])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_count(1)],
output=[count#71L])
  +- *Project
 +- Generate explode(internal_col#31), false, false,
[internal_col#42]
+- SortAggregate(key=[key#0],
functions=[aggregatefunction(key#0, nested#1, nestedArray#2,
nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)],
output=[internal_col#31])
   +- *Sort [key#0 ASC], false, 0
  +- Exchange hashpartitioning(key#0, 200)
 +- SortAggregate(key=[key#0],
functions=[partial_aggregatefunction(key#0, nested#1, nestedArray#2,
nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)],
output=[key#0,internal_col#37])
+- *Sort [key#0 ASC], false, 0
   +- Scan
ExistingRDD[key#0,nested#1,nestedArray#2,nestedObjectArray#3,value#4L]

How can I make Spark to use HashAggregate (like the count(*) expression)
instead of SortAggregate with my UDAF?

Is it intentional? Is there an issue tracking this?

---
Regards,
Andy


Re: Converting an InternalRow to a Row

2017-01-07 Thread Andy Dang
Ah, I missed that bit of documentation  my bad :). That totally explains
the behavior!

Thanks a lot!

---
Regards,
Andy

On Sat, Jan 7, 2017 at 10:11 AM, Liang-Chi Hsieh <vii...@gmail.com> wrote:

>
> Hi Andy,
>
> Thanks for sharing the code snippet.
>
> I am not sure if you miss something in the snippet, because some function
> signature are not matched, e.g.,
>
> @Override
> public StructType bufferSchema() {
> return new UserDefineType(schema, unboundedEncoder);
> }
>
>
> Maybe you define a class UserDefineType which extends StructType.
>
> Anyway, I noticed that in this line:
>
> data.add(unboundedEncoder.toRow(input));
>
> If you read the comment of "toRow", you will find it says:
>
> Note that multiple calls to toRow are allowed to return the same actual
> [[InternalRow]] object.  Thus, the caller should copy the result before
> making another call if required.
>
> I think it is why you get a list of the same entries.
>
> So you may need to change it to:
>
> data.add(unboundedEncoder.toRow(input).copy());
>
>
>
> Andy Dang wrote
> > Hi Liang-Chi,
> >
> > The snippet of code is below. If I bind the encoder early (the schema
> > doesn't change throughout the execution), the final result is a list of
> > the
> > same entries.
> >
> > @RequiredArgsConstructor
> > public class UDAF extends UserDefinedAggregateFunction {
> >
> > // Do not resolve and bind this expression encoder eagerly
> > private final ExpressionEncoder
> > 
> >  unboundedEncoder;
> > private final StructType schema;
> >
> > @Override
> > public StructType inputSchema() {
> > return schema;
> > }
> >
> > @Override
> > public StructType bufferSchema() {
> > return new UserDefineType(schema, unboundedEncoder);
> > }
> >
> > @Override
> > public DataType dataType() {
> > return DataTypes.createArrayType(schema);
> > }
> >
> > @Override
> > public void initialize(MutableAggregationBuffer buffer) {
> > buffer.update(0, new InternalRow[0]);
> > }
> >
> > @Override
> > public void update(MutableAggregationBuffer buffer, Row input) {
> > UserDefineType data = buffer.getAs(0);
> >
> > data.add(unboundedEncoder.toRow(input));
> >
> > buffer.update(0, data);
> > }
> >
> > @Override
> > public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
> > // merge
> > buffer1.update(0, data1);
> > }
> >
> > @Override
> > public Object evaluate(Row buffer) {
> > UserDefineType data = buffer.getAs(0);
> >
> >// need to return Row here instead of Internal Row
> > return data.rows();
> > }
> >
> > static ExpressionEncoder
> > 
> >  resolveAndBind(ExpressionEncoder
> > 
> > encoder) {
> > val attributes =
> > JavaConversions.asJavaCollection(encoder.schema().toAttributes()).
> stream().map(Attribute::toAttribute).collect(Collectors.toList());
> > return encoder.resolveAndBind(ScalaUtils.scalaSeq(attributes),
> > SimpleAnalyzer$.MODULE$);
> > }
> > }
> >
> > // Wrap around a list of InternalRow
> > class TopKDataType extends UserDefinedType
> > 
> >  {
> > private final ExpressionEncoder
> > 
> >  unboundedEncoder;
> > private final List
> > 
> >  data;
> >
> >public Row[] rows() {
> > val encoder = resolveAndBind(this.unboundedEncoder);
> >
> > return data.stream().map(encoder::fromRow).toArray(Row[]::new);
> > }
> > }
> >
> > ---
> > Regards,
> > Andy
> >
> > On Fri, Jan 6, 2017 at 3:48 AM, Liang-Chi Hsieh 
>
> > viirya@
>
> >  wrote:
> >
> >>
> >> Can you show how you use the encoder in your UDAF?
> >>
> >>
> >> Andy Dang wrote
> >> > One more question about the behavior of ExpressionEncoder
> >> >
> > 
> >> > .
> >> >
> >> > I have a UDAF that has ExpressionEncoder
> >> >
> > 
> >> >  as a member variable.
> >> >
> >> > However, if call resolveAndBind() eagerly on this encoder, it appears
> >> to
> >> > break the UDAF. Bascially somehow the deserialize

Re: Converting an InternalRow to a Row

2017-01-06 Thread Andy Dang
Hi Liang-Chi,

The snippet of code is below. If I bind the encoder early (the schema
doesn't change throughout the execution), the final result is a list of the
same entries.

@RequiredArgsConstructor
public class UDAF extends UserDefinedAggregateFunction {

// Do not resolve and bind this expression encoder eagerly
private final ExpressionEncoder unboundedEncoder;
private final StructType schema;

@Override
public StructType inputSchema() {
return schema;
}

@Override
public StructType bufferSchema() {
return new UserDefineType(schema, unboundedEncoder);
}

@Override
public DataType dataType() {
return DataTypes.createArrayType(schema);
}

@Override
public void initialize(MutableAggregationBuffer buffer) {
buffer.update(0, new InternalRow[0]);
}

@Override
public void update(MutableAggregationBuffer buffer, Row input) {
UserDefineType data = buffer.getAs(0);

data.add(unboundedEncoder.toRow(input));

buffer.update(0, data);
}

@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
// merge
buffer1.update(0, data1);
}

@Override
public Object evaluate(Row buffer) {
UserDefineType data = buffer.getAs(0);

   // need to return Row here instead of Internal Row
return data.rows();
}

static ExpressionEncoder resolveAndBind(ExpressionEncoder
encoder) {
val attributes =
JavaConversions.asJavaCollection(encoder.schema().toAttributes()).stream().map(Attribute::toAttribute).collect(Collectors.toList());
return encoder.resolveAndBind(ScalaUtils.scalaSeq(attributes),
SimpleAnalyzer$.MODULE$);
}
}

// Wrap around a list of InternalRow
class TopKDataType extends UserDefinedType {
private final ExpressionEncoder unboundedEncoder;
private final List data;

   public Row[] rows() {
val encoder = resolveAndBind(this.unboundedEncoder);

return data.stream().map(encoder::fromRow).toArray(Row[]::new);
}
}

---
Regards,
Andy

On Fri, Jan 6, 2017 at 3:48 AM, Liang-Chi Hsieh <vii...@gmail.com> wrote:

>
> Can you show how you use the encoder in your UDAF?
>
>
> Andy Dang wrote
> > One more question about the behavior of ExpressionEncoder
> > 
> > .
> >
> > I have a UDAF that has ExpressionEncoder
> > 
> >  as a member variable.
> >
> > However, if call resolveAndBind() eagerly on this encoder, it appears to
> > break the UDAF. Bascially somehow the deserialized row are all the same
> > during the merge step. Is it the expected behavior of Encoders?
> >
> > ---
> > Regards,
> > Andy
> >
> > On Thu, Jan 5, 2017 at 10:55 AM, Andy Dang 
>
> > namd88@
>
> >  wrote:
> >
> >> Perfect. The API in Java is bit clumsy though
> >>
> >> What I ended up doing in Java (the val is from lombok, if anyone's
> >> wondering):
> >> val attributes = JavaConversions.asJavaCollection(schema.
> >> toAttributes()).stream().map(Attribute::toAttribute).
> >> collect(Collectors.toList());
> >> val encoder =
> >> RowEncoder.apply(schema).resolveAndBind(ScalaUtils.
> scalaSeq(attributes),
> >> SimpleAnalyzer$.MODULE$);
> >>
> >>
> >> ---
> >> Regards,
> >> Andy
> >>
> >> On Thu, Jan 5, 2017 at 2:53 AM, Liang-Chi Hsieh 
>
> > viirya@
>
> >  wrote:
> >>
> >>>
> >>> You need to resolve and bind the encoder.
> >>>
> >>> ExpressionEncoder
> > 
> >  enconder = RowEncoder.apply(struct).resol
> >>> veAndBind();
> >>>
> >>>
> >>> Andy Dang wrote
> >>> > Hi all,
> >>> > (cc-ing dev since I've hit some developer API corner)
> >>> >
> >>> > What's the best way to convert an InternalRow to a Row if I've got an
> >>> > InternalRow and the corresponding Schema.
> >>> >
> >>> > Code snippet:
> >>> > @Test
> >>> > public void foo() throws Exception {
> >>> > Row row = RowFactory.create(1);
> >>> > StructType struct = new StructType().add("id",
> >>> > DataTypes.IntegerType);
> >>> > ExpressionEncoder
> >>> >
> > 
> >>> >  enconder = RowEncoder.apply(struct);
> >>> > InternalRow internalRow = enconder.toRow(row);
> >>> > System.out.println("Internal row size: " +
> >>> > internalRow.numFields());
> >>> >

Re: Converting an InternalRow to a Row

2017-01-05 Thread Andy Dang
Perfect. The API in Java is bit clumsy though

What I ended up doing in Java (the val is from lombok, if anyone's
wondering):
val attributes =
JavaConversions.asJavaCollection(schema.toAttributes()).stream().map(Attribute::toAttribute).collect(Collectors.toList());
val encoder =
RowEncoder.apply(schema).resolveAndBind(ScalaUtils.scalaSeq(attributes),
SimpleAnalyzer$.MODULE$);


---
Regards,
Andy

On Thu, Jan 5, 2017 at 2:53 AM, Liang-Chi Hsieh <vii...@gmail.com> wrote:

>
> You need to resolve and bind the encoder.
>
> ExpressionEncoder enconder = RowEncoder.apply(struct).
> resolveAndBind();
>
>
> Andy Dang wrote
> > Hi all,
> > (cc-ing dev since I've hit some developer API corner)
> >
> > What's the best way to convert an InternalRow to a Row if I've got an
> > InternalRow and the corresponding Schema.
> >
> > Code snippet:
> > @Test
> > public void foo() throws Exception {
> > Row row = RowFactory.create(1);
> > StructType struct = new StructType().add("id",
> > DataTypes.IntegerType);
> > ExpressionEncoder
> > 
> >  enconder = RowEncoder.apply(struct);
> > InternalRow internalRow = enconder.toRow(row);
> > System.out.println("Internal row size: " +
> > internalRow.numFields());
> > Row roundTrip = enconder.fromRow(internalRow);
> > System.out.println("Round trip: " + roundTrip.size());
> > }
> >
> > The code fails at the line encoder.fromRow() with the exception:
> >> Caused by: java.lang.UnsupportedOperationException: Cannot evaluate
> > expression: getcolumnbyordinal(0, IntegerType)
> >
> > ---
> > Regards,
> > Andy
>
>
>
>
>
> -
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/Converting-an-InternalRow-to-a-Row-
> tp20460p20465.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Converting an InternalRow to a Row

2017-01-04 Thread Andy Dang
Hi all,
(cc-ing dev since I've hit some developer API corner)

What's the best way to convert an InternalRow to a Row if I've got an
InternalRow and the corresponding Schema.

Code snippet:
@Test
public void foo() throws Exception {
Row row = RowFactory.create(1);
StructType struct = new StructType().add("id",
DataTypes.IntegerType);
ExpressionEncoder enconder = RowEncoder.apply(struct);
InternalRow internalRow = enconder.toRow(row);
System.out.println("Internal row size: " + internalRow.numFields());
Row roundTrip = enconder.fromRow(internalRow);
System.out.println("Round trip: " + roundTrip.size());
}

The code fails at the line encoder.fromRow() with the exception:
> Caused by: java.lang.UnsupportedOperationException: Cannot evaluate
expression: getcolumnbyordinal(0, IntegerType)

---
Regards,
Andy


Re: Best Practice for Spark Job Jar Generation

2016-12-23 Thread Andy Dang
We remodel Spark dependencies and ours together and chuck them under the
/jars path. There are other ways to do it but we want the classpath to be
strictly as close to development as possible.

---
Regards,
Andy

On Fri, Dec 23, 2016 at 6:00 PM, Chetan Khatri <chetan.opensou...@gmail.com>
wrote:

> Andy, Thanks for reply.
>
> If we download all the dependencies at separate location  and link with
> spark job jar on spark cluster, is it best way to execute spark job ?
>
> Thanks.
>
> On Fri, Dec 23, 2016 at 8:34 PM, Andy Dang <nam...@gmail.com> wrote:
>
>> I used to use uber jar in Spark 1.x because of classpath issues (we
>> couldn't re-model our dependencies based on our code, and thus cluster's
>> run dependencies could be very different from running Spark directly in the
>> IDE. We had to use userClasspathFirst "hack" to work around this.
>>
>> With Spark 2, it's easier to replace dependencies (say, Guava) than
>> before. We moved away from deploying superjar and just pass the libraries
>> as part of Spark jars (still can't use Guava v19 or later because Spark
>> uses a deprecated method that's not available, but that's not a big issue
>> for us).
>>
>> ---
>> Regards,
>> Andy
>>
>> On Fri, Dec 23, 2016 at 6:44 AM, Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Hello Spark Community,
>>>
>>> For Spark Job Creation I use SBT Assembly to build Uber("Super") Jar and
>>> then submit to spark-submit.
>>>
>>> Example,
>>>
>>> bin/spark-submit --class hbase.spark.chetan.com.SparkHbaseJob
>>> /home/chetan/hbase-spark/SparkMSAPoc-assembly-1.0.jar
>>>
>>> But other folks has debate with for Uber Less Jar, Guys can you please
>>> explain me best practice industry standard for the same.
>>>
>>> Thanks,
>>>
>>> Chetan Khatri.
>>>
>>
>>
>


Re: Best Practice for Spark Job Jar Generation

2016-12-23 Thread Andy Dang
I used to use uber jar in Spark 1.x because of classpath issues (we
couldn't re-model our dependencies based on our code, and thus cluster's
run dependencies could be very different from running Spark directly in the
IDE. We had to use userClasspathFirst "hack" to work around this.

With Spark 2, it's easier to replace dependencies (say, Guava) than before.
We moved away from deploying superjar and just pass the libraries as part
of Spark jars (still can't use Guava v19 or later because Spark uses a
deprecated method that's not available, but that's not a big issue for us).

---
Regards,
Andy

On Fri, Dec 23, 2016 at 6:44 AM, Chetan Khatri 
wrote:

> Hello Spark Community,
>
> For Spark Job Creation I use SBT Assembly to build Uber("Super") Jar and
> then submit to spark-submit.
>
> Example,
>
> bin/spark-submit --class hbase.spark.chetan.com.SparkHbaseJob
> /home/chetan/hbase-spark/SparkMSAPoc-assembly-1.0.jar
>
> But other folks has debate with for Uber Less Jar, Guys can you please
> explain me best practice industry standard for the same.
>
> Thanks,
>
> Chetan Khatri.
>


Negative number of active tasks

2016-12-23 Thread Andy Dang
Hi all,

Today I hit a weird bug in Spark 2.0.2 (vanilla Spark) - the executor tab
shows negative number of active tasks.

I have about 25 jobs, each with 20k tasks so the numbers are not that crazy.

What could possibly the cause of this bug? This is the first time I've seen
it and the only special thing I'm doing is saving multiple datasets at the
same time to HDFS from different threads.

Thanks,
Andy

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org