[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.2.0

2017-02-26 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15884790#comment-15884790
 ] 

Ofir Manor commented on SPARK-18057:


Sure - [~marmbrus] - what do you think? it resolved my Kafka 0.10.1 concern

> Update structured streaming kafka from 10.0.1 to 10.2.0
> ---
>
> Key: SPARK-18057
> URL: https://issues.apache.org/jira/browse/SPARK-18057
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.2.0

2017-02-26 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15884619#comment-15884619
 ] 

Ofir Manor commented on SPARK-18057:


[~c...@koeninger.org] I think my previous issue with upgrading to Kafka 0.10.1 
client has been resolved in Kafka 0.10.2 - since now newer clients DO support 
older brokers:
{quote}
The Java clients (producer and consumer) have acquired the ability to 
communicate with older brokers. Version 0.10.2 clients can talk to version 
0.10.0 or newer brokers. Note that some features are not available or are 
limited when older brokers are used.
{quote}
See https://kafka.apache.org/documentation/#upgrade_1020_notable
So it seems safe to upgrade Spark's Kafka client to 0.10.2, as it shouldn't 
bump up the minimum required version of the Kafka broker.
What do you think?

> Update structured streaming kafka from 10.0.1 to 10.2.0
> ---
>
> Key: SPARK-18057
> URL: https://issues.apache.org/jira/browse/SPARK-18057
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18475) Be able to provide higher parallelization for StructuredStreaming Kafka Source

2016-11-22 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15688246#comment-15688246
 ] 

Ofir Manor commented on SPARK-18475:


Cody, for me your are the main gatekeeper for everything Kafka and the main 
Kafka expert, so I wanted your perspective, not Michael's (except the generic 
"order" guarantee, which I still think does not exist).
I thought that if someone did the effort of building, testing and trying to 
contribute it, it is an indication that it hurts in the real world, especially 
when you said it is a repeated request. I guess in many places, getting a read 
access to a potentially huge, shared topic is not the same as having Kafka 
admin rights or being the only or main consumer or being able to easily fix bad 
past decisions around partitions and keys...
Anyway, it is totally up to you, you'll have to maintain it. I personally have 
no use for this feature.

> Be able to provide higher parallelization for StructuredStreaming Kafka Source
> --
>
> Key: SPARK-18475
> URL: https://issues.apache.org/jira/browse/SPARK-18475
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Burak Yavuz
>
> Right now the StructuredStreaming Kafka Source creates as many Spark tasks as 
> there are TopicPartitions that we're going to read from Kafka.
> This doesn't work well when we have data skew, and there is no reason why we 
> shouldn't be able to increase parallelism further, i.e. have multiple Spark 
> tasks reading from the same Kafka TopicPartition.
> What this will mean is that we won't be able to use the "CachedKafkaConsumer" 
> for what it is defined for (being cached) in this use case, but the extra 
> overhead is worth handling data skew and increasing parallelism especially in 
> ETL use cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18475) Be able to provide higher parallelization for StructuredStreaming Kafka Source

2016-11-20 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15680780#comment-15680780
 ] 

Ofir Manor commented on SPARK-18475:


I understand, but I disagree with you. I think you are mixing two very 
different things.
KAFKA does provide order guarantee within a Kafka partition. That is a basic 
design attribute of it.
SPARK RDD however, does not have any order guarantees. When you create and 
manipulate RDDs / Dataframes (regardless of their origin, Kafka or other), 
Spark does not explicitly tells you what is the order of elements. For example, 
operators on RDD do not specify whether they keep or break the internal order 
(and some of them do), there are no order-sensitive operators, there is no 
general-purpose order metadata etc.
I agree that with Kafka source, under some circumstances (like non-shuffling 
operators), the output of RDD manipulation may very well be in accordance to 
the input order (within an input Kafka partition). BUT, I never saw anywhere 
that this is an explicit guarantee that users can rely on, and not an artifact 
of the internal, current implementation.
This is relevant because you want to block here a potentially significant 
performance improvement (one relying on explicit, non-default configuration), 
just to maintain a property that Spark does not guarantee to keep.

> Be able to provide higher parallelization for StructuredStreaming Kafka Source
> --
>
> Key: SPARK-18475
> URL: https://issues.apache.org/jira/browse/SPARK-18475
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Burak Yavuz
>
> Right now the StructuredStreaming Kafka Source creates as many Spark tasks as 
> there are TopicPartitions that we're going to read from Kafka.
> This doesn't work well when we have data skew, and there is no reason why we 
> shouldn't be able to increase parallelism further, i.e. have multiple Spark 
> tasks reading from the same Kafka TopicPartition.
> What this will mean is that we won't be able to use the "CachedKafkaConsumer" 
> for what it is defined for (being cached) in this use case, but the extra 
> overhead is worth handling data skew and increasing parallelism especially in 
> ETL use cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18475) Be able to provide higher parallelization for StructuredStreaming Kafka Source

2016-11-17 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15674628#comment-15674628
 ] 

Ofir Manor commented on SPARK-18475:


