[GitHub] spark pull request #16114: [SPARK-18620][Streaming][Kinesis] Flatten input r...

2016-12-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/16114


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16114: [SPARK-18620][Streaming][Kinesis] Flatten input r...

2016-12-09 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/16114#discussion_r91774523
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 ---
@@ -221,6 +221,12 @@ private[kinesis] class KinesisReceiver[T](
 }
   }
 
+  /** Return the current rate limit defined in [[BlockGenerator]]. */
+  private[kinesis] def getCurrentLimit: Int = {
+assert(blockGenerator != null)
--- End diff --

I would be okay to keep it if we add a useful error in case this assertion 
doesn't hold, e.g.
`assert(blockGenerator != null, "Expected blockGenerator to be set for the 
receiver before the processor received records")`

or something like that


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16114: [SPARK-18620][Streaming][Kinesis] Flatten input r...

2016-12-07 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/16114#discussion_r91419246
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
 ---
@@ -68,7 +68,16 @@ private[kinesis] class 
KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
   override def processRecords(batch: List[Record], checkpointer: 
IRecordProcessorCheckpointer) {
 if (!receiver.isStopped()) {
   try {
-receiver.addRecords(shardId, batch)
+// Limit the number of processed records from Kinesis stream. This 
is because the KCL cannot
+// control the number of aggregated records to be fetched even if 
we set `MaxRecords`
+// in `KinesisClientLibConfiguration`. For example, if we set 10 
to the number of max
+// records in a worker and a producer aggregates two records into 
one message, the worker
+// possibly 20 records every callback function called.
+val maxRecords = receiver.getCurrentLimit
+for (start <- 0 until batch.size by maxRecords) {
+  val miniBatch = batch.subList(start, math.min(start + 
maxRecords, batch.size))
+  receiver.addRecords(shardId, miniBatch)
+}
 logDebug(s"Stored: Worker $workerId stored ${batch.size} records 
for shardId $shardId")
--- End diff --

okay


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16114: [SPARK-18620][Streaming][Kinesis] Flatten input r...

2016-12-07 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/16114#discussion_r91339332
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
 ---
@@ -68,7 +68,16 @@ private[kinesis] class 
KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
   override def processRecords(batch: List[Record], checkpointer: 
IRecordProcessorCheckpointer) {
 if (!receiver.isStopped()) {
   try {
-receiver.addRecords(shardId, batch)
+// Limit the number of processed records from Kinesis stream. This 
is because the KCL cannot
+// control the number of aggregated records to be fetched even if 
we set `MaxRecords`
+// in `KinesisClientLibConfiguration`. For example, if we set 10 
to the number of max
+// records in a worker and a producer aggregates two records into 
one message, the worker
+// possibly 20 records every callback function called.
+val maxRecords = receiver.getCurrentLimit
+for (start <- 0 until batch.size by maxRecords) {
+  val miniBatch = batch.subList(start, math.min(start + 
maxRecords, batch.size))
+  receiver.addRecords(shardId, miniBatch)
+}
 logDebug(s"Stored: Worker $workerId stored ${batch.size} records 
for shardId $shardId")
--- End diff --

I would leave this comment inside the `for` loop, because IIRC `addRecords` 
will be a blocking call where it needs to be written to the WAL


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16114: [SPARK-18620][Streaming][Kinesis] Flatten input r...

2016-12-06 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/16114#discussion_r91210953
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
 ---
@@ -68,9 +69,16 @@ private[kinesis] class 
KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
   override def processRecords(batch: List[Record], checkpointer: 
IRecordProcessorCheckpointer) {
 if (!receiver.isStopped()) {
   try {
-receiver.addRecords(shardId, batch)
-logDebug(s"Stored: Worker $workerId stored ${batch.size} records 
for shardId $shardId")
-receiver.setCheckpointer(shardId, checkpointer)
+// Limit the number of processed records from Kinesis stream. This 
is because the KCL cannot
+// control the number of aggregated records to be fetched even if 
we set `MaxRecords`
+// in `KinesisClientLibConfiguration`. For example, if we set 10 
to the number of max
+// records in a worker and a producer aggregates two records into 
one message, the worker
+// possibly 20 records every callback function called.
+batch.asScala.grouped(receiver.getCurrentLimit).foreach { batch =>
--- End diff --

yea, I also think, when `maxRecords` is small and `batch` is large, many 
iterations cause a little overheads. So, I restored the code to the previous 
java-style one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16114: [SPARK-18620][Streaming][Kinesis] Flatten input r...

2016-12-06 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/16114#discussion_r91208525
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
 ---
@@ -68,9 +69,16 @@ private[kinesis] class 
KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
   override def processRecords(batch: List[Record], checkpointer: 
IRecordProcessorCheckpointer) {
 if (!receiver.isStopped()) {
   try {
-receiver.addRecords(shardId, batch)
-logDebug(s"Stored: Worker $workerId stored ${batch.size} records 
for shardId $shardId")
-receiver.setCheckpointer(shardId, checkpointer)
+// Limit the number of processed records from Kinesis stream. This 
is because the KCL cannot
+// control the number of aggregated records to be fetched even if 
we set `MaxRecords`
+// in `KinesisClientLibConfiguration`. For example, if we set 10 
to the number of max
+// records in a worker and a producer aggregates two records into 
one message, the worker
+// possibly 20 records every callback function called.
+batch.asScala.grouped(receiver.getCurrentLimit).foreach { batch =>
+  receiver.addRecords(shardId, batch.asJava)
+  logDebug(s"Stored: Worker $workerId stored ${batch.size} records 
for shardId $shardId")
+  receiver.setCheckpointer(shardId, checkpointer)
--- End diff --

thanks, I'll fix


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16114: [SPARK-18620][Streaming][Kinesis] Flatten input r...

2016-12-06 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/16114#discussion_r91206300
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
 ---
@@ -68,9 +69,16 @@ private[kinesis] class 
KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
   override def processRecords(batch: List[Record], checkpointer: 
IRecordProcessorCheckpointer) {
 if (!receiver.isStopped()) {
   try {
-receiver.addRecords(shardId, batch)
-logDebug(s"Stored: Worker $workerId stored ${batch.size} records 
for shardId $shardId")
-receiver.setCheckpointer(shardId, checkpointer)
+// Limit the number of processed records from Kinesis stream. This 
is because the KCL cannot
+// control the number of aggregated records to be fetched even if 
we set `MaxRecords`
+// in `KinesisClientLibConfiguration`. For example, if we set 10 
to the number of max
+// records in a worker and a producer aggregates two records into 
one message, the worker
+// possibly 20 records every callback function called.
+batch.asScala.grouped(receiver.getCurrentLimit).foreach { batch =>
+  receiver.addRecords(shardId, batch.asJava)
+  logDebug(s"Stored: Worker $workerId stored ${batch.size} records 
for shardId $shardId")
+  receiver.setCheckpointer(shardId, checkpointer)
--- End diff --

Yeah, that's what I suspected at 
https://github.com/apache/spark/pull/16114#discussion_r90756702 -- thanks for 
confirming


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16114: [SPARK-18620][Streaming][Kinesis] Flatten input r...

2016-12-06 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/16114#discussion_r91203809
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
 ---
@@ -68,9 +69,16 @@ private[kinesis] class 
KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
   override def processRecords(batch: List[Record], checkpointer: 
IRecordProcessorCheckpointer) {
 if (!receiver.isStopped()) {
   try {
-receiver.addRecords(shardId, batch)
-logDebug(s"Stored: Worker $workerId stored ${batch.size} records 
for shardId $shardId")
-receiver.setCheckpointer(shardId, checkpointer)
+// Limit the number of processed records from Kinesis stream. This 
is because the KCL cannot
+// control the number of aggregated records to be fetched even if 
we set `MaxRecords`
+// in `KinesisClientLibConfiguration`. For example, if we set 10 
to the number of max
+// records in a worker and a producer aggregates two records into 
one message, the worker
+// possibly 20 records every callback function called.
+batch.asScala.grouped(receiver.getCurrentLimit).foreach { batch =>
+  receiver.addRecords(shardId, batch.asJava)
+  logDebug(s"Stored: Worker $workerId stored ${batch.size} records 
for shardId $shardId")
+  receiver.setCheckpointer(shardId, checkpointer)
--- End diff --

this should be outside, after the `foreach`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16114: [SPARK-18620][Streaming][Kinesis] Flatten input r...

2016-12-06 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/16114#discussion_r91184750
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
 ---
@@ -68,9 +69,16 @@ private[kinesis] class 
KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
   override def processRecords(batch: List[Record], checkpointer: 
IRecordProcessorCheckpointer) {
 if (!receiver.isStopped()) {
   try {
-receiver.addRecords(shardId, batch)
-logDebug(s"Stored: Worker $workerId stored ${batch.size} records 
for shardId $shardId")
-receiver.setCheckpointer(shardId, checkpointer)
+// Limit the number of processed records from Kinesis stream. This 
is because the KCL cannot
+// control the number of aggregated records to be fetched even if 
we set `MaxRecords`
+// in `KinesisClientLibConfiguration`. For example, if we set 10 
to the number of max
+// records in a worker and a producer aggregates two records into 
one message, the worker
+// possibly 20 records every callback function called.
+batch.asScala.grouped(receiver.getCurrentLimit).foreach { batch =>
--- End diff --

Sorry, one last comment -- `batch` is used for the overall data set and 
each subset. They should be named differently for clarity.

It's also my fault for not realizing the collections here were Java not 
Scala, and you have to convert to use the nice Scala idiom. I think it's OK as 
it's just going to wrap and not copy the class, but it does bear being careful 
about performance here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16114: [SPARK-18620][Streaming][Kinesis] Flatten input r...

2016-12-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/16114#discussion_r90758322
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
 ---
@@ -56,6 +56,27 @@ private[kinesis] class 
KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
 logInfo(s"Initialized workerId $workerId with shardId $shardId")
   }
 
+  private def addRecords(batch: List[Record], checkpointer: 
IRecordProcessorCheckpointer): Unit = {
+receiver.addRecords(shardId, batch)
+logDebug(s"Stored: Worker $workerId stored ${batch.size} records for 
shardId $shardId")
+receiver.setCheckpointer(shardId, checkpointer)
--- End diff --

yea, you're right and this code overwrites `checkpointer` every the 
callback function called (maybe, every 1 sec.). I'm not sure what an original 
author thinks about though, it seems this is waste of codes. But, I also not 
sure that it is worth fixing this and this fix is out of scope in this jira. If 
necessary, I'm pleased to fix in follow-up activities.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16114: [SPARK-18620][Streaming][Kinesis] Flatten input r...

2016-12-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/16114#discussion_r90758182
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
 ---
@@ -56,6 +56,27 @@ private[kinesis] class 
KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
 logInfo(s"Initialized workerId $workerId with shardId $shardId")
   }
 
+  private def addRecords(batch: List[Record], checkpointer: 
IRecordProcessorCheckpointer): Unit = {
+receiver.addRecords(shardId, batch)
+logDebug(s"Stored: Worker $workerId stored ${batch.size} records for 
shardId $shardId")
+receiver.setCheckpointer(shardId, checkpointer)
+  }
+
+  /**
+   * Limit the number of processed records from Kinesis stream. This is 
because the KCL cannot
+   * control the number of aggregated records to be fetched even if we set 
`MaxRecords`
+   * in `KinesisClientLibConfiguration`. For example, if we set 10 to the 
number of max records
+   * in a worker and a producer aggregates two records into one message, 
the worker possibly
+   * 20 records every callback function called.
+   */
+  private def processRecordsWithLimit(
+  batch: List[Record], checkpointer: IRecordProcessorCheckpointer): 
Unit = {
+val maxRecords = receiver.getCurrentLimit
+for (start <- 0 until batch.size by maxRecords) {
--- End diff --

Actually, since each kinesis shard has strict read limits of throughput 
(http://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html), 
`batch.size` hardly exceeds `Int.MaxValue / 2`. But, since I like your idea in 
terms of code clearness, I fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16114: [SPARK-18620][Streaming][Kinesis] Flatten input r...

2016-12-03 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/16114#discussion_r90756702
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
 ---
@@ -56,6 +56,27 @@ private[kinesis] class 
KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
 logInfo(s"Initialized workerId $workerId with shardId $shardId")
   }
 
+  private def addRecords(batch: List[Record], checkpointer: 
IRecordProcessorCheckpointer): Unit = {
+receiver.addRecords(shardId, batch)
+logDebug(s"Stored: Worker $workerId stored ${batch.size} records for 
shardId $shardId")
+receiver.setCheckpointer(shardId, checkpointer)
--- End diff --

BTW is this supposed to be called on every batch or once at the end? I 
don't know how it works.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16114: [SPARK-18620][Streaming][Kinesis] Flatten input r...

2016-12-03 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/16114#discussion_r90756693
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
 ---
@@ -56,6 +56,27 @@ private[kinesis] class 
KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
 logInfo(s"Initialized workerId $workerId with shardId $shardId")
   }
 
+  private def addRecords(batch: List[Record], checkpointer: 
IRecordProcessorCheckpointer): Unit = {
+receiver.addRecords(shardId, batch)
+logDebug(s"Stored: Worker $workerId stored ${batch.size} records for 
shardId $shardId")
+receiver.setCheckpointer(shardId, checkpointer)
+  }
+
+  /**
+   * Limit the number of processed records from Kinesis stream. This is 
because the KCL cannot
+   * control the number of aggregated records to be fetched even if we set 
`MaxRecords`
+   * in `KinesisClientLibConfiguration`. For example, if we set 10 to the 
number of max records
+   * in a worker and a producer aggregates two records into one message, 
the worker possibly
+   * 20 records every callback function called.
+   */
+  private def processRecordsWithLimit(
+  batch: List[Record], checkpointer: IRecordProcessorCheckpointer): 
Unit = {
+val maxRecords = receiver.getCurrentLimit
+for (start <- 0 until batch.size by maxRecords) {
--- End diff --

Hm, it just occurred to me that you would have a problem here if batch.size 
and maxRecords were both over Int.MaxValue / 2, and maxRecords were a bit 
smaller than batch.size. The addition below overflows.

It seems like a corner case but I note above you already defensively capped 
the maxRecords at Int.MaxValue so maybe it's less unlikely than it sounds.

You can fix it by letting the addition and min comparison take place over 
longs and then convert back to int.

Alternatively I think this is even simpler in Scala, though I imagine 
there's some extra overhead here:

```
batch.grouped(maxRecords).foreach(batch => addRecords(batch, checkpointer))
```

I don't know of a good reviewer for this component but I think I'm 
comfortable merging a straightforward change like this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16114: [SPARK-18620][Streaming][Kinesis] Flatten input r...

2016-12-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/16114#discussion_r90754922
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
 ---
@@ -56,6 +56,31 @@ private[kinesis] class 
KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
 logInfo(s"Initialized workerId $workerId with shardId $shardId")
   }
 
+  private def addRecords(batch: List[Record], checkpointer: 
IRecordProcessorCheckpointer): Unit = {
+receiver.addRecords(shardId, batch)
+logDebug(s"Stored: Worker $workerId stored ${batch.size} records for 
shardId $shardId")
+receiver.setCheckpointer(shardId, checkpointer)
+  }
+
+  /**
+   * Limit the number of processed records from Kinesis stream. This is 
because the KCL cannot
+   * control the number of aggregated records to be fetched even if we set 
`MaxRecords`
+   * in `KinesisClientLibConfiguration`. For example, if we set 10 to the 
number of max records
+   * in a worker and a producer aggregates two records into one message, 
the worker possibly
+   * 20 records every callback function called.
+   */
+  private def processRecordsWithLimit(
+  batch: List[Record], checkpointer: IRecordProcessorCheckpointer): 
Unit = {
+val maxRecords = receiver.getCurrentLimit
+if (batch.size() <= maxRecords) {
+  addRecords(batch, checkpointer)
--- End diff --

Aha, I see. I'll fix, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16114: [SPARK-18620][Streaming][Kinesis] Flatten input r...

2016-12-03 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/16114#discussion_r90754731
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
 ---
@@ -56,6 +56,31 @@ private[kinesis] class 
KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
 logInfo(s"Initialized workerId $workerId with shardId $shardId")
   }
 
+  private def addRecords(batch: List[Record], checkpointer: 
IRecordProcessorCheckpointer): Unit = {
+receiver.addRecords(shardId, batch)
+logDebug(s"Stored: Worker $workerId stored ${batch.size} records for 
shardId $shardId")
+receiver.setCheckpointer(shardId, checkpointer)
+  }
+
+  /**
+   * Limit the number of processed records from Kinesis stream. This is 
because the KCL cannot
+   * control the number of aggregated records to be fetched even if we set 
`MaxRecords`
+   * in `KinesisClientLibConfiguration`. For example, if we set 10 to the 
number of max records
+   * in a worker and a producer aggregates two records into one message, 
the worker possibly
+   * 20 records every callback function called.
+   */
+  private def processRecordsWithLimit(
+  batch: List[Record], checkpointer: IRecordProcessorCheckpointer): 
Unit = {
+val maxRecords = receiver.getCurrentLimit
+if (batch.size() <= maxRecords) {
+  addRecords(batch, checkpointer)
--- End diff --

I think the for loop even takes care of this case, but no big deal either 
way. It seems like a reasonable change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16114: [SPARK-18620][Streaming][Kinesis] Flatten input r...

2016-12-02 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/16114#discussion_r90601340
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 ---
@@ -221,6 +221,12 @@ private[kinesis] class KinesisReceiver[T](
 }
   }
 
+  /** Return the current rate limit defined in [[BlockGenerator]]. */
+  private[kinesis] def getCurrentLimit: Int = {
+assert(blockGenerator != null)
--- End diff --

I just added this assertion along with the other parts such as 
`assert(kinesisCheckpointer != null, "Kinesis Checkpointer not initialized!")` 
because both're initialized in `onStart`. But, I have no strong opnion on this 
and it's okay to remove this entry to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16114: [SPARK-18620][Streaming][Kinesis] Flatten input r...

2016-12-02 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/16114#discussion_r90600966
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
 ---
@@ -56,6 +56,38 @@ private[kinesis] class 
KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
 logInfo(s"Initialized workerId $workerId with shardId $shardId")
   }
 
+  private def addRecords(batch: List[Record], checkpointer: 
IRecordProcessorCheckpointer): Unit = {
+receiver.addRecords(shardId, batch)
+logDebug(s"Stored: Worker $workerId stored ${batch.size} records for 
shardId $shardId")
+receiver.setCheckpointer(shardId, checkpointer)
+  }
+
+  /**
+   * Limit the number of processed records from Kinesis stream. This is 
because the KCL cannot
+   * control the number of aggregated records to be fetched even if we set 
`MaxRecords`
+   * in `KinesisClientLibConfiguration`. For example, if we set 10 to the 
number of max records
+   * in a worker and a producer aggregates two records into one message, 
the worker possibly
+   * 20 records every callback function called.
+   */
+  private def processRecordsWithLimit(
+  batch: List[Record], checkpointer: IRecordProcessorCheckpointer): 
Unit = {
+val maxRecords = receiver.getCurrentLimit
+if (batch.size() <= maxRecords) {
+  addRecords(batch, checkpointer)
+} else {
+  val numIter = batch.size / maxRecords
--- End diff --

Thanks! I'll fix


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16114: [SPARK-18620][Streaming][Kinesis] Flatten input r...

2016-12-02 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/16114#discussion_r90600774
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 ---
@@ -221,6 +221,12 @@ private[kinesis] class KinesisReceiver[T](
 }
   }
 
+  /** Return the current rate limit defined in [[BlockGenerator]]. */
+  private[kinesis] def getCurrentLimit: Int = {
+assert(blockGenerator != null)
--- End diff --

This is pretty trivial but do we use runtime assertions in general in the 
project? the next line fails already when it's null whether assertions are on 
or not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16114: [SPARK-18620][Streaming][Kinesis] Flatten input r...

2016-12-02 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/16114#discussion_r90600716
  
--- Diff: 
external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
 ---
@@ -56,6 +56,38 @@ private[kinesis] class 
KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
 logInfo(s"Initialized workerId $workerId with shardId $shardId")
   }
 
+  private def addRecords(batch: List[Record], checkpointer: 
IRecordProcessorCheckpointer): Unit = {
+receiver.addRecords(shardId, batch)
+logDebug(s"Stored: Worker $workerId stored ${batch.size} records for 
shardId $shardId")
+receiver.setCheckpointer(shardId, checkpointer)
+  }
+
+  /**
+   * Limit the number of processed records from Kinesis stream. This is 
because the KCL cannot
+   * control the number of aggregated records to be fetched even if we set 
`MaxRecords`
+   * in `KinesisClientLibConfiguration`. For example, if we set 10 to the 
number of max records
+   * in a worker and a producer aggregates two records into one message, 
the worker possibly
+   * 20 records every callback function called.
+   */
+  private def processRecordsWithLimit(
+  batch: List[Record], checkpointer: IRecordProcessorCheckpointer): 
Unit = {
+val maxRecords = receiver.getCurrentLimit
+if (batch.size() <= maxRecords) {
+  addRecords(batch, checkpointer)
+} else {
+  val numIter = batch.size / maxRecords
--- End diff --

Is this clause a bit simpler as ...

```
for (start <- 0 until batch.size by maxRecords) {
  addRecords(batch.sublist(start, math.min(start + maxRecords, 
batch.size)), checkpointer)
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16114: [SPARK-18620][Streaming][Kinesis] Flatten input r...

2016-12-01 Thread maropu
GitHub user maropu opened a pull request:

https://github.com/apache/spark/pull/16114

[SPARK-18620][Streaming][Kinesis] Flatten input rates in timeline for 
streaming + kinesis

## What changes were proposed in this pull request?
This pr is to make input rates in timeline more flat for spark streaming + 
kinesis.
Since kinesis workers fetch records and push them into block generators in 
bulk, timeline in web UI has many spikes when `maxRates` applied (See a 
Figure.1 below). This fix splits fetched input records into multiple 
`adRecords` calls.

Figure.1 Apply `maxRates=500` in vanilla Spark
https://cloud.githubusercontent.com/assets/692303/20823861/4602f300-b89b-11e6-95f3-164a37061305.png";>

Figure.2 Apply `maxRates=500` in Spark with my patch
https://cloud.githubusercontent.com/assets/692303/20823882/6c46352c-b89b-11e6-81ab-afd8abfe0cfe.png";>

## How was this patch tested?
Add tests to check to split input records into multiple `addRecords` calls.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/maropu/spark SPARK-18620

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16114.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16114






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org