Github user koeninger commented on the issue:

    https://github.com/apache/spark/pull/15102
  
    > I pushed for this code to be copied rather than refactored because I 
think this is the right direction long term. While it is nice to minimize 
inter-project dependencies, that is not really the motivation. While the code 
is very similar now, there a bunch of things I'd like to start changing:
    
    A bigger concern for me at this point is that the code was copied, and then 
modified in ways that don't seem to have anything to do with the necessities of 
structured streaming (e.g. your "why is this a nested class" comment).
    
    Options from my point of view, from best to worst
    
    1. Refactor to a common submodule.  Based on how little I had to change to 
get the common functionality in my branch, I think this is going to initially 
leave most things untouched.  If things change in the future, they can be 
refactored / copied as necessary.  I think this minimizes the chance that 
someone fixes a bug in Dstream cached consumer, and forgets to fix in sql 
cached consumer, or vice versa.
    
    2. Copy without changes, make only minimal changes necessary at first.  
This is going to make what happened more obvious, and make it easier to 
maintain changes across both pieces of code
    
    3.  Copy and make unnecessary changes (what seems to have been done 
currently).  This seems like a maintenance nightmare for no gain.
    
    
    > 
    > I don't think that all the classes need to be type parameterized. Our 
interface SQL has its own type system, analyser, and interface to the type 
system of the JVM (encoders). We should be using that. Operators in SQL do not 
type parameterize in general.
    > To optimize performance, there are several tricks we might want to play 
eventually (maybe prefetching data during execution, etc).
    
    Kafka consumers prefetch data already, that's the main reason the 
CachedKafkaConsumer exists.  My thought here is that there isn't much gain to 
be had with something more than a thin shim around a Kafka rdd, or at least not 
for a while.  Kafka's data model doesn't really allow for much in terms of 
pushdown optimizations (you basically get to query by offset, or maybe time).  
About the only idea I've heard that might have promise was Reynold suggesting 
scheduling straight map jobs as long-running kafka consumers in a poll loop on 
the executors, to avoid batching latency.  But that seems to open a whole can 
of worms in terms of deterministic behavior, and is probably much further down 
the road.  If we get there, what's the harm in cutting shared dependencies at 
that point rather than now?
    
    > These are just ideas, but given that DStreams and Structured Streaming 
have significantly different models and user interfaces, I don't think that we 
want to tie ourselves to the same internals. If we identify utilities that are 
needed by both, then we should pull those out and share them.
    
    At this point, the shared need is basically everything except KafkaUtils' 
static constructors, and the parts of the DirectStream related to the DStream 
interface.  You still need an rdd, a cache for consumers, offset ranges, a way 
to configure consumers, a way to configure locality, a consumer running on the 
driver to get latest offsets...
    
    > We don't need to handle the general problem of is kafka Offset A from 
Topic 1 before or after kafka Offset B from Topic 2. 
    > 
    > Does x: KafkaOffset == y: KafkaOffset (i.e. is there new data since the 
last time I checked)?
    
    We do need to handle it comparing completely different topicpartitions, 
because it's entirely possible to have a job with a single topicpartition A, 
which is deleted or unsubscribed, and then single topicpartition B is added, in 
the space of one batch.  I have talked to companies that are actually doing 
this kind of thing.  If all we need to do is be able to tell that one sql 
offset (that we already knew about) is different from another sql offset (that 
we just learned about), then I think it's pretty straightforward - your three 
cases are
    
    * error.  ahead in some common topicpartitions, and behind in others.
    * equal.  same kafka offsets for same topicpartitions
    * not equal.  different offsets for same topicpartitions, and/or different 
topicpartitions
    
    That does imply that any ordering of sql Offsets is by when we learn about 
them in processing time, which sounds suspect, but...
    
    > The final version of this Source should almost certainly support 
wildcards with topicpartitions that change on the fly. Since it seems this is 
one of the harder problems to solve, as a strawman, I'd propose that we only 
support static lists of topics in this PR and possibly even static partitions. 
I want to get users to kick the tires on structured streaming in general and 
report whats missing so we can all prioritize our engineering effort.
    
    The problem I see with that approach is that doing it right may require 
changing the structured streaming interface, which gets progressively harder to 
do the more users and more implementations there are relying on it.  I think 
it's generally best to learn about your items of highest risk first (even if 
you don't totally solve them in version 1.0).
    
    > I typically open PRs against the PR author's branch when I want to 
collaborate more directly.
    
    Sorry, I meant this less in terms of github mechanics and more in terms of 
process - are you going to consider pull requests against this branch, what 
pieces make sense to split off, etc.  For a specific initial question, is your 
mind made up about the submodule thing, or if I do that refactor will you at 
least look at it?
    
    > df.withColumn("value", $"value".cast("string"))
    > I'd also like to create expressions that work well with kafka specific 
deserializers, as well as integrations with our json parsing and maybe Avro 
too. The nice thing about this path is it fits with other SQL APIs and should 
play nicely with Dataset Encoders, UDFs, our expression library, etc.
    > 
    > Does that seem reasonable? Is this missing important cases?
    
    Yeah, that seems reasonable.  Yeah, the builtin types, json, and maybe avro 
are the ones to hit.  I'd be 100% on board if we had one working example of a 
specific deserializer, that didn't require access to private spark classes, so 
people could tell how to do it for whatever serialization scheme they are using.
    
    My guess is someone's going to want an implicit api to let them just call 
.stringKeyValue or whatever on the dataframe because all they care about is 
those two columns, but that's really straightforward for people to do 
themselves.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to