I was just wondering if it actually works, but it seems you found a way to hack 
it (I thought you would need different consumer group per worker to avoid 
coordination by the broker, but it didn't seem like it). 
If it does provide a big perf boost in some cases, and it is not enabled by 
default, I personally don't have any objections.
[~c...@koeninger.org] - I didn't understand your objection. An RDD / dataset 
does not have any inherent order guarantees (same as a SQL result set), and the 
Kafka metadata per message (including topic, partition, offset) is exposed if 
someone really cares. If you guys have a smart hack that allows you to divide a 
specific partition into ranges and have different workers read different range 
of a partition in parallel, and if it does provide a significant perf boost, 
why not have it as an option? 
I don't think it should break correctness, as the boundries are decided anyway 
by the driver. 

> Be able to provide higher parallelization for StructuredStreaming Kafka Source
> --
>
> Key: SPARK-18475
> URL: https://issues.apache.org/jira/browse/SPARK-18475
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Burak Yavuz
>
> Right now the StructuredStreaming Kafka Source creates as many Spark tasks as 
> there are TopicPartitions that we're going to read from Kafka.
> This doesn't work well when we have data skew, and there is no reason why we 
> shouldn't be able to increase parallelism further, i.e. have multiple Spark 
> tasks reading from the same Kafka TopicPartition.
> What this will mean is that we won't be able to use the "CachedKafkaConsumer" 
> for what it is defined for (being cached) in this use case, but the extra 
> overhead is worth handling data skew and increasing parallelism especially in 
> ETL use cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18475) Be able to provide higher parallelization for StructuredStreaming Kafka Source

2016-11-17 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15674092#comment-15674092
 ] 

Ofir Manor commented on SPARK-18475:


Are you sure this is working? Having a visible perf effect?
As far as I know, the maximum parallelism of Kafka is the number of 
topic-partitions, by design. If your consumer group has more consumers than 
that, some of them will be just idle. This is because when reading, each 
partition is owned by a single consumer (that allocation of partitions to 
consumer is dynamic, as consumers joins and leaves). 
To quote an older source:
??The first thing to understand is that a topic partition is the unit of 
parallelism in Kafka. On both the producer and the broker side, writes to 
different partitions can be done fully in parallel. So expensive operations 
such as compression can utilize more hardware resources. On the consumer side, 
Kafka always gives a single partition’s data to one consumer thread. Thus, the 
degree of parallelism in the consumer (within a consumer group) is bounded by 
the number of partitions being consumed. Therefore, in general, the more 
partitions there are in a Kafka cluster, the higher the throughput one can 
achieve.??
https://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/

You could repartition the data in Spark after reading, to increase parallelism 
of Spark's processing.

> Be able to provide higher parallelization for StructuredStreaming Kafka Source
> --
>
> Key: SPARK-18475
> URL: https://issues.apache.org/jira/browse/SPARK-18475
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Burak Yavuz
>
> Right now the StructuredStreaming Kafka Source creates as many Spark tasks as 
> there are TopicPartitions that we're going to read from Kafka.
> This doesn't work well when we have data skew, and there is no reason why we 
> shouldn't be able to increase parallelism further, i.e. have multiple Spark 
> tasks reading from the same Kafka TopicPartition.
> What this will mean is that we won't be able to use the "CachedKafkaConsumer" 
> for what it is defined for (being cached) in this use case, but the extra 
> overhead is worth handling data skew and increasing parallelism especially in 
> ETL use cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18386) Batch mode SQL source for Kafka

2016-11-10 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15653821#comment-15653821
 ] 

Ofir Manor commented on SPARK-18386:


BTW [~c...@koeninger.org] - I think that filtering (by timestamp) can be done 
today  "the hard way" if the Kafka broker is 0.10.1.
The user could use the 0.10.1 client to get a list of offsets for his requested 
timestamp, then submit a job to spark using explicit offsets to be used by 
Spark's 0.10.0 client (quite ugly but should  work).

> Batch mode SQL source for Kafka
> ---
>
> Key: SPARK-18386
> URL: https://issues.apache.org/jira/browse/SPARK-18386
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Cody Koeninger
>
> An SQL equivalent to the DStream KafkaUtils.createRDD would be useful for 
> querying over a defined batch of offsets.
> The possibility of Kafka 0.10.1 time indexing (e.g. a batch from timestamp X 
> to timestamp Y) should be taken into account, even if not available in the 
> initial implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.1.0

2016-11-10 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15653415#comment-15653415
 ] 

Ofir Manor commented on SPARK-18057:


This is great, especially if we follow up by adding reading topics from a 
timestamp...
Do you suggest to replace the existing 0.10.0 client with 0.10.1 client, or 
switch to having multiple implementations? (similar to how dstream has 0.8 and 
0.10 implementation).
Due to Kafka architecture, older clients can talk to newer brokers, but newer 
clients can not talk to older brokers (therefore, during upgrades, you always 
upgrade the brokers first).
So, bumping up Spark's Kafka client version practically means de-supporting 
Kafka broker 0.10.0 by Structured Streaming, which is why I prefer adding a 
second implementation...
For context, 0.10.0 was released in May 2016, 0.10.1 was released this October.

> Update structured streaming kafka from 10.0.1 to 10.1.0
> ---
>
> Key: SPARK-18057
> URL: https://issues.apache.org/jira/browse/SPARK-18057
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18386) Batch mode SQL source for Kafka

2016-11-10 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15653388#comment-15653388
 ] 

Ofir Manor commented on SPARK-18386:


