Weird ClassCastException when using generics from Java

2020-05-21 Thread Stephen Coy
Hi there,

This will be a little long so please bear with me. There is a buildable example 
available at https://github.com/sfcoy/sfcoy-spark-cce-test.

Say I have the following three tables:


Machines

Id,MachineType
11,A
12,B
23,B
24,A
25,B

Bolts

MachineType,Description
A,20 x M5
A,30 x M5
B,"2"" x 1/4"""
B,"2"" x 1/2"""
B,"2"" x 3/8"""
A,40 x M6
A,50 x M10
B,"1"" x 1/8"""

Nuts

MachineType,Description
A,M5
A,M6
B,"1/4"""
B,"1/8"""
B,"3/8"""

The objective is to create lists of Machines by Id, with all of their bolts and 
nuts listed on the same line:

11, 20 x M5, 30 x M5, 40 x M6,50 x M10,M5,M6

The output is further categorised by the first 5 digits of the machine id, 
although that seems immaterial to this problem.
In practice I’m dealing with ~70 million machines with a couple of hundred 
thousand types - therefore Spark!

The code to do this looks like:


final Dataset machineRecords = sparkSession
  .read()
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("src/main/data/machines.csv")
  .as(Encoders.bean(Machine.class))
  .persist();

final int workerCount = sparkContext.defaultParallelism();

final JavaPairRDD> nutsByMachine = sparkSession
  .read()
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("src/main/data/nuts.csv")
  .as(Encoders.bean(Nut.class))
  .toJavaRDD()
  .mapToPair(nut -> new Tuple2<>(nut.getMachineType(), nut))
  .repartitionAndSortWithinPartitions(new HashPartitioner(workerCount))
  .combineByKey(SparkCCETest::createListAndCombine, SparkCCETest::mergeValues, 
SparkCCETest::mergeCombiners)
  .persist(StorageLevel.MEMORY_AND_DISK());

final JavaPairRDD> boltsByMachine = sparkSession
  .read()
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("src/main/data/bolts.csv")
  .as(Encoders.bean(Bolt.class))
  .toJavaRDD()
  .mapToPair(bolt -> new Tuple2<>(bolt.getMachineType(), bolt))
  .repartitionAndSortWithinPartitions(new HashPartitioner(workerCount))
  .combineByKey(SparkCCETest::createListAndCombine, SparkCCETest::mergeValues, 
SparkCCETest::mergeCombiners)
  .persist(StorageLevel.MEMORY_AND_DISK());

machineRecords
  .toJavaRDD()
  .mapToPair(machine -> new Tuple2<>(machine.getMachineType(), machine))
  .join(nutsByMachine)
  .join(boltsByMachine)
  .map(Tuple2::_2)
  .map(tuples -> new Tuple3<>(tuples._1._1, tuples._1._2, tuples._2))
  .mapToPair(machineWithNutsBolts -> new 
Tuple2<>(exportFileFor(machineWithNutsBolts._1()), machineWithNutsBolts))
  .repartitionAndSortWithinPartitions(new HashPartitioner(workerCount))
  .foreachPartition(machineIterator -> {// <- line 77
  ///...
  });


static String exportFileFor(Machine machine) {
return machine.getId().substring(0, 5);
}

static  List createListAndCombine(T v) {
List c = new ArrayList<>();
c.add(v);
return c;
}

static  List mergeValues(List c, T v) {
c.add(v);
return c;
}

static  List mergeCombiners(List c1, List c2) {
c1.addAll(c2);
return c1;
}


Running this yields a ClassCastException:

20/05/22 14:05:31 WARN BlockManager: Putting block rdd_47_1 failed due to 
exception java.lang.ClassCastException: org.example.Bolt cannot be cast to 
org.example.Nut.
20/05/22 14:05:31 WARN BlockManager: Putting block rdd_47_2 failed due to 
exception java.lang.ClassCastException: org.example.Bolt cannot be cast to 
org.example.Nut.
20/05/22 14:05:31 WARN BlockManager: Block rdd_47_2 could not be removed as it 
was not found on disk or in memory
20/05/22 14:05:31 WARN BlockManager: Block rdd_47_1 could not be removed as it 
was not found on disk or in memory
20/05/22 14:05:31 ERROR Executor: Exception in task 2.0 in stage 9.0 (TID 13)
java.lang.ClassCastException: org.example.Bolt cannot be cast to org.example.Nut
at 
org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1041)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1(ExternalAppendOnlyMap.scala:152)
...
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
20/05/22 14:05:31 ERROR Executor: Exception in task 1.0 in stage 9.0 (TID 12)
java.lang.ClassCastException: org.example.Bolt cannot be cast to org.example.Nut
at 
org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1041)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1(ExternalAppendOnlyMap.scala:152)
...
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

