Re: Live Streamed Code Review today at 11am Pacific

2018-06-07 Thread Holden Karau
I'll be doing another one tomorrow morning at 9am pacific focused on Python
+ K8s support & improved JSON support -
https://www.youtube.com/watch?v=Z7ZEkvNwneU &
https://www.twitch.tv/events/xU90q9RGRGSOgp2LoNsf6A :)

On Fri, Mar 9, 2018 at 3:54 PM, Holden Karau  wrote:

> If anyone wants to watch the recording: https://www.
> youtube.com/watch?v=lugG_2QU6YU
>
> I'll do one next week as well - March 16th @ 11am -
> https://www.youtube.com/watch?v=pXzVtEUjrLc
>
> On Fri, Mar 9, 2018 at 9:28 AM, Holden Karau  wrote:
>
>> Hi folks,
>>
>> If your curious in learning more about how Spark is developed, I’m going
>> to expirement doing a live code review where folks can watch and see how
>> that part of our process works. I have two volunteers already for having
>> their PRs looked at live, and if you have a Spark PR your working on you’d
>> like me to livestream a review of please ping me.
>>
>> The livestream will be at https://www.youtube.com/watch?v=lugG_2QU6YU.
>>
>> Cheers,
>>
>> Holden :)
>> --
>> Twitter: https://twitter.com/holdenkarau
>>
>
>
>
> --
> Twitter: https://twitter.com/holdenkarau
>



-- 
Twitter: https://twitter.com/holdenkarau


Re: [announce] BeakerX supports Scala+Spark in Jupyter

2018-06-07 Thread Scott Draves
In Jupyter, notebooks have just one kernel at a time so I am not aware of
any conflicts.

On Jun 7, 2018 9:09 PM, "Irving Duran"  wrote:

So would you recommend not to have Toree and BeakerX installed to avoid
conflicts?

Thank you,

Irving Duran

On 06/07/2018 07:55 PM, s...@draves.org wrote:

The %%spark magic comes with BeakerX's Scala kernel, not related to Toree.

On Thu, Jun 7, 2018, 8:51 PM Stephen Boesch  wrote:

> Assuming that the spark 2.X kernel (e.g. toree) were chosen for a given
> jupyter notebook and there is a  Cell 3 that contains some Spark DataFrame
> operations .. Then :
>
>
>- what is the relationship  does the %%spark  magic and the toree
>kernel?
>- how does the %%spark magic get applied to that other Cell 3 ?
>
> thanks!
>
> 2018-06-07 16:33 GMT-07:00 s...@draves.org :
>
>> We are pleased to announce release 0.19.0 of BeakerX ,
>> a collection of extensions and kernels for Jupyter and Jupyter Lab.
>>
>> BeakerX now features Scala+Spark integration including GUI configuration,
>> status, progress, interrupt, and interactive tables.
>>
>> We are very interested in your feedback about what remains to be done.
>> You may reach by github and gitter, as documented in the readme:
>> https://github.com/twosigma/beakerx
>>
>> Thanks, -Scott
>>
>> [image: spark.png]
>> ​
>>
>> --
>> BeakerX.com
>> ScottDraves.com 
>> @Scott_Draves 
>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: [announce] BeakerX supports Scala+Spark in Jupyter

2018-06-07 Thread Irving Duran
So would you recommend not to have Toree and BeakerX installed to avoid
conflicts?

Thank you,

Irving Duran