This would be really useful for me right now!
We have many bounded Kafka topics, for example, metrics of the last couple of 
months, web clicks of the last 7 days etc. It would be great to be able to just 
query them with Spark (I'd use "earliest" starting offsets in my case).
There is also an interesting interaction between structured streaming and 
regular queries, where each streaming batch recomputes the regular queries it 
depends on. It works with the file sources, I'd like to use that with Kafka 
source as well in some cases.
If would also be great if the external API will be as close to the current one 
({{spark.readStream.format("kafka").option(...)}}) as possible (same options 
etc), maybe just with {{spark.read.kakfa...}}?



> Batch mode SQL source for Kafka
> ---
>
> Key: SPARK-18386
> URL: https://issues.apache.org/jira/browse/SPARK-18386
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Cody Koeninger
>
> An SQL equivalent to the DStream KafkaUtils.createRDD would be useful for 
> querying over a defined batch of offsets.
> The possibility of Kafka 0.10.1 time indexing (e.g. a batch from timestamp X 
> to timestamp Y) should be taken into account, even if not available in the 
> initial implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-15 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15578699#comment-15578699
 ] 

Ofir Manor commented on SPARK-17812:


I think Michael suggest that if you use {{startingOffsets}} without using 
{assign}}, that could work (use the topic-partition list from the 
startingOffsets), and would simplify the user coding (not needing to specify 
two similar lists, simpler resume etc).
You could keep an explicit {{assign}} for the "more rare?" cases, if someone 
wants to specify a list of topic-partitions but also earliest / latest / 
timestamp.

> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>Assignee: Cody Koeninger
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions
> currently agreed on plan:
> Mutually exclusive subscription options (only assign is new to this ticket)
> {noformat}
> .option("subscribe","topicFoo,topicBar")
> .option("subscribePattern","topic.*")
> .option("assign","""{"topicfoo": [0, 1],"topicbar": [0, 1]}""")
> {noformat}
> where assign can only be specified that way, no inline offsets
> Single starting position option with three mutually exclusive types of value
> {noformat}
> .option("startingOffsets", "earliest" | "latest" | """{"topicFoo": {"0": 
> 1234, "1": -2}, "topicBar":{"0": -1}}""")
> {noformat}
> startingOffsets with json fails if any topicpartition in the assignments 
> doesn't have an offset.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573457#comment-15573457
 ] 

Ofir Manor commented on SPARK-17812:


I'm with you - I warned you it is bikeshedding...
I don't have a strong opinion, just a preference, and what you suggested is way 
better then the commited solution, so I'll get out of the loop.
Whatever [~marmbrus] and you are OK with - either way it would be a big step 
forward

> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573341#comment-15573341
 ] 

Ofir Manor commented on SPARK-17812:


Thanks Cody! great to have a concrete example.
I've some comments, but its mostly bikeshedding
1.  subscribe vs. subscribePattern --> personally, I would combine them both to 
"subscribe" - no need to burden the user with the different Kafka API nuances. 
It can get a list of discreet topics or a pattern.
2. It would be much clearer if "assign" was called subscribeSomething, so the 
user would choose one "subscribe.." and one (or more) "starting...".
Not sure I have a good name though - subscribeCustom?
You can even use the regular subscribe for that (and be smarter with the 
pattern matching) - I think it would just work, and if someone tries to be 
funny (combine astrerix and partitions) we could just error
3. I like startingTime... pretty neat.
We could hypothetically add {{.option("startingMessages", long)}} to support 
Michael's "just start with a 1000 recent messages"...
4. As I said before, I'd rather have all starting* be mutual-exclusive. Yes, it 
blocks some edge cases, on purpose,  but make the API and code way clearer 
(think about startingMessage interacting with startingOffsets etc).
I think that it would be easier to regret and allow multiple starting* in the 
future (opening all sorts of esoteric combinations) than clean it up in the 
future if users find it confusing and not needed.
Anyway, as long as it is functional I'm good with it, even if it less aesthetic.

> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)

2016-10-13 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573109#comment-15573109
 ] 

Ofir Manor commented on SPARK-17812:


Regarding (1) - of course it is *all* data in the source, as of query start. 
Just the same as file system directory or a database table - I'm not sure a 
disclaimer that the directory or table could have had different data in the 
past adds anything but confusion...
Anyway, the startingOffset is confusing because, it seems you want a different 
parameter for "assign" --> to explicitly specify starting offsets.
For you use case, I would add:
5. Give me nnn messages (not last ones). We still do one of the above options 
(trying to go back nnn messages, somehow split between the topic-partitions 
involved), but not provide a more explicit guarantee like "last nnn". 
Generally, the distribution of messages to partitions don't have to be 
round-robin or uniform, it is strongly based on the key (example, could be 
state, could be URL etc).
Anyway, I haven't seen a concrete suggestion on how to specify offsets or 
timestamp, so I think that would be the next step in this ticket (I suggested 
you could condense all to one option to avoid dependencies between options, but 
I don't have an elegant "stringly" suggestion)

> More granular control of starting offsets (assign)
> --
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets

2016-10-10 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15563273#comment-15563273
 ] 

Ofir Manor commented on SPARK-17812:


If you think it is useful, you should definitely propose something that covers 
it.
Personally, I can't imagine a case where someone will need that - but you have 
much more exposure to different users. 
I can only say that it is already covered by the strict "assign" (using only 
partitions that were explictly used) - the user will open a new consumer group, 
seek to the end, seek specific partitions to its preferred offset and pass the 
explictly all the topic-partitions-offsets to the "assign" option

BTW - just from the top of my head - maybe you want'll to add an helper 
function or two that:
- gets a specific consumer group, and return whatever object / magic string 
that is needed to configure a starting offset from Structured Streaming.
- Same as above, but reads from a Structured Streaming checkpoint and return a 
new consumer group (created, seeked, commited). That relates to the "DRP" 
ticket, and also just passed to the previous helper function (from checkpoint 
to a new streaming job).



