[GitHub] spark pull request #15679: [SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] A...

2016-10-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/15679


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15679: [SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] A...

2016-10-29 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15679#discussion_r85642639
  
--- Diff: docs/streaming-kafka-0-10-integration.md ---
@@ -141,9 +214,11 @@ For data stores that support transactions, saving 
offsets in the same transactio
 
// begin from the the offsets committed to the database
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
- new TopicPartition(resultSet.string("topic")), 
resultSet.int("partition")) -> resultSet.long("offset")
+ new TopicPartition(resultSet.string("topic"), 
resultSet.int("partition")) -> resultSet.long("offset")
}.toMap
 
+   import org.apache.spark.streaming.kafka010.ConsumerStrategies.Assign
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15679: [SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] A...

2016-10-29 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15679#discussion_r85642607
  
--- Diff: docs/streaming-kafka-0-10-integration.md ---
@@ -165,6 +240,36 @@ For data stores that support transactions, saving 
offsets in the same transactio
}
 
 
+   // The details depend on your data store, but the general idea looks 
like this
+
+   // begin from the the offsets committed to the database
+   Map fromOffsets = new HashMap<>();
+   for (resultSet: selectOffsetsFromYourDatabase)
+ fromOffsets.put(new TopicPartition(resultSet.string("topic"), 
resultSet.int("partition")), resultSet.long("offset"));
+   }
+
+   JavaInputDStream> stream = 
KafkaUtils.createDirectStream(
+ streamingContext,
+ LocationStrategies.PreferConsistent(),
+ ConsumerStrategies.Assign(fromOffsets.keySet(), 
kafkaParams, fromOffsets)
+   );
+
+   stream.foreachRDD(new VoidFunction>>() {
+ @Override
+ public void call(JavaRDD> rdd) {
+   OffsetRange[] offsetRanges = ((HasOffsetRanges) 
rdd.rdd()).offsetRanges();
+   
+   Object results = yourCalculation(rdd);
+
+   yourTransactionBlock {
--- End diff --

well received; thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15679: [SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] A...

2016-10-29 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15679#discussion_r85642029
  
--- Diff: docs/streaming-kafka-0-10-integration.md ---
@@ -165,6 +240,36 @@ For data stores that support transactions, saving 
offsets in the same transactio
}
 
 
+   // The details depend on your data store, but the general idea looks 
like this
+
+   // begin from the the offsets committed to the database
+   Map fromOffsets = new HashMap<>();
+   for (resultSet: selectOffsetsFromYourDatabase)
+ fromOffsets.put(new TopicPartition(resultSet.string("topic"), 
resultSet.int("partition")), resultSet.long("offset"));
+   }
+
+   JavaInputDStream> stream = 
KafkaUtils.createDirectStream(
+ streamingContext,
+ LocationStrategies.PreferConsistent(),
+ ConsumerStrategies.Assign(fromOffsets.keySet(), 
kafkaParams, fromOffsets)
+   );
+
+   stream.foreachRDD(new VoidFunction>>() {
+ @Override
+ public void call(JavaRDD> rdd) {
+   OffsetRange[] offsetRanges = ((HasOffsetRanges) 
rdd.rdd()).offsetRanges();
+   
+   Object results = yourCalculation(rdd);
+
+   yourTransactionBlock {
--- End diff --

I agree with Sean, this would probably be clearer if it was changed to a 
comment like
// begin your transaction
...
// end your transaction


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15679: [SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] A...

2016-10-29 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15679#discussion_r85641731
  
--- Diff: docs/streaming-kafka-0-10-integration.md ---
@@ -120,15 +184,24 @@ Kafka has an offset commit API that stores offsets in 
a special Kafka topic.  By
 
 
stream.foreachRDD { rdd =>
- val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
 
  // some time later, after outputs have completed
- stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
+ stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
 
 As with HasOffsetRanges, the cast to CanCommitOffsets will only succeed if 
called on the result of createDirectStream, not after transformations.  The 
commitAsync call is threadsafe, but must occur after outputs if you want 
meaningful semantics.
 
 
+   stream.foreachRDD(new VoidFunction>>() {
+ @Override
+ public void call(JavaRDD> rdd) {
+   OffsetRange[] offsetRanges = ((HasOffsetRanges) 
rdd.rdd()).offsetRanges();
+
+   // some time later, after outputs have completed
+   ((CanCommitOffsets) 
stream.inputDStream()).commitAsync(offsetRanges);
--- End diff --

thanks Cody. sorry for not being clear, but my point was that the Java 
kafka input stream does not implements `CanCommitOffsets`, thus it has to 
delegate `commitAsync(...)` explicitly to `stream.inputDStream()`, which is a 
scala input stream which implements `CanCommitOffsets`. 

should we return a java kafka inputdstream that also implements 
`CanCommitOffsets` when we `createDirectStream()` in java?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15679: [SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] A...

2016-10-29 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15679#discussion_r85641730
  
--- Diff: docs/streaming-kafka-0-10-integration.md ---
@@ -120,15 +184,24 @@ Kafka has an offset commit API that stores offsets in 
a special Kafka topic.  By
 
 
stream.foreachRDD { rdd =>
- val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
 
  // some time later, after outputs have completed
- stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
+ stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
 
 As with HasOffsetRanges, the cast to CanCommitOffsets will only succeed if 
called on the result of createDirectStream, not after transformations.  The 
commitAsync call is threadsafe, but must occur after outputs if you want 
meaningful semantics.
 
 
+   stream.foreachRDD(new VoidFunction>>() {
+ @Override
+ public void call(JavaRDD> rdd) {
+   OffsetRange[] offsetRanges = ((HasOffsetRanges) 
rdd.rdd()).offsetRanges();
+
+   // some time later, after outputs have completed
+   ((CanCommitOffsets) 
stream.inputDStream()).commitAsync(offsetRanges);
--- End diff --

thanks Cody. sorry for not being clear, but my point was that the Java 
kafka input stream does not implements `CanCommitOffsets`, thus it has to 
delegate `commitAsync(...)` explicitly to `stream.inputDStream()`, which is a 
scala input stream which implements `CanCommitOffsets`. 

should we return a java kafka inputdstream that also implements 
`CanCommitOffsets` when we `createDirectStream()` in java?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15679: [SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] A...

2016-10-29 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15679#discussion_r85641432
  
--- Diff: docs/streaming-kafka-0-10-integration.md ---
@@ -120,15 +184,24 @@ Kafka has an offset commit API that stores offsets in 
a special Kafka topic.  By
 
 
stream.foreachRDD { rdd =>
- val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
 
  // some time later, after outputs have completed
- stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
+ stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
 
 As with HasOffsetRanges, the cast to CanCommitOffsets will only succeed if 
called on the result of createDirectStream, not after transformations.  The 
commitAsync call is threadsafe, but must occur after outputs if you want 
meaningful semantics.
 
 
+   stream.foreachRDD(new VoidFunction>>() {
+ @Override
+ public void call(JavaRDD> rdd) {
+   OffsetRange[] offsetRanges = ((HasOffsetRanges) 
rdd.rdd()).offsetRanges();
+
+   // some time later, after outputs have completed
+   ((CanCommitOffsets) 
stream.inputDStream()).commitAsync(offsetRanges);
--- End diff --

I think it's far too late to fix those issues at this point.  DStreams 
return an RDD, not a parameterized type.  KafkaUtils methods return DStreams 
and RDDs, not an implementation specific type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15679: [SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] A...

2016-10-29 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15679#discussion_r85640168
  
--- Diff: docs/streaming-kafka-0-10-integration.md ---
@@ -165,6 +240,36 @@ For data stores that support transactions, saving 
offsets in the same transactio
}
 
 
+   // The details depend on your data store, but the general idea looks 
like this
+
+   // begin from the the offsets committed to the database
+   Map fromOffsets = new HashMap<>();
+   for (resultSet: selectOffsetsFromYourDatabase)
+ fromOffsets.put(new TopicPartition(resultSet.string("topic"), 
resultSet.int("partition")), resultSet.long("offset"));
+   }
+
+   JavaInputDStream> stream = 
KafkaUtils.createDirectStream(
+ streamingContext,
+ LocationStrategies.PreferConsistent(),
+ ConsumerStrategies.Assign(fromOffsets.keySet(), 
kafkaParams, fromOffsets)
+   );
+
+   stream.foreachRDD(new VoidFunction>>() {
+ @Override
+ public void call(JavaRDD> rdd) {
+   OffsetRange[] offsetRanges = ((HasOffsetRanges) 
rdd.rdd()).offsetRanges();
+   
+   Object results = yourCalculation(rdd);
+
+   yourTransactionBlock {
--- End diff --

oh, just to keep it consistent with the scala counterpart


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15679: [SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] A...

2016-10-29 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15679#discussion_r85640159
  
--- Diff: docs/streaming-kafka-0-10-integration.md ---
@@ -165,6 +240,36 @@ For data stores that support transactions, saving 
offsets in the same transactio
}
 
 
+   // The details depend on your data store, but the general idea looks 
like this
+
+   // begin from the the offsets committed to the database
+   Map fromOffsets = new HashMap<>();
+   for (resultSet: selectOffsetsFromYourDatabase)
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15679: [SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] A...

2016-10-29 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15679#discussion_r85640157
  
--- Diff: docs/streaming-kafka-0-10-integration.md ---
@@ -103,6 +153,20 @@ Note that you cannot use `PreferBrokers`, because 
without the stream there is no
}
 
 
+   stream.foreachRDD(new VoidFunction>>() {
+ @Override
+ public void call(JavaRDD> rdd) {
+   final OffsetRange[] offsetRanges = ((HasOffsetRanges) 
rdd.rdd()).offsetRanges();
+   rdd.foreachPartition(new 
VoidFunction>>() {
+ @Override
+ public void call(Iterator> 
consumerRecords) throws Exception {
--- End diff --

done -- my bad


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15679: [SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] A...

2016-10-29 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15679#discussion_r85640154
  
--- Diff: docs/streaming-kafka-0-10-integration.md ---
@@ -85,6 +121,20 @@ If you have a use case that is better suited to batch 
processing, you can create
 
 
 
+   // Import dependencies and create kafka params as in Create Direct 
Stream above
+
+   OffsetRange[] offsetRanges = new OffsetRange[]{
--- End diff --

done -- thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15679: [SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] A...

2016-10-29 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15679#discussion_r85640143
  
--- Diff: docs/streaming-kafka-0-10-integration.md ---
@@ -44,6 +44,42 @@ For Scala/Java applications using SBT/Maven project 
definitions, link your strea
 Each item in the stream is a 
[ConsumerRecord](http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html)
 
 
+   import java.util.*;
+   import org.apache.spark.SparkConf;
+   import org.apache.spark.TaskContext;
+   import org.apache.spark.api.java.*;
+   import org.apache.spark.api.java.function.*;
+   import org.apache.spark.streaming.api.java.*;
+   import org.apache.spark.streaming.kafka010.*;
+   import org.apache.kafka.clients.consumer.ConsumerRecord;
+   import org.apache.kafka.common.TopicPartition;
+   import org.apache.kafka.common.serialization.StringDeserializer;
+   import scala.Tuple2;
+   
+   Map kafkaParams = new HashMap();
--- End diff --

A: yes -- please see 
https://github.com/apache/kafka/blob/0.10.0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L539


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15679: [SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] A...

2016-10-29 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15679#discussion_r85638955
  
--- Diff: docs/streaming-kafka-0-10-integration.md ---
@@ -165,6 +240,36 @@ For data stores that support transactions, saving 
offsets in the same transactio
}
 
 
+   // The details depend on your data store, but the general idea looks 
like this
+
+   // begin from the the offsets committed to the database
+   Map fromOffsets = new HashMap<>();
+   for (resultSet: selectOffsetsFromYourDatabase)
--- End diff --

Space before colon


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15679: [SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] A...

2016-10-29 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15679#discussion_r85638952
  
--- Diff: docs/streaming-kafka-0-10-integration.md ---
@@ -103,6 +153,20 @@ Note that you cannot use `PreferBrokers`, because 
without the stream there is no
}
 
 
+   stream.foreachRDD(new VoidFunction>>() {
+ @Override
+ public void call(JavaRDD> rdd) {
+   final OffsetRange[] offsetRanges = ((HasOffsetRanges) 
rdd.rdd()).offsetRanges();
+   rdd.foreachPartition(new 
VoidFunction>>() {
+ @Override
+ public void call(Iterator> 
consumerRecords) throws Exception {
--- End diff --

Can this actually throw `Exception`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15679: [SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] A...

2016-10-29 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15679#discussion_r85638945
  
--- Diff: docs/streaming-kafka-0-10-integration.md ---
@@ -85,6 +121,20 @@ If you have a use case that is better suited to batch 
processing, you can create
 
 
 
+   // Import dependencies and create kafka params as in Create Direct 
Stream above
+
+   OffsetRange[] offsetRanges = new OffsetRange[]{
+ // topic, partition, inclusive starting offset, exclusive ending 
offset
+ OffsetRange.create("test", 0, 0, 100),
+ OffsetRange.create("test", 1, 0, 100)
+   };
+
+   JavaRDD> rdd = KafkaUtils.createRDD(
--- End diff --

I don't think `` is needed after `KafkaUtils` in Java 7?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15679: [SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] A...

2016-10-29 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15679#discussion_r85638958
  
--- Diff: docs/streaming-kafka-0-10-integration.md ---
@@ -165,6 +240,36 @@ For data stores that support transactions, saving 
offsets in the same transactio
}
 
 
+   // The details depend on your data store, but the general idea looks 
like this
+
+   // begin from the the offsets committed to the database
+   Map fromOffsets = new HashMap<>();
+   for (resultSet: selectOffsetsFromYourDatabase)
+ fromOffsets.put(new TopicPartition(resultSet.string("topic"), 
resultSet.int("partition")), resultSet.long("offset"));
+   }
+
+   JavaInputDStream> stream = 
KafkaUtils.createDirectStream(
+ streamingContext,
+ LocationStrategies.PreferConsistent(),
+ ConsumerStrategies.Assign(fromOffsets.keySet(), 
kafkaParams, fromOffsets)
+   );
+
+   stream.foreachRDD(new VoidFunction>>() {
+ @Override
+ public void call(JavaRDD> rdd) {
+   OffsetRange[] offsetRanges = ((HasOffsetRanges) 
rdd.rdd()).offsetRanges();
+   
+   Object results = yourCalculation(rdd);
+
+   yourTransactionBlock {
--- End diff --

What is this syntax? don't think this is Java


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15679: [SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] A...

2016-10-29 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15679#discussion_r85638937
  
--- Diff: docs/streaming-kafka-0-10-integration.md ---
@@ -85,6 +121,20 @@ If you have a use case that is better suited to batch 
processing, you can create
 
 
 
+   // Import dependencies and create kafka params as in Create Direct 
Stream above
+
+   OffsetRange[] offsetRanges = new OffsetRange[]{
--- End diff --

Nit: you can just write `... = { ... }` in Java for a static array 
declaration.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15679: [SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] A...

2016-10-29 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15679#discussion_r85638966
  
--- Diff: docs/streaming-kafka-0-10-integration.md ---
@@ -44,6 +44,42 @@ For Scala/Java applications using SBT/Maven project 
definitions, link your strea
 Each item in the stream is a 
[ConsumerRecord](http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html)
 
 
+   import java.util.*;
+   import org.apache.spark.SparkConf;
+   import org.apache.spark.TaskContext;
+   import org.apache.spark.api.java.*;
+   import org.apache.spark.api.java.function.*;
+   import org.apache.spark.streaming.api.java.*;
+   import org.apache.spark.streaming.kafka010.*;
+   import org.apache.kafka.clients.consumer.ConsumerRecord;
+   import org.apache.kafka.common.TopicPartition;
+   import org.apache.kafka.common.serialization.StringDeserializer;
+   import scala.Tuple2;
+   
+   Map kafkaParams = new HashMap();
--- End diff --

Q: is the map value type really Object for the Kafka API?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15679: [SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] A...

2016-10-28 Thread lw-lin
GitHub user lw-lin opened a pull request:

https://github.com/apache/spark/pull/15679

[SPARK-16312][Follow-up][STREAMING][KAFKA][DOC] Add java code snippet for 
Kafka 0.10 integration doc

## What changes were proposed in this pull request?

added java code snippet for Kafka 0.10 integration doc

## How was this patch tested?

SKIP_API=1 jekyll build

## Screenshot


![kafka-doc](https://cloud.githubusercontent.com/assets/15843379/19826272/bf0d8a4c-9db8-11e6-9e40-1396723df4bc.png)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lw-lin/spark kafka-010-examples

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15679.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15679


commit 9dc8e0fa6e679eb511b70a1796d9ae127661a95e
Author: Liwei Lin 
Date:   2016-10-28T09:07:42Z

Add Java code snippet for Kafka guide




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org