On 06/07/2018 07:55 PM, s...@draves.org wrote:
> The %%spark magic comes with BeakerX's Scala kernel, not related to Toree.
>
> On Thu, Jun 7, 2018, 8:51 PM Stephen Boesch  > wrote:
>
> Assuming that the spark 2.X kernel (e.g. toree) were chosen for a
> given jupyter notebook and there is a  Cell 3 that contains some
> Spark DataFrame operations .. Then :
>
>   * what is the relationship  does the %%spark  magic and the
> toree kernel?
>   * how does the %%spark magic get applied to that other Cell 3 ?
>
> thanks!
>
> 2018-06-07 16:33 GMT-07:00 s...@draves.org
>  mailto:s...@draves.org>>:
>
> We are pleased to announce release 0.19.0 of BeakerX
> , a collection of extensions and kernels
> for Jupyter and Jupyter Lab.
>
> BeakerX now features Scala+Spark integration including GUI
> configuration, status, progress, interrupt, and interactive
> tables.
>
> We are very interested in your feedback about what remains to
> be done.  You may reach by github and gitter, as documented in
> the readme: https://github.com/twosigma/beakerx
>
> Thanks, -Scott
>
> spark.png
> ​
>
> -- 
> BeakerX.com 
> ScottDraves.com 
> @Scott_Draves 
>
>



signature.asc
Description: OpenPGP digital signature


how to call database specific function when reading writing thru jdbc

2018-06-07 Thread Kyunam Kim
For example, in SQL Server, when reading, I want to call a built-in function: 
STAsText()

SELECT id, shape.STAsText() FROM SpatialTable

val df = _sparkSession
.read
.jdbc(url, "dbo.SpatialTable", props)
.select("shape.STAsText()")  // No, this doesn't work.
.as("shape")

Also when writing, I want to be able to call a built-in function: 
STGeomFromText like this:
INSERT INTO SpatialTable (GeomCol1) VALUES (geometry::STGeomFromText('POLYGON 
((0 0, 150 0, 150 150, 0 150, 0 0))', 0));

How would go about doing this?

Thanks,
Kyunam


Re: If there is timestamp type data in DF, Spark 2.3 toPandas is much slower than spark 2.2.

2018-06-07 Thread Irving Duran
I haven't noticed or seen this behavior.  Have you noticed this with by
testing the same dataset between versions?

Thank you,

Irving Duran

On 06/06/2018 11:22 PM, 李斌松 wrote:
> If there is timestamp type data in DF, Spark 2.3 toPandas is much
> slower than spark 2.2.



signature.asc
Description: OpenPGP digital signature


Re: [announce] BeakerX supports Scala+Spark in Jupyter

2018-06-07 Thread s...@draves.org
The %%spark magic comes with BeakerX's Scala kernel, not related to Toree.

On Thu, Jun 7, 2018, 8:51 PM Stephen Boesch  wrote:

> Assuming that the spark 2.X kernel (e.g. toree) were chosen for a given
> jupyter notebook and there is a  Cell 3 that contains some Spark DataFrame
> operations .. Then :
>
>
>- what is the relationship  does the %%spark  magic and the toree
>kernel?
>- how does the %%spark magic get applied to that other Cell 3 ?
>
> thanks!
>
> 2018-06-07 16:33 GMT-07:00 s...@draves.org :
>
>> We are pleased to announce release 0.19.0 of BeakerX ,
>> a collection of extensions and kernels for Jupyter and Jupyter Lab.
>>
>> BeakerX now features Scala+Spark integration including GUI configuration,
>> status, progress, interrupt, and interactive tables.
>>
>> We are very interested in your feedback about what remains to be done.
>> You may reach by github and gitter, as documented in the readme:
>> https://github.com/twosigma/beakerx
>>
>> Thanks, -Scott
>>
>> [image: spark.png]
>> ​
>>
>> --
>> BeakerX.com
>> ScottDraves.com 
>> @Scott_Draves 
>>
>>
>


Re: [announce] BeakerX supports Scala+Spark in Jupyter

2018-06-07 Thread Stephen Boesch
Assuming that the spark 2.X kernel (e.g. toree) were chosen for a given
jupyter notebook and there is a  Cell 3 that contains some Spark DataFrame
operations .. Then :


   - what is the relationship  does the %%spark  magic and the toree kernel?
   - how does the %%spark magic get applied to that other Cell 3 ?

thanks!

2018-06-07 16:33 GMT-07:00 s...@draves.org :

> We are pleased to announce release 0.19.0 of BeakerX ,
> a collection of extensions and kernels for Jupyter and Jupyter Lab.
>
> BeakerX now features Scala+Spark integration including GUI configuration,
> status, progress, interrupt, and interactive tables.
>
> We are very interested in your feedback about what remains to be done.
> You may reach by github and gitter, as documented in the readme:
> https://github.com/twosigma/beakerx
>
> Thanks, -Scott
>
> [image: spark.png]
> ​
>
> --
> BeakerX.com
> ScottDraves.com 
> @Scott_Draves 
>
>


[announce] BeakerX supports Scala+Spark in Jupyter

2018-06-07 Thread s...@draves.org
We are pleased to announce release 0.19.0 of BeakerX ,
a collection of extensions and kernels for Jupyter and Jupyter Lab.

BeakerX now features Scala+Spark integration including GUI configuration,
status, progress, interrupt, and interactive tables.

We are very interested in your feedback about what remains to be done.  You
may reach by github and gitter, as documented in the readme:
https://github.com/twosigma/beakerx

Thanks, -Scott

[image: spark.png]
​

-- 
BeakerX.com
ScottDraves.com 
@Scott_Draves 


Reset the offsets, Kafka 0.10 and Spark

2018-06-07 Thread Guillermo Ortiz Fernández
I'm consuming data from Kafka with  createDirectStream and store the
offsets in Kafka (
https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself
)

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams))