…

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1979)
at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1967)
at 

Re: Handling user-facing metadata issues on file stream source & sink

2020-05-21 Thread Jungtaek Lim
Worth noting that I got similar question around local community as well.
These reporters didn't encounter the edge-case, they're encountered the
critical issue in the normal running of streaming query.

On Fri, May 8, 2020 at 4:49 PM Jungtaek Lim 
wrote:

> (bump to expose the discussion to more readers)
>
> On Mon, May 4, 2020 at 5:45 PM Jungtaek Lim 
> wrote:
>
>> Hi devs,
>>
>> I'm seeing more and more structured streaming end users encountered the
>> metadata issues on file stream source and sink. They have been known-issues
>> and there're even long-standing JIRA issues reported before, end users
>> report them again in user@ mailing list in April.
>>
>> * Spark Structure Streaming | FileStreamSourceLog not deleting list of
>> input files | Spark -2.4.0 [1]
>> * [Structured Streaming] Checkpoint file compact file grows big [2]
>>
>> I've proposed various improvements on the area (see my PRs [3]) but
>> suffered on lack of interests/reviews. I feel the issue is critical
>> (under-estimated) because...
>>
>> 1. It's one of "built-in" data sources which is being maintained by Spark
>> community. (End users may judge the state of project/area on the quality on
>> the built-in data source, because that's the thing they would start with.)
>> 2. It's the only built-in data source which provides "end-to-end
>> exactly-once" in structured streaming.
>>
>> I'd hope to see us address such issues so that end users can live with
>> built-in data source. (It may not need to be perfect, but at least be
>> reasonable on the long-run streaming workloads.) I know there're couple of
>> alternatives, but I don't think starter would start from there. End users
>> may just try to find alternatives - not alternative of data source, but
>> alternative of streaming processing framework.
>>
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>>
>> 1.
>> https://lists.apache.org/thread.html/r0916e2fe8181a58c20ee8a76341aae243c76bbfd8758d8d94f79fe8e%40%3Cuser.spark.apache.org%3E
>> 2.
>> https://lists.apache.org/thread.html/r0916e2fe8181a58c20ee8a76341aae243c76bbfd8758d8d94f79fe8e%40%3Cuser.spark.apache.org%3E
>> 3. https://github.com/apache/spark/pulls/HeartSaVioR
>>
>


Re: [VOTE] Apache Spark 3.0 RC2

2020-05-21 Thread Jungtaek Lim
Looks like there're new blocker issues newly figured out.

* https://issues.apache.org/jira/browse/SPARK-31786
* https://issues.apache.org/jira/browse/SPARK-31761 (not yet marked as
blocker but according to JIRA comment it's a regression issue as well as
correctness issue IMHO)

Let's collect the list of blocker issues so that RC3 won't miss them.

On Thu, May 21, 2020 at 2:12 AM Ryan Blue  wrote:

> Okay, I took a look at the PR and I think it should be okay. The new
> classes are unfortunately public, but are in catalyst which is considered
> private. So this is the approach we discussed.
>
> I'm fine with the commit, other than the fact that it violated ASF norms
>  to commit without waiting
> for a review.
>
> On Wed, May 20, 2020 at 10:00 AM Ryan Blue  wrote:
>
>> Why was https://github.com/apache/spark/pull/28523 merged with a -1? We
>> discussed this months ago and concluded that it was a bad idea to introduce
>> a new v2 API that cannot have reliable behavior across sources.
>>
>> The last time I checked that PR, the approach I discussed with Tathagata
>> was to not add update mode to DSv2. Instead, Tathagata gave a couple of
>> reasonable options to avoid it. Why were those not done?
>>
>> This is the second time this year that a PR with a -1 was merged. Does
>> the Spark community not follow the convention to build consensus before
>> merging changes?
>>
>> On Wed, May 20, 2020 at 12:13 AM Wenchen Fan  wrote:
>>
>>> Seems the priority of SPARK-31706 is incorrectly marked, and it's a
>>> blocker now. The fix was merged just a few hours ago.
>>>
>>> This should be a -1 for RC2.
>>>
>>> On Wed, May 20, 2020 at 2:42 PM rickestcode <
>>> matthias.harder...@gmail.com> wrote:
>>>
 +1



 --
 Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

 -
 To unsubscribe e-mail: dev-unsubscr...@spark.apache.org


