Re: how to add columns to row when column has a different encoder?

2018-02-28 Thread David Capwell
Anyone know a way right now to do this? As best as I can tell I need a
custom expression to pass to udf to do this.

Just finished a protobuf encoder and it feels like expression is not meant
to be public (good amount of things are private[sql]), am I wrong about
this? Am I looking at the right interface to add such a UDF?

Thanks for your help!

On Mon, Feb 26, 2018, 3:50 PM David Capwell  wrote:

> I have a row that looks like the following pojo
>
> case class Wrapper(var id: String, var bytes: Array[Byte])
>
> Those bytes are a serialized pojo that looks like this
>
> case class Inner(var stuff: String, var moreStuff: String)
>
> I right now have encoders for both the types, but I don't see how to merge
> the two into a unified row that looks like the following
>
>
> struct>
>
> If I know how to deserialize the bytes and have a encoder, how could I get
> the above schema?  I was looking at ds.withColumn("inner", ???) but wasn't
> sure how to go from pojo + encoder to a column.  Is there a better way to
> do this?
>
> Thanks for your time reading this email
>


Re: SizeEstimator

2018-02-27 Thread David Capwell
Thanks for the reply and sorry for my delayed response, had to go find the
profile data to lookup the class again.


https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala

That class extends SizeEstimator and has a field "map" which buffers the
rows.  In my case the buffer was > 1 million rows so became costly every
time it was checked.


This can be reproduced, create a random data set of (string, long), then
group by string (I believe this is what the code did first, there was a
sort later but should have been a different stage).  Make sure number of
executors is small (for example only one) else you are reducing the size of
M for each executor.

On Mon, Feb 26, 2018, 10:04 PM 叶先进  wrote:

> What type is for the buffer you mentioned?
>
>
> On 27 Feb 2018, at 11:46 AM, David Capwell  wrote:
>
> advancedxy , I don't remember the code as well
> anymore but what we hit was a very simple schema (string, long). The issue
> is the buffer had a million of these so SizeEstimator of the buffer had to
> keep recalculating the same elements over and over again.  SizeEstimator
> was on-cpu about 30% of the time, bounding the buffer got it to be < 5%
> (going off memory so may be off).
>
> The class info(size of fields lay on heap) is cached for every occurred
> class, so the size info of the same elements would not be recalculated.
> However, for Collection class (or similar) SizeEstimator will scan all the
> elements in the container (`next` field in LinkedList for example).
>
> And the array is a special case: SizeEstimator will sample array if
> array.length > ARRAY_SIZE_FOR_SAMPLING(400).
>
> The cost is really (assuming memory is O(1) which is not true) O(N × M)
> where N is number of rows in buffer and M is size of schema.  My case could
> be solved by not recomputing which would bring the cost to O(M) since
> bookkeeping should be consistent time. There was logic to delay
> recalculating bases off a change in frequency, but that didn't really do
> much for us, bounding and spilling was the bigger win in our case.
>
> On Mon, Feb 26, 2018, 7:24 PM Xin Liu  wrote:
>
>> Thanks David. Another solution is to convert the protobuf object to byte
>> array, It does speed up SizeEstimator
>>
>> On Mon, Feb 26, 2018 at 5:34 PM, David Capwell 
>> wrote:
>>
>>> This is used to predict the current cost of memory so spark knows to
>>> flush or not. This is very costly for us so we use a flag marked in the
>>> code as private to lower the cost
>>>
>>> spark.shuffle.spill.numElementsForceSpillThreshold (on phone hope no
>>> typo) - how many records before flush
>>>
>>> This lowers the cost because it let's us leave data in young, if we
>>> don't bound we get everyone promoted to old and GC becomes a issue.  This
>>> doesn't solve the fact that the walk is slow, but lowers the cost of GC.
>>> For us we make sure to have spare memory on the system for page cache so
>>> spilling to disk for us is a memory write 99% of the time.  If your host
>>> has less free memory spilling may become more expensive.
>>>
>>>
>>> If the walk is your bottleneck and not GC then I would recommend JOL and
>>> guessing to better predict memory.
>>>
>>> On Mon, Feb 26, 2018, 4:47 PM Xin Liu  wrote:
>>>
>>>> Hi folks,
>>>>
>>>> We have a situation where, shuffled data is protobuf based, and
>>>> SizeEstimator is taking a lot of time.
>>>>
>>>> We have tried to override SizeEstimator to return a constant value,
>>>> which speeds up things a lot.
>>>>
>>>> My questions, what is the side effect of disabling SizeEstimator? Is it
>>>> just spark do memory reallocation, or there is more severe consequences?
>>>>
>>>> Thanks!
>>>>
>>>
>>
>