My Spark version is 2.0.2 and 0.10 from Kafka. This solution works well and
when I restart the spark process starts from the last offset which Spark
consumes, but sometimes I need to reprocess all the topic from the
beginning.

I have seen that I could reset the offset with a kafka script but it's not
enable in Kafka 0.10...

kafka-consumer-groups --bootstrap-server kafka-host:9092 --group
my-group --reset-offsets --to-earliest --all-topics --execute


Another possibility it's to set another kafka parameter in the
createDirectStream with a map with the offsets but, how could I get first
offset from each partition?, I have checked the api from the new consumer
and I don't see any method to get these offsets.

Any other way?? I could start with another groupId as well, but it doesn't
seem a very clean option for production.


Re: Append In-Place to S3

2018-06-07 Thread Benjamin Kim
I tried a different tactic. I still append based on the query below, but I add 
another deduping step afterwards, writing to a staging directory then 
overwriting back. Luckily, the data is small enough for this to happen fast.

Cheers,
Ben

> On Jun 3, 2018, at 3:02 PM, Tayler Lawrence Jones  
> wrote:
> 
> Sorry actually my last message is not true for anti join, I was thinking of 
> semi join. 
> 
> -TJ
> 
> On Sun, Jun 3, 2018 at 14:57 Tayler Lawrence Jones  > wrote:
> A left join with null filter is only the same as a left anti join if the join 
> keys can be guaranteed unique in the existing data. Since hive tables on s3 
> offer no unique guarantees outside of your processing code, I recommend using 
> left anti join over left join + null filter.
> 
> -TJ
> 
> On Sun, Jun 3, 2018 at 14:47 ayan guha  > wrote:
> I do not use anti join semantics, but you can use left outer join and then 
> filter out nulls from right side. Your data may have dups on the columns 
> separately but it should not have dups on the composite key ie all columns 
> put together.
> 
> On Mon, 4 Jun 2018 at 6:42 am, Tayler Lawrence Jones  > wrote:
> The issue is not the append vs overwrite - perhaps those responders do not 
> know Anti join semantics. Further, Overwrite on s3 is a bad pattern due to s3 
> eventual consistency issues. 
> 
> First, your sql query is wrong as you don’t close the parenthesis of the CTE 
> (“with” part). In fact, it looks like you don’t need that with at all, and 
> the query should fail to parse. If that does parse, I would open a bug on the 
> spark jira.
> 
> Can you provide the query that you are using to detect duplication so I can 
> see if your deduplication logic matches the detection query? 
> 
> -TJ
> 
> On Sat, Jun 2, 2018 at 10:22 Aakash Basu  > wrote:
> As Jay suggested correctly, if you're joining then overwrite otherwise only 
> append as it removes dups.
> 
> I think, in this scenario, just change it to write.mode('overwrite') because 
> you're already reading the old data and your job would be done.
> 
> 
> On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim,  > wrote:
> Hi Jay,
> 
> Thanks for your response. Are you saying to append the new data and then 
> remove the duplicates to the whole data set afterwards overwriting the 
> existing data set with new data set with appended values? I will give that a 
> try. 
> 
> Cheers,
> Ben
> 
> On Fri, Jun 1, 2018 at 11:49 PM Jay  > wrote:
> Benjamin,
> 
> The append will append the "new" data to the existing data with removing the 
> duplicates. You would need to overwrite the file everytime if you need unique 
> values.
> 
> Thanks,
> Jayadeep
> 
> On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim  > wrote:
> I have a situation where I trying to add only new rows to an existing data 
> set that lives in S3 as gzipped parquet files, looping and appending for each 
> hour of the day. First, I create a DF from the existing data, then I use a 
> query to create another DF with the data that is new. Here is the code 
> snippet.
> 
> df = spark.read.parquet(existing_data_path)
> df.createOrReplaceTempView(‘existing_data’)
> new_df = spark.read.parquet(new_data_path)
> new_df.createOrReplaceTempView(’new_data’)
> append_df = spark.sql(
> """
> WITH ids AS (
> SELECT DISTINCT
> source,
> source_id,
> target,
> target_id
> FROM new_data i
> LEFT ANTI JOIN existing_data im
> ON i.source = im.source
> AND i.source_id = im.source_id
> AND i.target = im.target
> AND i.target = im.target_id
> """
> )
> append_df.coalesce(1).write.parquet(existing_data_path, mode='append', 
> compression='gzip’)
> 
> I thought this would append new rows and keep the data unique, but I am see 
> many duplicates. Can someone help me with this and tell me what I am doing 
> wrong?
> 
> Thanks,
> Ben
> -- 
> Best Regards,
> Ayan Guha



Register UDF duration runtime

2018-06-07 Thread 杜斌
Hi ,
I meeting some issue when I try to read from some string coming from web
service as an UDF string and register.

Here is the exception.

java.lang.ClassCastException: cannot assign instance of
scala.collection.immutable.List$SerializationProxy to field
org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type
scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD

at
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)

