Re: [Spark streaming] No assigned partition error during seek

2017-11-30 Thread venkat
I notice that *'Do not* manually add dependencies on org.apache.kafka
artifacts (e.g. kafka-clients). The spark-streaming-kafka-0-10 artifact has
the appropriate transitive dependencies already, and different versions may
be incompatible in hard to diagnose way' after your query.

Does this imply that we should not be adding kafka clients in our jars?.

Thanks
Venkat

On Fri, 1 Dec 2017 at 06:45 venkat  wrote:

> Yes I use latest Kafka clients 0.11 to determine beginning offsets without
> seek and also I use Kafka offsets commits externally.
> I dont find the spark async commit  useful for our needs.
>
> Thanks
> Venkat
>
> On Fri, 1 Dec 2017 at 02:39 Cody Koeninger  wrote:
>
>> You mentioned 0.11 version; the latest version of org.apache.kafka
>> kafka-clients artifact supported by DStreams is 0.10.0.1, for which it
>> has an appropriate dependency.
>>
>> Are you manually depending on a different version of the kafka-clients
>> artifact?
>>
>> On Fri, Nov 24, 2017 at 7:39 PM, venks61176  wrote:
>> > Version: 2.2 with Kafka010
>> >
>> > Hi,
>> >
>> > We are running spark streaming on AWS and trying to process incoming
>> > messages on Kafka topics. All was well.
>> > Recently we wanted to migrate from 0.8 to 0.11 version of Spark library
>> and
>> > Kafka 0.11 version of server.
>> >
>> > With this new version of software we are facing issues with regard to
>> 'No
>> > assignment to partition for a topic and it happens intermittently'. I
>> > construct four DStreams with different group.ids as suggested.
>> >
>> > The main source of code thats causing the issue is this one
>> >
>> > if (!toSeek.isEmpty) {
>> >   // work around KAFKA-3370 when reset is none
>> >   // poll will throw if no position, i.e. auto offset reset none
>> and no
>> > explicit position
>> >   // but cant seek to a position before poll, because poll is what
>> gets
>> > subscription partitions
>> >   // So, poll, suppress the first exception, then seek
>> >   val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
>> >   val shouldSuppress = aor != null &&
>> > aor.asInstanceOf[String].toUpperCase == "NONE"
>> >   try {
>> > consumer.poll(0)
>> >   } catch {
>> > case x: NoOffsetForPartitionException if shouldSuppress =>
>> >   logWarning("Catching NoOffsetForPartitionException since " +
>> > ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none.  See
>> > KAFKA-3370")
>> >   }
>> >   toSeek.asScala.foreach { case (topicPartition, offset) =>
>> >   *consumer.seek(topicPartition, offset)*
>> >   }
>> > }
>> >
>> > At the start of the job, I also ensure we are supplying all required
>> offsets
>> > correctly
>> >
>> > private Map getCommittedOffsets(String topic) {
>> > Map offsets = new HashMap<>();
>> > List topicPartitions =
>> > consumer.partitionsFor(topic).stream().map(partitionInfo ->
>> > new TopicPartition(partitionInfo.topic(),
>> > partitionInfo.partition()))
>> > .collect(Collectors.toList());
>> > Map earliestOffsets =
>> > consumer.beginningOffsets(topicPartitions);
>> > // pick committed offsets
>> > for (TopicPartition topicAndPartition : topicPartitions) {
>> >   final OffsetAndMetadata committed =
>> > consumer.committed(topicAndPartition);
>> >   Long earliestOffset = earliestOffsets.get(topicAndPartition);
>> >   if (committed != null && committed.offset() > earliestOffset) {
>> > logger
>> > .warn(
>> > "Committed offset found for: {} offset:{} -> Hence
>> adding
>> > committed offset",
>> > topicAndPartition, committed.offset());
>> > offsets.put(topicAndPartition, committed.offset());
>> >   } else {
>> > logger
>> > .warn(
>> > "New partition/stale offset found for: {} offset:{} ->
>> Hence
>> > adding earliest offset",
>> > topicAndPartition, earliestOffset);
>> > offsets.put(topicAndPartition, earliestOffset);
>> >   }
>> > }
>> > return offsets;
>> >   }
>> >
>> > The actual stack trace:
>> >
>> > Caused by: java.lang.IllegalStateException: No current assignment for
>> > partition genericEvents-1
>> > 2017-11-23 10:35:24,677 -at
>> >
>> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)
>> > 2017-11-23 10:35:24,677 -at
>> >
>> org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)
>> > 2017-11-23 10:35:24,677 -at
>> >
>> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)
>> > 2017-11-23 10:35:24,678 -at
>> >
>> org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:107)
>> > 2017-11-23 10:35:24,678 -at
>> >
>> org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:106)
>> > 2017-11-23 10:35:24,678 -   