>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Re: [DatasourceV2] Default Mode for DataFrameWriter not Dependent on DataSource Version

2020-05-21 Thread Russell Spitzer
Another related issue for backwards compatibility, In Datasource.scala
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L415-L416

Will get triggered even when the class is a Valid DatasourceV2 but being
used in a DatasourceV1 Context.

For example if I run


val createDDL =
  s"""CREATE TABLE IF NOT EXISTS $registerName
 |USING org.apache.spark.sql.cassandra
 |OPTIONS (
 | $optionString
 |)""".stripMargin

spark.sql(createDDL)


On the default catalog, I will get an exception

org.apache.spark.sql.AnalysisException: org.apache.spark.sql.cassandra is
not a valid Spark SQL Data Source.;

I can understand that perhaps we no longer want DSV2 sources to be able to
create session catalog entries anymore, but I think at a bare minimum we
should change the error message in case of a V2 source recommending the
DSV2 api

On Wed, May 20, 2020 at 5:07 PM Russell Spitzer 
wrote:

> I think those are fair concerns, I was mostly just updating tests for RC2
> and adding in "append" everywhere
>
> Code like
>
> spark.sql(s"SELECT a, b from $ks.test1")
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> "test_insert1", "keyspace" -> ks))
>   .save()
>
> Now fails at runtime, while it would have succeeded before. This is
> probably not a huge issue since the majority of actual usages aren't
> writing to empty tables.
>
> I think my main concern here is that a lot of our old demos and tutorials
> where
>
> * Make table outside of Spark
> * Write to table with spark
>
> Now obviously they can be done in a single operation in spark so that's
> probably the best path forward. The old pathway is pretty awkward, I just
> didn't really want it to break it didn't have to but I think having
> different defaults is definitely not intuitive.
>
> I think the majority of other use cases are "append" anyway so it's not a
> big pain for non-demo / just trying things out users.
>
> Thanks for commenting,
> Russ
>
>
> On Wed, May 20, 2020 at 5:00 PM Ryan Blue  wrote:
>
>> The context on this is that it was confusing that the mode changed, which
>> introduced different behaviors for the same user code when moving from v1
>> to v2. Burak pointed this out and I agree that it's weird that if your
>> dependency changes from v1 to v2, your compiled Spark job starts appending
>> instead of erroring out when the table exists.
>>
>> The work-around is to implement a new trait, SupportsCatalogOptions, that
>> allows you to extract a table identifier and catalog name from the options
>> in the DataFrameReader. That way, you can re-route to your catalog so that
>> Spark correctly uses a CreateTableAsSelect statement for ErrorIfExists
>> mode.
>> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsCatalogOptions.java
>>
>> On Wed, May 20, 2020 at 2:50 PM Russell Spitzer <
>> russell.spit...@gmail.com> wrote:
>>
>>>
>>> While the ScalaDocs for DataFrameWriter say
>>>
>>> /**
>>>  * Specifies the behavior when data or table already exists. Options 
>>> include:
>>>  * 
>>>  * `SaveMode.Overwrite`: overwrite the existing data.
>>>  * `SaveMode.Append`: append the data.
>>>  * `SaveMode.Ignore`: ignore the operation (i.e. no-op).
>>>  * `SaveMode.ErrorIfExists`: throw an exception at runtime.
>>>  * 
>>>  * 
>>>  * When writing to data source v1, the default option is `ErrorIfExists`. 
>>> When writing to data
>>>  * source v2, the default option is `Append`.
>>>  *
>>>  * @since 1.4.0
>>>  */
>>>
>>>
>>> As far as I can tell, using DataFrame writer with a TableProviding
>>> DataSource V2 will still default to ErrorIfExists which breaks existing
>>> code since DSV2 cannot support ErrorIfExists mode. I noticed in the history
>>> of DataframeWriter there were versions which differentiated between DSV2
>>> and DSV1 and set the mode accordingly but this seems to no longer be the
>>> case. Was this intentional? I feel like if we could
>>> have the default be based on the Source then upgrading code from DSV1 ->
>>> DSV2 would be much easier for users.
>>>
>>> I'm currently testing this on RC2
>>>
>>>
>>> Any thoughts?
>>>
>>> Thanks for your time as usual,
>>> Russ
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>