> More granular control of starting offsets
> -
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek back {{X}} offsets in the stream from the moment the query starts
>  - seek to user specified offsets



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17815) Report committed offsets

2016-10-09 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15560906#comment-15560906
 ] 

Ofir Manor commented on SPARK-17815:


I totally understand your concerns, and yes, I hope you will continue to 
faithfully respond to all Kafka-related messages, and yes, you should insist on 
fixing what you think broken or unclear or too complex.
I just don't see how it relates to this ticket, which is explicitly about 
helping external monitoring tools that might rely on the Kafka consumer group 
advancing its offsets.
You have some problems with the framework, specifically with its fault 
tolerance semantics, not with some specific Kafka source implementation detail:
   
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#fault-tolerance-semantics
Since Kafka might be the first  or second "real" streaming source, it does make 
a lot of sense to validate / improve the framework based on this feedback. I 
think this ticket is the wrong place for this - it will have no visiblity. 
Maybe that should go the dev list, or to the blocking ticket (which is about 
the framework), or a new Structure Streaming JIRA (if there is a specific 
suggestion), or maybe to a SEP/SPIP, or if no committer is willing to revisit 
it or discuss it in any channel (as was with the Kafka source for many months), 
you'll eventually have to face that this is how this project is unfortunately 
managed.
Anyway, I think I maxed out my contribution to this thread. It should be 
continued with the right committers, wherever they want, not with me.

> Report committed offsets
> 
>
> Key: SPARK-17815
> URL: https://issues.apache.org/jira/browse/SPARK-17815
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Since we manage our own offsets, we have turned off auto-commit.  However, 
> this means that external tools are not able to report on how far behind a 
> given streaming job is.  When the user manually gives us a group.id, we 
> should report back to it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets

2016-10-09 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15560865#comment-15560865
 ] 

Ofir Manor commented on SPARK-17812:


Why? If you choose oldest, latest or by timestamp, you can combine it with a 
topic pattern.
If you explictly provide a list of specific topic-partition-offsets, Spark 
should respect that.
Choosing topics and choosing a starting position method are complementary 
choices

> More granular control of starting offsets
> -
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek back {{X}} offsets in the stream from the moment the query starts
>  - seek to user specified offsets



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17812) More granular control of starting offsets

2016-10-09 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15560820#comment-15560820
 ] 

Ofir Manor commented on SPARK-17812:


I'll have to think about it - not sure if it ever makes sense to provide custom 
offsets, skip some partitions, and expect a user-configurable default offsets 
for the missing ones.
I would actually suggest that if a user provide custom offsets, they would be 
used as is. For example, if I provide offsets only for topic 1 partition 
1,4,10, then the consumer group would subscribe to only these three partitions. 
In other words, custom assignment should be respected as is.

> More granular control of starting offsets
> -
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek back {{X}} offsets in the stream from the moment the query starts
>  - seek to user specified offsets



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17815) Report committed offsets

2016-10-09 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15560749#comment-15560749
 ] 

Ofir Manor commented on SPARK-17815:


Thanks Cody, this is much clearer.
(BTW - I've been bitten multiple times by HDFS corrupting files, especially 
with truncate() API, but that is a different story)
I think we are mixing two different discussions here.
Structured Streaming provides a framework and an algorithm, and expects all 
sources and sinks to align with that. The Kafka source is just one such example 
(and the Kafka sink discussion is about other limits of the current framework).
You have some concerns and reservation regarding the framework - both due to 
partial implementation so far and due to deeper concerns (mostly complexity and 
its likely effects).
I think the umbrella discussion (Structured Streaming Kafka source) is about 
conforming to the spec. This specific ticket is about an even smaller detail.
Of course, given that so far there were no real opportunities for the deeper, 
architectural discussion (or maybe it is just my perception), it might make 
sense to use every opportunity to try to raise and effect the higher-level 
issues. But I think at least we should be clear if we discuss something 
specific to the Kafka source for Structured Streaming, or things at the 
framework level. 
(Your SIP suggestion in the mailing list - if I understand correctly - is 
exactly about enabling that kind of discussion, right?)
Just my two cents.

> Report committed offsets
> 
>
> Key: SPARK-17815
> URL: https://issues.apache.org/jira/browse/SPARK-17815
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Since we manage our own offsets, we have turned off auto-commit.  However, 
> this means that external tools are not able to report on how far behind a 
> given streaming job is.  When the user manually gives us a group.id, we 
> should report back to it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-17812) More granular control of starting offsets

2016-10-09 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15560696#comment-15560696
 ] 

Ofir Manor edited comment on SPARK-17812 at 10/9/16 10:00 PM:
--

Cody, you are absolutely right that the option naming is silly and leads to a 
dead end. Maybe it could be fixed now, as this code haven't yet been released.
In general, I see just four useful options for a starting position:
1. Give me all messages - read all messages in the topics.
2. Ignore all current messages - read only new messages from now on.
3. Give me  all messages starting from timestamp t - that could be a filter on 
(1), or in Kafka 0.10.1+, pushed down to Kafka (in that version, the Kafka 
topic can also have either broker-generated timestamps, or user-provided event 
timestamps).
4. Give me all messages from a custom offset - for "advanced" cases