Re: [Spark streaming] No assigned partition error during seek

2017-11-30 Thread venkat
Yes I use latest Kafka clients 0.11 to determine beginning offsets without
seek and also I use Kafka offsets commits externally.
I dont find the spark async commit  useful for our needs.

Thanks
Venkat

On Fri, 1 Dec 2017 at 02:39 Cody Koeninger  wrote:

> You mentioned 0.11 version; the latest version of org.apache.kafka
> kafka-clients artifact supported by DStreams is 0.10.0.1, for which it
> has an appropriate dependency.
>
> Are you manually depending on a different version of the kafka-clients
> artifact?
>
> On Fri, Nov 24, 2017 at 7:39 PM, venks61176  wrote:
> > Version: 2.2 with Kafka010
> >
> > Hi,
> >
> > We are running spark streaming on AWS and trying to process incoming
> > messages on Kafka topics. All was well.
> > Recently we wanted to migrate from 0.8 to 0.11 version of Spark library
> and
> > Kafka 0.11 version of server.
> >
> > With this new version of software we are facing issues with regard to 'No
> > assignment to partition for a topic and it happens intermittently'. I
> > construct four DStreams with different group.ids as suggested.
> >
> > The main source of code thats causing the issue is this one
> >
> > if (!toSeek.isEmpty) {
> >   // work around KAFKA-3370 when reset is none
> >   // poll will throw if no position, i.e. auto offset reset none and
> no
> > explicit position
> >   // but cant seek to a position before poll, because poll is what
> gets
> > subscription partitions
> >   // So, poll, suppress the first exception, then seek
> >   val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
> >   val shouldSuppress = aor != null &&
> > aor.asInstanceOf[String].toUpperCase == "NONE"
> >   try {
> > consumer.poll(0)
> >   } catch {
> > case x: NoOffsetForPartitionException if shouldSuppress =>
> >   logWarning("Catching NoOffsetForPartitionException since " +
> > ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none.  See
> > KAFKA-3370")
> >   }
> >   toSeek.asScala.foreach { case (topicPartition, offset) =>
> >   *consumer.seek(topicPartition, offset)*
> >   }
> > }
> >
> > At the start of the job, I also ensure we are supplying all required
> offsets
> > correctly
> >
> > private Map getCommittedOffsets(String topic) {
> > Map offsets = new HashMap<>();
> > List topicPartitions =
> > consumer.partitionsFor(topic).stream().map(partitionInfo ->
> > new TopicPartition(partitionInfo.topic(),
> > partitionInfo.partition()))
> > .collect(Collectors.toList());
> > Map earliestOffsets =
> > consumer.beginningOffsets(topicPartitions);
> > // pick committed offsets
> > for (TopicPartition topicAndPartition : topicPartitions) {
> >   final OffsetAndMetadata committed =
> > consumer.committed(topicAndPartition);
> >   Long earliestOffset = earliestOffsets.get(topicAndPartition);
> >   if (committed != null && committed.offset() > earliestOffset) {
> > logger
> > .warn(
> > "Committed offset found for: {} offset:{} -> Hence adding
> > committed offset",
> > topicAndPartition, committed.offset());
> > offsets.put(topicAndPartition, committed.offset());
> >   } else {
> > logger
> > .warn(
> > "New partition/stale offset found for: {} offset:{} ->
> Hence
> > adding earliest offset",
> > topicAndPartition, earliestOffset);
> > offsets.put(topicAndPartition, earliestOffset);
> >   }
> > }
> > return offsets;
> >   }
> >
> > The actual stack trace:
> >
> > Caused by: java.lang.IllegalStateException: No current assignment for
> > partition genericEvents-1
> > 2017-11-23 10:35:24,677 -at
> >
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)
> > 2017-11-23 10:35:24,677 -at
> >
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)
> > 2017-11-23 10:35:24,677 -at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)
> > 2017-11-23 10:35:24,678 -at
> >
> org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:107)
> > 2017-11-23 10:35:24,678 -at
> >
> org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:106)
> > 2017-11-23 10:35:24,678 -at
> > scala.collection.Iterator$class.foreach(Iterator.scala:893)
> > 2017-11-23 10:35:24,678 -at
> > scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> > 2017-11-23 10:35:24,678 -at
> > scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> > 2017-11-23 10:35:24,678 -at
> > scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> > 2017-11-23 10:35:24,678 -at
> >
> org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:106)
> > 2017-11-23 10:35:24,679 -at
> >

