[GitHub] spark pull request #23103: [SPARK-26121] [Structured Streaming] Allow users ...

2018-11-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/23103


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23103: [SPARK-26121] [Structured Streaming] Allow users ...

2018-11-21 Thread zouzias
Github user zouzias commented on a diff in the pull request:

https://github.com/apache/spark/pull/23103#discussion_r235627877
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -538,6 +538,17 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
   .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
   .build()
 
+  /**
+   * Returns a unique consumer group (group.id), allowing the user to set 
the prefix of
+   * the consumer group
+   */
+  private def streamingUniqueGroupId(parameters: Map[String, String],
+ metadataPath: String): String = {
+val groupIdPrefix = parameters
+  .getOrElse("group.id.prefix", "spark-kafka-source")
--- End diff --

Makes sense, fixed in 39424dd 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23103: [SPARK-26121] [Structured Streaming] Allow users ...

2018-11-21 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/23103#discussion_r235480695
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -538,6 +538,17 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
   .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
   .build()
 
+  /**
+   * Returns a unique consumer group (group.id), allowing the user to set 
the prefix of
+   * the consumer group
+   */
+  private def streamingUniqueGroupId(parameters: Map[String, String],
--- End diff --

first arg should be on its own newline as well


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23103: [SPARK-26121] [Structured Streaming] Allow users ...

2018-11-21 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/23103#discussion_r235479872
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -538,6 +538,17 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
   .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
   .build()
 
+  /**
+   * Returns a unique consumer group (group.id), allowing the user to set 
the prefix of
+   * the consumer group
+   */
+  private def streamingUniqueGroupId(parameters: Map[String, String],
+ metadataPath: String): String = {
+val groupIdPrefix = parameters
+  .getOrElse("group.id.prefix", "spark-kafka-source")
--- End diff --

kafka.* is reserved for the existing kafka project's client configs, see 
e.g. line 86.  I'd just go with groupIdPrefix


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23103: [SPARK-26121] [Structured Streaming] Allow users ...

2018-11-21 Thread zouzias
Github user zouzias commented on a diff in the pull request:

https://github.com/apache/spark/pull/23103#discussion_r235478266
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -538,6 +538,17 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
   .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
   .build()
 
+  /**
+   * Returns a unique consumer group (group.id), allowing the user to set 
the prefix of
+   * the consumer group
+   */
+  private def streamingUniqueGroupId(parameters: Map[String, String],
--- End diff --

No worries, done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23103: [SPARK-26121] [Structured Streaming] Allow users ...

2018-11-21 Thread zouzias
Github user zouzias commented on a diff in the pull request:

https://github.com/apache/spark/pull/23103#discussion_r235478048
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -538,6 +538,17 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
   .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
   .build()
 
+  /**
+   * Returns a unique consumer group (group.id), allowing the user to set 
the prefix of
+   * the consumer group
+   */
+  private def streamingUniqueGroupId(parameters: Map[String, String],
+ metadataPath: String): String = {
+val groupIdPrefix = parameters
+  .getOrElse("group.id.prefix", "spark-kafka-source")
--- End diff --

I named the property `kafka.groupIdPrefix` for now. Let me know if you a 
better name suggestion.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23103: [SPARK-26121] [Structured Streaming] Allow users ...

2018-11-21 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/23103#discussion_r235462394
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -538,6 +538,17 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
   .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
   .build()
 
+  /**
+   * Returns a unique consumer group (group.id), allowing the user to set 
the prefix of
+   * the consumer group
+   */
+  private def streamingUniqueGroupId(parameters: Map[String, String],
+ metadataPath: String): String = {
+val groupIdPrefix = parameters
+  .getOrElse("group.id.prefix", "spark-kafka-source")
--- End diff --

It seems like convention has been to mostly use camelcase for streaming 
options that aren't from the existing kafka.blah.whatever configuration 
namespace... e.g. subscribePattern, startingOffsets, maxOffsetsPerTrigger


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23103: [SPARK-26121] [Structured Streaming] Allow users ...

2018-11-21 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/23103#discussion_r235461374
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -538,6 +538,17 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
   .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
   .build()
 
+  /**
+   * Returns a unique consumer group (group.id), allowing the user to set 
the prefix of
+   * the consumer group
+   */
+  private def streamingUniqueGroupId(parameters: Map[String, String],
--- End diff --

Sorry there isn't an automatic formatter for this... but use 4 space 
indentation for multi-line argument lists

https://github.com/databricks/scala-style-guide#spacing-and-indentation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org