I can suggest something specific (though it will be a matter of taste, stringy 
or not), but generally, I think there should be a single way to specify where 
to start, and it should cover these four alternatives. Having a bunch of 
mutally-exclusive options seems confusing, and giving them the wrong names is 
even more so.

Regarding "last x offsets" - I don't really get it. It seems to assume that 
Kafka has a single offset space, which is quite alien to Kafka (a topic is a 
collection of independent, ordered partitions).
For example, a simple topic with four partitions. What is 1000 offsets back?
1. Last 1000 messages per partition? (4000 in total)
2. Last 250 messages per partition? (definitely NOT the last 1000 messages)
3. Read last 1000 messages per partition, then merge and keep the last 1000 
messages by timestamp? (provide a somewhat meaningful semantics, but is still a 
bit nonsense)
For me, neither of them makes sense, because the user actually says - I want 
random stuff and I don't care what it is... It is as if, for a database source, 
we would start with random 1000 rows, followed by careful work to capture every 
change without missing any.
I believe "last hour" would make a lot more sense, and if someone really wants 
some variation of this "last 1000 messages", he could just create a custom 
offset.
(UPDATE) BTW Cody, I now get why you insistent on consuming from Kafka based on 
timestamp, since May(!). It is the only option that isn't "start at a random 
point", but "start at a well-defined logical point"


was (Author: ofirm):
Cody, you are absolutely right that the option naming is silly and leads to a 
dead end. Maybe it could be fixed now, as this code haven't yet been released.
In general, I see just four useful options for a starting position:
1. Give me all messages - read all messages in the topics.
2. Ignore all current messages - read only new messages from now on.
3. Give me  all messages starting from timestamp t - that could be a filter on 
(1), or in Kafka 0.10.1+, pushed down to Kafka (in that version, the Kafka 
topic can also have either broker-generated timestamps, or user-provided event 
timestamps).
4. Give me all messages from a custom offset - for "advanced" cases

I can suggest something specific (though it will be a matter of taste, stringy 
or not), but generally, I think there should be a single way to specify where 
to start, and it should cover these four alternatives. Having a bunch of 
mutally-exclusive options seems confusing, and giving them the wrong names is 
even more so.

Regarding "last x offsets" - I don't really get it. It seems to assume that 
Kafka has a single offset space, which is quite alien to Kafka (a topic is a 
collection of independent, ordered partitions).
For example, a simple topic with four partitions. What is 1000 offsets back?
1. Last 1000 messages per partition? (4000 in total)
2. Last 250 messages per partition? (definitely NOT the last 1000 messages)
3. Read last 1000 messages per partition, then merge and keep the last 1000 
messages by timestamp? (provide a somewhat meaningful semantics, but is still a 
bit nonsense)
For me, neither of them makes sense, because the user actually says - I want 
random stuff and I don't care what it is... It is as if, for a database source, 
we would start with random 1000 rows, followed by careful work to capture every 
change without missing any.
I believe "last hour" would make a lot more sense, and if someone really wants 
some variation of this "last 1000 messages", he could just create a custom 
offset.

> More granular control of starting offsets
> -
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets availab

[jira] [Commented] (SPARK-17812) More granular control of starting offsets

2016-10-09 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15560696#comment-15560696
 ] 

Ofir Manor commented on SPARK-17812:


Cody, you are absolutely right that the option naming is silly and leads to a 
dead end. Maybe it could be fixed now, as this code haven't yet been released.
In general, I see just four useful options for a starting position:
1. Give me all messages - read all messages in the topics.
2. Ignore all current messages - read only new messages from now on.
3. Give me  all messages starting from timestamp t - that could be a filter on 
(1), or in Kafka 0.10.1+, pushed down to Kafka (in that version, the Kafka 
topic can also have either broker-generated timestamps, or user-provided event 
timestamps).
4. Give me all messages from a custom offset - for "advanced" cases

I can suggest something specific (though it will be a matter of taste, stringy 
or not), but generally, I think there should be a single way to specify where 
to start, and it should cover these four alternatives. Having a bunch of 
mutally-exclusive options seems confusing, and giving them the wrong names is 
even more so.

Regarding "last x offsets" - I don't really get it. It seems to assume that 
Kafka has a single offset space, which is quite alien to Kafka (a topic is a 
collection of independent, ordered partitions).
For example, a simple topic with four partitions. What is 1000 offsets back?
1. Last 1000 messages per partition? (4000 in total)
2. Last 250 messages per partition? (definitely NOT the last 1000 messages)
3. Read last 1000 messages per partition, then merge and keep the last 1000 
messages by timestamp? (provide a somewhat meaningful semantics, but is still a 
bit nonsense)
For me, neither of them makes sense, because the user actually says - I want 
random stuff and I don't care what it is... It is as if, for a database source, 
we would start with random 1000 rows, followed by careful work to capture every 
change without missing any.
I believe "last hour" would make a lot more sense, and if someone really wants 
some variation of this "last 1000 messages", he could just create a custom 
offset.

> More granular control of starting offsets
> -
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek back {{X}} offsets in the stream from the moment the query starts
>  - seek to user specified offsets



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17815) Report committed offsets

2016-10-09 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15560661#comment-15560661
 ] 

Ofir Manor commented on SPARK-17815:


I hope others with better understanding of Spark internals can comment, but 
still:
1. "The kafka commit log cant be ignored as merely for metric collection 
either" - that is exactly how I read this specific ticket (title and 
description)...
The way I understand it, when starting a Structured Streaming job for the first 
time, as of current trunk, a new consumer group is generated, with offsets 
being set based on a Spark source option, not based on Kafka defaults. After 
failure, the offsets of the consumer group in Kafka are ignored, on purpose, 
and are overidden (seek?) by the Structured Streaming infra based on its 
internal checkpoint. So, I don't get your comment about it. If during recovery 
the offsets are not set correctly or some corner case / exception is not 
handled, it is probably a bug in the Structured Streaming Kafka source that 
should be reported and fixed.
2. Regarding your WAL comment - not sure you are accurate. The WAL should be 
written to HDFS at the beginning of a batch, and a checkpoint at the end of the 
batch. So, not sure to which corruption scenario do you imply? You are not 
referring to HDFS bugs, right? Is it just a potential vector for Spark-induced 
file corruptions? I assume that generally, if the checkpoint is corrupted or if 
the WAL is corrupted, the job would fail, as it can't guarantee exactly-once.

> Report committed offsets
> 
>
> Key: SPARK-17815
> URL: https://issues.apache.org/jira/browse/SPARK-17815
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Since we manage our own offsets, we have turned off auto-commit.  However, 
> this means that external tools are not able to report on how far behind a 
> given streaming job is.  When the user manually gives us a group.id, we 
> should report back to it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17815) Report committed offsets

2016-10-09 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15559549#comment-15559549
 ] 

Ofir Manor commented on SPARK-17815:


Good, we are on the same page, no argument that SPARK-16963 blocks this issue. 
My only points were about the current ticket - that reporting committed offsets 
should be done by default, not based on a non-default parameter, and that 
setting group.id or a prefix of it is a great suggestion, but currently blocked.

> Report committed offsets
> 
>
> Key: SPARK-17815
> URL: https://issues.apache.org/jira/browse/SPARK-17815
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Since we manage our own offsets, we have turned off auto-commit.  However, 
> this means that external tools are not able to report on how far behind a 
> given streaming job is.  When the user manually gives us a group.id, we 
> should report back to it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17815) Report committed offsets

2016-10-09 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15559541#comment-15559541
 ] 

Ofir Manor commented on SPARK-17815:


As far as I understand, there is a clear single-point-of-truth: the structured 
streaming "commit log" - the checkpoint. It holds both the source state 
(offsets) and the Spark state (aggregations) of successfully finished batches 
atomically, and is the one that is used during recovery to identify the correct 
beginning offset in the source during recovery.
The structured WAL is a technical, internal implementation detail, that stores 
an intention to process a range of offsets, before they are actually read. 
Spark used it during recovery to repeat the same source end boundary to a 
failed batch.
The data in the downstream store is about Spark output - which [version,spark 
partition] have landed - not about source state. Of course, it is being used 
during Spark recovery / retry, but not as a basis to choose a offsets in the 
source (it is used to skip specific output version-partitions there were 
already written).
As this ticket states, updating the Kafka consumer group offsets in Kafka is 
only for easier progress monitoring using Kafka-specific tools. So, it should 
be considered informational, after-the-fact updating just for being nice, as it 
won't be used for Spark recovery. If a user want to manually recover, it should 
rely on the Spark checkpoint offset.
In other words, updating Kafka offsets after a batch successfully commited 
means that the offsets in Kafka represent which messages have been successfully 
processed and landed in the sink, not which messages have been read. 
[~marmbrus] Is my understanding correct?

> Report committed offsets
> 
>
> Key: SPARK-17815
> URL: https://issues.apache.org/jira/browse/SPARK-17815
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Since we manage our own offsets, we have turned off auto-commit.  However, 
> this means that external tools are not able to report on how far behind a 
> given streaming job is.  When the user manually gives us a group.id, we 
> should report back to it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka

2016-10-07 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15554426#comment-15554426
 ] 

Ofir Manor commented on SPARK-15406:


Thanks Micahel for the checkpoint location explanation!  I didn't understand it 
after reading the docs. That should solve the "exactly-once" source restart 
challenge.

Regarding exactly-once to Kafka sink - we have implemented exactly that in our 
product (not within a Spark job). You are right that when retrying (and only 
when retrying), some scanning is needed (as I wrote, you need to know which 
messages of this [version,partition] have landed). A lot of it can be optimized 
though. Quick examples:
1. The Foreach sink open() call needs to recover that list (which messages have 
landed) only when retrying, so it is needed only in the first [version,*] after 
failure - that could be passed as a flag to open(). 
2. Hopefully the Structured Streaming checkpoint could hold the offset within 
the sink as part of the commited [version, partition] metadata, so that could 
significantly cut down the actual scanning (provide a starting point).
3. The actual scanning of the sink could be stopped (at least in a case of 
Kafka partition) once we read messages of [version+1], if I understand the 
Structured Streaming internals well enough (as Kafka partition provides order 
guarantee.

Anyway, the Kafka community have been discussing "exactly-once" as something 
that will arrive, and will likely deserve an "1.0" release, so maybe that would 
solve itself. I see Cody have more up-to-date insights on that...

> Structured streaming support for consuming from Kafka
> -
>
> Key: SPARK-15406
> URL: https://issues.apache.org/jira/browse/SPARK-15406
> Project: Spark
>  Issue Type: New Feature
>Reporter: Cody Koeninger
>
> This is the parent JIRA to track all the work for the building a Kafka source 
> for Structured Streaming. Here is the design doc for an initial version of 
> the Kafka Source.
> https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing
> == Old description =
> Structured streaming doesn't have support for kafka yet.  I personally feel 
> like time based indexing would make for a much better interface, but it's 
> been pushed back to kafka 0.10.1
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17815) Report committed offsets