at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)

at
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)

at
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)

at
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)

at 

Fundamental Question on Spark's distribution

2018-06-07 Thread Aakash Basu
Hi all,

*Query 1)*

Need a serious help! I'm running feature engineering of different types on
a dataset and trying to benchmark from by tweaking different types of Spark
properties.

I don't know where it is going wrong that a single machine is working
faster than a 3 node cluster, even though, most of the operations from code
are distributed.

The log I collected by running in different ways is -

Remote Spark Benchmarking (4 node cluster, 1 driver, 3 workers) -

Cluster details: 12 GB RAM, 6 cores each.

Medium data -> 1,00,000 sample (0.1 million rows) [Data placed in Local
File System, same path, same data on all worker nodes]
Runs -
1) Time Taken for the Feature Engineering Pipeline to finish:
482.20375990867615 secs.; --num-executors 3 --executor-cores 5
--executor-memory 4G
2) Time Taken for the Feature Engineering Pipeline to finish:
467.3759717941284 secs.; --num-executors 10 --executor-cores 6
--executor-memory 11G
3) Time Taken for the Feature Engineering Pipeline to finish:
459.885710477829 secs.; --num-executors 3 --executor-cores 6
--executor-memory 8G
4) Time Taken for the Feature Engineering Pipeline to finish:
476.61902809143066 secs.; --num-executors 3 --executor-cores 5
--executor-memory 4G --conf spark.memory.fraction=0.2
5) Time Taken for the Feature Engineering Pipeline to finish:
575.9314386844635 secs.; --num-executors 3 --executor-cores 5
--executor-memory 4G --conf spark.default.parallelism=200