Re: Kafka version support

2017-11-30 Thread Michael Armbrust
Oh good question.  I was saying that the stock structured streaming
connector should be able to talk to 0.11 or 1.0 brokers.

On Thu, Nov 30, 2017 at 1:12 PM, Cody Koeninger  wrote:

> Are you talking about the broker version, or the kafka-clients artifact
> version?
>
> On Thu, Nov 30, 2017 at 12:17 AM, Raghavendra Pandey
>  wrote:
> > Just wondering if anyone has tried spark structured streaming kafka
> > connector (2.2) with Kafka 0.11 or Kafka 1.0 version
> >
> > Thanks
> > Raghav
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Kafka version support

2017-11-30 Thread Cody Koeninger
Are you talking about the broker version, or the kafka-clients artifact version?

On Thu, Nov 30, 2017 at 12:17 AM, Raghavendra Pandey
 wrote:
> Just wondering if anyone has tried spark structured streaming kafka
> connector (2.2) with Kafka 0.11 or Kafka 1.0 version
>
> Thanks
> Raghav

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark streaming] No assigned partition error during seek

2017-11-30 Thread Cody Koeninger
You mentioned 0.11 version; the latest version of org.apache.kafka
kafka-clients artifact supported by DStreams is 0.10.0.1, for which it
has an appropriate dependency.

Are you manually depending on a different version of the kafka-clients artifact?