2016-10-06 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15553662#comment-15553662
 ] 

Ofir Manor commented on SPARK-17815:


I think this is a good idea.There is a minor confusion here, though, as setting 
group.id is explictly blocked as far as I understand (it is even 
documented...). So, it might need rephrasing.
1. I think auto-commit should be off, and the driver should manually commit 
kafka offsets after it successfully commits a batch to HDFS (when a batch is 
over), so monitoring will work. I think that should happen unconditionally, 
unless there is a concrete performance / overhead concerns (commiting offsets 
to Kafka too frequently?)
2. Regarding manually setting group.id - that would be great. If there is a 
concern that users might mess up (reuse the group.id by mistake), at least 
allow setting a prefix to it (and a way to get the actual group.id)

> Report committed offsets
> 
>
> Key: SPARK-17815
> URL: https://issues.apache.org/jira/browse/SPARK-17815
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Michael Armbrust
>
> Since we manage our own offsets, we have turned off auto-commit.  However, 
> this means that external tools are not able to report on how far behind a 
> given streaming job is.  When the user manually gives us a group.id, we 
> should report back to it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka

2016-10-06 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15553613#comment-15553613
 ] 

Ofir Manor commented on SPARK-15406:


I see three somewhat-related issues:
1. More flexibility is generally needed in stating offsets when using a Kafka 
source - that moved to SPARK-17812.
2. Regarding exactly-once, what I'm missing (given a transactional / idempotent 
sink) is the ability to restart a failed structured streaming job (after it 
crashed or was killed) by submitting a new one and asking to "copy" the context 
(offset and internal state) of another job / query.
The programing guide hints that it is possible, but doesn't give an example 
(and I couldn't find a relevant method):
"In case of a failure or intentional shutdown, you can recover the previous 
progress and state of a previous query, and continue where it left off".
If that was possible, it would remove the need to manually monitor / manage 
source offsets for exactly-once.
However, this is not specific to Kafka source - it is relevant for all 
fault-tolerant sources.
3. Another related issue (though out-of-scope of this JIRA) is adding an 
"exactly-once" Kafka sink.
Since in Kafka we can't commit a batch of messages together (like in a Foreach 
sink), the open of [version,partition] can't just return a boolean - it should 
likely return the messages within a [version,partition] that were already 
written so only those will be filtered out (or otherwise filter within the 
partition before process() is called).
That again is not Kafka-specific, will be useful in other non-transactional 
sinks,



> Structured streaming support for consuming from Kafka
> -
>
> Key: SPARK-15406
> URL: https://issues.apache.org/jira/browse/SPARK-15406
> Project: Spark
>  Issue Type: New Feature
>Reporter: Cody Koeninger
>
> This is the parent JIRA to track all the work for the building a Kafka source 
> for Structured Streaming. Here is the design doc for an initial version of 
> the Kafka Source.
> https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing
> == Old description =
> Structured streaming doesn't have support for kafka yet.  I personally feel 
> like time based indexing would make for a much better interface, but it's 
> been pushed back to kafka 0.10.1
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka

2016-10-06 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15553372#comment-15553372
 ] 

Ofir Manor commented on SPARK-15406:


Thanks Michael, I'll check the JIRAs, and will also post some feedback down the 
line once I try things out 

> Structured streaming support for consuming from Kafka
> -
>
> Key: SPARK-15406
> URL: https://issues.apache.org/jira/browse/SPARK-15406
> Project: Spark
>  Issue Type: New Feature
>Reporter: Cody Koeninger
>
> This is the parent JIRA to track all the work for the building a Kafka source 
> for Structured Streaming. Here is the design doc for an initial version of 
> the Kafka Source.
> https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing
> == Old description =
> Structured streaming doesn't have support for kafka yet.  I personally feel 
> like time based indexing would make for a much better interface, but it's 
> been pushed back to kafka 0.10.1
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka

2016-10-06 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15553161#comment-15553161
 ] 

Ofir Manor commented on SPARK-15406:


Alright, I do hope you all find a way to resolve this and switch to a more 
productive relationship, I highly appreciate all your efforts. That alternative 
ticket is definitely pretty nasty (and recent Kafka versions makes log 
compaction more versatile, so it might get used more often) 


> Structured streaming support for consuming from Kafka
> -
>
> Key: SPARK-15406
> URL: https://issues.apache.org/jira/browse/SPARK-15406
> Project: Spark
>  Issue Type: New Feature
>Reporter: Cody Koeninger
>
> This is the parent JIRA to track all the work for the building a Kafka source 
> for Structured Streaming. Here is the design doc for an initial version of 
> the Kafka Source.
> https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing
> == Old description =
> Structured streaming doesn't have support for kafka yet.  I personally feel 
> like time based indexing would make for a much better interface, but it's 
> been pushed back to kafka 0.10.1
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka

2016-10-06 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15553076#comment-15553076
 ] 

Ofir Manor commented on SPARK-15406:


Thanks Cody! I missed the activity in the subtask PR...
It is great to see basic support merge, I'll try it out next week. I hope that 
now that the "basics" are merged, you will be allowed to continue hacking it - 
personally looking forward to any sort of "assign" support (explicitly 
providing a startingOffset or providing pre-seeked consumer group)
Just curious - given that 0.10.1 with KIP-79 should be released in a couple of 
weeks, are you planning to add a separate kafka 0.10.1 artifact to allow "seek 
to timestamp" (maybe an optional startingTimestamp instead of startingOffset)? 
I think you were passionate about it 