Re: SizeEstimator

2018-02-26 Thread David Capwell
advancedxy , I don't remember the code as well
anymore but what we hit was a very simple schema (string, long). The issue
is the buffer had a million of these so SizeEstimator of the buffer had to
keep recalculating the same elements over and over again.  SizeEstimator
was on-cpu about 30% of the time, bounding the buffer got it to be < 5%
(going off memory so may be off).

The cost is really (assuming memory is O(1) which is not true) O(N × M)
where N is number of rows in buffer and M is size of schema.  My case could
be solved by not recomputing which would bring the cost to O(M) since
bookkeeping should be consistent time. There was logic to delay
recalculating bases off a change in frequency, but that didn't really do
much for us, bounding and spilling was the bigger win in our case.

On Mon, Feb 26, 2018, 7:24 PM Xin Liu  wrote:

> Thanks David. Another solution is to convert the protobuf object to byte
> array, It does speed up SizeEstimator
>
> On Mon, Feb 26, 2018 at 5:34 PM, David Capwell  wrote:
>
>> This is used to predict the current cost of memory so spark knows to
>> flush or not. This is very costly for us so we use a flag marked in the
>> code as private to lower the cost
>>
>> spark.shuffle.spill.numElementsForceSpillThreshold (on phone hope no
>> typo) - how many records before flush
>>
>> This lowers the cost because it let's us leave data in young, if we don't
>> bound we get everyone promoted to old and GC becomes a issue.  This doesn't
>> solve the fact that the walk is slow, but lowers the cost of GC. For us we
>> make sure to have spare memory on the system for page cache so spilling to
>> disk for us is a memory write 99% of the time.  If your host has less free
>> memory spilling may become more expensive.
>>
>>
>> If the walk is your bottleneck and not GC then I would recommend JOL and
>> guessing to better predict memory.
>>
>> On Mon, Feb 26, 2018, 4:47 PM Xin Liu  wrote:
>>
>>> Hi folks,
>>>
>>> We have a situation where, shuffled data is protobuf based, and
>>> SizeEstimator is taking a lot of time.
>>>
>>> We have tried to override SizeEstimator to return a constant value,
>>> which speeds up things a lot.
>>>
>>> My questions, what is the side effect of disabling SizeEstimator? Is it
>>> just spark do memory reallocation, or there is more severe consequences?
>>>
>>> Thanks!
>>>
>>
>


Re: SizeEstimator

2018-02-26 Thread David Capwell
This is used to predict the current cost of memory so spark knows to flush
or not. This is very costly for us so we use a flag marked in the code as
private to lower the cost

spark.shuffle.spill.numElementsForceSpillThreshold (on phone hope no typo)
- how many records before flush

This lowers the cost because it let's us leave data in young, if we don't
bound we get everyone promoted to old and GC becomes a issue.  This doesn't
solve the fact that the walk is slow, but lowers the cost of GC. For us we
make sure to have spare memory on the system for page cache so spilling to
disk for us is a memory write 99% of the time.  If your host has less free
memory spilling may become more expensive.


If the walk is your bottleneck and not GC then I would recommend JOL and
guessing to better predict memory.

On Mon, Feb 26, 2018, 4:47 PM Xin Liu  wrote:

> Hi folks,
>
> We have a situation where, shuffled data is protobuf based, and
> SizeEstimator is taking a lot of time.
>
> We have tried to override SizeEstimator to return a constant value, which
> speeds up things a lot.
>
> My questions, what is the side effect of disabling SizeEstimator? Is it
> just spark do memory reallocation, or there is more severe consequences?
>
> Thanks!
>


how to add columns to row when column has a different encoder?

2018-02-26 Thread David Capwell
I have a row that looks like the following pojo

case class Wrapper(var id: String, var bytes: Array[Byte])

Those bytes are a serialized pojo that looks like this

case class Inner(var stuff: String, var moreStuff: String)

I right now have encoders for both the types, but I don't see how to merge
the two into a unified row that looks like the following


struct>