On Fri, Nov 24, 2017 at 7:39 PM, venks61176  wrote:
> Version: 2.2 with Kafka010
>
> Hi,
>
> We are running spark streaming on AWS and trying to process incoming
> messages on Kafka topics. All was well.
> Recently we wanted to migrate from 0.8 to 0.11 version of Spark library and
> Kafka 0.11 version of server.
>
> With this new version of software we are facing issues with regard to 'No
> assignment to partition for a topic and it happens intermittently'. I
> construct four DStreams with different group.ids as suggested.
>
> The main source of code thats causing the issue is this one
>
> if (!toSeek.isEmpty) {
>   // work around KAFKA-3370 when reset is none
>   // poll will throw if no position, i.e. auto offset reset none and no
> explicit position
>   // but cant seek to a position before poll, because poll is what gets
> subscription partitions
>   // So, poll, suppress the first exception, then seek
>   val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
>   val shouldSuppress = aor != null &&
> aor.asInstanceOf[String].toUpperCase == "NONE"
>   try {
> consumer.poll(0)
>   } catch {
> case x: NoOffsetForPartitionException if shouldSuppress =>
>   logWarning("Catching NoOffsetForPartitionException since " +
> ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none.  See
> KAFKA-3370")
>   }
>   toSeek.asScala.foreach { case (topicPartition, offset) =>
>   *consumer.seek(topicPartition, offset)*
>   }
> }
>
> At the start of the job, I also ensure we are supplying all required offsets
> correctly
>
> private Map getCommittedOffsets(String topic) {
> Map offsets = new HashMap<>();
> List topicPartitions =
> consumer.partitionsFor(topic).stream().map(partitionInfo ->
> new TopicPartition(partitionInfo.topic(),
> partitionInfo.partition()))
> .collect(Collectors.toList());
> Map earliestOffsets =
> consumer.beginningOffsets(topicPartitions);
> // pick committed offsets
> for (TopicPartition topicAndPartition : topicPartitions) {
>   final OffsetAndMetadata committed =
> consumer.committed(topicAndPartition);
>   Long earliestOffset = earliestOffsets.get(topicAndPartition);
>   if (committed != null && committed.offset() > earliestOffset) {
> logger
> .warn(
> "Committed offset found for: {} offset:{} -> Hence adding
> committed offset",
> topicAndPartition, committed.offset());
> offsets.put(topicAndPartition, committed.offset());
>   } else {
> logger
> .warn(
> "New partition/stale offset found for: {} offset:{} -> Hence
> adding earliest offset",
> topicAndPartition, earliestOffset);
> offsets.put(topicAndPartition, earliestOffset);
>   }
> }
> return offsets;
>   }
>
> The actual stack trace:
>
> Caused by: java.lang.IllegalStateException: No current assignment for
> partition genericEvents-1
> 2017-11-23 10:35:24,677 -at
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)
> 2017-11-23 10:35:24,677 -at
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)
> 2017-11-23 10:35:24,677 -at
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)
> 2017-11-23 10:35:24,678 -at
> org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:107)
> 2017-11-23 10:35:24,678 -at
> org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:106)
> 2017-11-23 10:35:24,678 -at
> scala.collection.Iterator$class.foreach(Iterator.scala:893)
> 2017-11-23 10:35:24,678 -at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> 2017-11-23 10:35:24,678 -at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> 2017-11-23 10:35:24,678 -at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> 2017-11-23 10:35:24,678 -at
> org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:106)
> 2017-11-23 10:35:24,679 -at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72)
> 2017-11-23 10:35:24,679 -at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242)
> 2017-11-23 10:35:24,679 -at
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
> 2017-11-23 10:35:24,679 -at
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
> 2017-

Re: Kafka version support

2017-11-30 Thread Michael Armbrust
I would expect that to work.

On Wed, Nov 29, 2017 at 10:17 PM, Raghavendra Pandey <
raghavendra.pan...@gmail.com> wrote:

> Just wondering if anyone has tried spark structured streaming kafka
> connector (2.2) with Kafka 0.11 or Kafka 1.0 version
>
> Thanks
> Raghav
>


[Spark ML] : Implement the Conjugate Gradient method for ALS

2017-11-30 Thread Nate Wendt
The conjugate gradient method has been shown to be very efficient at
solving the least squares error problem in matrix factorization:
http://www.benfrederickson.com/fast-implicit-matrix-factorization/.

This post is motivated by:
https://pdfs.semanticscholar.org/bfdf/7af6cf7fd7bb5e6b6db5bbd91be11597eaf0.pdf

Implementing this in Spark could mean a significant speedup in ALS solving
as the order of growth is smaller than the default solver (Cholesky). This
has the potential to improve the training phase of collaborative filtering
significantly.

I've opened a JIRA ticket
 but thought I'd reach
out here as well since I've implemented the algorithm (for implicit
feedback) and demonstrated it's correctness but I am having trouble
actually seeing a performance speedup, likely due to incorrect handling of
RDD persistence/checkpointing. I wasn't sure the best way to reach out to
see if there were dev cycles available to collaborate on completing this
solution but I figure it has the potential to have a big impact within
Spark and MLLib. If there is interest, I can open a pull request with the
functionally correct code I have as of now.

*Note, we are seeing collaborative filtering training times of over 3 hours
within Spark (4 large node instances) compared to ~8 minutes on a single
machine running the Implicit library cited above.  It would be great to get
this kind of speedup within Spark and potentially benefit from the added
parallelism.*

Thanks,
Nathaniel