Github user koeninger commented on the issue:
https://github.com/apache/spark/pull/15102
> I pushed for this code to be copied rather than refactored because I
think this is the right direction long term. While it is nice to minimize
inter-project dependencies, that is not really the motivation. While the code
is very similar now, there a bunch of things I'd like to start changing:
A bigger concern for me at this point is that the code was copied, and then
modified in ways that don't seem to have anything to do with the necessities of
structured streaming (e.g. your "why is this a nested class" comment).
Options from my point of view, from best to worst
1. Refactor to a common submodule. Based on how little I had to change to
get the common functionality in my branch, I think this is going to initially
leave most things untouched. If things change in the future, they can be
refactored / copied as necessary. I think this minimizes the chance that
someone fixes a bug in Dstream cached consumer, and forgets to fix in sql
cached consumer, or vice versa.
2. Copy without changes, make only minimal changes necessary at first.
This is going to make what happened more obvious, and make it easier to
maintain changes across both pieces of code
3. Copy and make unnecessary changes (what seems to have been done
currently). This seems like a maintenance nightmare for no gain.
>
> I don't think that all the classes need to be type parameterized. Our
interface SQL has its own type system, analyser, and interface to the type
system of the JVM (encoders). We should be using that. Operators in SQL do not
type parameterize in general.
> To optimize performance, there are several tricks we might want to play
eventually (maybe prefetching data during execution, etc).
Kafka consumers prefetch data already, that's the main reason the
CachedKafkaConsumer exists. My thought here is that there isn't much gain to
be had with something more than a thin shim around a Kafka rdd, or at least not
for a while. Kafka's data model doesn't really allow for much in terms of
pushdown optimizations (you basically get to query by offset, or maybe time).
About the only idea I've heard that might have promise was Reynold suggesting
scheduling straight map jobs as long-running kafka consumers in a poll loop on
the executors, to avoid batching latency. But that seems to open a whole can
of worms in terms of deterministic behavior, and is probably much further down
the road. If we get there, what's the harm in cutting shared dependencies at
that point rather than now?
> These are just ideas, but given that DStreams and Structured Streaming
have significantly different models and user interfaces, I don't think that we
want to tie ourselves to the same internals. If we identify utilities that are
needed by both, then we should pull those out and share them.
At this point, the shared need is basically everything except KafkaUtils'
static constructors, and the parts of the DirectStream related to the DStream
interface. You still need an rdd, a cache for consumers, offset ranges, a way
to configure consumers, a way to configure locality, a consumer running on the
driver to get latest offsets...
> We don't need to handle the general problem of is kafka Offset A from
Topic 1 before or after kafka Offset B from Topic 2.
>
> Does x: KafkaOffset == y: KafkaOffset (i.e. is there new data since the
last time I checked)?
We do need to handle it comparing completely different topicpartitions,
because it's entirely possible to have a job with a single topicpartition A,
which is deleted or unsubscribed, and then single topicpartition B is added, in
the space of one batch. I have talked to companies that are actually doing
this kind of thing. If all we need to do is be able to tell that one sql
offset (that we already knew about) is different from another sql offset (that
we just learned about), then I think it's pretty straightforward - your three
cases are
* error. ahead in some common topicpartitions, and behind in others.
* equal. same kafka offsets for same topicpartitions
* not equal. different offsets for same topicpartitions, and/or different
topicpartitions
That does imply that any ordering of sql Offsets is by when we learn about
them in processing time, which sounds suspect, but...
> The final version of this Source should almost certainly support
wildcards with topicpartitions that change on the fly. Since it seems this is
one of the harder problems to solve, as a strawman, I'd propose that we only
support static lists of topics in this PR and possibly even static partitions.
I want to get users to kick the tires on structured streaming in general and
report whats missing so we can all prioritize our engineering effort.
The problem I see with that approach is that doing it right may require
changing the structured streaming interface, which gets progressively harder to
do the more users and more implementations there are relying on it. I think
it's generally best to learn about your items of highest risk first (even if
you don't totally solve them in version 1.0).
> I typically open PRs against the PR author's branch when I want to
collaborate more directly.
Sorry, I meant this less in terms of github mechanics and more in terms of
process - are you going to consider pull requests against this branch, what
pieces make sense to split off, etc. For a specific initial question, is your
mind made up about the submodule thing, or if I do that refactor will you at
least look at it?
> df.withColumn("value", $"value".cast("string"))
> I'd also like to create expressions that work well with kafka specific
deserializers, as well as integrations with our json parsing and maybe Avro
too. The nice thing about this path is it fits with other SQL APIs and should
play nicely with Dataset Encoders, UDFs, our expression library, etc.
>
> Does that seem reasonable? Is this missing important cases?
Yeah, that seems reasonable. Yeah, the builtin types, json, and maybe avro
are the ones to hit. I'd be 100% on board if we had one working example of a
specific deserializer, that didn't require access to private spark classes, so
people could tell how to do it for whatever serialization scheme they are using.
My guess is someone's going to want an implicit api to let them just call
.stringKeyValue or whatever on the dataframe because all they care about is
those two columns, but that's really straightforward for people to do
themselves.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]