If I know how to deserialize the bytes and have a encoder, how could I get
the above schema?  I was looking at ds.withColumn("inner", ???) but wasn't
sure how to go from pojo + encoder to a column.  Is there a better way to
do this?

Thanks for your time reading this email


Re: Encoder with empty bytes deserializes with non-empty bytes

2018-02-21 Thread David Capwell
Ok found my issue

case c if c == classOf[ByteString] =>
  StaticInvoke(classOf[Protobufs], ArrayType(ByteType),
"fromByteString", parent :: Nil)

Should be

case c if c == classOf[ByteString] =>
  StaticInvoke(classOf[Protobufs], BinaryType, "fromByteString", parent :: Nil)



This causes the java code to see a byte[] which uses a different code path
than linked.  Since I did ArrayType(ByteTyep) I had to wrap the data in a
ArrayData class



On Wed, Feb 21, 2018 at 9:55 PM, David Capwell  wrote:

> I am trying to create a Encoder for protobuf data and noticed something
> rather weird.  When we have a empty ByteString (not null, just empty), when
> we deserialize we get back a empty array of length 8.  I took the generated
> code and see something weird going on.
>
> UnsafeRowWriter
>
>
>1.
>
>public void setOffsetAndSize(int ordinal, long currentCursor, long size) {
>
>2.
>
>final long relativeOffset = currentCursor - startingOffset;
>
>3.
>
>final long fieldOffset = getFieldOffset(ordinal);
>
>4.
>
>final long offsetAndSize = (relativeOffset << 32) | size;
>
>5.
>
>6.
>
>Platform.putLong(holder.buffer, fieldOffset, offsetAndSize);
>
>7.
>
>  }
>
>
>
> So this takes the size of the array and stores it... but its not the array
> size, its how many bytes were added
>
> rowWriter2.setOffsetAndSize(2, tmpCursor16, holder.cursor - tmpCursor16);
>
>
>
> So since the data is empty the only method that moves the cursor forward is
>
> arrayWriter1.initialize(holder, numElements1, 8);
>
> which does the following
>
> holder.cursor += (headerInBytes + fixedPartInBytes);
>
> in a debugger I see that headerInBytes = 8 and fixedPartInBytes = 0.
>
> Here is the header write
>
>
>1.
>
>Platform.putLong(holder.buffer, startingOffset, numElements);
>
>2.
>
>for (int i = 8; i < headerInBytes; i += 8) {
>
>3.
>
>  Platform.putLong(holder.buffer, startingOffset + i, 0L);
>
>4.
>
>}
>
>
>
>
> Ok so so far this makes sense, in order to deserialize you need to know
> about the data, so all good. Now to look at the deserialize path
>
>
> UnsafeRow.java
>
> @Override
> public byte[] getBinary(int ordinal) {
>   if (isNullAt(ordinal)) {
> return null;
>   } else {
> final long offsetAndSize = getLong(ordinal);
> final int offset = (int) (offsetAndSize >> 32);
> final int size = (int) offsetAndSize;
> final byte[] bytes = new byte[size];
> Platform.copyMemory(
>   baseObject,
>   baseOffset + offset,
>   bytes,
>   Platform.BYTE_ARRAY_OFFSET,
>   size
> );
> return bytes;
>   }
> }
>
>
>
> Since this doesn't read the header to return the user-bytes, it tries to
> return header + user-data.
>
>
>
> Is this expected? Am I supposed to filter out the header and force a
> mem-copy to filter out for just the user-data? Since header appears to be
> dynamic, how would I know the header length?
>
> Thanks for your time reading this email.
>
>
> Spark version: spark_2.11-2.2.1
>


Encoder with empty bytes deserializes with non-empty bytes

2018-02-21 Thread David Capwell
I am trying to create a Encoder for protobuf data and noticed something
rather weird.  When we have a empty ByteString (not null, just empty), when
we deserialize we get back a empty array of length 8.  I took the generated
code and see something weird going on.

UnsafeRowWriter


   1.

   public void setOffsetAndSize(int ordinal, long currentCursor, long size) {

   2.

   final long relativeOffset = currentCursor - startingOffset;

   3.

   final long fieldOffset = getFieldOffset(ordinal);

   4.

   final long offsetAndSize = (relativeOffset << 32) | size;

   5.

   6.

   Platform.putLong(holder.buffer, fieldOffset, offsetAndSize);

   7.

 }