Re: [DISCUSS] "complete" streaming output mode

2020-05-21 Thread Jungtaek Lim
Thanks for the input, Burak!

The reason I started to think the complete mode is for niche case is that
the mode is most probably only helpful for the memory sink, once we address
the update mode properly. Kafka has compacted topic, JDBC can upsert, Delta
can merge, AFAIK Iceberg is in discussion about row level update. Most
probably external storages don't want to require end users to drop
everything they've been built to re-add inputs.

The complete mode is not the only way for dealing with the cases (the use
cases seem to be consolidated with dashboard). I totally agree the complete
mode "helps" to deal with the cases "natively" (make our life easier), but
it doesn't mean it's the only way to deal with. Please refer the previous
reply.

The root issue is that it's very unclear that how streaming output mode has
been affecting end users' arbitrary query. There're some wall of texts
in SPARK-31706 [1] (recorded in SPARK-31724 [2]) so I would skip reiterate
here, but the point is that streaming output mode is only effective on
streaming aggregation (not even for all stateful operations) and the
semantic is easily broken from the following operators. (The possibility of
data loss issue I've explained comes from there.)

Would we like to restrict operators being applied on streaming aggregation
and force keeping the semantic till the sink? (like letting the result
table of streaming aggregation still be keyed Dataframe, and prevent such
Dataframe to be back to non-keyed Dataframe) If we would like to continue
considering streaming output mode to be coupled with sink, that's also
valid approach and I'm also open for it.

ps. We seem to take the path where we provide the flexibility and restrict
via case by case. We are human and what we miss brings possible
correctness issue. That's why I had to document the caution of possible
correctness issue for global watermark. [3] There's no metric to track the
dropped rows so even if it happens between operators end users won't
indicate anything but get wrong output. (This is also what I've proposed to
improve so far, SPARK-24634 [4].) IMHO it would be better to struggle to be
theoretically correct, even it comes to discontinue support of some
functionalities.

1. https://issues.apache.org/jira/browse/SPARK-31706
2. https://issues.apache.org/jira/browse/SPARK-31724
3.
https://github.com/apache/spark/blob/master/docs/structured-streaming-programming-guide.md#limitation-of-global-watermark
4. http://issues.apache.org/jira/browse/SPARK-24634

On Thu, May 21, 2020 at 3:32 PM Burak Yavuz  wrote:

> Oh wow. I never thought this would be up for debate. I use complete mode
> VERY frequently for all my dashboarding use cases. Here are some of my
> thoughts:
>
> > 1. It destroys the purpose of watermark and forces Spark to maintain all
> of state rows, growing incrementally. It only works when all keys are
> bounded to the limited set.
>
> yes, this is a conscious architectural decision users need to make. There
> are many cases where state is finite and small.
>
> > 2. It has to provide all state rows as outputs per batch, hence the size
> of outputs is also growing incrementally.
>
> Not if you add a filter that filters the results of the aggregation output
> ;) For example, I have aggregations that filter out only the last week of
> data, and I use it in Complete mode without any issues. I don't remember if
> we drop filtered values from the state as well, but I haven't faced a
> memory issue yet (streams have been running since the creation of
> Structured Streaming).
>
> > 3. It has to truncate the target before putting rows which might not be
> trivial for external storage if it should be executed per batch.
>
> This is trivial and super cheap for certain data sources like JDBC, Delta,
> Iceberg. Again, it becomes a question of using the right tools and the
> right architecture to solve your problem. IMHO it's not a problem of the
> execution engine or the mode.
>
> > 4. It enables some operations like sort on streaming query or couple of
> more things. But it will not work cleanly (state won't keep up) under
> reasonably high input rate, and we have to consider how the operation will
> work for streaming output mode hence non-trivial amount of consideration
> has to be added to maintain the mode.
>
> I mean, I would want all pipelines that I build to work magically without
> me having to put any thought into it, but then I feel most people in this
> email list would be out of jobs. These are typical considerations that you
> need to put into how you architect data pipelines. If someone doesn't put
> thought into the scalability of their system then ¯\_(ツ)_/¯
>
> Let me know what you think!
>
> Best,
> Burak
>
>
> On Wed, May 20, 2020 at 10:29 PM Jungtaek Lim <
> kabhwan.opensou...@gmail.com> wrote:
>
>> Thanks for the voice, Shixiong!
>>
>> Thanks for sharing the use case of complete mode in practice. I agree
>> that's a valid use case where complete mode would help, 

Re: [DISCUSS] "complete" streaming output mode

2020-05-21 Thread Burak Yavuz
Oh wow. I never thought this would be up for debate. I use complete mode
VERY frequently for all my dashboarding use cases. Here are some of my
thoughts:

> 1. It destroys the purpose of watermark and forces Spark to maintain all
of state rows, growing incrementally. It only works when all keys are
bounded to the limited set.

yes, this is a conscious architectural decision users need to make. There
are many cases where state is finite and small.

> 2. It has to provide all state rows as outputs per batch, hence the size
of outputs is also growing incrementally.

Not if you add a filter that filters the results of the aggregation output
;) For example, I have aggregations that filter out only the last week of
data, and I use it in Complete mode without any issues. I don't remember if
we drop filtered values from the state as well, but I haven't faced a
memory issue yet (streams have been running since the creation of
Structured Streaming).

