Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-13 Thread Cody Koeninger
Preferred locations are only advisory, you can still get tasks scheduled on other executors. You can try bumping up the size of the cache to see if that is causing the issue you're seeing. On Nov 13, 2016 12:47, "Ivan von Nagy" wrote: > As the code iterates through the parallel

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-13 Thread Ivan von Nagy
As the code iterates through the parallel list, it is processing up to 8 KafkaRDD at a time. Each has it's own unique topic and consumer group now. Every topic has 4 partitions, so technically there should never be more then 32 CachedKafkaConsumers. However, this seems to not be the case as we are

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-12 Thread Cody Koeninger
You should not be getting consumer churn on executors at all, that's the whole point of the cache. How many partitions are you trying to process per executor? http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#locationstrategies gives instructions on the default size of

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-12 Thread Ivan von Nagy
Hi Sean, Thanks for responding. We have run our jobs with internal parallel processing for well over a year (Spark 1.5, 1.6 and Kafka 0.8.2.2.) and did not encounter any of these issues until upgrading to Spark 2.0.1 and Kafka 0.10 clients. If we process serially, then we sometimes get the

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-12 Thread Sean McKibben
How are you iterating through your RDDs in parallel? In the past (Spark 1.5.2) when I've had actions being performed on multiple RDDs concurrently using futures, I've encountered some pretty bad behavior in Spark, especially during job retries. Very difficult to explain things, like records

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-12 Thread Ivan von Nagy
The code was changed to use a unique group for each KafkaRDD that was created (see Nov 10 post). There is no KafkaRDD being reused. The basic logic (see Nov 10 post for example) is get a list of channels, iterate through them in parallel, load a KafkaRDD using a given topic and a consumer group

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-11 Thread Cody Koeninger
It is already documented that you must use a different group id, which as far as I can tell you are still not doing. On Nov 10, 2016 7:43 PM, "Shixiong(Ryan) Zhu" wrote: > Yeah, the KafkaRDD cannot be reused. It's better to document it. > > On Thu, Nov 10, 2016 at 8:26

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-10 Thread Shixiong(Ryan) Zhu
Yeah, the KafkaRDD cannot be reused. It's better to document it. On Thu, Nov 10, 2016 at 8:26 AM, Ivan von Nagy wrote: > Ok, I have split he KafkaRDD logic to each use their own group and bumped > the poll.ms to 10 seconds. Anything less then 2 seconds on the poll.ms > ends up

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-10 Thread Ivan von Nagy
Ok, I have split he KafkaRDD logic to each use their own group and bumped the poll.ms to 10 seconds. Anything less then 2 seconds on the poll.ms ends up with a timeout and exception so I am still perplexed on that one. The new error I am getting now is a `ConcurrentModificationException` when

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-07 Thread Cody Koeninger
There definitely is Kafka documentation indicating that you should use a different consumer group for logically different subscribers, this is really basic to Kafka: http://kafka.apache.org/documentation#intro_consumers As for your comment that "commit async after each RDD, which is not really

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-07 Thread Ivan von Nagy
With our stream version, we update the offsets for only the partition we operating on. We even break down the partition into smaller batches and then update the offsets after each batch within the partition. With Spark 1.6 and Kafka 0.8.x this was not an issue, and as Sean pointed out, this is not

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-07 Thread Cody Koeninger
Someone can correct me, but I'm pretty sure Spark dstreams (in general, not just kafka) have been progressing on to the next batch after a given batch aborts for quite some time now. Yet another reason I put offsets in my database transactionally. My jobs throw exceptions if the offset in the DB

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-07 Thread Sean McKibben
I've been encountering the same kinds of timeout issues as Ivan, using the "Kafka Stream" approach that he is using, except I'm storing my offsets manually from the driver to Zookeeper in the Kafka 8 format. I haven't yet implemented the KafkaRDD approach, and therefore don't have the

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-04 Thread Cody Koeninger
So basically what I am saying is - increase poll.ms - use a separate group id everywhere - stop committing offsets under the covers That should eliminate all of those as possible causes, and then we can see if there are still issues. As far as 0.8 vs 0.10, Spark doesn't require you to assign or

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-04 Thread Ivan von Nagy
Yes, the parallel KafkaRDD uses the same consumer group, but each RDD uses a single distinct topic. For example, the group would be something like "storage-group", and the topics would be "storage-channel1", and "storage-channel2". In each thread a KafkaConsumer is started, assigned the partitions

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-04 Thread Cody Koeninger
So just to be clear, the answers to my questions are - you are not using different group ids, you're using the same group id everywhere - you are committing offsets manually Right? If you want to eliminate network or kafka misbehavior as a source, tune poll.ms upwards even higher. You must

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-04 Thread vonnagy
Here are some examples and details of the scenarios. The KafkaRDD is the most error prone to polling timeouts and concurrentm modification errors. *Using KafkaRDD* - This takes a list of channels and processes them in parallel using the KafkaRDD directly. they all use the same consumer group

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-04 Thread Cody Koeninger
- are you using different group ids for the different streams? - are you manually committing offsets? - what are the values of your kafka-related settings? On Fri, Nov 4, 2016 at 12:20 PM, vonnagy wrote: > I am getting the issues using Spark 2.0.1 and Kafka 0.10. I have two jobs,