[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-05-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/17467


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-05-05 Thread yssharma
Github user yssharma commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r115110773
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.streaming.StreamingContext
+
+/**
+ * Configurations to pass to the [KinesisBackedBlockRDD].
+ *
+ * @param maxRetries: The maximum number of attempts to be made to 
Kinesis. Defaults to 3.
+ * @param retryWaitTimeMs: The interval between consequent Kinesis retries.
+ * Defaults to 100ms.
+ * @param retryTimeoutMs: The timeout in milliseconds for a Kinesis 
request.
+* Defaults to batch duration provided for 
streaming,
+* else uses 1 if invoked directly.
+ */
+private[kinesis] case class KinesisReadConfigurations(
+ maxRetries: Int,
+ retryWaitTimeMs: Long,
+ retryTimeoutMs: Long)
+
+object KinesisReadConfigurations {
+  def apply(): KinesisReadConfigurations = {
--- End diff --

It can be used in places where we don't have the spark conf. I am using 
this in `KinesisBackedBlockRDD`'s constructor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-05-05 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r115039620
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
 ---
@@ -60,12 +61,19 @@ private[kinesis] class KinesisInputDStream[T: ClassTag](
   val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
   logDebug(s"Creating KinesisBackedBlockRDD for $time with 
${seqNumRanges.length} " +
   s"seq number ranges: ${seqNumRanges.mkString(", ")} ")
+
+  /**
+   * Construct the Kinesis read configs from streaming context
+   * and pass to KinesisBackedBlockRDD
+   */
+  val kinesisReadConfigs = KinesisReadConfigurations(ssc)
+
   new KinesisBackedBlockRDD(
 context.sc, regionName, endpointUrl, blockIds, seqNumRanges,
 isBlockIdValid = isBlockIdValid,
-retryTimeoutMs = ssc.graph.batchDuration.milliseconds.toInt,
 messageHandler = messageHandler,
-kinesisCreds = kinesisCreds)
+kinesisCreds = kinesisCreds,
+kinesisReadConfigs = kinesisReadConfigs)
--- End diff --

+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-05-04 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r114922000
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
 ---
@@ -60,12 +61,19 @@ private[kinesis] class KinesisInputDStream[T: ClassTag](
   val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
   logDebug(s"Creating KinesisBackedBlockRDD for $time with 
${seqNumRanges.length} " +
   s"seq number ranges: ${seqNumRanges.mkString(", ")} ")
+
+  /**
+   * Construct the Kinesis read configs from streaming context
+   * and pass to KinesisBackedBlockRDD
+   */
+  val kinesisReadConfigs = KinesisReadConfigurations(ssc)
+
   new KinesisBackedBlockRDD(
 context.sc, regionName, endpointUrl, blockIds, seqNumRanges,
 isBlockIdValid = isBlockIdValid,
-retryTimeoutMs = ssc.graph.batchDuration.milliseconds.toInt,
 messageHandler = messageHandler,
-kinesisCreds = kinesisCreds)
+kinesisCreds = kinesisCreds,
+kinesisReadConfigs = kinesisReadConfigs)
--- End diff --

I think it would be sufficient to change this to

```scala
  kinesisReadConfigs = KinesisReadConfigurations(ssc))
```

and omit lines 65-70. I don't think a comment is necessary here, the code 
is pretty straightforward.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-05-04 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r114855947
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.streaming.StreamingContext
+
+/**
+ * Configurations to pass to the [KinesisBackedBlockRDD].
+ *
+ * @param maxRetries: The maximum number of attempts to be made to 
Kinesis. Defaults to 3.
+ * @param retryWaitTimeMs: The interval between consequent Kinesis retries.
+ * Defaults to 100ms.
+ * @param retryTimeoutMs: The timeout in milliseconds for a Kinesis 
request.
+* Defaults to batch duration provided for 
streaming,
+* else uses 1 if invoked directly.
+ */
+private[kinesis] case class KinesisReadConfigurations(
+ maxRetries: Int,
+ retryWaitTimeMs: Long,
+ retryTimeoutMs: Long)
+
+object KinesisReadConfigurations {
+  def apply(): KinesisReadConfigurations = {
+KinesisReadConfigurations(3, 100, 1)
--- End diff --

+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-05-04 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r114855848
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.streaming.StreamingContext
+
+/**
+ * Configurations to pass to the [KinesisBackedBlockRDD].
--- End diff --

+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-05-04 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r114856082
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.streaming.StreamingContext
+
+/**
+ * Configurations to pass to the [KinesisBackedBlockRDD].
+ *
+ * @param maxRetries: The maximum number of attempts to be made to 
Kinesis. Defaults to 3.
+ * @param retryWaitTimeMs: The interval between consequent Kinesis retries.
+ * Defaults to 100ms.
+ * @param retryTimeoutMs: The timeout in milliseconds for a Kinesis 
request.
+* Defaults to batch duration provided for 
streaming,
+* else uses 1 if invoked directly.
+ */
+private[kinesis] case class KinesisReadConfigurations(
+ maxRetries: Int,
+ retryWaitTimeMs: Long,
+ retryTimeoutMs: Long)
+
+object KinesisReadConfigurations {
+  def apply(): KinesisReadConfigurations = {
--- End diff --

actually do we even need this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-05-04 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r114856307
  
--- Diff: 
external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
 ---
@@ -234,6 +235,53 @@ abstract class KinesisStreamTests(aggregateTestData: 
Boolean) extends KinesisFun
 ssc.stop(stopSparkContext = false)
   }
 
+  test("Kinesis read with custom configurations") {
+try {
+  ssc.sc.conf.set(RETRY_WAIT_TIME_KEY, "2000ms")
+  ssc.sc.conf.set(RETRY_MAX_ATTEMPTS_KEY, "5")
+
+  val kinesisStream = KinesisInputDStream.builder.streamingContext(ssc)
+  .checkpointAppName(appName)
+  .streamName("dummyStream")
+  .endpointUrl(dummyEndpointUrl)
+  .regionName(dummyRegionName)
+  .initialPositionInStream(InitialPositionInStream.LATEST)
+  .checkpointInterval(Seconds(10))
+  .storageLevel(StorageLevel.MEMORY_ONLY)
+  .build()
+  .asInstanceOf[KinesisInputDStream[Array[Byte]]]
+
+  val time = Time(1000)
+  // Generate block info data for testing
+  val seqNumRanges1 = SequenceNumberRanges(
+SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy", 67))
+  val blockId1 = StreamBlockId(kinesisStream.id, 123)
+  val blockInfo1 = ReceivedBlockInfo(
+0, None, Some(seqNumRanges1), new 
BlockManagerBasedStoreResult(blockId1, None))
+
+  val seqNumRanges2 = SequenceNumberRanges(
+SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb", 89))
+  val blockId2 = StreamBlockId(kinesisStream.id, 345)
+  val blockInfo2 = ReceivedBlockInfo(
+0, None, Some(seqNumRanges2), new 
BlockManagerBasedStoreResult(blockId2, None))
+
+  // Verify that the generated KinesisBackedBlockRDD has the all the 
right information
+  val blockInfos = Seq(blockInfo1, blockInfo2)
+
+  val kinesisRDD =
+kinesisStream.createBlockRDD(time, 
blockInfos).asInstanceOf[KinesisBackedBlockRDD[_]]
+
+  assert(kinesisRDD.kinesisReadConfigs.retryWaitTimeMs === 2000)
+  assert(kinesisRDD.kinesisReadConfigs.maxRetries === 5)
+  assert(kinesisRDD.kinesisReadConfigs.retryTimeoutMs === 
batchDuration.milliseconds)
+
+  ssc.sc.conf.remove(RETRY_WAIT_TIME_KEY)
--- End diff --

these also need to be in the `finally`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-05-04 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r114855912
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.streaming.StreamingContext
+
+/**
+ * Configurations to pass to the [KinesisBackedBlockRDD].
+ *
+ * @param maxRetries: The maximum number of attempts to be made to 
Kinesis. Defaults to 3.
+ * @param retryWaitTimeMs: The interval between consequent Kinesis retries.
+ * Defaults to 100ms.
+ * @param retryTimeoutMs: The timeout in milliseconds for a Kinesis 
request.
+* Defaults to batch duration provided for 
streaming,
+* else uses 1 if invoked directly.
+ */
+private[kinesis] case class KinesisReadConfigurations(
+ maxRetries: Int,
--- End diff --

+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-05-04 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r114856216
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.streaming.StreamingContext
+
+/**
+ * Configurations to pass to the [KinesisBackedBlockRDD].
+ *
+ * @param maxRetries: The maximum number of attempts to be made to 
Kinesis. Defaults to 3.
+ * @param retryWaitTimeMs: The interval between consequent Kinesis retries.
+ * Defaults to 100ms.
+ * @param retryTimeoutMs: The timeout in milliseconds for a Kinesis 
request.
+* Defaults to batch duration provided for 
streaming,
+* else uses 1 if invoked directly.
+ */
+private[kinesis] case class KinesisReadConfigurations(
+ maxRetries: Int,
+ retryWaitTimeMs: Long,
+ retryTimeoutMs: Long)
+
+object KinesisReadConfigurations {
--- End diff --

`private object`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-05-04 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r114850001
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.streaming.StreamingContext
+
+/**
+ * Configurations to pass to the [KinesisBackedBlockRDD].
+ *
+ * @param maxRetries: The maximum number of attempts to be made to 
Kinesis. Defaults to 3.
+ * @param retryWaitTimeMs: The interval between consequent Kinesis retries.
+ * Defaults to 100ms.
+ * @param retryTimeoutMs: The timeout in milliseconds for a Kinesis 
request.
+* Defaults to batch duration provided for 
streaming,
--- End diff --

*nit*: You're missing a space here and on the following line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-05-04 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r114850114
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.streaming.StreamingContext
+
+/**
+ * Configurations to pass to the [KinesisBackedBlockRDD].
+ *
+ * @param maxRetries: The maximum number of attempts to be made to 
Kinesis. Defaults to 3.
+ * @param retryWaitTimeMs: The interval between consequent Kinesis retries.
+ * Defaults to 100ms.
+ * @param retryTimeoutMs: The timeout in milliseconds for a Kinesis 
request.
+* Defaults to batch duration provided for 
streaming,
+* else uses 1 if invoked directly.
+ */
+private[kinesis] case class KinesisReadConfigurations(
+ maxRetries: Int,
--- End diff --

Incorrect indentation here-- should be 2 softabs/4 spaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-05-04 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r114851170
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.streaming.StreamingContext
+
+/**
+ * Configurations to pass to the [KinesisBackedBlockRDD].
+ *
+ * @param maxRetries: The maximum number of attempts to be made to 
Kinesis. Defaults to 3.
+ * @param retryWaitTimeMs: The interval between consequent Kinesis retries.
+ * Defaults to 100ms.
+ * @param retryTimeoutMs: The timeout in milliseconds for a Kinesis 
request.
+* Defaults to batch duration provided for 
streaming,
+* else uses 1 if invoked directly.
+ */
+private[kinesis] case class KinesisReadConfigurations(
+ maxRetries: Int,
+ retryWaitTimeMs: Long,
+ retryTimeoutMs: Long)
+
+object KinesisReadConfigurations {
+  def apply(): KinesisReadConfigurations = {
+KinesisReadConfigurations(3, 100, 1)
--- End diff --

I would use constants and named parameters here too, e.g.

```scala
def apply(): KinesisReadConfigurations = KinesisReadConfigurations(
  maxTretries = DEFAULT_MAX_RETRIES,
  ...
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-05-04 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r114851396
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.streaming.StreamingContext
+
+/**
+ * Configurations to pass to the [KinesisBackedBlockRDD].
--- End diff --

*nit*: should be ```[[KinesisBackedBlockRDD]]```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-05-03 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r114586962
  
--- Diff: 
external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
 ---
@@ -234,6 +235,50 @@ abstract class KinesisStreamTests(aggregateTestData: 
Boolean) extends KinesisFun
 ssc.stop(stopSparkContext = false)
   }
 
+  test("Kinesis read with custom configurations") {
+ssc.sc.conf.set(RETRY_WAIT_TIME_KEY, "2000ms")
+ssc.sc.conf.set(RETRY_MAX_ATTEMPTS_KEY, "5")
+
+val kinesisStream = KinesisInputDStream.builder.streamingContext(ssc)
+.checkpointAppName(appName)
+.streamName("dummyStream")
+.endpointUrl(dummyEndpointUrl)
+.regionName(dummyRegionName)
+.initialPositionInStream(InitialPositionInStream.LATEST)
+.checkpointInterval(Seconds(10))
+.storageLevel(StorageLevel.MEMORY_ONLY)
+.build()
+.asInstanceOf[KinesisInputDStream[Array[Byte]]]
+
+val time = Time(1000)
+// Generate block info data for testing
+val seqNumRanges1 = SequenceNumberRanges(
+  SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy", 67))
+val blockId1 = StreamBlockId(kinesisStream.id, 123)
+val blockInfo1 = ReceivedBlockInfo(
+  0, None, Some(seqNumRanges1), new 
BlockManagerBasedStoreResult(blockId1, None))
+
+val seqNumRanges2 = SequenceNumberRanges(
+  SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb", 89))
+val blockId2 = StreamBlockId(kinesisStream.id, 345)
+val blockInfo2 = ReceivedBlockInfo(
+  0, None, Some(seqNumRanges2), new 
BlockManagerBasedStoreResult(blockId2, None))
+
+// Verify that the generated KinesisBackedBlockRDD has the all the 
right information
+val blockInfos = Seq(blockInfo1, blockInfo2)
+
+val kinesisRDD =
+  kinesisStream.createBlockRDD(time, 
blockInfos).asInstanceOf[KinesisBackedBlockRDD[_]]
+
+assert(kinesisRDD.kinesisReadConfigs.retryWaitTimeMs === 2000)
+assert(kinesisRDD.kinesisReadConfigs.maxRetries === 5)
+assert(kinesisRDD.kinesisReadConfigs.retryTimeoutMs === 
batchDuration.milliseconds)
+
+ssc.sc.conf.remove(RETRY_WAIT_TIME_KEY)
--- End diff --

mind putting these in a `try - finally` block


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-05-03 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r114586572
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.network.util.JavaUtils
+
+/**
+ * Configurations to pass to the [KinesisBackedBlockRDD].
+ *
+ * @param maxRetriesOption : The maximum number of attempts to be made to 
Kinesis. Defaults to 3.
+ * @param retryWaitTimeMsOption : The interval between consequent Kinesis 
retries.
+ *  Defaults to 100ms.
+ * @param retryTimeoutMsOption : The timeout in milliseconds for a Kinesis 
request.
+ * Defaults to batch duration provided for 
streaming,
+ * else uses 1 if invoked directly.
+ */
+private[kinesis]
+case class KinesisReadConfigurations (
--- End diff --

How about something like this?
```scala
private[kinesis] case class KinesisReadConfigurations(
maxRetries: Int,
retryWaitTimeMs: Long,
retryTimeoutMs: Long)

object KinesisReadConfigurations {
  def apply(): KinesisReadConfigurations = {
KinesisReadConfigurations(3, 100, 1)
  }

  def apply(ssc: StreamingContext): KinesisReadConfigurations = {
KinesisReadConfigurations(
  maxRetries = ssc.sc.getConf.getInt(RETRY_MAX_ATTEMPTS_KEY, 
DEFAULT_MAX_RETRIES),
  retryWaitTimeMs = JavaUtils.timeStringAsMs(
ssc.sc.getConf.getString(RETRY_WAIT_TIME_KEY, 
DEFAULT_RETRY_WAIT_TIME)),
  retryTimeoutMs = ssc.graph.batchDuration.milliseconds)
  }

  val DEFAULT_MAX_RETRIES = 3
  val DEFAULT_RETRY_WAIT_TIME = "100ms"
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-05-03 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r114584488
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 ---
@@ -284,17 +282,12 @@ class KinesisSequenceRangeIterator(
 result.getOrElse {
   if (isTimedOut) {
 throw new SparkException(
-  s"Timed out after $retryTimeoutMs ms while $message, last 
exception: ", lastError)
+  s"Timed out after ${kinesisReadConfigs.retryTimeoutMs} ms while "
+  + "$message, last exception: ", lastError)
--- End diff --

also please move `+` to line above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-05-03 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r114584422
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 ---
@@ -284,17 +282,12 @@ class KinesisSequenceRangeIterator(
 result.getOrElse {
   if (isTimedOut) {
 throw new SparkException(
-  s"Timed out after $retryTimeoutMs ms while $message, last 
exception: ", lastError)
+  s"Timed out after ${kinesisReadConfigs.retryTimeoutMs} ms while "
+  + "$message, last exception: ", lastError)
--- End diff --

`s"$message`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-05-03 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r114584091
  
--- Diff: docs/streaming-kinesis-integration.md ---
@@ -216,3 +216,7 @@ de-aggregate records during consumption.
 - If no Kinesis checkpoint info exists when the input DStream starts, it 
will start either from the oldest record available 
(`InitialPositionInStream.TRIM_HORIZON`) or from the latest tip 
(`InitialPositionInStream.LATEST`).  This is configurable.
   - `InitialPositionInStream.LATEST` could lead to missed records if data 
is added to the stream while no input DStreams are running (and no checkpoint 
info is being stored).
   - `InitialPositionInStream.TRIM_HORIZON` may lead to duplicate 
processing of records where the impact is dependent on checkpoint frequency and 
processing idempotency.
+
+ Kinesis retry configuration
+ - `spark.streaming.kinesis.retry.waitTime` : Wait time between Kinesis 
retries as a duration string. When reading from Amazon Kinesis, users may hit 
'ThroughputExceededExceptions', when consuming faster than 5 
transactions/second or, exceeding the maximum read rate of 2 MB/second. This 
configuration can be tweaked to increase the sleep between fetches when a fetch 
fails to reduce these exceptions. Default is "100ms".
+ - `spark.streaming.kinesis.retry.maxAttempts` : Max number of retries for 
Kinesis fetches. This config can also be used to tackle the Kinesis 
`ThroughputExceededExceptions` in scenarios mentioned above. It can be 
increased to have more number of retries for Kinesis reads. Default is 3.
--- End diff --

ditto


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-05-03 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r114584052
  
--- Diff: docs/streaming-kinesis-integration.md ---
@@ -216,3 +216,7 @@ de-aggregate records during consumption.
 - If no Kinesis checkpoint info exists when the input DStream starts, it 
will start either from the oldest record available 
(`InitialPositionInStream.TRIM_HORIZON`) or from the latest tip 
(`InitialPositionInStream.LATEST`).  This is configurable.
   - `InitialPositionInStream.LATEST` could lead to missed records if data 
is added to the stream while no input DStreams are running (and no checkpoint 
info is being stored).
   - `InitialPositionInStream.TRIM_HORIZON` may lead to duplicate 
processing of records where the impact is dependent on checkpoint frequency and 
processing idempotency.
+
+ Kinesis retry configuration
+ - `spark.streaming.kinesis.retry.waitTime` : Wait time between Kinesis 
retries as a duration string. When reading from Amazon Kinesis, users may hit 
'ThroughputExceededExceptions', when consuming faster than 5 
transactions/second or, exceeding the maximum read rate of 2 MB/second. This 
configuration can be tweaked to increase the sleep between fetches when a fetch 
fails to reduce these exceptions. Default is "100ms".
--- End diff --

`ProvisionedThroughputExceededException`s


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-23 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112849184
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 ---
@@ -135,7 +139,8 @@ class KinesisSequenceRangeIterator(
 endpointUrl: String,
 regionId: String,
 range: SequenceNumberRange,
-retryTimeoutMs: Int) extends NextIterator[Record] with Logging {
+retryTimeoutMs: Int,
+sparkConf: SparkConf) extends NextIterator[Record] with Logging {
--- End diff --

I prefer the latter. Create it in `KinesisInputDStream` and pass it down to 
`KinesisBackedBlockRDD`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-23 Thread yssharma
Github user yssharma commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112848855
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 ---
@@ -135,7 +139,8 @@ class KinesisSequenceRangeIterator(
 endpointUrl: String,
 regionId: String,
 range: SequenceNumberRange,
-retryTimeoutMs: Int) extends NextIterator[Record] with Logging {
+retryTimeoutMs: Int,
+sparkConf: SparkConf) extends NextIterator[Record] with Logging {
--- End diff --

Or we can pass then via spark conf and construct the 
KinesisReadConfigurations object in `KinesisInputDStream` and pass it down to 
`KinesisBackedBlockRDD `.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-23 Thread yssharma
Github user yssharma commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112848762
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 ---
@@ -135,7 +139,8 @@ class KinesisSequenceRangeIterator(
 endpointUrl: String,
 regionId: String,
 range: SequenceNumberRange,
-retryTimeoutMs: Int) extends NextIterator[Record] with Logging {
+retryTimeoutMs: Int,
+sparkConf: SparkConf) extends NextIterator[Record] with Logging {
--- End diff --

And would you expect it to be passed directly to the`KinesisInputDStream` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-23 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112848595
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 ---
@@ -135,7 +139,8 @@ class KinesisSequenceRangeIterator(
 endpointUrl: String,
 regionId: String,
 range: SequenceNumberRange,
-retryTimeoutMs: Int) extends NextIterator[Record] with Logging {
+retryTimeoutMs: Int,
+sparkConf: SparkConf) extends NextIterator[Record] with Logging {
--- End diff --

I would prefer a specialized case class,
something like:
```scala
case class KinesisReadConfigurations(
  maxRetries: Int,
  retryWaitTimeMs: Long,
  retryTimeoutMs: Long)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-23 Thread yssharma
Github user yssharma commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112848401
  
--- Diff: 
external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
 ---
@@ -101,6 +103,36 @@ abstract class 
KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
 }
   }
 
+  testIfEnabled("Basic reading from Kinesis with modified configurations") 
{
--- End diff --

I wasn't able to test the actual waiting of Kinesis. I haven't looked at 
the `PrivateMethodTester ` yet and check how that can help us to test how the 
vars are picked.
I used this testcase to debug and verify that all the values are passed 
correctly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-23 Thread yssharma
Github user yssharma commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112848373
  
--- Diff: 
external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
 ---
@@ -101,6 +103,36 @@ abstract class 
KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
 }
   }
 
+  testIfEnabled("Basic reading from Kinesis with modified configurations") 
{
+// Add Kinesis retry configurations
+sc.conf.set(RETRY_WAIT_TIME_KEY, "1000ms")
--- End diff --

+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-23 Thread yssharma
Github user yssharma commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112848363
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 ---
@@ -135,7 +139,8 @@ class KinesisSequenceRangeIterator(
 endpointUrl: String,
 regionId: String,
 range: SequenceNumberRange,
-retryTimeoutMs: Int) extends NextIterator[Record] with Logging {
+retryTimeoutMs: Int,
+sparkConf: SparkConf) extends NextIterator[Record] with Logging {
--- End diff --

@brkyvz - I was thinking not to pass individual configs to the constructor 
because that would just cause the list to grow. Using SparkConf or a Map would 
enable us to add new configs without any code changes. I was using a Map 
earlier for this so that its easy to pass more configs. 
What are your thoughts on Map vs Case class ?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-23 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112841794
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 ---
@@ -135,7 +139,8 @@ class KinesisSequenceRangeIterator(
 endpointUrl: String,
 regionId: String,
 range: SequenceNumberRange,
-retryTimeoutMs: Int) extends NextIterator[Record] with Logging {
+retryTimeoutMs: Int,
+sparkConf: SparkConf) extends NextIterator[Record] with Logging {
--- End diff --

I wouldn't pass in the `SparkConf` all the way in here. See how 
`retryTimeoutMs` has been passed in specifically above. You can do two things:
 1. Pass each of them one by one
 2. Evaluate all the configurations in `KinesisBackedBlockRDD` or one level 
higher and use a `case class` such as `KinesisReadConfigurations`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-23 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112841862
  
--- Diff: 
external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
 ---
@@ -101,6 +103,36 @@ abstract class 
KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
 }
   }
 
+  testIfEnabled("Basic reading from Kinesis with modified configurations") 
{
--- End diff --

I don't see how this test actually tests the configuration setting. It just 
tests if things work, not that the configurations are actually picked up.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-23 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112841727
  
--- Diff: docs/streaming-kinesis-integration.md ---
@@ -216,3 +216,7 @@ de-aggregate records during consumption.
 - If no Kinesis checkpoint info exists when the input DStream starts, it 
will start either from the oldest record available 
(`InitialPositionInStream.TRIM_HORIZON`) or from the latest tip 
(`InitialPositionInStream.LATEST`).  This is configurable.
   - `InitialPositionInStream.LATEST` could lead to missed records if data 
is added to the stream while no input DStreams are running (and no checkpoint 
info is being stored).
   - `InitialPositionInStream.TRIM_HORIZON` may lead to duplicate 
processing of records where the impact is dependent on checkpoint frequency and 
processing idempotency.
+
+ Kinesis retry configurations
+ - `spark.streaming.kinesis.retry.waitTime` : SparkConf for wait time 
between Kinesis retries (in milliseconds). Default is "100ms".
--- End diff --

Example: `Wait time between Kinesis retries as a duration string. When 
reading from Amazon Kinesis, users may hit 'ThroughputExceededExceptions', when 
consuming faster than 2 mb/s. This configuration can be tweaked to increase the 
sleep between fetches when a fetch fails to reduce these exceptions.`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-23 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112841869
  
--- Diff: 
external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
 ---
@@ -101,6 +103,36 @@ abstract class 
KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
 }
   }
 
+  testIfEnabled("Basic reading from Kinesis with modified configurations") 
{
+// Add Kinesis retry configurations
+sc.conf.set(RETRY_WAIT_TIME_KEY, "1000ms")
--- End diff --

we need to clean these up after the test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-23 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112841668
  
--- Diff: docs/streaming-kinesis-integration.md ---
@@ -216,3 +216,7 @@ de-aggregate records during consumption.
 - If no Kinesis checkpoint info exists when the input DStream starts, it 
will start either from the oldest record available 
(`InitialPositionInStream.TRIM_HORIZON`) or from the latest tip 
(`InitialPositionInStream.LATEST`).  This is configurable.
   - `InitialPositionInStream.LATEST` could lead to missed records if data 
is added to the stream while no input DStreams are running (and no checkpoint 
info is being stored).
   - `InitialPositionInStream.TRIM_HORIZON` may lead to duplicate 
processing of records where the impact is dependent on checkpoint frequency and 
processing idempotency.
+
+ Kinesis retry configurations
+ - `spark.streaming.kinesis.retry.waitTime` : SparkConf for wait time 
between Kinesis retries (in milliseconds). Default is "100ms".
--- End diff --

`SparkConf for` is redundant. I would try to focus on when should people 
actually tweak these, why are these confs important in the first place 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-23 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112839963
  
--- Diff: docs/streaming-kinesis-integration.md ---
@@ -216,3 +216,7 @@ de-aggregate records during consumption.
 - If no Kinesis checkpoint info exists when the input DStream starts, it 
will start either from the oldest record available 
(`InitialPositionInStream.TRIM_HORIZON`) or from the latest tip 
(`InitialPositionInStream.LATEST`).  This is configurable.
   - `InitialPositionInStream.LATEST` could lead to missed records if data 
is added to the stream while no input DStreams are running (and no checkpoint 
info is being stored).
   - `InitialPositionInStream.TRIM_HORIZON` may lead to duplicate 
processing of records where the impact is dependent on checkpoint frequency and 
processing idempotency.
+
+ Kinesis retry configurations
--- End diff --

*nit:* think "configuration" sounds more natural here than "configurations"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-22 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112816898
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 ---
@@ -295,6 +306,23 @@ class KinesisSequenceRangeIterator(
 
 private[streaming]
 object KinesisSequenceRangeIterator {
-  val MAX_RETRIES = 3
-  val MIN_RETRY_WAIT_TIME_MS = 100
+  /**
+   * The maximum number of attempts to be made to kinesis. Defaults to 3.
+   */
+  val MAX_RETRIES = "3"
+
+  /**
+   * The interval between consequent kinesis retries. Defaults to 100ms.
+   */
+  val MIN_RETRY_WAIT_TIME_MS = "100ms"
+
+  /**
+   * Key for configuring the retry wait time for kinesis. The values can 
be passed to SparkConf.
--- End diff --

*nit:* I'd make the following tweaks here:

```scala
/**
 * SparkConf key for configuring the wait time to use before retrying a 
Kinesis attempt.
 */
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-22 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112816922
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 ---
@@ -295,6 +306,23 @@ class KinesisSequenceRangeIterator(
 
 private[streaming]
 object KinesisSequenceRangeIterator {
-  val MAX_RETRIES = 3
-  val MIN_RETRY_WAIT_TIME_MS = 100
+  /**
+   * The maximum number of attempts to be made to kinesis. Defaults to 3.
+   */
+  val MAX_RETRIES = "3"
+
+  /**
+   * The interval between consequent kinesis retries. Defaults to 100ms.
+   */
+  val MIN_RETRY_WAIT_TIME_MS = "100ms"
+
+  /**
+   * Key for configuring the retry wait time for kinesis. The values can 
be passed to SparkConf.
+   */
+  val RETRY_WAIT_TIME_KEY = "spark.streaming.kinesis.retry.waitTime"
+
+  /**
+   * Key for configuring the number of retries for kinesis. The values can 
be passed to SparkConf.
--- End diff --

*nit:* I'd make the following tweaks here:

```scala
/**
 * SparkConf key for configuring the maximum number of retries used when 
attempting a Kinesis
 * request.
 */
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-22 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112817123
  
--- Diff: docs/streaming-kinesis-integration.md ---
@@ -216,3 +216,7 @@ de-aggregate records during consumption.
 - If no Kinesis checkpoint info exists when the input DStream starts, it 
will start either from the oldest record available 
(`InitialPositionInStream.TRIM_HORIZON`) or from the latest tip 
(`InitialPositionInStream.LATEST`).  This is configurable.
   - `InitialPositionInStream.LATEST` could lead to missed records if data 
is added to the stream while no input DStreams are running (and no checkpoint 
info is being stored).
   - `InitialPositionInStream.TRIM_HORIZON` may lead to duplicate 
processing of records where the impact is dependent on checkpoint frequency and 
processing idempotency.
+
+- Kinesis retry configurations
--- End diff --

@brkyvz or another Spark committer might have better suggestions here, but 
I would suggest making this section a new heading (rather than part of 
**Kinesis Checkpointing**) and adding a brief explanatory sentence, e.g.:

```
 Kinesis retry configuration
- A Kinesis DStream will retry any failed request to the Kinesis API. The 
following SparkConf properties can be set in order to customize the behavior of 
the retry logic:
```

followed by the rest of your changes here.

This also reminds me that I owe @brkyvz a change to add docs for the stream 
builder interface here :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-22 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112816822
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 ---
@@ -295,6 +306,23 @@ class KinesisSequenceRangeIterator(
 
 private[streaming]
 object KinesisSequenceRangeIterator {
-  val MAX_RETRIES = 3
-  val MIN_RETRY_WAIT_TIME_MS = 100
+  /**
+   * The maximum number of attempts to be made to kinesis. Defaults to 3.
+   */
+  val MAX_RETRIES = "3"
+
+  /**
+   * The interval between consequent kinesis retries. Defaults to 100ms.
--- End diff --

*nit:* **K**inesis


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-22 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112816810
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 ---
@@ -295,6 +306,23 @@ class KinesisSequenceRangeIterator(
 
 private[streaming]
 object KinesisSequenceRangeIterator {
-  val MAX_RETRIES = 3
-  val MIN_RETRY_WAIT_TIME_MS = 100
+  /**
+   * The maximum number of attempts to be made to kinesis. Defaults to 3.
--- End diff --

*nit:* **K**inesis


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-21 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112764788
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 ---
@@ -147,6 +153,17 @@ class KinesisSequenceRangeIterator(
   private var lastSeqNumber: String = null
   private var internalIterator: Iterator[Record] = null
 
+  // variable for kinesis wait time interval between next retry
+  private val kinesisWaitTimeMs = JavaUtils.timeStringAsMs(
+Try {sparkConf.get("spark.streaming.kinesis.retry.waitTime")}
--- End diff --

This complexity isn't necessary. You can achieve the same effect by using 
an alternate form of 
[```SparkConf.get()```](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkConf@get(key:String,defaultValue:String):String):

```scala
private val kinesisWaitTimeMs = JavaUtils.timeStringAsMs(
  sparkConf.get("spark.streaming.kinesis.retry.waitTime", 
MIN_RETRY_WAIT_TIME_MS))
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-21 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112764350
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 ---
@@ -112,7 +116,8 @@ class KinesisBackedBlockRDD[T: ClassTag](
   val credentials = kinesisCreds.provider.getCredentials
   partition.seqNumberRanges.ranges.iterator.flatMap { range =>
 new KinesisSequenceRangeIterator(credentials, endpointUrl, 
regionName,
-  range, retryTimeoutMs).map(messageHandler)
+  range, retryTimeoutMs, sparkConf
+).map(messageHandler)
--- End diff --

*nit:* Move this to end of previous line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-21 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112765111
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 ---
@@ -17,21 +17,24 @@
 
 package org.apache.spark.streaming.kinesis
 
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-import scala.util.control.NonFatal
-
-import com.amazonaws.auth.{AWSCredentials, 
DefaultAWSCredentialsProviderChain}
+import com.amazonaws.auth.AWSCredentials
 import com.amazonaws.services.kinesis.AmazonKinesisClient
 import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord
 import com.amazonaws.services.kinesis.model._
-
 import org.apache.spark._
 import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rdd.{BlockRDD, BlockRDDPartition}
 import org.apache.spark.storage.BlockId
 import org.apache.spark.util.NextIterator
 
+import scala.collection.JavaConverters._
--- End diff --

Why change the ordering of this import group? I don't think this is 
consistent with the scalastyle for this project.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-21 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112765206
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 ---
@@ -17,21 +17,24 @@
 
 package org.apache.spark.streaming.kinesis
 
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-import scala.util.control.NonFatal
-
-import com.amazonaws.auth.{AWSCredentials, 
DefaultAWSCredentialsProviderChain}
+import com.amazonaws.auth.AWSCredentials
 import com.amazonaws.services.kinesis.AmazonKinesisClient
 import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord
 import com.amazonaws.services.kinesis.model._
-
--- End diff --

I think this newline should be kept to be consistent with the project's 
scalastyle. Have you been running style checks when testing this change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-21 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112766374
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 ---
@@ -147,6 +153,17 @@ class KinesisSequenceRangeIterator(
   private var lastSeqNumber: String = null
   private var internalIterator: Iterator[Record] = null
 
+  // variable for kinesis wait time interval between next retry
+  private val kinesisWaitTimeMs = JavaUtils.timeStringAsMs(
+Try {sparkConf.get("spark.streaming.kinesis.retry.waitTime")}
--- End diff --

It may also be useful to declare these keys as public constants in a 
sensible location such as the [companion object to 
```KinesisInputDStream```](https://github.com/apache/spark/blob/master/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala#L84),
 e.g.:

```scala
object KinesisInputDStream {
...
  /**
   * Relevant doc
   */
  val RETRY_WAIT_TIME_KEY = "spark.streaming.kinesis.retry.waitTime"
 
  /**
   * Relevant doc
   */
  val RETRY_MAX_ATTEMPTS_KEY = "spark.streaming.kinesis.retry.maxAttempts"
...
```

This will make things a little less brittle for users who want to 
dynamically fill in SparkConf values in their apps. You would also be able use 
these constants in unit tests here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-21 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112766633
  
--- Diff: 
external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
 ---
@@ -101,6 +101,37 @@ abstract class 
KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
 }
   }
 
+  testIfEnabled("Basic reading from Kinesis with modified configurations") 
{
+// Add Kinesis retry configurations
+sc.conf.set("spark.streaming.kinesis.retry.waitTime", "1000ms")
+sc.conf.set("spark.streaming.kinesis.retry.maxAttempts", "5")
+
+// Verify all data using multiple ranges in a single RDD partition
+val receivedData1 = new KinesisBackedBlockRDD[Array[Byte]](sc, 
testUtils.regionName,
+  testUtils.endpointUrl, fakeBlockIds(1),
+  Array(SequenceNumberRanges(allRanges.toArray)),
+  sparkConf = sc.getConf).map { bytes => new String(bytes).toInt 
}.collect()
+assert(receivedData1.toSet === testData.toSet)
+
+// Verify all data using one range in each of the multiple RDD 
partitions
+val receivedData2 = new KinesisBackedBlockRDD[Array[Byte]](sc, 
testUtils.regionName,
+  testUtils.endpointUrl, fakeBlockIds(allRanges.size),
+  allRanges.map { range => SequenceNumberRanges(Array(range)) 
}.toArray,
+  sparkConf = sc.getConf).map { bytes => new String(bytes).toInt 
}.collect()
+assert(receivedData2.toSet === testData.toSet)
+
+// Verify ordering within each partition
+val receivedData3 = new KinesisBackedBlockRDD[Array[Byte]](sc, 
testUtils.regionName,
+  testUtils.endpointUrl, fakeBlockIds(allRanges.size),
+  allRanges.map { range => SequenceNumberRanges(Array(range)) 
}.toArray,
+  sparkConf = sc.getConf
+).map { bytes => new String(bytes).toInt }.collectPartitions()
--- End diff --

*nit:* move this to the end of previous line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-21 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112764808
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 ---
@@ -147,6 +153,17 @@ class KinesisSequenceRangeIterator(
   private var lastSeqNumber: String = null
   private var internalIterator: Iterator[Record] = null
 
+  // variable for kinesis wait time interval between next retry
+  private val kinesisWaitTimeMs = JavaUtils.timeStringAsMs(
+Try {sparkConf.get("spark.streaming.kinesis.retry.waitTime")}
+  .getOrElse(MIN_RETRY_WAIT_TIME_MS)
+  )
+
+  // variable for kinesis max retry attempts
+  private val kinesisMaxRetries =
+Try {sparkConf.get("spark.streaming.kinesis.retry.maxAttempts")}
--- End diff --

See above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-21 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112764344
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 ---
@@ -83,7 +86,8 @@ class KinesisBackedBlockRDD[T: ClassTag](
 @transient private val isBlockIdValid: Array[Boolean] = Array.empty,
 val retryTimeoutMs: Int = 1,
 val messageHandler: Record => T = 
KinesisInputDStream.defaultMessageHandler _,
-val kinesisCreds: SparkAWSCredentials = DefaultCredentials
+val kinesisCreds: SparkAWSCredentials = DefaultCredentials,
+val sparkConf: SparkConf = new SparkConf()
--- End diff --

Why does this need to be provided as a constructor parameter? You'll want 
to use the global ```SparkConf``` for the context via ```sc.getConf```. To 
avoid bringing ```sc``` into the serialized closure for the ```compute()``` 
method and raising an exception you can alias it as a private field in this 
class:

```scala
private val sparkConf: SparkConf = sc.getConf
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-21 Thread yssharma
Github user yssharma commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112614624
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 ---
@@ -83,7 +83,8 @@ class KinesisBackedBlockRDD[T: ClassTag](
 @transient private val isBlockIdValid: Array[Boolean] = Array.empty,
 val retryTimeoutMs: Int = 1,
 val messageHandler: Record => T = 
KinesisInputDStream.defaultMessageHandler _,
-val kinesisCreds: SparkAWSCredentials = DefaultCredentials
+val kinesisCreds: SparkAWSCredentials = DefaultCredentials,
+val kinesisConf: Map[String, String] = Map.empty
--- End diff --

+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-20 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112566462
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 ---
@@ -147,6 +152,14 @@ class KinesisSequenceRangeIterator(
   private var lastSeqNumber: String = null
   private var internalIterator: Iterator[Record] = null
 
+  // variable for kinesis wait time interval between next retry
+  private var kinesisWaitTimeMs = JavaUtils.timeStringAsMs(
--- End diff --

I don't think you want to do this-- ```kinesisWaitTimeMs``` is never reset 
to the default value after the retry loop exists. I think you should make this 
a ```val``` and introduce a ```var``` initialized to its value within 
```retryOrTimeout()``` to store the wait time for each retry iteration.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-20 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112565900
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
 ---
@@ -249,6 +252,17 @@ object KinesisInputDStream {
 }
 
 /**
+  * Sets the [[SparkAWSCredentials]] to use for authenticating to the 
AWS CloudWatch
+  * endpoint. Will use the same credentials used for AWS Kinesis if no 
custom value is set.
+  *
+  * @param conf: Map[String, String] to use for CloudWatch 
authentication
+  */
+def kinesisConf(conf: Map[String, String]): Builder = {
--- End diff --

If you want the extensibility of a key/value map for configs then I would 
go the route of getting a solution that uses ```SparkConf``` to do that in 
order to use the existing facilities provided by Spark. It doesn't make sense 
to me to introduce a key/value map just for Kinesis, especially since the 
naming of your keys (e.g. ```spark.streaming.kinesis.retry.waitTime```) would 
indicate to most users that these are ```SparkConf``` params, not a 
Kinesis-specific mapping that must be manually set up and passed to the Kinesis 
stream builder.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-19 Thread yssharma
Github user yssharma commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112347174
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
 ---
@@ -249,6 +252,17 @@ object KinesisInputDStream {
 }
 
 /**
+  * Sets the [[SparkAWSCredentials]] to use for authenticating to the 
AWS CloudWatch
+  * endpoint. Will use the same credentials used for AWS Kinesis if no 
custom value is set.
+  *
+  * @param conf: Map[String, String] to use for CloudWatch 
authentication
+  */
+def kinesisConf(conf: Map[String, String]): Builder = {
--- End diff --

Do you think it would be better to pass values to builder rather than a map 
of configs. I thought map of configs can be easily extended when we need to 
support new configurations without code changes?
What is your thought on values+builder per config vs one map for all 
configs ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-19 Thread yssharma
Github user yssharma commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112346805
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 ---
@@ -147,6 +152,14 @@ class KinesisSequenceRangeIterator(
   private var lastSeqNumber: String = null
   private var internalIterator: Iterator[Record] = null
 
+  // variable for kinesis wait time interval between next retry
+  private var kinesisWaitTimeMs = JavaUtils.timeStringAsMs(
--- End diff --

No, this value is modified after waits -
`kinesisWaitTimeMs *= 2  // if you have waited, then double wait time for 
next round`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-19 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112344999
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
 ---
@@ -249,6 +252,17 @@ object KinesisInputDStream {
 }
 
 /**
+  * Sets the [[SparkAWSCredentials]] to use for authenticating to the 
AWS CloudWatch
+  * endpoint. Will use the same credentials used for AWS Kinesis if no 
custom value is set.
+  *
+  * @param conf: Map[String, String] to use for CloudWatch 
authentication
+  */
+def kinesisConf(conf: Map[String, String]): Builder = {
--- End diff --

If using ```SparkConf``` to store these custom config values doesn't end up 
being feasible then I'd strongly prefer that we follow the existing approach 
and have separate builder methods for setting the retry wait time and max 
attempts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-19 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112344746
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
 ---
@@ -249,6 +252,17 @@ object KinesisInputDStream {
 }
 
 /**
+  * Sets the [[SparkAWSCredentials]] to use for authenticating to the 
AWS CloudWatch
--- End diff --

Documentation is not correct


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-19 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112343978
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 ---
@@ -147,6 +152,14 @@ class KinesisSequenceRangeIterator(
   private var lastSeqNumber: String = null
   private var internalIterator: Iterator[Record] = null
 
+  // variable for kinesis wait time interval between next retry
+  private var kinesisWaitTimeMs = JavaUtils.timeStringAsMs(
--- End diff --

Can't this be a ```val```?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-19 Thread budde
Github user budde commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r112343115
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 ---
@@ -83,7 +83,8 @@ class KinesisBackedBlockRDD[T: ClassTag](
 @transient private val isBlockIdValid: Array[Boolean] = Array.empty,
 val retryTimeoutMs: Int = 1,
 val messageHandler: Record => T = 
KinesisInputDStream.defaultMessageHandler _,
-val kinesisCreds: SparkAWSCredentials = DefaultCredentials
+val kinesisCreds: SparkAWSCredentials = DefaultCredentials,
+val kinesisConf: Map[String, String] = Map.empty
--- End diff --

+1. I think reading the config values from ```sc``` will be a much cleaner 
approach.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-07 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r110437376
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 ---
@@ -251,21 +255,22 @@ class KinesisSequenceRangeIterator(
 
   /** Helper method to retry Kinesis API request with exponential backoff 
and timeouts */
   private def retryOrTimeout[T](message: String)(body: => T): T = {
-import KinesisSequenceRangeIterator._
-
-var startTimeMs = System.currentTimeMillis()
+val startTimeMs = System.currentTimeMillis()
 var retryCount = 0
-var waitTimeMs = MIN_RETRY_WAIT_TIME_MS
+var kinesisWaitTimeMs =
+  kinesisConfigs.getOrElse("spark.streaming.kinesis.retry.wait.time", 
"100").toInt
--- End diff --

define this once during class initialization as well


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-07 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r110437471
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 ---
@@ -292,9 +297,3 @@ class KinesisSequenceRangeIterator(
 }
   }
 }
-
-private[streaming]
-object KinesisSequenceRangeIterator {
--- End diff --

keep the default values here please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-07 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r110437682
  
--- Diff: docs/streaming-kinesis-integration.md ---
@@ -216,3 +216,7 @@ de-aggregate records during consumption.
 - If no Kinesis checkpoint info exists when the input DStream starts, it 
will start either from the oldest record available 
(`InitialPositionInStream.TRIM_HORIZON`) or from the latest tip 
(`InitialPositionInStream.LATEST`).  This is configurable.
   - `InitialPositionInStream.LATEST` could lead to missed records if data 
is added to the stream while no input DStreams are running (and no checkpoint 
info is being stored).
   - `InitialPositionInStream.TRIM_HORIZON` may lead to duplicate 
processing of records where the impact is dependent on checkpoint frequency and 
processing idempotency.
+
+- Kinesis retry configurations
+ - `spark.streaming.kinesis.retry.wait.time` : Config for wait time 
between Kinesis retries (in milliseconds). Default is 100 ms.
--- End diff --

We can take duration strings, such as `100ms` or `1s`, I would prefer we 
use that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-07 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r110437838
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 ---
@@ -83,7 +83,8 @@ class KinesisBackedBlockRDD[T: ClassTag](
 @transient private val isBlockIdValid: Array[Boolean] = Array.empty,
 val retryTimeoutMs: Int = 1,
 val messageHandler: Record => T = 
KinesisInputDStream.defaultMessageHandler _,
-val kinesisCreds: SparkAWSCredentials = DefaultCredentials
+val kinesisCreds: SparkAWSCredentials = DefaultCredentials,
+val kinesisConf: Map[String, String] = Map.empty
--- End diff --

do you need this to be provided as a `Map`? You already have `sc` being 
passed in with all the configurations


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

2017-04-07 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/17467#discussion_r110437249
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 ---
@@ -251,21 +255,22 @@ class KinesisSequenceRangeIterator(
 
   /** Helper method to retry Kinesis API request with exponential backoff 
and timeouts */
   private def retryOrTimeout[T](message: String)(body: => T): T = {
-import KinesisSequenceRangeIterator._
-
-var startTimeMs = System.currentTimeMillis()
+val startTimeMs = System.currentTimeMillis()
 var retryCount = 0
-var waitTimeMs = MIN_RETRY_WAIT_TIME_MS
+var kinesisWaitTimeMs =
+  kinesisConfigs.getOrElse("spark.streaming.kinesis.retry.wait.time", 
"100").toInt
+val kinesisMaxRetries =
--- End diff --

define this once during class initialization


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org