> 3. It has to truncate the target before putting rows which might not be
trivial for external storage if it should be executed per batch.

This is trivial and super cheap for certain data sources like JDBC, Delta,
Iceberg. Again, it becomes a question of using the right tools and the
right architecture to solve your problem. IMHO it's not a problem of the
execution engine or the mode.

> 4. It enables some operations like sort on streaming query or couple of
more things. But it will not work cleanly (state won't keep up) under
reasonably high input rate, and we have to consider how the operation will
work for streaming output mode hence non-trivial amount of consideration
has to be added to maintain the mode.

I mean, I would want all pipelines that I build to work magically without
me having to put any thought into it, but then I feel most people in this
email list would be out of jobs. These are typical considerations that you
need to put into how you architect data pipelines. If someone doesn't put
thought into the scalability of their system then ¯\_(ツ)_/¯

Let me know what you think!

Best,
Burak


On Wed, May 20, 2020 at 10:29 PM Jungtaek Lim 
wrote:

> Thanks for the voice, Shixiong!
>
> Thanks for sharing the use case of complete mode in practice. I agree
> that's a valid use case where complete mode would help, but I'm unsure
> enabling complete mode is the only way to deal with the use case.
>
> 1. Given it assumes pretty much small cardinality of the output, using
> "update" mode with leveraging external storages which can handle fast read
> and update would also work smoothly. e.g. Redis. The cons of approach is
> that it requires external storage to install and maintain (+ assuming
> there's data source implementation for the external storage).
>
> 2. I think "queryable state" (interactive queries) is the widely adopted
> technology for addressing such use case. It doesn't need to be accessed
> within the same driver, and it even doesn't need to assume the set of keys
> are bounded and small enough to fit in driver memory. Probably it requires
> major effort to implement and may need more effort to wrap with the table.
>
> Also worth noting that there's data loss issue on complete mode if end
> users try to union the result of streaming aggregation and non-streaming
> aggregation. While we may be able to restrict the query to not applying
> union after streaming aggregation, we have already lots of rules to
> restrict the problematic cases for arbitrary plans. It seems to be time to
> revisit theoretically (SPARK-31724 is for).
>
> We may need to provide the alternative for the possible use cases even we
> decide to drop complete mode. But before then it's more important to build
> a consensus that complete mode is only used for few use case (we need to
> collect these use cases of course) and the cost of maintenance exceeds the
> benefit. For sure I'm open for disagreement.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> On Thu, May 21, 2020 at 9:45 AM Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Hey Jungtaek,
>>
>> I totally agree with you about the issues of the complete mode you raised
>> here. However, not all streaming queries have unbounded states and
>> will grow quickly to a crazy state.
>>
>> Actually, I found the complete mode is pretty useful when the states are
>> bounded and small. For example, a user can build a realtime dashboard based
>> on daily aggregation results (only 365 or 366 keys in one year, so less
>> than 40k keys in 100 years) using memory sink in the following steps:
>>
>> - Write a streaming query to poll data from Kafka, calculate the
>> aggregation results, and save to the memory sink in the complete mode.
>> - In the same Spark application, start a thrift server with
>> "spark.sql.hive.thriftServer.singleSession=true" to expose the temp table
>> created by the memory sink through JDBC/ODBC.
>> - Connect a BI tool using JDBC/ODBC to query the temp table created by
>> the memory sink.
>> - Use the BI tool to