Medium data -> 1,00,000 sample (0.1 million rows) [Data placed in Local
File System]
1) Time Taken for the Feature Engineering Pipeline to finish:
594.1818737983704 secs.
2) Time Taken for the Feature Engineering Pipeline to finish:
528.6015181541443 secs. (on single driver node [local])
3) Time Taken for the Feature Engineering Pipeline to finish:
323.6546362755467 secs. (on my laptop - 16GB RAM and 8 Cores).

*Query 2)*

The below is the event timeline of the same code taken from the Spark UI,
can you provide some insight on why there are two big gaps between the
parallel tasks? Does it mean, that time, there's no operation happening? I
am kind of new to Spark UI monitoring, can anyone suggest me other aspects
which needs to be monitored to optimize further?





Thanks,
Aakash.


[ANNOUNCE] Apache Bahir 2.1.2 Released

2018-06-07 Thread Luciano Resende
Apache Bahir provides extensions to multiple distributed analytic
platforms, extending their reach with a diversity of streaming connectors
and SQL data sources.
The Apache Bahir community is pleased to announce the release of Apache
Bahir 2.1.2 which provides the following extensions for Apache Spark 2.1.2:

   - Apache CouchDB/Cloudant SQL data source
   - Apache CouchDB/Cloudant Streaming
   - Akka Streaming
   - Akka Structured Streaming
   - Google Cloud Pub/Sub Streaming connector
   - MQTT Streaming
   - MQTT Structured Streaming
   - Twitter Streaming
   - ZeroMQ Streaming

For more information about Apache Bahir and to download the
latest release go to:

http://bahir.apache.org

For more details on how to use Apache Bahir extensions in your
application please visit our documentation page

   http://bahir.apache.org/docs/spark/2.1.2/documentation/

The Apache Bahir PMC


-- 
Luciano Resende
http://people.apache.org/~lresende
http://twitter.com/lresende1975
http://lresende.blogspot.com/


Re: Strange codegen error for SortMergeJoin in Spark 2.2.1

2018-06-07 Thread Kazuaki Ishizaki
Thank you for reporting a problem.
Would it be possible to create a JIRA entry with a small program that can 
reproduce this problem?

Best Regards,
Kazuaki Ishizaki



From:   Rico Bergmann 
To: "user@spark.apache.org" 
Date:   2018/06/05 19:58
Subject:Strange codegen error for SortMergeJoin in Spark 2.2.1



Hi!
I get a strange error when executing a complex SQL-query involving 4 
tables that are left-outer-joined:
Caused by: org.codehaus.commons.compiler.CompileException: File 
'generated.java', Line 37, Column 18: failed to compile: 
org.codehaus.commons.compiler.CompileException: File 'generated.java', 
Line 37, Column 18: No applicable constructor/method found for actual 
parameters "int"; candidates are: 
"org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(org.apache.spark.memory.TaskMemoryManager,
 
org.apache.spark.storage.BlockManager, 
org.apache.spark.serializer.SerializerManager, 
org.apache.spark.TaskContext, int, long, int, int)", 
"org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(int, 
int)"

...

/* 037 */ smj_matches = new 
org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(2147483647);


The same query works with Spark 2.2.0.
I checked the Spark source code and saw that in 
ExternalAppendOnlyUnsafeRowArray a second int was introduced into the 
constructor in 2.2.1
But looking at the codegeneration part of SortMergeJoinExec:
// A list to hold all matched rows from right side.
val matches = ctx.freshName("matches")
val clsName = classOf[ExternalAppendOnlyUnsafeRowArray].getName

val spillThreshold = getSpillThreshold
val inMemoryThreshold = getInMemoryThreshold

ctx.addMutableState(clsName, matches,
  s"$matches = new $clsName($inMemoryThreshold, $spillThreshold);")


it should get 2 parameters, not just one.

May be anyone has an idea?

Best,
Rico.