> Structured streaming support for consuming from Kafka
> -
>
> Key: SPARK-15406
> URL: https://issues.apache.org/jira/browse/SPARK-15406
> Project: Spark
>  Issue Type: New Feature
>Reporter: Cody Koeninger
>
> This is the parent JIRA to track all the work for the building a Kafka source 
> for Structured Streaming. Here is the design doc for an initial version of 
> the Kafka Source.
> https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing
> == Old description =
> Structured streaming doesn't have support for kafka yet.  I personally feel 
> like time based indexing would make for a much better interface, but it's 
> been pushed back to kafka 0.10.1
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka

2016-10-05 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15548350#comment-15548350
 ] 

Ofir Manor commented on SPARK-15406:


Checking back - is there any progress on the API or code? I'll be happy to test 
once some code is posted for review


> Structured streaming support for consuming from Kafka
> -
>
> Key: SPARK-15406
> URL: https://issues.apache.org/jira/browse/SPARK-15406
> Project: Spark
>  Issue Type: New Feature
>Reporter: Cody Koeninger
>
> This is the parent JIRA to track all the work for the building a Kafka source 
> for Structured Streaming. Here is the design doc for an initial version of 
> the Kafka Source.
> https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing
> == Old description =
> Structured streaming doesn't have support for kafka yet.  I personally feel 
> like time based indexing would make for a much better interface, but it's 
> been pushed back to kafka 0.10.1
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka

2016-09-13 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15488487#comment-15488487
 ] 

Ofir Manor commented on SPARK-15406:


Cody, I think you are right. Now is the right time to spend a several days 
iterating over the design. Commiting a new, half-baked API that will need to be 
maintained for years is exactly what we all try to avoid.
Of course, v1 doesn't have to implement a lot of features, but it should be 
future-compatible with the rest of the planned improvements.
(as a side note - I haven't seen a timeline for 2.0.1 being discussed, so I'm 
not sure what are the expectations regarding "v1" delivery)

> Structured streaming support for consuming from Kafka
> -
>
> Key: SPARK-15406
> URL: https://issues.apache.org/jira/browse/SPARK-15406
> Project: Spark
>  Issue Type: New Feature
>Reporter: Cody Koeninger
>
> This is the parent JIRA to track all the work for the building a Kafka source 
> for Structured Streaming. Here is the design doc for an initial version of 
> the Kafka Source.
> https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing
> == Old description =
> Structured streaming doesn't have support for kafka yet.  I personally feel 
> like time based indexing would make for a much better interface, but it's 
> been pushed back to kafka 0.10.1
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka

2016-08-31 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15453412#comment-15453412
 ] 

Ofir Manor commented on SPARK-15406:


For me - structured streaming is currently all about real window operations 
based on event time (fields in the event), not processing time (already in 2.0 
with some limitations). In a future release it may also be about some new 
sink-related features (managing exactly-once from Spark to relational databases 
or HDFS, automatically doing upserts to databases).
So, I just want the same Kafka features as before - the value is the new 
processing capabilities, it just happens that my source of real-time events is 
Kafka,not Parquet files (as in 2.0).
I expect a couple of things. First, some basic config control like a pointer to 
Kafka (bootstrap servers), one or more topics, optionally an existing consumer 
group or an offset definition, optionally kerberised connection. I also expect 
exactly-once processing from Kafka to Spark (including correctly recovering 
after Spark node failure)

> Structured streaming support for consuming from Kafka
> -
>
> Key: SPARK-15406
> URL: https://issues.apache.org/jira/browse/SPARK-15406
> Project: Spark
>  Issue Type: New Feature
>Reporter: Cody Koeninger
>
> Structured streaming doesn't have support for kafka yet.  I personally feel 
> like time based indexing would make for a much better interface, but it's 
> been pushed back to kafka 0.10.1
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka

2016-08-31 Thread Ofir Manor (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15453289#comment-15453289
 ] 

Ofir Manor commented on SPARK-15406:


Cody, why do you think Structured Streaming support for Kafka requires that 
specific feature (time-indexing)?
Personally, I have a couple of objections:
# Technically, I think starting a stream from a timestamp is really nice, but 
definitely optional. We could start by letting the user choose between latest, 
oldest and user-provided Kafka offset object (offset per partition per topic), 
like every other Kafka consumer today. We could definitely add timestamp as 
another option when that is released
For me, starting by latest and by user-provided Kafka offset is what I'd like 
to use, though I can see myself wanting to use start from timestamp in some 
cases.
# Non-technically, I think this Kafka source would be very popular. So, I think 
wishing to support only the next (future) release of Kafka is 
counter-productive, as it won't work with most Kafka clusters out there after 
the release.
Of course, supporting currently deployed Kafka cluster likely means 0.8.2.x 
support, which is the old consumer... So it is additional, duplicate work, but 
I think is critical.

WDYT?

> Structured streaming support for consuming from Kafka
> -
>
> Key: SPARK-15406
> URL: https://issues.apache.org/jira/browse/SPARK-15406
> Project: Spark
>  Issue Type: New Feature
>Reporter: Cody Koeninger
>
> Structured streaming doesn't have support for kafka yet.  I personally feel 
> like time based indexing would make for a much better interface, but it's 
> been pushed back to kafka 0.10.1
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org