[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.2.0
[ https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15884790#comment-15884790 ] Ofir Manor commented on SPARK-18057: Sure - [~marmbrus] - what do you think? it resolved my Kafka 0.10.1 concern > Update structured streaming kafka from 10.0.1 to 10.2.0 > --- > > Key: SPARK-18057 > URL: https://issues.apache.org/jira/browse/SPARK-18057 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Reporter: Cody Koeninger > > There are a couple of relevant KIPs here, > https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.2.0
[ https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15884619#comment-15884619 ] Ofir Manor commented on SPARK-18057: [~c...@koeninger.org] I think my previous issue with upgrading to Kafka 0.10.1 client has been resolved in Kafka 0.10.2 - since now newer clients DO support older brokers: {quote} The Java clients (producer and consumer) have acquired the ability to communicate with older brokers. Version 0.10.2 clients can talk to version 0.10.0 or newer brokers. Note that some features are not available or are limited when older brokers are used. {quote} See https://kafka.apache.org/documentation/#upgrade_1020_notable So it seems safe to upgrade Spark's Kafka client to 0.10.2, as it shouldn't bump up the minimum required version of the Kafka broker. What do you think? > Update structured streaming kafka from 10.0.1 to 10.2.0 > --- > > Key: SPARK-18057 > URL: https://issues.apache.org/jira/browse/SPARK-18057 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Reporter: Cody Koeninger > > There are a couple of relevant KIPs here, > https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18475) Be able to provide higher parallelization for StructuredStreaming Kafka Source
[ https://issues.apache.org/jira/browse/SPARK-18475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15688246#comment-15688246 ] Ofir Manor commented on SPARK-18475: Cody, for me your are the main gatekeeper for everything Kafka and the main Kafka expert, so I wanted your perspective, not Michael's (except the generic "order" guarantee, which I still think does not exist). I thought that if someone did the effort of building, testing and trying to contribute it, it is an indication that it hurts in the real world, especially when you said it is a repeated request. I guess in many places, getting a read access to a potentially huge, shared topic is not the same as having Kafka admin rights or being the only or main consumer or being able to easily fix bad past decisions around partitions and keys... Anyway, it is totally up to you, you'll have to maintain it. I personally have no use for this feature. > Be able to provide higher parallelization for StructuredStreaming Kafka Source > -- > > Key: SPARK-18475 > URL: https://issues.apache.org/jira/browse/SPARK-18475 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.0.2, 2.1.0 >Reporter: Burak Yavuz > > Right now the StructuredStreaming Kafka Source creates as many Spark tasks as > there are TopicPartitions that we're going to read from Kafka. > This doesn't work well when we have data skew, and there is no reason why we > shouldn't be able to increase parallelism further, i.e. have multiple Spark > tasks reading from the same Kafka TopicPartition. > What this will mean is that we won't be able to use the "CachedKafkaConsumer" > for what it is defined for (being cached) in this use case, but the extra > overhead is worth handling data skew and increasing parallelism especially in > ETL use cases. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18475) Be able to provide higher parallelization for StructuredStreaming Kafka Source
[ https://issues.apache.org/jira/browse/SPARK-18475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15680780#comment-15680780 ] Ofir Manor commented on SPARK-18475: I understand, but I disagree with you. I think you are mixing two very different things. KAFKA does provide order guarantee within a Kafka partition. That is a basic design attribute of it. SPARK RDD however, does not have any order guarantees. When you create and manipulate RDDs / Dataframes (regardless of their origin, Kafka or other), Spark does not explicitly tells you what is the order of elements. For example, operators on RDD do not specify whether they keep or break the internal order (and some of them do), there are no order-sensitive operators, there is no general-purpose order metadata etc. I agree that with Kafka source, under some circumstances (like non-shuffling operators), the output of RDD manipulation may very well be in accordance to the input order (within an input Kafka partition). BUT, I never saw anywhere that this is an explicit guarantee that users can rely on, and not an artifact of the internal, current implementation. This is relevant because you want to block here a potentially significant performance improvement (one relying on explicit, non-default configuration), just to maintain a property that Spark does not guarantee to keep. > Be able to provide higher parallelization for StructuredStreaming Kafka Source > -- > > Key: SPARK-18475 > URL: https://issues.apache.org/jira/browse/SPARK-18475 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.0.2, 2.1.0 >Reporter: Burak Yavuz > > Right now the StructuredStreaming Kafka Source creates as many Spark tasks as > there are TopicPartitions that we're going to read from Kafka. > This doesn't work well when we have data skew, and there is no reason why we > shouldn't be able to increase parallelism further, i.e. have multiple Spark > tasks reading from the same Kafka TopicPartition. > What this will mean is that we won't be able to use the "CachedKafkaConsumer" > for what it is defined for (being cached) in this use case, but the extra > overhead is worth handling data skew and increasing parallelism especially in > ETL use cases. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18475) Be able to provide higher parallelization for StructuredStreaming Kafka Source
[ https://issues.apache.org/jira/browse/SPARK-18475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15674628#comment-15674628 ] Ofir Manor commented on SPARK-18475: I was just wondering if it actually works, but it seems you found a way to hack it (I thought you would need different consumer group per worker to avoid coordination by the broker, but it didn't seem like it). If it does provide a big perf boost in some cases, and it is not enabled by default, I personally don't have any objections. [~c...@koeninger.org] - I didn't understand your objection. An RDD / dataset does not have any inherent order guarantees (same as a SQL result set), and the Kafka metadata per message (including topic, partition, offset) is exposed if someone really cares. If you guys have a smart hack that allows you to divide a specific partition into ranges and have different workers read different range of a partition in parallel, and if it does provide a significant perf boost, why not have it as an option? I don't think it should break correctness, as the boundries are decided anyway by the driver. > Be able to provide higher parallelization for StructuredStreaming Kafka Source > -- > > Key: SPARK-18475 > URL: https://issues.apache.org/jira/browse/SPARK-18475 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.0.2, 2.1.0 >Reporter: Burak Yavuz > > Right now the StructuredStreaming Kafka Source creates as many Spark tasks as > there are TopicPartitions that we're going to read from Kafka. > This doesn't work well when we have data skew, and there is no reason why we > shouldn't be able to increase parallelism further, i.e. have multiple Spark > tasks reading from the same Kafka TopicPartition. > What this will mean is that we won't be able to use the "CachedKafkaConsumer" > for what it is defined for (being cached) in this use case, but the extra > overhead is worth handling data skew and increasing parallelism especially in > ETL use cases. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18475) Be able to provide higher parallelization for StructuredStreaming Kafka Source
[ https://issues.apache.org/jira/browse/SPARK-18475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15674092#comment-15674092 ] Ofir Manor commented on SPARK-18475: Are you sure this is working? Having a visible perf effect? As far as I know, the maximum parallelism of Kafka is the number of topic-partitions, by design. If your consumer group has more consumers than that, some of them will be just idle. This is because when reading, each partition is owned by a single consumer (that allocation of partitions to consumer is dynamic, as consumers joins and leaves). To quote an older source: ??The first thing to understand is that a topic partition is the unit of parallelism in Kafka. On both the producer and the broker side, writes to different partitions can be done fully in parallel. So expensive operations such as compression can utilize more hardware resources. On the consumer side, Kafka always gives a single partition’s data to one consumer thread. Thus, the degree of parallelism in the consumer (within a consumer group) is bounded by the number of partitions being consumed. Therefore, in general, the more partitions there are in a Kafka cluster, the higher the throughput one can achieve.?? https://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/ You could repartition the data in Spark after reading, to increase parallelism of Spark's processing. > Be able to provide higher parallelization for StructuredStreaming Kafka Source > -- > > Key: SPARK-18475 > URL: https://issues.apache.org/jira/browse/SPARK-18475 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.0.2, 2.1.0 >Reporter: Burak Yavuz > > Right now the StructuredStreaming Kafka Source creates as many Spark tasks as > there are TopicPartitions that we're going to read from Kafka. > This doesn't work well when we have data skew, and there is no reason why we > shouldn't be able to increase parallelism further, i.e. have multiple Spark > tasks reading from the same Kafka TopicPartition. > What this will mean is that we won't be able to use the "CachedKafkaConsumer" > for what it is defined for (being cached) in this use case, but the extra > overhead is worth handling data skew and increasing parallelism especially in > ETL use cases. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18386) Batch mode SQL source for Kafka
[ https://issues.apache.org/jira/browse/SPARK-18386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15653821#comment-15653821 ] Ofir Manor commented on SPARK-18386: BTW [~c...@koeninger.org] - I think that filtering (by timestamp) can be done today "the hard way" if the Kafka broker is 0.10.1. The user could use the 0.10.1 client to get a list of offsets for his requested timestamp, then submit a job to spark using explicit offsets to be used by Spark's 0.10.0 client (quite ugly but should work). > Batch mode SQL source for Kafka > --- > > Key: SPARK-18386 > URL: https://issues.apache.org/jira/browse/SPARK-18386 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Cody Koeninger > > An SQL equivalent to the DStream KafkaUtils.createRDD would be useful for > querying over a defined batch of offsets. > The possibility of Kafka 0.10.1 time indexing (e.g. a batch from timestamp X > to timestamp Y) should be taken into account, even if not available in the > initial implementation. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.1.0
[ https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15653415#comment-15653415 ] Ofir Manor commented on SPARK-18057: This is great, especially if we follow up by adding reading topics from a timestamp... Do you suggest to replace the existing 0.10.0 client with 0.10.1 client, or switch to having multiple implementations? (similar to how dstream has 0.8 and 0.10 implementation). Due to Kafka architecture, older clients can talk to newer brokers, but newer clients can not talk to older brokers (therefore, during upgrades, you always upgrade the brokers first). So, bumping up Spark's Kafka client version practically means de-supporting Kafka broker 0.10.0 by Structured Streaming, which is why I prefer adding a second implementation... For context, 0.10.0 was released in May 2016, 0.10.1 was released this October. > Update structured streaming kafka from 10.0.1 to 10.1.0 > --- > > Key: SPARK-18057 > URL: https://issues.apache.org/jira/browse/SPARK-18057 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Reporter: Cody Koeninger > > There are a couple of relevant KIPs here, > https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18386) Batch mode SQL source for Kafka
[ https://issues.apache.org/jira/browse/SPARK-18386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15653388#comment-15653388 ] Ofir Manor commented on SPARK-18386: This would be really useful for me right now! We have many bounded Kafka topics, for example, metrics of the last couple of months, web clicks of the last 7 days etc. It would be great to be able to just query them with Spark (I'd use "earliest" starting offsets in my case). There is also an interesting interaction between structured streaming and regular queries, where each streaming batch recomputes the regular queries it depends on. It works with the file sources, I'd like to use that with Kafka source as well in some cases. If would also be great if the external API will be as close to the current one ({{spark.readStream.format("kafka").option(...)}}) as possible (same options etc), maybe just with {{spark.read.kakfa...}}? > Batch mode SQL source for Kafka > --- > > Key: SPARK-18386 > URL: https://issues.apache.org/jira/browse/SPARK-18386 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Cody Koeninger > > An SQL equivalent to the DStream KafkaUtils.createRDD would be useful for > querying over a defined batch of offsets. > The possibility of Kafka 0.10.1 time indexing (e.g. a batch from timestamp X > to timestamp Y) should be taken into account, even if not available in the > initial implementation. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15578699#comment-15578699 ] Ofir Manor commented on SPARK-17812: I think Michael suggest that if you use {{startingOffsets}} without using {assign}}, that could work (use the topic-partition list from the startingOffsets), and would simplify the user coding (not needing to specify two similar lists, simpler resume etc). You could keep an explicit {{assign}} for the "more rare?" cases, if someone wants to specify a list of topic-partitions but also earliest / latest / timestamp. > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust >Assignee: Cody Koeninger > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions > currently agreed on plan: > Mutually exclusive subscription options (only assign is new to this ticket) > {noformat} > .option("subscribe","topicFoo,topicBar") > .option("subscribePattern","topic.*") > .option("assign","""{"topicfoo": [0, 1],"topicbar": [0, 1]}""") > {noformat} > where assign can only be specified that way, no inline offsets > Single starting position option with three mutually exclusive types of value > {noformat} > .option("startingOffsets", "earliest" | "latest" | """{"topicFoo": {"0": > 1234, "1": -2}, "topicBar":{"0": -1}}""") > {noformat} > startingOffsets with json fails if any topicpartition in the assignments > doesn't have an offset. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573457#comment-15573457 ] Ofir Manor commented on SPARK-17812: I'm with you - I warned you it is bikeshedding... I don't have a strong opinion, just a preference, and what you suggested is way better then the commited solution, so I'll get out of the loop. Whatever [~marmbrus] and you are OK with - either way it would be a big step forward > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573341#comment-15573341 ] Ofir Manor commented on SPARK-17812: Thanks Cody! great to have a concrete example. I've some comments, but its mostly bikeshedding 1. subscribe vs. subscribePattern --> personally, I would combine them both to "subscribe" - no need to burden the user with the different Kafka API nuances. It can get a list of discreet topics or a pattern. 2. It would be much clearer if "assign" was called subscribeSomething, so the user would choose one "subscribe.." and one (or more) "starting...". Not sure I have a good name though - subscribeCustom? You can even use the regular subscribe for that (and be smarter with the pattern matching) - I think it would just work, and if someone tries to be funny (combine astrerix and partitions) we could just error 3. I like startingTime... pretty neat. We could hypothetically add {{.option("startingMessages", long)}} to support Michael's "just start with a 1000 recent messages"... 4. As I said before, I'd rather have all starting* be mutual-exclusive. Yes, it blocks some edge cases, on purpose, but make the API and code way clearer (think about startingMessage interacting with startingOffsets etc). I think that it would be easier to regret and allow multiple starting* in the future (opening all sorts of esoteric combinations) than clean it up in the future if users find it confusing and not needed. Anyway, as long as it is functional I'm good with it, even if it less aesthetic. > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573109#comment-15573109 ] Ofir Manor commented on SPARK-17812: Regarding (1) - of course it is *all* data in the source, as of query start. Just the same as file system directory or a database table - I'm not sure a disclaimer that the directory or table could have had different data in the past adds anything but confusion... Anyway, the startingOffset is confusing because, it seems you want a different parameter for "assign" --> to explicitly specify starting offsets. For you use case, I would add: 5. Give me nnn messages (not last ones). We still do one of the above options (trying to go back nnn messages, somehow split between the topic-partitions involved), but not provide a more explicit guarantee like "last nnn". Generally, the distribution of messages to partitions don't have to be round-robin or uniform, it is strongly based on the key (example, could be state, could be URL etc). Anyway, I haven't seen a concrete suggestion on how to specify offsets or timestamp, so I think that would be the next step in this ticket (I suggested you could condense all to one option to avoid dependencies between options, but I don't have an elegant "stringly" suggestion) > More granular control of starting offsets (assign) > -- > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek to user specified offsets for manually specified topicpartitions -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15563273#comment-15563273 ] Ofir Manor commented on SPARK-17812: If you think it is useful, you should definitely propose something that covers it. Personally, I can't imagine a case where someone will need that - but you have much more exposure to different users. I can only say that it is already covered by the strict "assign" (using only partitions that were explictly used) - the user will open a new consumer group, seek to the end, seek specific partitions to its preferred offset and pass the explictly all the topic-partitions-offsets to the "assign" option BTW - just from the top of my head - maybe you want'll to add an helper function or two that: - gets a specific consumer group, and return whatever object / magic string that is needed to configure a starting offset from Structured Streaming. - Same as above, but reads from a Structured Streaming checkpoint and return a new consumer group (created, seeked, commited). That relates to the "DRP" ticket, and also just passed to the previous helper function (from checkpoint to a new streaming job). > More granular control of starting offsets > - > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek back {{X}} offsets in the stream from the moment the query starts > - seek to user specified offsets -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17815) Report committed offsets
[ https://issues.apache.org/jira/browse/SPARK-17815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15560906#comment-15560906 ] Ofir Manor commented on SPARK-17815: I totally understand your concerns, and yes, I hope you will continue to faithfully respond to all Kafka-related messages, and yes, you should insist on fixing what you think broken or unclear or too complex. I just don't see how it relates to this ticket, which is explicitly about helping external monitoring tools that might rely on the Kafka consumer group advancing its offsets. You have some problems with the framework, specifically with its fault tolerance semantics, not with some specific Kafka source implementation detail: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#fault-tolerance-semantics Since Kafka might be the first or second "real" streaming source, it does make a lot of sense to validate / improve the framework based on this feedback. I think this ticket is the wrong place for this - it will have no visiblity. Maybe that should go the dev list, or to the blocking ticket (which is about the framework), or a new Structure Streaming JIRA (if there is a specific suggestion), or maybe to a SEP/SPIP, or if no committer is willing to revisit it or discuss it in any channel (as was with the Kafka source for many months), you'll eventually have to face that this is how this project is unfortunately managed. Anyway, I think I maxed out my contribution to this thread. It should be continued with the right committers, wherever they want, not with me. > Report committed offsets > > > Key: SPARK-17815 > URL: https://issues.apache.org/jira/browse/SPARK-17815 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Since we manage our own offsets, we have turned off auto-commit. However, > this means that external tools are not able to report on how far behind a > given streaming job is. When the user manually gives us a group.id, we > should report back to it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15560865#comment-15560865 ] Ofir Manor commented on SPARK-17812: Why? If you choose oldest, latest or by timestamp, you can combine it with a topic pattern. If you explictly provide a list of specific topic-partition-offsets, Spark should respect that. Choosing topics and choosing a starting position method are complementary choices > More granular control of starting offsets > - > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek back {{X}} offsets in the stream from the moment the query starts > - seek to user specified offsets -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17812) More granular control of starting offsets
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15560820#comment-15560820 ] Ofir Manor commented on SPARK-17812: I'll have to think about it - not sure if it ever makes sense to provide custom offsets, skip some partitions, and expect a user-configurable default offsets for the missing ones. I would actually suggest that if a user provide custom offsets, they would be used as is. For example, if I provide offsets only for topic 1 partition 1,4,10, then the consumer group would subscribe to only these three partitions. In other words, custom assignment should be respected as is. > More granular control of starting offsets > - > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek back {{X}} offsets in the stream from the moment the query starts > - seek to user specified offsets -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17815) Report committed offsets
[ https://issues.apache.org/jira/browse/SPARK-17815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15560749#comment-15560749 ] Ofir Manor commented on SPARK-17815: Thanks Cody, this is much clearer. (BTW - I've been bitten multiple times by HDFS corrupting files, especially with truncate() API, but that is a different story) I think we are mixing two different discussions here. Structured Streaming provides a framework and an algorithm, and expects all sources and sinks to align with that. The Kafka source is just one such example (and the Kafka sink discussion is about other limits of the current framework). You have some concerns and reservation regarding the framework - both due to partial implementation so far and due to deeper concerns (mostly complexity and its likely effects). I think the umbrella discussion (Structured Streaming Kafka source) is about conforming to the spec. This specific ticket is about an even smaller detail. Of course, given that so far there were no real opportunities for the deeper, architectural discussion (or maybe it is just my perception), it might make sense to use every opportunity to try to raise and effect the higher-level issues. But I think at least we should be clear if we discuss something specific to the Kafka source for Structured Streaming, or things at the framework level. (Your SIP suggestion in the mailing list - if I understand correctly - is exactly about enabling that kind of discussion, right?) Just my two cents. > Report committed offsets > > > Key: SPARK-17815 > URL: https://issues.apache.org/jira/browse/SPARK-17815 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Since we manage our own offsets, we have turned off auto-commit. However, > this means that external tools are not able to report on how far behind a > given streaming job is. When the user manually gives us a group.id, we > should report back to it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-17812) More granular control of starting offsets
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15560696#comment-15560696 ] Ofir Manor edited comment on SPARK-17812 at 10/9/16 10:00 PM: -- Cody, you are absolutely right that the option naming is silly and leads to a dead end. Maybe it could be fixed now, as this code haven't yet been released. In general, I see just four useful options for a starting position: 1. Give me all messages - read all messages in the topics. 2. Ignore all current messages - read only new messages from now on. 3. Give me all messages starting from timestamp t - that could be a filter on (1), or in Kafka 0.10.1+, pushed down to Kafka (in that version, the Kafka topic can also have either broker-generated timestamps, or user-provided event timestamps). 4. Give me all messages from a custom offset - for "advanced" cases I can suggest something specific (though it will be a matter of taste, stringy or not), but generally, I think there should be a single way to specify where to start, and it should cover these four alternatives. Having a bunch of mutally-exclusive options seems confusing, and giving them the wrong names is even more so. Regarding "last x offsets" - I don't really get it. It seems to assume that Kafka has a single offset space, which is quite alien to Kafka (a topic is a collection of independent, ordered partitions). For example, a simple topic with four partitions. What is 1000 offsets back? 1. Last 1000 messages per partition? (4000 in total) 2. Last 250 messages per partition? (definitely NOT the last 1000 messages) 3. Read last 1000 messages per partition, then merge and keep the last 1000 messages by timestamp? (provide a somewhat meaningful semantics, but is still a bit nonsense) For me, neither of them makes sense, because the user actually says - I want random stuff and I don't care what it is... It is as if, for a database source, we would start with random 1000 rows, followed by careful work to capture every change without missing any. I believe "last hour" would make a lot more sense, and if someone really wants some variation of this "last 1000 messages", he could just create a custom offset. (UPDATE) BTW Cody, I now get why you insistent on consuming from Kafka based on timestamp, since May(!). It is the only option that isn't "start at a random point", but "start at a well-defined logical point" was (Author: ofirm): Cody, you are absolutely right that the option naming is silly and leads to a dead end. Maybe it could be fixed now, as this code haven't yet been released. In general, I see just four useful options for a starting position: 1. Give me all messages - read all messages in the topics. 2. Ignore all current messages - read only new messages from now on. 3. Give me all messages starting from timestamp t - that could be a filter on (1), or in Kafka 0.10.1+, pushed down to Kafka (in that version, the Kafka topic can also have either broker-generated timestamps, or user-provided event timestamps). 4. Give me all messages from a custom offset - for "advanced" cases I can suggest something specific (though it will be a matter of taste, stringy or not), but generally, I think there should be a single way to specify where to start, and it should cover these four alternatives. Having a bunch of mutally-exclusive options seems confusing, and giving them the wrong names is even more so. Regarding "last x offsets" - I don't really get it. It seems to assume that Kafka has a single offset space, which is quite alien to Kafka (a topic is a collection of independent, ordered partitions). For example, a simple topic with four partitions. What is 1000 offsets back? 1. Last 1000 messages per partition? (4000 in total) 2. Last 250 messages per partition? (definitely NOT the last 1000 messages) 3. Read last 1000 messages per partition, then merge and keep the last 1000 messages by timestamp? (provide a somewhat meaningful semantics, but is still a bit nonsense) For me, neither of them makes sense, because the user actually says - I want random stuff and I don't care what it is... It is as if, for a database source, we would start with random 1000 rows, followed by careful work to capture every change without missing any. I believe "last hour" would make a lot more sense, and if someone really wants some variation of this "last 1000 messages", he could just create a custom offset. > More granular control of starting offsets > - > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets availab
[jira] [Commented] (SPARK-17812) More granular control of starting offsets
[ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15560696#comment-15560696 ] Ofir Manor commented on SPARK-17812: Cody, you are absolutely right that the option naming is silly and leads to a dead end. Maybe it could be fixed now, as this code haven't yet been released. In general, I see just four useful options for a starting position: 1. Give me all messages - read all messages in the topics. 2. Ignore all current messages - read only new messages from now on. 3. Give me all messages starting from timestamp t - that could be a filter on (1), or in Kafka 0.10.1+, pushed down to Kafka (in that version, the Kafka topic can also have either broker-generated timestamps, or user-provided event timestamps). 4. Give me all messages from a custom offset - for "advanced" cases I can suggest something specific (though it will be a matter of taste, stringy or not), but generally, I think there should be a single way to specify where to start, and it should cover these four alternatives. Having a bunch of mutally-exclusive options seems confusing, and giving them the wrong names is even more so. Regarding "last x offsets" - I don't really get it. It seems to assume that Kafka has a single offset space, which is quite alien to Kafka (a topic is a collection of independent, ordered partitions). For example, a simple topic with four partitions. What is 1000 offsets back? 1. Last 1000 messages per partition? (4000 in total) 2. Last 250 messages per partition? (definitely NOT the last 1000 messages) 3. Read last 1000 messages per partition, then merge and keep the last 1000 messages by timestamp? (provide a somewhat meaningful semantics, but is still a bit nonsense) For me, neither of them makes sense, because the user actually says - I want random stuff and I don't care what it is... It is as if, for a database source, we would start with random 1000 rows, followed by careful work to capture every change without missing any. I believe "last hour" would make a lot more sense, and if someone really wants some variation of this "last 1000 messages", he could just create a custom offset. > More granular control of starting offsets > - > > Key: SPARK-17812 > URL: https://issues.apache.org/jira/browse/SPARK-17812 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Right now you can only run a Streaming Query starting from either the > earliest or latests offsets available at the moment the query is started. > Sometimes this is a lot of data. It would be nice to be able to do the > following: > - seek back {{X}} offsets in the stream from the moment the query starts > - seek to user specified offsets -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17815) Report committed offsets
[ https://issues.apache.org/jira/browse/SPARK-17815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15560661#comment-15560661 ] Ofir Manor commented on SPARK-17815: I hope others with better understanding of Spark internals can comment, but still: 1. "The kafka commit log cant be ignored as merely for metric collection either" - that is exactly how I read this specific ticket (title and description)... The way I understand it, when starting a Structured Streaming job for the first time, as of current trunk, a new consumer group is generated, with offsets being set based on a Spark source option, not based on Kafka defaults. After failure, the offsets of the consumer group in Kafka are ignored, on purpose, and are overidden (seek?) by the Structured Streaming infra based on its internal checkpoint. So, I don't get your comment about it. If during recovery the offsets are not set correctly or some corner case / exception is not handled, it is probably a bug in the Structured Streaming Kafka source that should be reported and fixed. 2. Regarding your WAL comment - not sure you are accurate. The WAL should be written to HDFS at the beginning of a batch, and a checkpoint at the end of the batch. So, not sure to which corruption scenario do you imply? You are not referring to HDFS bugs, right? Is it just a potential vector for Spark-induced file corruptions? I assume that generally, if the checkpoint is corrupted or if the WAL is corrupted, the job would fail, as it can't guarantee exactly-once. > Report committed offsets > > > Key: SPARK-17815 > URL: https://issues.apache.org/jira/browse/SPARK-17815 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Since we manage our own offsets, we have turned off auto-commit. However, > this means that external tools are not able to report on how far behind a > given streaming job is. When the user manually gives us a group.id, we > should report back to it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17815) Report committed offsets
[ https://issues.apache.org/jira/browse/SPARK-17815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15559549#comment-15559549 ] Ofir Manor commented on SPARK-17815: Good, we are on the same page, no argument that SPARK-16963 blocks this issue. My only points were about the current ticket - that reporting committed offsets should be done by default, not based on a non-default parameter, and that setting group.id or a prefix of it is a great suggestion, but currently blocked. > Report committed offsets > > > Key: SPARK-17815 > URL: https://issues.apache.org/jira/browse/SPARK-17815 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Since we manage our own offsets, we have turned off auto-commit. However, > this means that external tools are not able to report on how far behind a > given streaming job is. When the user manually gives us a group.id, we > should report back to it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17815) Report committed offsets
[ https://issues.apache.org/jira/browse/SPARK-17815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15559541#comment-15559541 ] Ofir Manor commented on SPARK-17815: As far as I understand, there is a clear single-point-of-truth: the structured streaming "commit log" - the checkpoint. It holds both the source state (offsets) and the Spark state (aggregations) of successfully finished batches atomically, and is the one that is used during recovery to identify the correct beginning offset in the source during recovery. The structured WAL is a technical, internal implementation detail, that stores an intention to process a range of offsets, before they are actually read. Spark used it during recovery to repeat the same source end boundary to a failed batch. The data in the downstream store is about Spark output - which [version,spark partition] have landed - not about source state. Of course, it is being used during Spark recovery / retry, but not as a basis to choose a offsets in the source (it is used to skip specific output version-partitions there were already written). As this ticket states, updating the Kafka consumer group offsets in Kafka is only for easier progress monitoring using Kafka-specific tools. So, it should be considered informational, after-the-fact updating just for being nice, as it won't be used for Spark recovery. If a user want to manually recover, it should rely on the Spark checkpoint offset. In other words, updating Kafka offsets after a batch successfully commited means that the offsets in Kafka represent which messages have been successfully processed and landed in the sink, not which messages have been read. [~marmbrus] Is my understanding correct? > Report committed offsets > > > Key: SPARK-17815 > URL: https://issues.apache.org/jira/browse/SPARK-17815 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Since we manage our own offsets, we have turned off auto-commit. However, > this means that external tools are not able to report on how far behind a > given streaming job is. When the user manually gives us a group.id, we > should report back to it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka
[ https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15554426#comment-15554426 ] Ofir Manor commented on SPARK-15406: Thanks Micahel for the checkpoint location explanation! I didn't understand it after reading the docs. That should solve the "exactly-once" source restart challenge. Regarding exactly-once to Kafka sink - we have implemented exactly that in our product (not within a Spark job). You are right that when retrying (and only when retrying), some scanning is needed (as I wrote, you need to know which messages of this [version,partition] have landed). A lot of it can be optimized though. Quick examples: 1. The Foreach sink open() call needs to recover that list (which messages have landed) only when retrying, so it is needed only in the first [version,*] after failure - that could be passed as a flag to open(). 2. Hopefully the Structured Streaming checkpoint could hold the offset within the sink as part of the commited [version, partition] metadata, so that could significantly cut down the actual scanning (provide a starting point). 3. The actual scanning of the sink could be stopped (at least in a case of Kafka partition) once we read messages of [version+1], if I understand the Structured Streaming internals well enough (as Kafka partition provides order guarantee. Anyway, the Kafka community have been discussing "exactly-once" as something that will arrive, and will likely deserve an "1.0" release, so maybe that would solve itself. I see Cody have more up-to-date insights on that... > Structured streaming support for consuming from Kafka > - > > Key: SPARK-15406 > URL: https://issues.apache.org/jira/browse/SPARK-15406 > Project: Spark > Issue Type: New Feature >Reporter: Cody Koeninger > > This is the parent JIRA to track all the work for the building a Kafka source > for Structured Streaming. Here is the design doc for an initial version of > the Kafka Source. > https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing > == Old description = > Structured streaming doesn't have support for kafka yet. I personally feel > like time based indexing would make for a much better interface, but it's > been pushed back to kafka 0.10.1 > https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17815) Report committed offsets
[ https://issues.apache.org/jira/browse/SPARK-17815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15553662#comment-15553662 ] Ofir Manor commented on SPARK-17815: I think this is a good idea.There is a minor confusion here, though, as setting group.id is explictly blocked as far as I understand (it is even documented...). So, it might need rephrasing. 1. I think auto-commit should be off, and the driver should manually commit kafka offsets after it successfully commits a batch to HDFS (when a batch is over), so monitoring will work. I think that should happen unconditionally, unless there is a concrete performance / overhead concerns (commiting offsets to Kafka too frequently?) 2. Regarding manually setting group.id - that would be great. If there is a concern that users might mess up (reuse the group.id by mistake), at least allow setting a prefix to it (and a way to get the actual group.id) > Report committed offsets > > > Key: SPARK-17815 > URL: https://issues.apache.org/jira/browse/SPARK-17815 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Michael Armbrust > > Since we manage our own offsets, we have turned off auto-commit. However, > this means that external tools are not able to report on how far behind a > given streaming job is. When the user manually gives us a group.id, we > should report back to it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka
[ https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15553613#comment-15553613 ] Ofir Manor commented on SPARK-15406: I see three somewhat-related issues: 1. More flexibility is generally needed in stating offsets when using a Kafka source - that moved to SPARK-17812. 2. Regarding exactly-once, what I'm missing (given a transactional / idempotent sink) is the ability to restart a failed structured streaming job (after it crashed or was killed) by submitting a new one and asking to "copy" the context (offset and internal state) of another job / query. The programing guide hints that it is possible, but doesn't give an example (and I couldn't find a relevant method): "In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off". If that was possible, it would remove the need to manually monitor / manage source offsets for exactly-once. However, this is not specific to Kafka source - it is relevant for all fault-tolerant sources. 3. Another related issue (though out-of-scope of this JIRA) is adding an "exactly-once" Kafka sink. Since in Kafka we can't commit a batch of messages together (like in a Foreach sink), the open of [version,partition] can't just return a boolean - it should likely return the messages within a [version,partition] that were already written so only those will be filtered out (or otherwise filter within the partition before process() is called). That again is not Kafka-specific, will be useful in other non-transactional sinks, > Structured streaming support for consuming from Kafka > - > > Key: SPARK-15406 > URL: https://issues.apache.org/jira/browse/SPARK-15406 > Project: Spark > Issue Type: New Feature >Reporter: Cody Koeninger > > This is the parent JIRA to track all the work for the building a Kafka source > for Structured Streaming. Here is the design doc for an initial version of > the Kafka Source. > https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing > == Old description = > Structured streaming doesn't have support for kafka yet. I personally feel > like time based indexing would make for a much better interface, but it's > been pushed back to kafka 0.10.1 > https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka
[ https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15553372#comment-15553372 ] Ofir Manor commented on SPARK-15406: Thanks Michael, I'll check the JIRAs, and will also post some feedback down the line once I try things out > Structured streaming support for consuming from Kafka > - > > Key: SPARK-15406 > URL: https://issues.apache.org/jira/browse/SPARK-15406 > Project: Spark > Issue Type: New Feature >Reporter: Cody Koeninger > > This is the parent JIRA to track all the work for the building a Kafka source > for Structured Streaming. Here is the design doc for an initial version of > the Kafka Source. > https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing > == Old description = > Structured streaming doesn't have support for kafka yet. I personally feel > like time based indexing would make for a much better interface, but it's > been pushed back to kafka 0.10.1 > https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka
[ https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15553161#comment-15553161 ] Ofir Manor commented on SPARK-15406: Alright, I do hope you all find a way to resolve this and switch to a more productive relationship, I highly appreciate all your efforts. That alternative ticket is definitely pretty nasty (and recent Kafka versions makes log compaction more versatile, so it might get used more often) > Structured streaming support for consuming from Kafka > - > > Key: SPARK-15406 > URL: https://issues.apache.org/jira/browse/SPARK-15406 > Project: Spark > Issue Type: New Feature >Reporter: Cody Koeninger > > This is the parent JIRA to track all the work for the building a Kafka source > for Structured Streaming. Here is the design doc for an initial version of > the Kafka Source. > https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing > == Old description = > Structured streaming doesn't have support for kafka yet. I personally feel > like time based indexing would make for a much better interface, but it's > been pushed back to kafka 0.10.1 > https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka
[ https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15553076#comment-15553076 ] Ofir Manor commented on SPARK-15406: Thanks Cody! I missed the activity in the subtask PR... It is great to see basic support merge, I'll try it out next week. I hope that now that the "basics" are merged, you will be allowed to continue hacking it - personally looking forward to any sort of "assign" support (explicitly providing a startingOffset or providing pre-seeked consumer group) Just curious - given that 0.10.1 with KIP-79 should be released in a couple of weeks, are you planning to add a separate kafka 0.10.1 artifact to allow "seek to timestamp" (maybe an optional startingTimestamp instead of startingOffset)? I think you were passionate about it > Structured streaming support for consuming from Kafka > - > > Key: SPARK-15406 > URL: https://issues.apache.org/jira/browse/SPARK-15406 > Project: Spark > Issue Type: New Feature >Reporter: Cody Koeninger > > This is the parent JIRA to track all the work for the building a Kafka source > for Structured Streaming. Here is the design doc for an initial version of > the Kafka Source. > https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing > == Old description = > Structured streaming doesn't have support for kafka yet. I personally feel > like time based indexing would make for a much better interface, but it's > been pushed back to kafka 0.10.1 > https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka
[ https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15548350#comment-15548350 ] Ofir Manor commented on SPARK-15406: Checking back - is there any progress on the API or code? I'll be happy to test once some code is posted for review > Structured streaming support for consuming from Kafka > - > > Key: SPARK-15406 > URL: https://issues.apache.org/jira/browse/SPARK-15406 > Project: Spark > Issue Type: New Feature >Reporter: Cody Koeninger > > This is the parent JIRA to track all the work for the building a Kafka source > for Structured Streaming. Here is the design doc for an initial version of > the Kafka Source. > https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing > == Old description = > Structured streaming doesn't have support for kafka yet. I personally feel > like time based indexing would make for a much better interface, but it's > been pushed back to kafka 0.10.1 > https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka
[ https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15488487#comment-15488487 ] Ofir Manor commented on SPARK-15406: Cody, I think you are right. Now is the right time to spend a several days iterating over the design. Commiting a new, half-baked API that will need to be maintained for years is exactly what we all try to avoid. Of course, v1 doesn't have to implement a lot of features, but it should be future-compatible with the rest of the planned improvements. (as a side note - I haven't seen a timeline for 2.0.1 being discussed, so I'm not sure what are the expectations regarding "v1" delivery) > Structured streaming support for consuming from Kafka > - > > Key: SPARK-15406 > URL: https://issues.apache.org/jira/browse/SPARK-15406 > Project: Spark > Issue Type: New Feature >Reporter: Cody Koeninger > > This is the parent JIRA to track all the work for the building a Kafka source > for Structured Streaming. Here is the design doc for an initial version of > the Kafka Source. > https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing > == Old description = > Structured streaming doesn't have support for kafka yet. I personally feel > like time based indexing would make for a much better interface, but it's > been pushed back to kafka 0.10.1 > https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka
[ https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15453412#comment-15453412 ] Ofir Manor commented on SPARK-15406: For me - structured streaming is currently all about real window operations based on event time (fields in the event), not processing time (already in 2.0 with some limitations). In a future release it may also be about some new sink-related features (managing exactly-once from Spark to relational databases or HDFS, automatically doing upserts to databases). So, I just want the same Kafka features as before - the value is the new processing capabilities, it just happens that my source of real-time events is Kafka,not Parquet files (as in 2.0). I expect a couple of things. First, some basic config control like a pointer to Kafka (bootstrap servers), one or more topics, optionally an existing consumer group or an offset definition, optionally kerberised connection. I also expect exactly-once processing from Kafka to Spark (including correctly recovering after Spark node failure) > Structured streaming support for consuming from Kafka > - > > Key: SPARK-15406 > URL: https://issues.apache.org/jira/browse/SPARK-15406 > Project: Spark > Issue Type: New Feature >Reporter: Cody Koeninger > > Structured streaming doesn't have support for kafka yet. I personally feel > like time based indexing would make for a much better interface, but it's > been pushed back to kafka 0.10.1 > https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka
[ https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15453289#comment-15453289 ] Ofir Manor commented on SPARK-15406: Cody, why do you think Structured Streaming support for Kafka requires that specific feature (time-indexing)? Personally, I have a couple of objections: # Technically, I think starting a stream from a timestamp is really nice, but definitely optional. We could start by letting the user choose between latest, oldest and user-provided Kafka offset object (offset per partition per topic), like every other Kafka consumer today. We could definitely add timestamp as another option when that is released For me, starting by latest and by user-provided Kafka offset is what I'd like to use, though I can see myself wanting to use start from timestamp in some cases. # Non-technically, I think this Kafka source would be very popular. So, I think wishing to support only the next (future) release of Kafka is counter-productive, as it won't work with most Kafka clusters out there after the release. Of course, supporting currently deployed Kafka cluster likely means 0.8.2.x support, which is the old consumer... So it is additional, duplicate work, but I think is critical. WDYT? > Structured streaming support for consuming from Kafka > - > > Key: SPARK-15406 > URL: https://issues.apache.org/jira/browse/SPARK-15406 > Project: Spark > Issue Type: New Feature >Reporter: Cody Koeninger > > Structured streaming doesn't have support for kafka yet. I personally feel > like time based indexing would make for a much better interface, but it's > been pushed back to kafka 0.10.1 > https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org