So this takes the size of the array and stores it... but its not the array
size, its how many bytes were added

rowWriter2.setOffsetAndSize(2, tmpCursor16, holder.cursor - tmpCursor16);



So since the data is empty the only method that moves the cursor forward is

arrayWriter1.initialize(holder, numElements1, 8);

which does the following

holder.cursor += (headerInBytes + fixedPartInBytes);

in a debugger I see that headerInBytes = 8 and fixedPartInBytes = 0.

Here is the header write


   1.

   Platform.putLong(holder.buffer, startingOffset, numElements);

   2.

   for (int i = 8; i < headerInBytes; i += 8) {

   3.

 Platform.putLong(holder.buffer, startingOffset + i, 0L);

   4.

   }




Ok so so far this makes sense, in order to deserialize you need to know
about the data, so all good. Now to look at the deserialize path


UnsafeRow.java

@Override
public byte[] getBinary(int ordinal) {
  if (isNullAt(ordinal)) {
return null;
  } else {
final long offsetAndSize = getLong(ordinal);
final int offset = (int) (offsetAndSize >> 32);
final int size = (int) offsetAndSize;
final byte[] bytes = new byte[size];
Platform.copyMemory(
  baseObject,
  baseOffset + offset,
  bytes,
  Platform.BYTE_ARRAY_OFFSET,
  size
);
return bytes;
  }
}



Since this doesn't read the header to return the user-bytes, it tries to
return header + user-data.



Is this expected? Am I supposed to filter out the header and force a
mem-copy to filter out for just the user-data? Since header appears to be
dynamic, how would I know the header length?

Thanks for your time reading this email.


Spark version: spark_2.11-2.2.1


Dynamic Accumulators in 2.x?

2017-10-11 Thread David Capwell
I wrote a spark instrumentation tool that instruments RDDs to give more
fine-grain details on what is going on within a Task.  This is working
right now, but uses volatiles and CAS to pass around this state (which
slows down the task).  We want to lower the overhead of this and make the
main call path single-threaded and pass around the result when the task
competes; which sounds like AccumulatorV2.

I started rewriting the instrumented logic to be based off accumulators,
but having a hard time getting these to show up in the UI/API (using this
to see if I am linking things properly).

So my question is as follows.

When running in the executor and we create a accumulator (that was not
created from SparkContext), how would I stitch things properly so it shows
up with accumulators defined from the spark context?  If this is different
for different versions that is fine since we can figure that out quickly
(hopefully) and change the instrumentation.

Approches taken:

Looked at SparkContext.register and copied the same logic, but at runtime

this.hasNextTotal = new LongAccumulator();
this.hasNextTotal.metadata_$eq(new
AccumulatorMetadata(AccumulatorContext.newId(),
createName("hasNextTotal"), false));
AccumulatorContext.register(hasNextTotal);


That didn't end up working

tried getting the context from a SparkContext.getActive, but its not
defined at runtime


Option opt = SparkContext$.MODULE$.getActive();
if (opt.isDefined()) {
SparkContext sc = opt.get();
hasNextTotal.register(sc, Option.apply("hasNext"), false);
nextTotal.register(sc, Option.apply("next"), false);
}


Any help on this would be very helpful! would really rather not
re-implement the wheel if I can piggy-back off Accumulators.

Thanks for your help!

Target spark version: 2.2.0


add jars to spark's runtime

2017-10-11 Thread David Capwell
We want to emit the metrics out of spark into our own custom store.  To do
this we built our own sink and tried to add it to spark by doing --jars
path/to/jar and defining the class in metrics.properties which is supplied
with the job.  We noticed that spark kept crashing with the below exception

17/10/11 09:42:37 ERROR metrics.MetricsSystem: Sink class
com.example.ExternalSink cannot be instantiated
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1707)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:284)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: java.lang.ClassNotFoundException: com.example.ExternalSink
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:230)
at
org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:198)
at
org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:194)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at
org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:194)
at
org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:102)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:366)
at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:201)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:223)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:67)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:66)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
... 4 more

We then added this jar into the spark tarball that we use for testing, and
saw that it was able to load just fine and publish.

My question is, how can I add this jar to the spark runtime rather than the
user runtime?  In production we don't have permissions to write to the jars
dir of spark, so that trick to get this working won't work for us.

Thanks for your time reading this email.

Tested on spark 2.2.0