[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-09-30 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r18240973
  
--- Diff: extras/kinesis-asl/src/main/resources/log4j.properties ---
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootCategory=WARN, console
--- End diff --

question: why do we need this log4j file here? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-02 Thread cfregly
Github user cfregly commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15727578
  
--- Diff: bin/run-example ---
@@ -29,7 +29,9 @@ if [ -n $1 ]; then
 else
   echo Usage: ./bin/run-example example-class [example-args] 12
   echo   - set MASTER=XX to use a specific master 12
-  echo   - can use abbreviated example class name (e.g. SparkPi, 
mllib.LinearRegression) 12
+  echo   - can use abbreviated example class name relative to 
com.apache.spark.examples 12
+  echo  (e.g. SparkPi, mllib.LinearRegression, 
streaming.KinesisWordCountASL) 12
+  echo   - to run the Kinesis Spark Streaming example, make sure you 
build with -Pkinesis-asl 12
--- End diff --

makes sense.  i'll make sure this is addressed in streaming-kinesis.md.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-02 Thread cfregly
Github user cfregly commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15727580
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.Duration
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.streaming.api.java.JavaStreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+
+/**
+ * Helper class to create Amazon Kinesis Input Stream
+ * :: Experimental ::
+ */
+@Experimental
+object KinesisUtils extends Logging {
+  /**
--- End diff --

removed and cleaned up imports


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-02 Thread cfregly
Github user cfregly commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15727587
  
--- Diff: extras/kinesis-asl/pom.xml ---
@@ -0,0 +1,99 @@
+?xml version=1.0 encoding=UTF-8?
+!--
+~ Licensed to the Apache Software Foundation (ASF) under one or more
+~ contributor license agreements.  See the NOTICE file distributed with
+~ this work for additional information regarding copyright ownership.
+~ The ASF licenses this file to You under the Apache License, Version 2.0
+~ (the License); you may not use this file except in compliance with
+~ the License.  You may obtain a copy of the License at
+~
+~http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an AS IS BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+--
+project xmlns=http://maven.apache.org/POM/4.0.0; 
xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;
+  modelVersion4.0.0/modelVersion
+  parent
+groupIdorg.apache.spark/groupId
+artifactIdspark-parent/artifactId
+version1.1.0-SNAPSHOT/version
+relativePath../../pom.xml/relativePath
+  /parent
+
+  !-- 
+Kinesis integration is not included by default due to ASL-licensed 
code.
+Note:  This project - if activated - is packaged with the main Spark 
assembly.
--- End diff --

oh yikes.  yeah, that's just plain wrong.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15727599
  
--- Diff: extras/kinesis-asl/pom.xml ---
@@ -0,0 +1,99 @@
+?xml version=1.0 encoding=UTF-8?
+!--
+~ Licensed to the Apache Software Foundation (ASF) under one or more
+~ contributor license agreements.  See the NOTICE file distributed with
+~ this work for additional information regarding copyright ownership.
+~ The ASF licenses this file to You under the Apache License, Version 2.0
+~ (the License); you may not use this file except in compliance with
+~ the License.  You may obtain a copy of the License at
+~
+~http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an AS IS BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+--
+project xmlns=http://maven.apache.org/POM/4.0.0; 
xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;
+  modelVersion4.0.0/modelVersion
+  parent
+groupIdorg.apache.spark/groupId
+artifactIdspark-parent/artifactId
+version1.1.0-SNAPSHOT/version
+relativePath../../pom.xml/relativePath
+  /parent
+
+  !-- 
+Kinesis integration is not included by default due to ASL-licensed 
code.
+Note:  This project - if activated - is packaged with the main Spark 
assembly.
--- End diff --

Correction, this is not a nit. Its already confusing ... please change this!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15727602
  
--- Diff: extras/kinesis-asl/pom.xml ---
@@ -0,0 +1,99 @@
+?xml version=1.0 encoding=UTF-8?
+!--
+~ Licensed to the Apache Software Foundation (ASF) under one or more
+~ contributor license agreements.  See the NOTICE file distributed with
+~ this work for additional information regarding copyright ownership.
+~ The ASF licenses this file to You under the Apache License, Version 2.0
+~ (the License); you may not use this file except in compliance with
+~ the License.  You may obtain a copy of the License at
+~
+~http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an AS IS BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+--
+project xmlns=http://maven.apache.org/POM/4.0.0; 
xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;
+  modelVersion4.0.0/modelVersion
+  parent
+groupIdorg.apache.spark/groupId
+artifactIdspark-parent/artifactId
+version1.1.0-SNAPSHOT/version
+relativePath../../pom.xml/relativePath
+  /parent
+
+  !-- 
+Kinesis integration is not included by default due to ASL-licensed 
code.
+Note:  This project - if activated - is packaged with the main Spark 
assembly.
--- End diff --

Haha, didnt see your response. Good :+1: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-02 Thread cfregly
Github user cfregly commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15727688
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
 ---
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import java.nio.ByteBuffer
+
+import scala.util.Random
+
+import org.apache.spark.Logging
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.Milliseconds
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
+import org.apache.spark.streaming.kinesis.KinesisUtils
+
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
+import com.amazonaws.services.kinesis.AmazonKinesisClient
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import com.amazonaws.services.kinesis.model.PutRecordRequest
+
+/**
+ * Kinesis Spark Streaming WordCount example.
+ *
+ * See http://spark.apache.org/docs/latest/streaming-kinesis.html for more 
details on
+ *   the Kinesis Spark Streaming integration.
+ *
+ * This example spins up 1 Kinesis Worker (Spark Streaming Receiver) per 
shard 
+ *   for the given stream.
+ * It then starts pulling from the last checkpointed sequence number of 
the given 
+ *   stream-name and endpoint-url. 
+ *
+ * Valid endpoint urls:  
http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region
+ * 
+ * This code uses the DefaultAWSCredentialsProviderChain and searches for 
credentials
+ *   in the following order of precedence:
+ * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ * Java System Properties - aws.accessKeyId and aws.secretKey
+ * Credential profiles file - default location (~/.aws/credentials) shared 
by all AWS SDKs
+ * Instance profile credentials - delivered through the Amazon EC2 
metadata service
+ *
+ * Usage: KinesisWordCountASL stream-name endpoint-url
+ *   stream-name is the name of the Kinesis stream (ie. mySparkStream)
+ *   endpoint-url is the endpoint of the Kinesis service
+ * (ie. https://kinesis.us-east-1.amazonaws.com)
+ *
+ * Example:
+ *$ export AWS_ACCESS_KEY_ID=your-access-key
+ *$ export AWS_SECRET_KEY=your-secret-key
+ *$ $SPARK_HOME/bin/run-example \
+ *org.apache.spark.examples.streaming.KinesisWordCountASL 
mySparkStream \
+ *https://kinesis.us-east-1.amazonaws.com
+ *
+ * There is a companion helper class below called 
KinesisWordCountProducerASL which puts
+ *   dummy data onto the Kinesis stream.
+ * Usage instructions for KinesisWordCountProducerASL are provided in that 
class definition.
+ */
+object KinesisWordCountASL extends Logging {
+  def main(args: Array[String]) {
+/**
+ * Check that all required args were passed in.
+ */
+if (args.length  2) {
+  System.err.println(
+
+  |Usage: KinesisWordCount stream-name endpoint-url
+  |stream-name is the name of the Kinesis stream
+  |endpoint-url is the endpoint of the Kinesis service
+  |   (e.g. 
https://kinesis.us-east-1.amazonaws.com)
+.stripMargin)
+  System.exit(1)
+}
+
+StreamingExamples.setStreamingLogLevels()
+
+/** Populate the appropriate variables from the given args */
+val Array(streamName, endpointUrl) = args
+
+/** Determine the number of shards from the stream */
+val kinesisClient = new AmazonKinesisClient(new 
DefaultAWSCredentialsProviderChain())
+kinesisClient.setEndpoint(endpointUrl)
+val numShards = 
kinesisClient.describeStream(streamName).getStreamDescription().getShards()
+  .size()
+
+/** In this example, we're going to create 1 

[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-02 Thread cfregly
Github user cfregly commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15727694
  
--- Diff: dev/audit-release/sbt_app_kinesis/build.sbt ---
@@ -0,0 +1,30 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the License); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an AS IS BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+name := Kinesis Test
+
+version := 1.0
+
+scalaVersion := System.getenv.get(SCALA_VERSION)
+
+libraryDependencies += org.apache.spark %% spark-core % 
System.getenv.get(SPARK_VERSION)
+libraryDependencies += org.apache.spark %% spark-streaming % 
System.getenv.get(SPARK_VERSION)
--- End diff --

gotcha


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-02 Thread cfregly
Github user cfregly commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15727693
  
--- Diff: dev/audit-release/audit_release.py ---
@@ -105,7 +105,7 @@ def get_url(url):
 spark-core, spark-bagel, spark-mllib, spark-streaming, 
spark-repl,
 spark-graphx, spark-streaming-flume, spark-streaming-kafka,
 spark-streaming-mqtt, spark-streaming-twitter, 
spark-streaming-zeromq,
-spark-catalyst, spark-sql, spark-hive
+spark-catalyst, spark-sql, spark-hive, kinesis-asl
--- End diff --

changed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-02 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1434#issuecomment-50956263
  
QA tests have started for PR 1434. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17760/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-02 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1434#issuecomment-50957227
  
QA results for PR 1434:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brpublic final class JavaKinesisWordCountASL {brbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17760/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-02 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/1434#issuecomment-50973644
  
Thanks you very much @cfregly! I have merged this!!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/1434


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-02 Thread nchammas
Github user nchammas commented on the pull request:

https://github.com/apache/spark/pull/1434#issuecomment-50974810
  
This was an epic pull request. Nice work, people.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-02 Thread pdeyhim
Github user pdeyhim commented on the pull request:

https://github.com/apache/spark/pull/1434#issuecomment-50974841
  
Awsome work!! @tdas @cfregly


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-01 Thread cfregly
Github user cfregly commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15685837
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessorUtils.scala
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import scala.util.Random
+
+import org.apache.spark.Logging
+
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException
+
+
+/**
+ * Helper for the KinesisRecordProcessor.
+ */
+private[kinesis] object KinesisRecordProcessorUtils extends Logging {
--- End diff --

added the companion object


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-01 Thread cfregly
Github user cfregly commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15685828
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisStringRecordSerializer.scala
 ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.Logging
+
+/**
+ * Implementation of KinesisRecordSerializer to convert Array[Byte] 
to/from String.
+ */
+class KinesisStringRecordSerializer extends 
KinesisRecordSerializer[String] with Logging {
--- End diff --

i removed the Serializer abstraction and am just using basic byte[] - 
String conversions


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-01 Thread cfregly
Github user cfregly commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15685855
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala
 ---
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import java.nio.ByteBuffer
+
+import scala.util.Random
+
+import org.apache.log4j.Level
+import org.apache.log4j.Logger
+import org.apache.spark.Logging
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkContext.rddToOrderedRDDFunctions
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.Milliseconds
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
+import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer
+import org.apache.spark.streaming.kinesis.KinesisUtils
+
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
+import com.amazonaws.services.kinesis.AmazonKinesisClient
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import com.amazonaws.services.kinesis.model.PutRecordRequest
+
+/**
+ * Kinesis Spark Streaming WordCount example.
+ *
+ * See 
http://spark.apache.org/docs/latest/streaming-programming-guide.html for more 
details on
+ *   the Kinesis Spark Streaming integration.
+ *
+ * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per 
shard of the
+ *   given stream.
+ * It then starts pulling from the last checkpointed sequence number of 
the given 
+ *   stream-name and endpoint-url. 
+ *
+ * Valid endpoint urls:  
http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region
+ * 
+ * This code uses the DefaultAWSCredentialsProviderChain and searches for 
credentials
+ *   in the following order of precedence:
+ * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ * Java System Properties - aws.accessKeyId and aws.secretKey
+ * Credential profiles file - default location (~/.aws/credentials) shared 
by all AWS SDKs
+ * Instance profile credentials - delivered through the Amazon EC2 
metadata service
+ *
+ * Usage: KinesisWordCount stream-name endpoint-url
+ *   stream-name is the name of the Kinesis stream (ie. mySparkStream)
+ *   endpoint-url is the endpoint of the Kinesis service
+ * (ie. https://kinesis.us-east-1.amazonaws.com)
+ *
+ * Example:
+ *$ export AWS_ACCESS_KEY_ID=your-access-key
+ *$ export AWS_SECRET_KEY=your-secret-key
+ *$ $SPARK_HOME/bin/run-example \
+ *org.apache.spark.examples.streaming.KinesisWordCount 
mySparkStream \
+ *https://kinesis.us-east-1.amazonaws.com
+ *
+ * There is a companion helper class below called KinesisWordCountProducer 
which puts
+ *   dummy data onto the Kinesis stream.
+ * Usage instructions for KinesisWordCountProducer are provided in that 
class definition.
+ */
+object KinesisWordCount extends Logging {
+  val WordSeparator =  
+
+  def main(args: Array[String]) {
+/**
+ * Check that all required args were passed in.
+ */
+if (args.length  2) {
+  System.err.println(Usage: KinesisWordCount stream-name 
endpoint-url)
+  System.exit(1)
+}
+
+/**
+ * (This was lifted from the StreamingExamples.scala in order to avoid 
the dependency
+ *   on the spark-examples artifact.)
+ * Set reasonable logging levels for streaming if the user has not 
configured log4j.
+ */
+val log4jInitialized = 
Logger.getRootLogger.getAllAppenders.hasMoreElements()
+if (!log4jInitialized) {
+  /** 
+   *  We first log something to initialize Spark's default logging, 
+   *  then we 

[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-01 Thread cfregly
Github user cfregly commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15685887
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala
 ---
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import java.nio.ByteBuffer
+
+import scala.util.Random
+
+import org.apache.log4j.Level
+import org.apache.log4j.Logger
+import org.apache.spark.Logging
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkContext.rddToOrderedRDDFunctions
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.Milliseconds
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
+import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer
+import org.apache.spark.streaming.kinesis.KinesisUtils
+
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
+import com.amazonaws.services.kinesis.AmazonKinesisClient
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import com.amazonaws.services.kinesis.model.PutRecordRequest
+
+/**
+ * Kinesis Spark Streaming WordCount example.
+ *
+ * See 
http://spark.apache.org/docs/latest/streaming-programming-guide.html for more 
details on
+ *   the Kinesis Spark Streaming integration.
+ *
+ * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per 
shard of the
+ *   given stream.
+ * It then starts pulling from the last checkpointed sequence number of 
the given 
+ *   stream-name and endpoint-url. 
+ *
+ * Valid endpoint urls:  
http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region
+ * 
+ * This code uses the DefaultAWSCredentialsProviderChain and searches for 
credentials
+ *   in the following order of precedence:
+ * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ * Java System Properties - aws.accessKeyId and aws.secretKey
+ * Credential profiles file - default location (~/.aws/credentials) shared 
by all AWS SDKs
+ * Instance profile credentials - delivered through the Amazon EC2 
metadata service
+ *
+ * Usage: KinesisWordCount stream-name endpoint-url
+ *   stream-name is the name of the Kinesis stream (ie. mySparkStream)
+ *   endpoint-url is the endpoint of the Kinesis service
+ * (ie. https://kinesis.us-east-1.amazonaws.com)
+ *
+ * Example:
+ *$ export AWS_ACCESS_KEY_ID=your-access-key
+ *$ export AWS_SECRET_KEY=your-secret-key
+ *$ $SPARK_HOME/bin/run-example \
+ *org.apache.spark.examples.streaming.KinesisWordCount 
mySparkStream \
+ *https://kinesis.us-east-1.amazonaws.com
+ *
+ * There is a companion helper class below called KinesisWordCountProducer 
which puts
+ *   dummy data onto the Kinesis stream.
+ * Usage instructions for KinesisWordCountProducer are provided in that 
class definition.
+ */
+object KinesisWordCount extends Logging {
+  val WordSeparator =  
+
+  def main(args: Array[String]) {
+/**
+ * Check that all required args were passed in.
+ */
+if (args.length  2) {
+  System.err.println(Usage: KinesisWordCount stream-name 
endpoint-url)
+  System.exit(1)
+}
+
+/**
+ * (This was lifted from the StreamingExamples.scala in order to avoid 
the dependency
+ *   on the spark-examples artifact.)
+ * Set reasonable logging levels for streaming if the user has not 
configured log4j.
+ */
+val log4jInitialized = 
Logger.getRootLogger.getAllAppenders.hasMoreElements()
+if (!log4jInitialized) {
+  /** 
+   *  We first log something to initialize Spark's default logging, 
+   *  then we 

[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-01 Thread cfregly
Github user cfregly commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15685865
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala
 ---
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import java.nio.ByteBuffer
+
+import scala.util.Random
+
+import org.apache.log4j.Level
+import org.apache.log4j.Logger
+import org.apache.spark.Logging
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkContext.rddToOrderedRDDFunctions
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.Milliseconds
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
+import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer
+import org.apache.spark.streaming.kinesis.KinesisUtils
+
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
+import com.amazonaws.services.kinesis.AmazonKinesisClient
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import com.amazonaws.services.kinesis.model.PutRecordRequest
+
+/**
+ * Kinesis Spark Streaming WordCount example.
+ *
+ * See 
http://spark.apache.org/docs/latest/streaming-programming-guide.html for more 
details on
+ *   the Kinesis Spark Streaming integration.
+ *
+ * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per 
shard of the
+ *   given stream.
+ * It then starts pulling from the last checkpointed sequence number of 
the given 
+ *   stream-name and endpoint-url. 
+ *
+ * Valid endpoint urls:  
http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region
+ * 
+ * This code uses the DefaultAWSCredentialsProviderChain and searches for 
credentials
+ *   in the following order of precedence:
+ * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ * Java System Properties - aws.accessKeyId and aws.secretKey
+ * Credential profiles file - default location (~/.aws/credentials) shared 
by all AWS SDKs
+ * Instance profile credentials - delivered through the Amazon EC2 
metadata service
+ *
+ * Usage: KinesisWordCount stream-name endpoint-url
+ *   stream-name is the name of the Kinesis stream (ie. mySparkStream)
+ *   endpoint-url is the endpoint of the Kinesis service
+ * (ie. https://kinesis.us-east-1.amazonaws.com)
+ *
+ * Example:
+ *$ export AWS_ACCESS_KEY_ID=your-access-key
+ *$ export AWS_SECRET_KEY=your-secret-key
+ *$ $SPARK_HOME/bin/run-example \
+ *org.apache.spark.examples.streaming.KinesisWordCount 
mySparkStream \
+ *https://kinesis.us-east-1.amazonaws.com
+ *
+ * There is a companion helper class below called KinesisWordCountProducer 
which puts
+ *   dummy data onto the Kinesis stream.
+ * Usage instructions for KinesisWordCountProducer are provided in that 
class definition.
+ */
+object KinesisWordCount extends Logging {
+  val WordSeparator =  
+
+  def main(args: Array[String]) {
+/**
+ * Check that all required args were passed in.
+ */
+if (args.length  2) {
+  System.err.println(Usage: KinesisWordCount stream-name 
endpoint-url)
+  System.exit(1)
+}
+
+/**
+ * (This was lifted from the StreamingExamples.scala in order to avoid 
the dependency
+ *   on the spark-examples artifact.)
+ * Set reasonable logging levels for streaming if the user has not 
configured log4j.
+ */
+val log4jInitialized = 
Logger.getRootLogger.getAllAppenders.hasMoreElements()
+if (!log4jInitialized) {
+  /** 
+   *  We first log something to initialize Spark's default logging, 
+   *  then we 

[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15690371
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisStringRecordSerializer.scala
 ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.Logging
+
+/**
+ * Implementation of KinesisRecordSerializer to convert Array[Byte] 
to/from String.
+ */
+class KinesisStringRecordSerializer extends 
KinesisRecordSerializer[String] with Logging {
--- End diff --

Cool!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-01 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1434#issuecomment-50938687
  
QA tests have started for PR 1434. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17701/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-01 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1434#issuecomment-50938943
  
QA results for PR 1434:br- This patch FAILED unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brpublic final class JavaKinesisWordCountASL {brbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17701/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15721090
  
--- Diff: bin/run-example ---
@@ -29,7 +29,9 @@ if [ -n $1 ]; then
 else
   echo Usage: ./bin/run-example example-class [example-args] 12
   echo   - set MASTER=XX to use a specific master 12
-  echo   - can use abbreviated example class name (e.g. SparkPi, 
mllib.LinearRegression) 12
+  echo   - can use abbreviated example class name relative to 
com.apache.spark.examples 12
+  echo  (e.g. SparkPi, mllib.LinearRegression, 
streaming.KinesisWordCountASL) 12
+  echo   - to run the Kinesis Spark Streaming example, make sure you 
build with -Pkinesis-asl 12
--- End diff --

@pwendell  Is it a good idea to have this in the main bin/run-example?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15721167
  
--- Diff: dev/audit-release/sbt_app_kinesis/build.sbt ---
@@ -0,0 +1,30 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the License); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an AS IS BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+name := Kinesis Test
+
+version := 1.0
+
+scalaVersion := System.getenv.get(SCALA_VERSION)
+
+libraryDependencies += org.apache.spark %% spark-core % 
System.getenv.get(SPARK_VERSION)
+libraryDependencies += org.apache.spark %% spark-streaming % 
System.getenv.get(SPARK_VERSION)
--- End diff --

nit: done need spark-core and spark-streaming as they will be caught 
through dependencies of spark-streaming-kinesis-asl


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15722812
  
--- Diff: 
examples/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
 ---
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.streaming;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kinesis.KinesisUtils;
+
+import scala.Tuple2;
+
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.google.common.collect.Lists;
+
+/**
+ * Java-friendly Kinesis Spark Streaming WordCount example
+ *
+ * See http://spark.apache.org/docs/latest/streaming-kinesis.html for more 
details
+ * on the Kinesis Spark Streaming integration.
+ *
+ * This example spins up 1 Kinesis Worker (Spark Streaming Receiver) per 
shard
+ *   for the given stream.
+ * It then starts pulling from the last checkpointed sequence number of 
the given
+ *   stream-name and endpoint-url. 
+ *
+ * Valid endpoint urls:  
http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region
+ *
+ * This code uses the DefaultAWSCredentialsProviderChain and searches for 
credentials 
+ *  in the following order of precedence: 
+ * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ * Java System Properties - aws.accessKeyId and aws.secretKey
+ * Credential profiles file - default location 
(~/.aws/credentials) shared by all AWS SDKs
+ * Instance profile credentials - delivered through the Amazon EC2 
metadata service
+ *
+ * Usage: JavaKinesisWordCountASL stream-name endpoint-url
+ * stream-name is the name of the Kinesis stream (ie. 
mySparkStream)
+ * endpoint-url is the endpoint of the Kinesis service 
+ *   (ie. https://kinesis.us-east-1.amazonaws.com)
+ *
+ * Example:
+ *  $ export AWS_ACCESS_KEY_ID=your-access-key
+ *  $ export AWS_SECRET_KEY=your-secret-key
+ *  $ $SPARK_HOME/bin/run-example \
+ *org.apache.spark.examples.streaming.JavaKinesisWordCountASL 
mySparkStream \
+ *https://kinesis.us-east-1.amazonaws.com
+ *
+ * There is a companion helper class called KinesisWordCountProducerASL 
which puts dummy data 
+ *   onto the Kinesis stream. 
+ * Usage instructions for KinesisWordCountProducerASL are provided in the 
class definition.
+ */
+public final class JavaKinesisWordCountASL {
+private static final Pattern WORD_SEPARATOR = Pattern.compile( );
+private static final Logger logger = 
Logger.getLogger(JavaKinesisWordCountASL.class);
+
+/**
+ * Make the constructor private to enforce singleton
+ */
+private JavaKinesisWordCountASL() {
+}
+
+public static void main(String[] args) {
+/**
--- End diff --

Inline comment style should /* ...  */   /** is reserved only for scala 
docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure 

[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15722833
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
 ---
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import java.nio.ByteBuffer
+
+import scala.util.Random
+
+import org.apache.spark.Logging
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.Milliseconds
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
+import org.apache.spark.streaming.kinesis.KinesisUtils
+
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
+import com.amazonaws.services.kinesis.AmazonKinesisClient
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import com.amazonaws.services.kinesis.model.PutRecordRequest
+
+/**
+ * Kinesis Spark Streaming WordCount example.
+ *
+ * See http://spark.apache.org/docs/latest/streaming-kinesis.html for more 
details on
+ *   the Kinesis Spark Streaming integration.
+ *
+ * This example spins up 1 Kinesis Worker (Spark Streaming Receiver) per 
shard 
+ *   for the given stream.
+ * It then starts pulling from the last checkpointed sequence number of 
the given 
+ *   stream-name and endpoint-url. 
+ *
+ * Valid endpoint urls:  
http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region
+ * 
+ * This code uses the DefaultAWSCredentialsProviderChain and searches for 
credentials
+ *   in the following order of precedence:
+ * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ * Java System Properties - aws.accessKeyId and aws.secretKey
+ * Credential profiles file - default location (~/.aws/credentials) shared 
by all AWS SDKs
+ * Instance profile credentials - delivered through the Amazon EC2 
metadata service
+ *
+ * Usage: KinesisWordCountASL stream-name endpoint-url
+ *   stream-name is the name of the Kinesis stream (ie. mySparkStream)
+ *   endpoint-url is the endpoint of the Kinesis service
+ * (ie. https://kinesis.us-east-1.amazonaws.com)
+ *
+ * Example:
+ *$ export AWS_ACCESS_KEY_ID=your-access-key
+ *$ export AWS_SECRET_KEY=your-secret-key
+ *$ $SPARK_HOME/bin/run-example \
+ *org.apache.spark.examples.streaming.KinesisWordCountASL 
mySparkStream \
+ *https://kinesis.us-east-1.amazonaws.com
+ *
+ * There is a companion helper class below called 
KinesisWordCountProducerASL which puts
+ *   dummy data onto the Kinesis stream.
+ * Usage instructions for KinesisWordCountProducerASL are provided in that 
class definition.
+ */
+object KinesisWordCountASL extends Logging {
+  def main(args: Array[String]) {
+/**
+ * Check that all required args were passed in.
+ */
+if (args.length  2) {
+  System.err.println(
+
+  |Usage: KinesisWordCount stream-name endpoint-url
+  |stream-name is the name of the Kinesis stream
+  |endpoint-url is the endpoint of the Kinesis service
+  |   (e.g. 
https://kinesis.us-east-1.amazonaws.com)
+.stripMargin)
+  System.exit(1)
+}
+
+StreamingExamples.setStreamingLogLevels()
+
+/** Populate the appropriate variables from the given args */
+val Array(streamName, endpointUrl) = args
+
+/** Determine the number of shards from the stream */
+val kinesisClient = new AmazonKinesisClient(new 
DefaultAWSCredentialsProviderChain())
+kinesisClient.setEndpoint(endpointUrl)
+val numShards = 
kinesisClient.describeStream(streamName).getStreamDescription().getShards()
+  .size()
+
+/** In this example, we're going to create 1 

[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15723335
  
--- Diff: extras/kinesis-asl/src/main/resources/log4j.properties ---
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file streaming/target/unit-tests.log
+log4j.rootCategory=WARN, console
+
+# File appender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p 
%c{1}: %m%n
+
+# Console appender
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.out
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p 
%c{1}: %m%n
+
+# Settings to quiet third party logs that are too verbose
+log4j.logger.org.eclipse.jetty=WARN
+log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
+log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
+log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
+
+# Log all Kinesis Streaming messages
+log4j.logger.org.apache.spark.examples.streaming=DEBUG
--- End diff --

These lines were removed in the PR I made to your branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15723363
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.Duration
+import org.apache.spark.streaming.util.Clock
+import org.apache.spark.streaming.util.ManualClock
+import org.apache.spark.streaming.util.SystemClock
+
+/**
+ * This is a helper class for managing checkpoint clocks.
+ *
+ * @param checkpointInterval 
+ * @param currentClock.  Default to current SystemClock if none is passed 
in (mocking purposes)
+ */
+private[kinesis] class KinesisCheckpointState(
--- End diff --

Good you changed the name!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15723355
  
--- Diff: extras/kinesis-asl/src/main/resources/log4j.properties ---
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file streaming/target/unit-tests.log
--- End diff --

This lines was removed. as this property does not determine that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15724763
  
--- Diff: extras/kinesis-asl/src/test/resources/log4j.properties ---
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set everything to be logged to the file streaming/target/unit-tests.log
--- End diff --

This path is probably wrong.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15724783
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.Duration
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.streaming.api.java.JavaStreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+
+/**
+ * Helper class to create Amazon Kinesis Input Stream
+ * :: Experimental ::
+ */
+@Experimental
+object KinesisUtils extends Logging {
+  /**
+   * Create an InputDStream that pulls messages from a Kinesis stream.
+   *
+   * @param sscStreamingContext object
+   * @param streamName   Kinesis stream name
+   * @param endpointUrl  Url of Kinesis service (e.g., 
https://kinesis.us-east-1.amazonaws.com)
+   * @param checkpointInterval  Checkpoint interval for Kinesis 
checkpointing.
+   *See the Kinesis Spark Streaming 
documentation for more
+   *details on the different types of 
checkpoints.
+   * @param initialPositionInStream  In the absence of Kinesis checkpoint 
info, this is the
+   * worker's initial starting position in 
the stream.
+   * The values are either the beginning 
of the stream
+   * per Kinesis' limit of 24 hours
+   * 
(InitialPositionInStream.TRIM_HORIZON) or
+   * the tip of the stream 
(InitialPositionInStream.LATEST).
+   * @param storageLevel Storage level to use for storing the received 
objects
+   *
+   * @return ReceiverInputDStream[Array[Byte]]
+   */
+  def createStream(
+  ssc: StreamingContext,
+  streamName: String,
+  endpointUrl: String,
+  checkpointInterval: Duration,
+  initialPositionInStream: InitialPositionInStream,
+  storageLevel: StorageLevel): ReceiverInputDStream[Array[Byte]] = {
+ssc.receiverStream(new KinesisReceiver(ssc.sc.appName, streamName, 
endpointUrl,
+checkpointInterval, initialPositionInStream, storageLevel))
+  }
+
+  /**
+   * Create a Java-friendly InputDStream that pulls messages from a 
Kinesis stream.
+   *
+   * @param jssc Java StreamingContext object
+   * @param sscStreamingContext object
+   * @param streamName   Kinesis stream name
+   * @param endpointUrl  Url of Kinesis service (e.g., 
https://kinesis.us-east-1.amazonaws.com)
+   * @param checkpointInterval  Checkpoint interval for Kinesis 
checkpointing.
+   *See the Kinesis Spark Streaming 
documentation for more
+   *details on the different types of 
checkpoints.
+   * @param initialPositionInStream  In the absence of Kinesis checkpoint 
info, this is the
+   * worker's initial starting position in 
the stream.
+   * The values are either the beginning 
of the stream
+   * per Kinesis' limit of 24 hours
+   * 
(InitialPositionInStream.TRIM_HORIZON) or
+   * the tip of the stream 
(InitialPositionInStream.LATEST).
+   * @param storageLevel Storage level to use for storing the received 
objects
+   *
+   * @return JavaReceiverInputDStream[Array[Byte]]   
+   *
+   * 

[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15724923
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.List
+
+import scala.collection.JavaConversions.asScalaBuffer
+import scala.util.Random
+
+import org.apache.spark.Logging
+
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
+import com.amazonaws.services.kinesis.model.Record
+
+/**
+ * Kinesis-specific implementation of the Kinesis Client Library (KCL) 
IRecordProcessor.
+ * This implementation operates on the Array[Byte] from the 
KinesisReceiver.
+ * The Kinesis Worker creates an instance of this KinesisRecordProcessor 
upon startup.
+ *
+ * @param receiver Kinesis receiver
+ * @param workerId for logging purposes
+ * @param checkpointState represents the checkpoint state including the 
next checkpoint time.
+ *   It's injected here for mocking purposes.
+ */
+private[kinesis] class KinesisRecordProcessor(
+receiver: KinesisReceiver,
+workerId: String,
+checkpointState: KinesisCheckpointState) extends IRecordProcessor with 
Logging {
+
+  /** shardId to be populated during initialize() */
+  var shardId: String = _
+
+  /**
+   * The Kinesis Client Library calls this method during IRecordProcessor 
initialization.
+   *
+   * @param shardId assigned by the KCL to this particular RecordProcessor.
+   */
+  override def initialize(shardId: String) {
+logInfo(sInitialize:  Initializing workerId $workerId with shardId 
$shardId)
+this.shardId = shardId
+  }
+
+  /**
+   * This method is called by the KCL when a batch of records is pulled 
from the Kinesis stream.
+   * This is the record-processing bridge between the KCL's 
IRecordProcessor.processRecords()
+   * and Spark Streaming's Receiver.store().
+   *
+   * @param batch list of records from the Kinesis stream shard
+   * @param checkpointer used to update Kinesis when this batch has been 
processed/stored 
+   *   in the DStream
+   */
+  override def processRecords(batch: List[Record], checkpointer: 
IRecordProcessorCheckpointer) {
+if (!receiver.isStopped()) {
+  try {
+/**
+ * Note:  If we try to store the raw ByteBuffer from 
record.getData(), the Spark Streaming
+ * Receiver.store(ByteBuffer) attempts to deserialize the 
ByteBuffer using the
+ *   internally-configured Spark serializer (kryo, etc).
+ * This is not desirable, so we instead store a raw Array[Byte] 
and decouple
+ *   ourselves from Spark's internal serialization strategy.
+ */
+batch.foreach(record = 
+  
KinesisRecordProcessor.retry(receiver.store(record.getData().array()), 4, 500)
--- End diff --

Backing off by 500 here is a probably a bad idea. Maybe reduce it to 10 ms. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15726162
  
--- Diff: project/SparkBuild.scala ---
@@ -62,7 +62,7 @@ object SparkBuild extends PomBuild {
 var isAlphaYarn = false
 var profiles: mutable.Seq[String] = mutable.Seq.empty
 if (Properties.envOrNone(SPARK_GANGLIA_LGPL).isDefined) {
-  println(NOTE: SPARK_GANGLIA_LGPL is deprecated, please use 
-Pganglia-lgpl flag.)
+  println(NOTE: SPARK_GANGLIA_LGPL is deprecated, please use 
-Pspark-ganglia-lgpl flag.)
--- End diff --

There is nothing in that JIRA. no monty at all. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15726168
  
--- Diff: project/SparkBuild.scala ---
@@ -62,7 +62,7 @@ object SparkBuild extends PomBuild {
 var isAlphaYarn = false
 var profiles: mutable.Seq[String] = mutable.Seq.empty
 if (Properties.envOrNone(SPARK_GANGLIA_LGPL).isDefined) {
-  println(NOTE: SPARK_GANGLIA_LGPL is deprecated, please use 
-Pganglia-lgpl flag.)
+  println(NOTE: SPARK_GANGLIA_LGPL is deprecated, please use 
-Pspark-ganglia-lgpl flag.)
--- End diff --

But I think i get the point, that this is the wrong advice statement.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-01 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1434#issuecomment-50951626
  
QA tests have started for PR 1434. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17736/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15726533
  
--- Diff: 
extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
 ---
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.streaming;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kinesis.KinesisUtils;
+
+import scala.Tuple2;
+
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.google.common.collect.Lists;
+
+/**
+ * Java-friendly Kinesis Spark Streaming WordCount example
+ *
+ * See http://spark.apache.org/docs/latest/streaming-kinesis.html for more 
details
+ * on the Kinesis Spark Streaming integration.
+ *
+ * This example spins up 1 Kinesis Worker (Spark Streaming Receiver) per 
shard
+ *   for the given stream.
+ * It then starts pulling from the last checkpointed sequence number of 
the given
+ *   stream-name and endpoint-url. 
+ *
+ * Valid endpoint urls:  
http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region
+ *
+ * This code uses the DefaultAWSCredentialsProviderChain and searches for 
credentials 
+ *  in the following order of precedence: 
+ * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ * Java System Properties - aws.accessKeyId and aws.secretKey
+ * Credential profiles file - default location 
(~/.aws/credentials) shared by all AWS SDKs
+ * Instance profile credentials - delivered through the Amazon EC2 
metadata service
+ *
+ * Usage: JavaKinesisWordCountASL stream-name endpoint-url
+ * stream-name is the name of the Kinesis stream (ie. 
mySparkStream)
+ * endpoint-url is the endpoint of the Kinesis service 
+ *   (ie. https://kinesis.us-east-1.amazonaws.com)
+ *
+ * Example:
+ *  $ export AWS_ACCESS_KEY_ID=your-access-key
+ *  $ export AWS_SECRET_KEY=your-secret-key
+ *  $ $SPARK_HOME/bin/run-example \
+ *org.apache.spark.examples.streaming.JavaKinesisWordCountASL 
mySparkStream \
+ *https://kinesis.us-east-1.amazonaws.com
+ *
+ * There is a companion helper class called KinesisWordCountProducerASL 
which puts dummy data 
+ *   onto the Kinesis stream. 
+ * Usage instructions for KinesisWordCountProducerASL are provided in the 
class definition.
+ */
+public final class JavaKinesisWordCountASL {
+private static final Pattern WORD_SEPARATOR = Pattern.compile( );
+private static final Logger logger = 
Logger.getLogger(JavaKinesisWordCountASL.class);
+
+/*
+ * Make the constructor private to enforce singleton
+ */
+private JavaKinesisWordCountASL() {
+}
+
+public static void main(String[] args) {
+/*
+ * Check that all required args were passed in.
+ */
+if (args.length  2) {
+  System.err.println(
+  |Usage: KinesisWordCount stream-name endpoint-url\n +
+  |stream-name is the name of the Kinesis stream\n +
+  |endpoint-url is the endpoint of the 

[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15726872
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.Duration
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.streaming.api.java.JavaStreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+
+/**
+ * Helper class to create Amazon Kinesis Input Stream
+ * :: Experimental ::
+ */
+@Experimental
+object KinesisUtils extends Logging {
+  /**
--- End diff --

This does not need to extend Logging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-08-01 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15726898
  
--- Diff: bin/run-example ---
@@ -29,7 +29,9 @@ if [ -n $1 ]; then
 else
   echo Usage: ./bin/run-example example-class [example-args] 12
   echo   - set MASTER=XX to use a specific master 12
-  echo   - can use abbreviated example class name (e.g. SparkPi, 
mllib.LinearRegression) 12
+  echo   - can use abbreviated example class name relative to 
com.apache.spark.examples 12
+  echo  (e.g. SparkPi, mllib.LinearRegression, 
streaming.KinesisWordCountASL) 12
+  echo   - to run the Kinesis Spark Streaming example, make sure you 
build with -Pkinesis-asl 12
--- End diff --

Hey I'd prefer not to have this in this short text here. We can add a note 
in the docs for this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-31 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15631461
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.streaming.api.java.JavaStreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+
+/**
+ * Facade to create the Scala-based or Java-based streams.
+ * Also, contains a reusable utility methods.
+ * :: Experimental ::
+ */
+@Experimental
+object KinesisUtils extends Logging {
+  /**
+   * Create an InputDStream that pulls messages from a Kinesis stream.
+   *
+   * @param StreamingContext object
+   * @param appName Kinesis Application Name.  Kinesis Apps are mapped to 
Kinesis Streams 
+   *   by the Kinesis Client Library.  If you change the App name or 
Stream name, 
+   *   the KCL will throw errors.
+   * @param stream Kinesis Stream Name
+   * @param endpoint url of Kinesis service
+   * @param checkpoint interval (millis) for Kinesis checkpointing (not 
Spark checkpointing).
+   * See the Kinesis Spark Streaming documentation for more details on the 
different types 
+   *   of checkpoints.
+   * @param initialPositionInStream in the absence of Kinesis checkpoint 
info, this is the 
+   *   worker's initial starting position in the stream.
+   * The values are either the beginning of the stream per Kinesis' limit 
of 24 hours 
+   *   (InitialPositionInStream.TRIM_HORIZON) or the tip of the stream 
+   *   (InitialPositionInStream.LATEST).
+   * The default is TRIM_HORIZON to avoid potential data loss.  However, 
this presents the risk 
+   *   of processing records more than once.
+   * @param storageLevel The default is StorageLevel.MEMORY_AND_DISK_2 
which replicates in-memory 
+   *   and on-disk to 2 nodes total (primary and secondary)
+   *
+   * @return ReceiverInputDStream[Array[Byte]]
+   */
+  def createStream(
+  ssc: StreamingContext,
+  appName: String,
+  stream: String,
+  endpoint: String,
+  checkpointIntervalMillis: Long,
+  initialPositionInStream: InitialPositionInStream,
+  storageLevel: StorageLevel): ReceiverInputDStream[Array[Byte]] = {
+ssc.receiverStream(new KinesisReceiver(appName, stream, endpoint, 
checkpointIntervalMillis, 
+initialPositionInStream, storageLevel))
+  }
+
+  /**
+   * Create a Java-friendly InputDStream that pulls messages from a 
Kinesis stream.
+   *
+   * @param JavaStreamingContext object
+   * @param appName Kinesis Application Name.  Kinesis Apps are mapped to 
Kinesis Streams 
+   *   by the Kinesis Client Library.  If you change the App name or 
Stream name, 
+   *   the KCL will throw errors.
+   * @param stream Kinesis Stream Name
+   * @param endpoint url of Kinesis service
--- End diff --

Can you give couple of examples of endpoints to give an idea what they look 
like 
`@param endpoint Url of the Kinesis service (e.g. 
https://kinesis.us-west-2.amazon.com.or.something.like.that)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-31 Thread cfregly
Github user cfregly commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15651823
  
--- Diff: project/SparkBuild.scala ---
@@ -62,7 +62,7 @@ object SparkBuild extends PomBuild {
 var isAlphaYarn = false
 var profiles: mutable.Seq[String] = mutable.Seq.empty
 if (Properties.envOrNone(SPARK_GANGLIA_LGPL).isDefined) {
-  println(NOTE: SPARK_GANGLIA_LGPL is deprecated, please use 
-Pganglia-lgpl flag.)
+  println(NOTE: SPARK_GANGLIA_LGPL is deprecated, please use 
-Pspark-ganglia-lgpl flag.)
--- End diff --

that -P wasn't working.  new jira for the full monty:  
https://issues.apache.org/jira/browse/SPARK-2770


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-31 Thread cfregly
Github user cfregly commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15651971
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.streaming.api.java.JavaStreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+
+/**
+ * Facade to create the Scala-based or Java-based streams.
+ * Also, contains a reusable utility methods.
+ * :: Experimental ::
+ */
+@Experimental
+object KinesisUtils extends Logging {
+  /**
+   * Create an InputDStream that pulls messages from a Kinesis stream.
+   *
+   * @param StreamingContext object
+   * @param appName Kinesis Application Name.  Kinesis Apps are mapped to 
Kinesis Streams 
+   *   by the Kinesis Client Library.  If you change the App name or 
Stream name, 
+   *   the KCL will throw errors.
+   * @param stream Kinesis Stream Name
+   * @param endpoint url of Kinesis service
+   * @param checkpoint interval (millis) for Kinesis checkpointing (not 
Spark checkpointing).
--- End diff --

fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-31 Thread cfregly
Github user cfregly commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15653238
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.streaming.api.java.JavaStreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+
+/**
+ * Facade to create the Scala-based or Java-based streams.
+ * Also, contains a reusable utility methods.
+ * :: Experimental ::
+ */
+@Experimental
+object KinesisUtils extends Logging {
+  /**
+   * Create an InputDStream that pulls messages from a Kinesis stream.
+   *
+   * @param StreamingContext object
+   * @param appName Kinesis Application Name.  Kinesis Apps are mapped to 
Kinesis Streams 
+   *   by the Kinesis Client Library.  If you change the App name or 
Stream name, 
+   *   the KCL will throw errors.
+   * @param stream Kinesis Stream Name
+   * @param endpoint url of Kinesis service
+   * @param checkpoint interval (millis) for Kinesis checkpointing (not 
Spark checkpointing).
+   * See the Kinesis Spark Streaming documentation for more details on the 
different types 
+   *   of checkpoints.
+   * @param initialPositionInStream in the absence of Kinesis checkpoint 
info, this is the 
+   *   worker's initial starting position in the stream.
+   * The values are either the beginning of the stream per Kinesis' limit 
of 24 hours 
+   *   (InitialPositionInStream.TRIM_HORIZON) or the tip of the stream 
+   *   (InitialPositionInStream.LATEST).
+   * The default is TRIM_HORIZON to avoid potential data loss.  However, 
this presents the risk 
--- End diff --

right, i knew you would ask about this!  

scala can't differentiate between the two overloaded methods with default 
arguments in this situation:  
https://groups.google.com/forum/#!msg/scala-user/FyQK3-cqfaY/fXLHr8QsW_0J

it's totes weird, i know.

i'll remove storageLevel to simplify.  i'd like to keep 
initialPositionInStream in case there's a need to differentiate.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-31 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15654371
  
--- Diff: make-distribution.sh ---
@@ -196,6 +196,8 @@ cp -r $FWDIR/bin $DISTDIR
 cp -r $FWDIR/python $DISTDIR
 cp -r $FWDIR/sbin $DISTDIR
 cp -r $FWDIR/ec2 $DISTDIR
+cp -r $FWDIR/extras/kinesis-asl/bin $DISTDIR
--- End diff --

@pwendell Can you take a look?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-31 Thread cfregly
Github user cfregly commented on the pull request:

https://github.com/apache/spark/pull/1434#issuecomment-50786500
  
ah, gotcha matei.  the examples aren't part of the core, so they can depend 
on external libs.  i'll make the change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-31 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15656394
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.net.InetAddress
+import java.util.UUID
+
+import org.apache.spark.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.receiver.Receiver
+
+import com.amazonaws.auth.AWSCredentialsProvider
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
+
+/**
+ * Custom AWS Kinesis-specific implementation of Spark Streaming's 
Receiver.
+ * This implementation relies on the Kinesis Client Library (KCL) Worker 
as described here:
+ * https://github.com/awslabs/amazon-kinesis-client
+ * This is a custom receiver used with 
StreamingContext.receiverStream(Receiver) 
+ *   as described here:
+ * http://spark.apache.org/docs/latest/streaming-custom-receivers.html
+ * Instances of this class will get shipped to the Spark Streaming Workers 
+ *   to run within a Spark Executor.
+ *
+ * @param appName Kinesis Application Name.  Kinesis apps are mapped to 
Kinesis streams 
+ *   by the Kinesis Client Library.  If you change the app name or stream 
name, 
+ *   the KCL will throw errors.
+ * @param stream Kinesis stream name
+ * @param endpoint url of Kinesis service
+ * @param checkpointIntervalMillis for Kinesis checkpointing (not Spark 
checkpointing).
+ *   See the Kinesis Spark Streaming documentation for more details on the 
different types 
+ * of checkpoints.
+ * @param initialPositionInStream in the absence of Kinesis checkpoint 
info, this is the worker's initial 
--- End diff --

This lines is more than 100 chars


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-31 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1434#issuecomment-50792452
  
QA tests have started for PR 1434. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17589/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-31 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1434#issuecomment-50792565
  
QA results for PR 1434:br- This patch FAILED unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brpublic final class JavaKinesisWordCount {brclass 
KinesisStringRecordSerializer extends KinesisRecordSerializer[String] with 
Logging {brbrFor more information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17589/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-31 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15657271
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisStringRecordSerializer.scala
 ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.Logging
+
+/**
+ * Implementation of KinesisRecordSerializer to convert Array[Byte] 
to/from String.
+ */
+class KinesisStringRecordSerializer extends 
KinesisRecordSerializer[String] with Logging {
--- End diff --

Is this class supposed to be publcly visible? If not, do the expected :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-31 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15657421
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessorUtils.scala
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import scala.util.Random
+
+import org.apache.spark.Logging
+
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException
+
+
+/**
+ * Helper for the KinesisRecordProcessor.
+ */
+private[kinesis] object KinesisRecordProcessorUtils extends Logging {
--- End diff --

This is a nit pick. But such utils function can often be added in the 
companion object of the same class. So this could have been added to `object 
KinesisRecordProcessor`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-31 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15668060
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala
 ---
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import java.nio.ByteBuffer
+
+import scala.util.Random
+
+import org.apache.log4j.Level
+import org.apache.log4j.Logger
+import org.apache.spark.Logging
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkContext.rddToOrderedRDDFunctions
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.Milliseconds
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
+import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer
+import org.apache.spark.streaming.kinesis.KinesisUtils
+
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
+import com.amazonaws.services.kinesis.AmazonKinesisClient
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import com.amazonaws.services.kinesis.model.PutRecordRequest
+
+/**
+ * Kinesis Spark Streaming WordCount example.
+ *
+ * See 
http://spark.apache.org/docs/latest/streaming-programming-guide.html for more 
details on
+ *   the Kinesis Spark Streaming integration.
+ *
+ * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per 
shard of the
+ *   given stream.
+ * It then starts pulling from the last checkpointed sequence number of 
the given 
+ *   stream-name and endpoint-url. 
+ *
+ * Valid endpoint urls:  
http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region
+ * 
+ * This code uses the DefaultAWSCredentialsProviderChain and searches for 
credentials
+ *   in the following order of precedence:
+ * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ * Java System Properties - aws.accessKeyId and aws.secretKey
+ * Credential profiles file - default location (~/.aws/credentials) shared 
by all AWS SDKs
+ * Instance profile credentials - delivered through the Amazon EC2 
metadata service
+ *
+ * Usage: KinesisWordCount stream-name endpoint-url
+ *   stream-name is the name of the Kinesis stream (ie. mySparkStream)
+ *   endpoint-url is the endpoint of the Kinesis service
+ * (ie. https://kinesis.us-east-1.amazonaws.com)
+ *
+ * Example:
+ *$ export AWS_ACCESS_KEY_ID=your-access-key
+ *$ export AWS_SECRET_KEY=your-secret-key
+ *$ $SPARK_HOME/bin/run-example \
+ *org.apache.spark.examples.streaming.KinesisWordCount 
mySparkStream \
+ *https://kinesis.us-east-1.amazonaws.com
+ *
+ * There is a companion helper class below called KinesisWordCountProducer 
which puts
+ *   dummy data onto the Kinesis stream.
+ * Usage instructions for KinesisWordCountProducer are provided in that 
class definition.
+ */
+object KinesisWordCount extends Logging {
+  val WordSeparator =  
+
+  def main(args: Array[String]) {
+/**
+ * Check that all required args were passed in.
+ */
+if (args.length  2) {
+  System.err.println(Usage: KinesisWordCount stream-name 
endpoint-url)
+  System.exit(1)
+}
+
+/**
+ * (This was lifted from the StreamingExamples.scala in order to avoid 
the dependency
+ *   on the spark-examples artifact.)
+ * Set reasonable logging levels for streaming if the user has not 
configured log4j.
+ */
+val log4jInitialized = 
Logger.getRootLogger.getAllAppenders.hasMoreElements()
+if (!log4jInitialized) {
+  /** 
+   *  We first log something to initialize Spark's default logging, 
+   *  then we 

[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-31 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15668098
  
--- Diff: 
examples/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCount.java
 ---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.streaming;
+
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.Milliseconds;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kinesis.KinesisRecordSerializer;
+import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer;
+import org.apache.spark.streaming.kinesis.KinesisUtils;
+
+import scala.Tuple2;
+
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+
+/**
+ * Java-friendly Kinesis Spark Streaming WordCount example
+ *
+ * See 
http://spark.apache.org/docs/latest/streaming-programming-guide.html for more 
details 
+ * on the Kinesis Spark Streaming integration.
+ *
+ * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per 
shard 
+ *   of the given stream.
+ * It then starts pulling from the last checkpointed sequence number of 
the given 
+ *   stream-name and endpoint-url. 
+ *
+ * Valid endpoint urls:  
http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region
+ *
+ * This code uses the DefaultAWSCredentialsProviderChain and searches for 
credentials 
+ *  in the following order of precedence: 
+ * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ * Java System Properties - aws.accessKeyId and aws.secretKey
+ * Credential profiles file - default location 
(~/.aws/credentials) shared by all AWS SDKs
+ * Instance profile credentials - delivered through the Amazon EC2 
metadata service
+ *
+ * Usage: JavaKinesisWordCount stream-name endpoint-url
+ * stream-name is the name of the Kinesis stream (ie. 
mySparkStream)
+ * endpoint-url is the endpoint of the Kinesis service 
+ *   (ie. https://kinesis.us-east-1.amazonaws.com)
+ *
+ * Example:
+ *  $ export AWS_ACCESS_KEY_ID=your-access-key
+ *  $ export AWS_SECRET_KEY=your-secret-key
+ *  $ $SPARK_HOME/bin/run-example \
+ *org.apache.spark.examples.streaming.JavaKinesisWordCount 
mySparkStream \
+ *https://kinesis.us-east-1.amazonaws.com
+ *
+ * There is a companion helper class called KinesisWordCountProducer which 
puts dummy data 
+ *   onto the Kinesis stream. 
+ * Usage instructions for KinesisWordCountProducer are provided in the 
class definition.
+ */
+public final class JavaKinesisWordCount {
+private static final Pattern WORD_SEPARATOR = Pattern.compile( );
+private static final Logger logger = 
Logger.getLogger(JavaKinesisWordCount.class);
+
+/**
+ * Make the constructor private to enforce singleton
+ */
+private JavaKinesisWordCount() {
+}
+
+public static void main(String[] args) {
+/**
+ * 

[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-31 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15668154
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala
 ---
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import java.nio.ByteBuffer
+
+import scala.util.Random
+
+import org.apache.log4j.Level
+import org.apache.log4j.Logger
+import org.apache.spark.Logging
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkContext.rddToOrderedRDDFunctions
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.Milliseconds
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
+import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer
+import org.apache.spark.streaming.kinesis.KinesisUtils
+
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
+import com.amazonaws.services.kinesis.AmazonKinesisClient
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import com.amazonaws.services.kinesis.model.PutRecordRequest
+
+/**
+ * Kinesis Spark Streaming WordCount example.
+ *
+ * See 
http://spark.apache.org/docs/latest/streaming-programming-guide.html for more 
details on
+ *   the Kinesis Spark Streaming integration.
+ *
+ * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per 
shard of the
+ *   given stream.
+ * It then starts pulling from the last checkpointed sequence number of 
the given 
+ *   stream-name and endpoint-url. 
+ *
+ * Valid endpoint urls:  
http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region
+ * 
+ * This code uses the DefaultAWSCredentialsProviderChain and searches for 
credentials
+ *   in the following order of precedence:
+ * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ * Java System Properties - aws.accessKeyId and aws.secretKey
+ * Credential profiles file - default location (~/.aws/credentials) shared 
by all AWS SDKs
+ * Instance profile credentials - delivered through the Amazon EC2 
metadata service
+ *
+ * Usage: KinesisWordCount stream-name endpoint-url
+ *   stream-name is the name of the Kinesis stream (ie. mySparkStream)
+ *   endpoint-url is the endpoint of the Kinesis service
+ * (ie. https://kinesis.us-east-1.amazonaws.com)
+ *
+ * Example:
+ *$ export AWS_ACCESS_KEY_ID=your-access-key
+ *$ export AWS_SECRET_KEY=your-secret-key
+ *$ $SPARK_HOME/bin/run-example \
+ *org.apache.spark.examples.streaming.KinesisWordCount 
mySparkStream \
+ *https://kinesis.us-east-1.amazonaws.com
+ *
+ * There is a companion helper class below called KinesisWordCountProducer 
which puts
+ *   dummy data onto the Kinesis stream.
+ * Usage instructions for KinesisWordCountProducer are provided in that 
class definition.
+ */
+object KinesisWordCount extends Logging {
+  val WordSeparator =  
+
+  def main(args: Array[String]) {
+/**
+ * Check that all required args were passed in.
+ */
+if (args.length  2) {
+  System.err.println(Usage: KinesisWordCount stream-name 
endpoint-url)
+  System.exit(1)
+}
+
+/**
+ * (This was lifted from the StreamingExamples.scala in order to avoid 
the dependency
+ *   on the spark-examples artifact.)
+ * Set reasonable logging levels for streaming if the user has not 
configured log4j.
+ */
+val log4jInitialized = 
Logger.getRootLogger.getAllAppenders.hasMoreElements()
+if (!log4jInitialized) {
+  /** 
+   *  We first log something to initialize Spark's default logging, 
+   *  then we 

[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-31 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15668266
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala
 ---
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import java.nio.ByteBuffer
+
+import scala.util.Random
+
+import org.apache.log4j.Level
+import org.apache.log4j.Logger
+import org.apache.spark.Logging
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkContext.rddToOrderedRDDFunctions
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.Milliseconds
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
+import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer
+import org.apache.spark.streaming.kinesis.KinesisUtils
+
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
+import com.amazonaws.services.kinesis.AmazonKinesisClient
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import com.amazonaws.services.kinesis.model.PutRecordRequest
+
+/**
+ * Kinesis Spark Streaming WordCount example.
+ *
+ * See 
http://spark.apache.org/docs/latest/streaming-programming-guide.html for more 
details on
+ *   the Kinesis Spark Streaming integration.
+ *
+ * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per 
shard of the
+ *   given stream.
+ * It then starts pulling from the last checkpointed sequence number of 
the given 
+ *   stream-name and endpoint-url. 
+ *
+ * Valid endpoint urls:  
http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region
+ * 
+ * This code uses the DefaultAWSCredentialsProviderChain and searches for 
credentials
+ *   in the following order of precedence:
+ * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ * Java System Properties - aws.accessKeyId and aws.secretKey
+ * Credential profiles file - default location (~/.aws/credentials) shared 
by all AWS SDKs
+ * Instance profile credentials - delivered through the Amazon EC2 
metadata service
+ *
+ * Usage: KinesisWordCount stream-name endpoint-url
+ *   stream-name is the name of the Kinesis stream (ie. mySparkStream)
+ *   endpoint-url is the endpoint of the Kinesis service
+ * (ie. https://kinesis.us-east-1.amazonaws.com)
+ *
+ * Example:
+ *$ export AWS_ACCESS_KEY_ID=your-access-key
+ *$ export AWS_SECRET_KEY=your-secret-key
+ *$ $SPARK_HOME/bin/run-example \
+ *org.apache.spark.examples.streaming.KinesisWordCount 
mySparkStream \
+ *https://kinesis.us-east-1.amazonaws.com
+ *
+ * There is a companion helper class below called KinesisWordCountProducer 
which puts
+ *   dummy data onto the Kinesis stream.
+ * Usage instructions for KinesisWordCountProducer are provided in that 
class definition.
+ */
+object KinesisWordCount extends Logging {
+  val WordSeparator =  
+
+  def main(args: Array[String]) {
+/**
+ * Check that all required args were passed in.
+ */
+if (args.length  2) {
+  System.err.println(Usage: KinesisWordCount stream-name 
endpoint-url)
+  System.exit(1)
+}
+
+/**
+ * (This was lifted from the StreamingExamples.scala in order to avoid 
the dependency
+ *   on the spark-examples artifact.)
+ * Set reasonable logging levels for streaming if the user has not 
configured log4j.
+ */
+val log4jInitialized = 
Logger.getRootLogger.getAllAppenders.hasMoreElements()
+if (!log4jInitialized) {
+  /** 
+   *  We first log something to initialize Spark's default logging, 
+   *  then we 

[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-31 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/1434#issuecomment-50819992
  
@pwendell Can you take a look at the pom, and other build stuff?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-31 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15670682
  
--- Diff: dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala ---
@@ -47,7 +47,14 @@ object SimpleApp {
   System.exit(-1)
 }
 if (foundGanglia) {
-  println(Ganglia sink was loaded via spark-core)
+  println(Ganglia sink was loaded via spark-ganglia-lgpl)
+  System.exit(-1)
+}
+
+// Remove kinesis from default build due to ASL license issue
+val foundKinesis = 
Try(Class.forName(org.apache.spark.streaming.kinesis.KinesisUtils)).isSuccess
+if (foundKinesis) {
+  println(Kinesis was loaded via kinesis-asl)
--- End diff --

This statement should be Kinesis was loaded via spark-core


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-31 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15670661
  
--- Diff: dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala ---
@@ -47,7 +47,14 @@ object SimpleApp {
   System.exit(-1)
 }
 if (foundGanglia) {
-  println(Ganglia sink was loaded via spark-core)
+  println(Ganglia sink was loaded via spark-ganglia-lgpl)
--- End diff --

This should remain as spark-core. This essentially checks whether 
dependencies on spark-core should not bring in Ganglia. So the statement is 
correct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-31 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15670918
  
--- Diff: assembly/pom.xml ---
@@ -186,6 +186,16 @@
   /dependencies
 /profile
 profile
+  idkinesis-asl/id
+  dependencies
+dependency
+  groupIdorg.apache.spark/groupId
+  artifactIdkinesis-asl_${scala.binary.version}/artifactId
--- End diff --

The artifact id should `spark-streaming-kinesis-asl` because the published 
maven dependencies should have spark (to make it clear that its related to 
Spark).
The profile id can be `kinesis-asl` as it is used only at Spark compile 
time when the person is already dealing with spark. 
Sorry if this wasnt clear.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15567656
  
--- Diff: 
extras/spark-kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCount.java
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.streaming;
+
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.Milliseconds;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.dstream.DStream;
+import org.apache.spark.streaming.kinesis.KinesisRecordSerializer;
+import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer;
+import org.apache.spark.streaming.kinesis.KinesisUtils;
+
+import scala.Tuple2;
+
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+
+/**
+ * Java-friendly Kinesis Spark Streaming WordCount example
+ *
+ * See 
http://spark.apache.org/docs/latest/streaming-programming-guide.html for more 
details on the Kinesis Spark Streaming integration.
+ *
+ * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per 
shard of the given stream.
+ * It then starts pulling from the tip of the given stream-name and 
endpoint-url at the given batch-interval.
+ * Because we're pulling from the tip (InitialPositionInStream.LATEST), 
only new stream data will be picked up after the KinesisReceiver starts.
+ * This could lead to missed records if data is added to the stream while 
no KinesisReceivers are running.
+ * In production, you'll want to switch to 
InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis 
limit) of previous stream data 
+ *  depending on the checkpoint frequency.
+ * InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing 
of records depending on the checkpoint frequency.
+ * Record processing should be idempotent when possible.
+ *
+ * This code uses the DefaultAWSCredentialsProviderChain and searches for 
credentials in the following order of precedence: 
+ * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ * Java System Properties - aws.accessKeyId and aws.secretKey
+ * Credential profiles file - default location 
(~/.aws/credentials) shared by all AWS SDKs
+ * Instance profile credentials - delivered through the Amazon EC2 
metadata service
+ *
+ * Usage: JavaKinesisWordCount stream-name endpoint-url 
batch-interval
+ * stream-name is the name of the Kinesis stream (ie. 
mySparkStream)
+ * endpoint-url is the endpoint of the Kinesis service (ie. 
https://kinesis.us-east-1.amazonaws.com)
+ * batch-interval is the batch interval in milliseconds (ie. 
1000ms)
+ *
+ * Example:
+ *  $ export AWS_ACCESS_KEY_ID=your-access-key
+ *  $ export AWS_SECRET_KEY=your-secret-key
+ *$ bin/run-kinesis-example  \
+ *org.apache.spark.examples.streaming.JavaKinesisWordCount 
mySparkStream https://kinesis.us-east-1.amazonaws.com 1000
+ *
+ * There is a companion helper class 

[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15567741
  
--- Diff: 
extras/spark-kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCount.java
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.streaming;
+
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.Milliseconds;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.dstream.DStream;
+import org.apache.spark.streaming.kinesis.KinesisRecordSerializer;
+import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer;
+import org.apache.spark.streaming.kinesis.KinesisUtils;
+
+import scala.Tuple2;
+
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+
+/**
+ * Java-friendly Kinesis Spark Streaming WordCount example
+ *
+ * See 
http://spark.apache.org/docs/latest/streaming-programming-guide.html for more 
details on the Kinesis Spark Streaming integration.
+ *
+ * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per 
shard of the given stream.
+ * It then starts pulling from the tip of the given stream-name and 
endpoint-url at the given batch-interval.
+ * Because we're pulling from the tip (InitialPositionInStream.LATEST), 
only new stream data will be picked up after the KinesisReceiver starts.
+ * This could lead to missed records if data is added to the stream while 
no KinesisReceivers are running.
+ * In production, you'll want to switch to 
InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis 
limit) of previous stream data 
+ *  depending on the checkpoint frequency.
+ * InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing 
of records depending on the checkpoint frequency.
+ * Record processing should be idempotent when possible.
+ *
+ * This code uses the DefaultAWSCredentialsProviderChain and searches for 
credentials in the following order of precedence: 
+ * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ * Java System Properties - aws.accessKeyId and aws.secretKey
+ * Credential profiles file - default location 
(~/.aws/credentials) shared by all AWS SDKs
+ * Instance profile credentials - delivered through the Amazon EC2 
metadata service
+ *
+ * Usage: JavaKinesisWordCount stream-name endpoint-url 
batch-interval
+ * stream-name is the name of the Kinesis stream (ie. 
mySparkStream)
+ * endpoint-url is the endpoint of the Kinesis service (ie. 
https://kinesis.us-east-1.amazonaws.com)
+ * batch-interval is the batch interval in milliseconds (ie. 
1000ms)
+ *
+ * Example:
+ *  $ export AWS_ACCESS_KEY_ID=your-access-key
+ *  $ export AWS_SECRET_KEY=your-secret-key
+ *$ bin/run-kinesis-example  \
+ *org.apache.spark.examples.streaming.JavaKinesisWordCount 
mySparkStream https://kinesis.us-east-1.amazonaws.com 1000
+ *
+ * There is a companion helper class 

[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15568250
  
--- Diff: 
extras/spark-kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala
 ---
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import java.nio.ByteBuffer
+import org.apache.log4j.Level
+import org.apache.log4j.Logger
+import org.apache.spark.Logging
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkContext._
+import org.apache.spark.streaming.Milliseconds
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
+import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer
+import org.apache.spark.streaming.kinesis.KinesisUtils
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
+import com.amazonaws.services.kinesis.AmazonKinesisClient
+import com.amazonaws.services.kinesis.model.PutRecordRequest
+import scala.util.Random
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.dstream.DStream
+
+/**
+ * Kinesis Spark Streaming WordCount example.
+ *
+ * See 
http://spark.apache.org/docs/latest/streaming-programming-guide.html for more 
details on the Kinesis Spark Streaming integration.
+ *
+ * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per 
shard of the given stream.
+ * It then starts pulling from the tip of the given stream-name and 
endpoint-url at the given batch-interval.
+ * Because we're pulling from the tip (InitialPositionInStream.LATEST), 
only new stream data will be picked up after the KinesisReceiver starts.
+ * This could lead to missed records if data is added to the stream while 
no KinesisReceivers are running.
+ * In production, you'll want to switch to 
InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis 
limit) of previous stream data 
+ *  depending on the checkpoint frequency.
+ *
+ * InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing 
of records depending on the checkpoint frequency.
+ * Record processing should be idempotent when possible.
+ *
+ * This code uses the DefaultAWSCredentialsProviderChain and searches for 
credentials in the following order of precedence:
+ * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ * Java System Properties - aws.accessKeyId and aws.secretKey
+ * Credential profiles file - default location (~/.aws/credentials) shared 
by all AWS SDKs
+ * Instance profile credentials - delivered through the Amazon EC2 
metadata service
+ *
+ * Usage: KinesisWordCount stream-name endpoint-url batch-interval
+ *   stream-name is the name of the Kinesis stream (ie. mySparkStream)
+ *   endpoint-url is the endpoint of the Kinesis service (ie. 
https://kinesis.us-east-1.amazonaws.com)
+ *   batch-interval is the batch interval in millis (ie. 1000ms)
+ *
+ * Example:
+ *  $ export AWS_ACCESS_KEY_ID=your-access-key
+ *  $ export AWS_SECRET_KEY=your-secret-key
+ *$ bin/run-kinesis-example \
+ *org.apache.spark.examples.streaming.KinesisWordCount 
mySparkStream https://kinesis.us-east-1.amazonaws.com 100
+ *
+ * There is a companion helper class below called KinesisWordCountProducer 
which puts dummy data onto the Kinesis stream.
+ * Usage instructions for KinesisWordCountProducer are provided in that 
class definition.
+ */
+object KinesisWordCount extends Logging {
+  val WordSeparator =  
+
+  def main(args: Array[String]) {
+/**
+ * Check that all required args were passed in.
+ */
+if (args.length  3) {
+  System.err.println(Usage: KinesisWordCount stream-name 

[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15568742
  
--- Diff: 
extras/spark-kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala
 ---
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import java.nio.ByteBuffer
+import org.apache.log4j.Level
+import org.apache.log4j.Logger
+import org.apache.spark.Logging
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkContext._
+import org.apache.spark.streaming.Milliseconds
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
+import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer
+import org.apache.spark.streaming.kinesis.KinesisUtils
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
+import com.amazonaws.services.kinesis.AmazonKinesisClient
+import com.amazonaws.services.kinesis.model.PutRecordRequest
+import scala.util.Random
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.dstream.DStream
+
+/**
+ * Kinesis Spark Streaming WordCount example.
+ *
+ * See 
http://spark.apache.org/docs/latest/streaming-programming-guide.html for more 
details on the Kinesis Spark Streaming integration.
+ *
+ * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per 
shard of the given stream.
+ * It then starts pulling from the tip of the given stream-name and 
endpoint-url at the given batch-interval.
+ * Because we're pulling from the tip (InitialPositionInStream.LATEST), 
only new stream data will be picked up after the KinesisReceiver starts.
+ * This could lead to missed records if data is added to the stream while 
no KinesisReceivers are running.
+ * In production, you'll want to switch to 
InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis 
limit) of previous stream data 
+ *  depending on the checkpoint frequency.
+ *
+ * InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing 
of records depending on the checkpoint frequency.
+ * Record processing should be idempotent when possible.
+ *
+ * This code uses the DefaultAWSCredentialsProviderChain and searches for 
credentials in the following order of precedence:
+ * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ * Java System Properties - aws.accessKeyId and aws.secretKey
+ * Credential profiles file - default location (~/.aws/credentials) shared 
by all AWS SDKs
+ * Instance profile credentials - delivered through the Amazon EC2 
metadata service
+ *
+ * Usage: KinesisWordCount stream-name endpoint-url batch-interval
+ *   stream-name is the name of the Kinesis stream (ie. mySparkStream)
+ *   endpoint-url is the endpoint of the Kinesis service (ie. 
https://kinesis.us-east-1.amazonaws.com)
+ *   batch-interval is the batch interval in millis (ie. 1000ms)
+ *
+ * Example:
+ *  $ export AWS_ACCESS_KEY_ID=your-access-key
+ *  $ export AWS_SECRET_KEY=your-secret-key
+ *$ bin/run-kinesis-example \
+ *org.apache.spark.examples.streaming.KinesisWordCount 
mySparkStream https://kinesis.us-east-1.amazonaws.com 100
+ *
+ * There is a companion helper class below called KinesisWordCountProducer 
which puts dummy data onto the Kinesis stream.
+ * Usage instructions for KinesisWordCountProducer are provided in that 
class definition.
+ */
+object KinesisWordCount extends Logging {
+  val WordSeparator =  
+
+  def main(args: Array[String]) {
+/**
+ * Check that all required args were passed in.
+ */
+if (args.length  3) {
+  System.err.println(Usage: KinesisWordCount stream-name 

[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15568964
  
--- Diff: 
extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
 ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.streaming.api.java.JavaStreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException
+import org.apache.spark.Logging
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
+import scala.util.Random
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.util.ManualClock
+import org.apache.spark.streaming.util.Clock
+import org.apache.spark.streaming.util.SystemClock
+
+/**
+ * Facade to create the Scala-based or Java-based streams.
+ * Also, contains a reusable utility methods.
+ */
+object KinesisUtils extends Logging {
+  /**
+   * Create an InputDStream that pulls messages from a Kinesis stream.
+   *
+   * @param StreamingContext object
+   * @param app name
+   * @param stream name
+   * @param endpoint
+   * @param checkpoint interval (millis) for Kinesis checkpointing (not 
Spark checkpointing).
+   * See the Kinesis Spark Streaming documentation for more details on the 
different types of checkpoints.
+   * The default is TRIM_HORIZON to avoid potential data loss.  However, 
this presents the risk of processing records more than once.
+   * @param in the absence of Kinesis checkpoint info, this is the 
worker's initial starting position in the stream.
+   * The values are either the beginning of the stream per Kinesis' limit 
of 24 hours (InitialPositionInStream.TRIM_HORIZON)
+   *   or the tip of the stream using InitialPositionInStream.LATEST.
+   * The default is StorageLevel.MEMORY_AND_DISK_2 which replicates 
in-memory and on-disk to 2 nodes total (primary and secondary)
+   *
+   * @return ReceiverInputDStream[Array[Byte]]
+   */
+  def createStream(
+ssc: StreamingContext,
--- End diff --

Incorrect formatting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15568976
  
--- Diff: 
extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
 ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.streaming.api.java.JavaStreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException
+import org.apache.spark.Logging
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
+import scala.util.Random
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.util.ManualClock
+import org.apache.spark.streaming.util.Clock
+import org.apache.spark.streaming.util.SystemClock
+
+/**
+ * Facade to create the Scala-based or Java-based streams.
+ * Also, contains a reusable utility methods.
+ */
+object KinesisUtils extends Logging {
+  /**
+   * Create an InputDStream that pulls messages from a Kinesis stream.
+   *
+   * @param StreamingContext object
+   * @param app name
+   * @param stream name
+   * @param endpoint
+   * @param checkpoint interval (millis) for Kinesis checkpointing (not 
Spark checkpointing).
+   * See the Kinesis Spark Streaming documentation for more details on the 
different types of checkpoints.
+   * The default is TRIM_HORIZON to avoid potential data loss.  However, 
this presents the risk of processing records more than once.
+   * @param in the absence of Kinesis checkpoint info, this is the 
worker's initial starting position in the stream.
+   * The values are either the beginning of the stream per Kinesis' limit 
of 24 hours (InitialPositionInStream.TRIM_HORIZON)
+   *   or the tip of the stream using InitialPositionInStream.LATEST.
+   * The default is StorageLevel.MEMORY_AND_DISK_2 which replicates 
in-memory and on-disk to 2 nodes total (primary and secondary)
+   *
+   * @return ReceiverInputDStream[Array[Byte]]
+   */
+  def createStream(
+ssc: StreamingContext,
+app: String,
+stream: String,
+endpoint: String,
+checkpointIntervalMillis: Long,
+initialPositionInStream: InitialPositionInStream = 
InitialPositionInStream.TRIM_HORIZON,
+storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2): 
ReceiverInputDStream[Array[Byte]] = {
+
+ssc.receiverStream(new KinesisReceiver(app, stream, endpoint, 
checkpointIntervalMillis, initialPositionInStream, storageLevel))
+  }
+
+  /**
+   * Create a Java-friendly InputDStream that pulls messages from a 
Kinesis stream.
+   *
+   * @param JavaStreamingContext object
+   * @param app name
+   * @param stream name
+   * @param endpoint
+   * @param checkpoint interval (millis) for Kinesis checkpointing (not 
Spark checkpointing).
+   * See the Kinesis Spark Streaming documentation for more details on the 
different types of checkpoints.
+   * The default is TRIM_HORIZON to avoid potential data loss.  However, 
this presents the risk of processing records more than once.
+   * @param in the absence of Kinesis checkpoint info, this is the 
worker's initial starting position in the stream.
+   * The values are either the beginning of the stream per 

[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-30 Thread cfregly
Github user cfregly commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15615786
  
--- Diff: 
extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.net.InetAddress
+import java.util.UUID
+import org.apache.spark.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.receiver.Receiver
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
+import java.nio.ByteBuffer
+import org.apache.spark.streaming.util.SystemClock
+
+/**
+ * Custom AWS Kinesis-specific implementation of Spark Streaming's 
Receiver.
+ * This implementation relies on the Kinesis Client Library (KCL) Worker 
as described here:
+ * https://github.com/awslabs/amazon-kinesis-client
+ * This is a custom receiver used with 
StreamingContext.receiverStream(Receiver) as described here:
+ * http://spark.apache.org/docs/latest/streaming-custom-receivers.html
+ * Instances of this class will get shipped to the Spark Streaming Workers 
to run within a Spark Executor.
+ *
+ * @param app name
+ * @param Kinesis stream name
+ * @param endpoint url of Kinesis service
+ * @param checkpoint interval (millis) for Kinesis checkpointing (not 
Spark checkpointing).
+ *   See the Kinesis Spark Streaming documentation for more details on the 
different types of checkpoints.
+ * @param in the absence of Kinesis checkpoint info, this is the worker's 
initial starting position in the stream.
+ *   The values are either the beginning of the stream per Kinesis' limit 
of 24 hours (InitialPositionInStream.TRIM_HORIZON)
+ *  or the tip of the stream using InitialPositionInStream.LATEST.
+ * @param persistence strategy for RDDs and DStreams.
+ */
+private[streaming] class KinesisReceiver(
+  app: String,
+  stream: String,
+  endpoint: String,
+  checkpointIntervalMillis: Long,
+  initialPositionInStream: InitialPositionInStream,
+  storageLevel: StorageLevel)
+  extends Receiver[Array[Byte]](storageLevel) with Logging { receiver =
+
+  /**
+   *  The lazy val's below will get instantiated in the remote Executor 
after the closure is shipped to the Spark Worker. 
+   *  These are all lazy because they're from third-party Amazon libraries 
and are not Serializable.
+   *  If they're not marked lazy, they will cause 
NotSerializableExceptions when they're shipped to the Spark Worker.
+   */
+
+  /**
+   *  workerId is lazy because we want the address of the actual Worker 
where the code runs - not the Driver's ip address.
+   *  This makes a difference when running in a cluster.
+   */
+  lazy val workerId = InetAddress.getLocalHost.getHostAddress() + : + 
UUID.randomUUID()
+
+  /**
+   * This impl uses the DefaultAWSCredentialsProviderChain per the 
following url:
+   *
http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
+   *  and searches for credentials in the following order of precedence:
+   * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+   * Java System Properties - aws.accessKeyId and aws.secretKey
+   * Credential profiles file at the default location (~/.aws/credentials) 
shared by all AWS SDKs and the AWS CLI
+   * Instance profile credentials delivered through the Amazon EC2 
metadata service
+   */
+  lazy val 

[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-30 Thread cfregly
Github user cfregly commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15616231
  
--- Diff: 
extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
 ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.streaming.api.java.JavaStreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException
+import org.apache.spark.Logging
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
+import scala.util.Random
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.util.ManualClock
+import org.apache.spark.streaming.util.Clock
+import org.apache.spark.streaming.util.SystemClock
+
+/**
+ * Facade to create the Scala-based or Java-based streams.
+ * Also, contains a reusable utility methods.
+ */
+object KinesisUtils extends Logging {
+  /**
+   * Create an InputDStream that pulls messages from a Kinesis stream.
+   *
+   * @param StreamingContext object
+   * @param app name
+   * @param stream name
+   * @param endpoint
+   * @param checkpoint interval (millis) for Kinesis checkpointing (not 
Spark checkpointing).
+   * See the Kinesis Spark Streaming documentation for more details on the 
different types of checkpoints.
+   * The default is TRIM_HORIZON to avoid potential data loss.  However, 
this presents the risk of processing records more than once.
+   * @param in the absence of Kinesis checkpoint info, this is the 
worker's initial starting position in the stream.
+   * The values are either the beginning of the stream per Kinesis' limit 
of 24 hours (InitialPositionInStream.TRIM_HORIZON)
+   *   or the tip of the stream using InitialPositionInStream.LATEST.
+   * The default is StorageLevel.MEMORY_AND_DISK_2 which replicates 
in-memory and on-disk to 2 nodes total (primary and secondary)
+   *
+   * @return ReceiverInputDStream[Array[Byte]]
+   */
+  def createStream(
+ssc: StreamingContext,
+app: String,
+stream: String,
+endpoint: String,
+checkpointIntervalMillis: Long,
+initialPositionInStream: InitialPositionInStream = 
InitialPositionInStream.TRIM_HORIZON,
+storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2): 
ReceiverInputDStream[Array[Byte]] = {
+
+ssc.receiverStream(new KinesisReceiver(app, stream, endpoint, 
checkpointIntervalMillis, initialPositionInStream, storageLevel))
+  }
+
+  /**
+   * Create a Java-friendly InputDStream that pulls messages from a 
Kinesis stream.
+   *
+   * @param JavaStreamingContext object
+   * @param app name
+   * @param stream name
+   * @param endpoint
+   * @param checkpoint interval (millis) for Kinesis checkpointing (not 
Spark checkpointing).
+   * See the Kinesis Spark Streaming documentation for more details on the 
different types of checkpoints.
+   * The default is TRIM_HORIZON to avoid potential data loss.  However, 
this presents the risk of processing records more than once.
+   * @param in the absence of Kinesis checkpoint info, this is the 
worker's initial starting position in the stream.
+   * The values are either the beginning of the stream 

[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-30 Thread cfregly
Github user cfregly commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15616323
  
--- Diff: 
extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.net.InetAddress
+import java.util.UUID
+import org.apache.spark.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.receiver.Receiver
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
+import java.nio.ByteBuffer
+import org.apache.spark.streaming.util.SystemClock
+
+/**
+ * Custom AWS Kinesis-specific implementation of Spark Streaming's 
Receiver.
+ * This implementation relies on the Kinesis Client Library (KCL) Worker 
as described here:
+ * https://github.com/awslabs/amazon-kinesis-client
+ * This is a custom receiver used with 
StreamingContext.receiverStream(Receiver) as described here:
+ * http://spark.apache.org/docs/latest/streaming-custom-receivers.html
+ * Instances of this class will get shipped to the Spark Streaming Workers 
to run within a Spark Executor.
+ *
+ * @param app name
+ * @param Kinesis stream name
+ * @param endpoint url of Kinesis service
+ * @param checkpoint interval (millis) for Kinesis checkpointing (not 
Spark checkpointing).
+ *   See the Kinesis Spark Streaming documentation for more details on the 
different types of checkpoints.
+ * @param in the absence of Kinesis checkpoint info, this is the worker's 
initial starting position in the stream.
+ *   The values are either the beginning of the stream per Kinesis' limit 
of 24 hours (InitialPositionInStream.TRIM_HORIZON)
+ *  or the tip of the stream using InitialPositionInStream.LATEST.
+ * @param persistence strategy for RDDs and DStreams.
+ */
+private[streaming] class KinesisReceiver(
+  app: String,
+  stream: String,
+  endpoint: String,
+  checkpointIntervalMillis: Long,
+  initialPositionInStream: InitialPositionInStream,
+  storageLevel: StorageLevel)
+  extends Receiver[Array[Byte]](storageLevel) with Logging { receiver =
+
+  /**
+   *  The lazy val's below will get instantiated in the remote Executor 
after the closure is shipped to the Spark Worker. 
+   *  These are all lazy because they're from third-party Amazon libraries 
and are not Serializable.
+   *  If they're not marked lazy, they will cause 
NotSerializableExceptions when they're shipped to the Spark Worker.
+   */
+
+  /**
+   *  workerId is lazy because we want the address of the actual Worker 
where the code runs - not the Driver's ip address.
+   *  This makes a difference when running in a cluster.
+   */
+  lazy val workerId = InetAddress.getLocalHost.getHostAddress() + : + 
UUID.randomUUID()
+
+  /**
+   * This impl uses the DefaultAWSCredentialsProviderChain per the 
following url:
+   *
http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
+   *  and searches for credentials in the following order of precedence:
+   * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+   * Java System Properties - aws.accessKeyId and aws.secretKey
+   * Credential profiles file at the default location (~/.aws/credentials) 
shared by all AWS SDKs and the AWS CLI
+   * Instance profile credentials delivered through the Amazon EC2 
metadata service
+   */
+  lazy val 

[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-30 Thread cfregly
Github user cfregly commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15617751
  
--- Diff: docs/streaming-programming-guide.md ---
@@ -467,6 +468,62 @@ For more details on these additional sources, see the 
corresponding [API documen
 Furthermore, you can also implement your own custom receiver for your 
sources. See the
 [Custom Receiver Guide](streaming-custom-receivers.html).
 
+### Kinesis
--- End diff --

moved


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-30 Thread cfregly
Github user cfregly commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15617745
  
--- Diff: 
extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.net.InetAddress
+import java.util.UUID
+import org.apache.spark.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.receiver.Receiver
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
+import java.nio.ByteBuffer
+import org.apache.spark.streaming.util.SystemClock
+
+/**
+ * Custom AWS Kinesis-specific implementation of Spark Streaming's 
Receiver.
+ * This implementation relies on the Kinesis Client Library (KCL) Worker 
as described here:
+ * https://github.com/awslabs/amazon-kinesis-client
+ * This is a custom receiver used with 
StreamingContext.receiverStream(Receiver) as described here:
+ * http://spark.apache.org/docs/latest/streaming-custom-receivers.html
+ * Instances of this class will get shipped to the Spark Streaming Workers 
to run within a Spark Executor.
+ *
+ * @param app name
--- End diff --

updated


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-30 Thread cfregly
Github user cfregly commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15617770
  
--- Diff: 
extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
 ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.streaming.api.java.JavaStreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException
+import org.apache.spark.Logging
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
+import scala.util.Random
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.util.ManualClock
+import org.apache.spark.streaming.util.Clock
+import org.apache.spark.streaming.util.SystemClock
+
+/**
+ * Facade to create the Scala-based or Java-based streams.
+ * Also, contains a reusable utility methods.
+ */
+object KinesisUtils extends Logging {
+  /**
+   * Create an InputDStream that pulls messages from a Kinesis stream.
+   *
+   * @param StreamingContext object
+   * @param app name
+   * @param stream name
+   * @param endpoint
+   * @param checkpoint interval (millis) for Kinesis checkpointing (not 
Spark checkpointing).
+   * See the Kinesis Spark Streaming documentation for more details on the 
different types of checkpoints.
+   * The default is TRIM_HORIZON to avoid potential data loss.  However, 
this presents the risk of processing records more than once.
+   * @param in the absence of Kinesis checkpoint info, this is the 
worker's initial starting position in the stream.
+   * The values are either the beginning of the stream per Kinesis' limit 
of 24 hours (InitialPositionInStream.TRIM_HORIZON)
+   *   or the tip of the stream using InitialPositionInStream.LATEST.
+   * The default is StorageLevel.MEMORY_AND_DISK_2 which replicates 
in-memory and on-disk to 2 nodes total (primary and secondary)
+   *
+   * @return ReceiverInputDStream[Array[Byte]]
+   */
+  def createStream(
+ssc: StreamingContext,
+app: String,
+stream: String,
+endpoint: String,
+checkpointIntervalMillis: Long,
+initialPositionInStream: InitialPositionInStream = 
InitialPositionInStream.TRIM_HORIZON,
+storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2): 
ReceiverInputDStream[Array[Byte]] = {
+
+ssc.receiverStream(new KinesisReceiver(app, stream, endpoint, 
checkpointIntervalMillis, initialPositionInStream, storageLevel))
+  }
+
+  /**
+   * Create a Java-friendly InputDStream that pulls messages from a 
Kinesis stream.
+   *
+   * @param JavaStreamingContext object
+   * @param app name
+   * @param stream name
+   * @param endpoint
+   * @param checkpoint interval (millis) for Kinesis checkpointing (not 
Spark checkpointing).
+   * See the Kinesis Spark Streaming documentation for more details on the 
different types of checkpoints.
+   * The default is TRIM_HORIZON to avoid potential data loss.  However, 
this presents the risk of processing records more than once.
+   * @param in the absence of Kinesis checkpoint info, this is the 
worker's initial starting position in the stream.
--- End diff --

fixed


---
If your project is 

[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-30 Thread cfregly
Github user cfregly commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15617760
  
--- Diff: 
extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
 ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.streaming.api.java.JavaStreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException
+import org.apache.spark.Logging
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
+import scala.util.Random
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.util.ManualClock
+import org.apache.spark.streaming.util.Clock
+import org.apache.spark.streaming.util.SystemClock
+
+/**
+ * Facade to create the Scala-based or Java-based streams.
+ * Also, contains a reusable utility methods.
+ */
+object KinesisUtils extends Logging {
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-30 Thread cfregly
Github user cfregly commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15617773
  
--- Diff: 
extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
 ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.streaming.api.java.JavaStreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException
+import org.apache.spark.Logging
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
+import scala.util.Random
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.util.ManualClock
+import org.apache.spark.streaming.util.Clock
+import org.apache.spark.streaming.util.SystemClock
+
+/**
+ * Facade to create the Scala-based or Java-based streams.
+ * Also, contains a reusable utility methods.
+ */
+object KinesisUtils extends Logging {
+  /**
+   * Create an InputDStream that pulls messages from a Kinesis stream.
+   *
+   * @param StreamingContext object
+   * @param app name
+   * @param stream name
+   * @param endpoint
+   * @param checkpoint interval (millis) for Kinesis checkpointing (not 
Spark checkpointing).
+   * See the Kinesis Spark Streaming documentation for more details on the 
different types of checkpoints.
+   * The default is TRIM_HORIZON to avoid potential data loss.  However, 
this presents the risk of processing records more than once.
--- End diff --

fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-30 Thread cfregly
Github user cfregly commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15618463
  
--- Diff: 
extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
 ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.streaming.api.java.JavaStreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException
+import org.apache.spark.Logging
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
+import scala.util.Random
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.util.ManualClock
+import org.apache.spark.streaming.util.Clock
+import org.apache.spark.streaming.util.SystemClock
+
+/**
+ * Facade to create the Scala-based or Java-based streams.
+ * Also, contains a reusable utility methods.
+ */
+object KinesisUtils extends Logging {
+  /**
+   * Create an InputDStream that pulls messages from a Kinesis stream.
+   *
+   * @param StreamingContext object
+   * @param app name
+   * @param stream name
+   * @param endpoint
+   * @param checkpoint interval (millis) for Kinesis checkpointing (not 
Spark checkpointing).
+   * See the Kinesis Spark Streaming documentation for more details on the 
different types of checkpoints.
+   * The default is TRIM_HORIZON to avoid potential data loss.  However, 
this presents the risk of processing records more than once.
+   * @param in the absence of Kinesis checkpoint info, this is the 
worker's initial starting position in the stream.
+   * The values are either the beginning of the stream per Kinesis' limit 
of 24 hours (InitialPositionInStream.TRIM_HORIZON)
+   *   or the tip of the stream using InitialPositionInStream.LATEST.
+   * The default is StorageLevel.MEMORY_AND_DISK_2 which replicates 
in-memory and on-disk to 2 nodes total (primary and secondary)
+   *
+   * @return ReceiverInputDStream[Array[Byte]]
+   */
+  def createStream(
+ssc: StreamingContext,
+app: String,
+stream: String,
+endpoint: String,
+checkpointIntervalMillis: Long,
+initialPositionInStream: InitialPositionInStream = 
InitialPositionInStream.TRIM_HORIZON,
+storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2): 
ReceiverInputDStream[Array[Byte]] = {
+
+ssc.receiverStream(new KinesisReceiver(app, stream, endpoint, 
checkpointIntervalMillis, initialPositionInStream, storageLevel))
+  }
+
+  /**
+   * Create a Java-friendly InputDStream that pulls messages from a 
Kinesis stream.
+   *
+   * @param JavaStreamingContext object
+   * @param app name
+   * @param stream name
+   * @param endpoint
+   * @param checkpoint interval (millis) for Kinesis checkpointing (not 
Spark checkpointing).
+   * See the Kinesis Spark Streaming documentation for more details on the 
different types of checkpoints.
+   * The default is TRIM_HORIZON to avoid potential data loss.  However, 
this presents the risk of processing records more than once.
+   * @param in the absence of Kinesis checkpoint info, this is the 
worker's initial starting position in the stream.
+   * The values are either the beginning of the stream 

[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-30 Thread cfregly
Github user cfregly commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15618507
  
--- Diff: project/SparkBuild.scala ---
@@ -60,9 +60,13 @@ object SparkBuild extends PomBuild {
 var isAlphaYarn = false
 var profiles: mutable.Seq[String] = mutable.Seq.empty
 if (Properties.envOrNone(SPARK_GANGLIA_LGPL).isDefined) {
-  println(NOTE: SPARK_GANGLIA_LGPL is deprecated, please use 
-Pganglia-lgpl flag.)
+  println(NOTE: SPARK_GANGLIA_LGPL is deprecated, please use 
-Pspark-ganglia-lgpl flag.)
   profiles ++= Seq(spark-ganglia-lgpl)
 }
+if (Properties.envOrNone(SPARK_KINESIS_ASL).isDefined) {
--- End diff --

removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-30 Thread cfregly
Github user cfregly commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15618503
  
--- Diff: extras/spark-kinesis-asl/src/test/resources/log4j.properties ---
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file streaming/target/unit-tests.log
+log4j.rootCategory=WARN, console
+
--- End diff --

matched


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-30 Thread cfregly
Github user cfregly commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15618546
  
--- Diff: 
extras/spark-kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCount.java
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.streaming;
+
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.Milliseconds;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.dstream.DStream;
+import org.apache.spark.streaming.kinesis.KinesisRecordSerializer;
+import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer;
+import org.apache.spark.streaming.kinesis.KinesisUtils;
+
+import scala.Tuple2;
+
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+
+/**
+ * Java-friendly Kinesis Spark Streaming WordCount example
+ *
+ * See 
http://spark.apache.org/docs/latest/streaming-programming-guide.html for more 
details on the Kinesis Spark Streaming integration.
+ *
+ * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per 
shard of the given stream.
+ * It then starts pulling from the tip of the given stream-name and 
endpoint-url at the given batch-interval.
+ * Because we're pulling from the tip (InitialPositionInStream.LATEST), 
only new stream data will be picked up after the KinesisReceiver starts.
+ * This could lead to missed records if data is added to the stream while 
no KinesisReceivers are running.
+ * In production, you'll want to switch to 
InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis 
limit) of previous stream data 
+ *  depending on the checkpoint frequency.
+ * InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing 
of records depending on the checkpoint frequency.
+ * Record processing should be idempotent when possible.
+ *
+ * This code uses the DefaultAWSCredentialsProviderChain and searches for 
credentials in the following order of precedence: 
+ * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ * Java System Properties - aws.accessKeyId and aws.secretKey
+ * Credential profiles file - default location 
(~/.aws/credentials) shared by all AWS SDKs
+ * Instance profile credentials - delivered through the Amazon EC2 
metadata service
+ *
+ * Usage: JavaKinesisWordCount stream-name endpoint-url 
batch-interval
+ * stream-name is the name of the Kinesis stream (ie. 
mySparkStream)
+ * endpoint-url is the endpoint of the Kinesis service (ie. 
https://kinesis.us-east-1.amazonaws.com)
+ * batch-interval is the batch interval in milliseconds (ie. 
1000ms)
+ *
+ * Example:
+ *  $ export AWS_ACCESS_KEY_ID=your-access-key
+ *  $ export AWS_SECRET_KEY=your-secret-key
+ *$ bin/run-kinesis-example  \
+ *org.apache.spark.examples.streaming.JavaKinesisWordCount 
mySparkStream https://kinesis.us-east-1.amazonaws.com 1000
+ *
+ * There is a companion helper 

[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-30 Thread cfregly
Github user cfregly commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15618561
  
--- Diff: 
extras/spark-kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala
 ---
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import java.nio.ByteBuffer
+import org.apache.log4j.Level
+import org.apache.log4j.Logger
+import org.apache.spark.Logging
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkContext._
+import org.apache.spark.streaming.Milliseconds
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
+import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer
+import org.apache.spark.streaming.kinesis.KinesisUtils
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
+import com.amazonaws.services.kinesis.AmazonKinesisClient
+import com.amazonaws.services.kinesis.model.PutRecordRequest
+import scala.util.Random
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.dstream.DStream
+
+/**
+ * Kinesis Spark Streaming WordCount example.
+ *
+ * See 
http://spark.apache.org/docs/latest/streaming-programming-guide.html for more 
details on the Kinesis Spark Streaming integration.
+ *
+ * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per 
shard of the given stream.
+ * It then starts pulling from the tip of the given stream-name and 
endpoint-url at the given batch-interval.
+ * Because we're pulling from the tip (InitialPositionInStream.LATEST), 
only new stream data will be picked up after the KinesisReceiver starts.
+ * This could lead to missed records if data is added to the stream while 
no KinesisReceivers are running.
+ * In production, you'll want to switch to 
InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis 
limit) of previous stream data 
+ *  depending on the checkpoint frequency.
+ *
+ * InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing 
of records depending on the checkpoint frequency.
+ * Record processing should be idempotent when possible.
+ *
+ * This code uses the DefaultAWSCredentialsProviderChain and searches for 
credentials in the following order of precedence:
+ * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ * Java System Properties - aws.accessKeyId and aws.secretKey
+ * Credential profiles file - default location (~/.aws/credentials) shared 
by all AWS SDKs
+ * Instance profile credentials - delivered through the Amazon EC2 
metadata service
+ *
+ * Usage: KinesisWordCount stream-name endpoint-url batch-interval
+ *   stream-name is the name of the Kinesis stream (ie. mySparkStream)
+ *   endpoint-url is the endpoint of the Kinesis service (ie. 
https://kinesis.us-east-1.amazonaws.com)
+ *   batch-interval is the batch interval in millis (ie. 1000ms)
+ *
+ * Example:
+ *  $ export AWS_ACCESS_KEY_ID=your-access-key
+ *  $ export AWS_SECRET_KEY=your-secret-key
+ *$ bin/run-kinesis-example \
+ *org.apache.spark.examples.streaming.KinesisWordCount 
mySparkStream https://kinesis.us-east-1.amazonaws.com 100
+ *
+ * There is a companion helper class below called KinesisWordCountProducer 
which puts dummy data onto the Kinesis stream.
+ * Usage instructions for KinesisWordCountProducer are provided in that 
class definition.
+ */
+object KinesisWordCount extends Logging {
+  val WordSeparator =  
+
+  def main(args: Array[String]) {
+/**
+ * Check that all required args were passed in.
+ */
+if (args.length  3) {
+  System.err.println(Usage: KinesisWordCount stream-name 

[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-30 Thread cfregly
Github user cfregly commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15620896
  
--- Diff: 
extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
 ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.streaming.api.java.JavaStreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException
+import org.apache.spark.Logging
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
+import scala.util.Random
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.util.ManualClock
+import org.apache.spark.streaming.util.Clock
+import org.apache.spark.streaming.util.SystemClock
+
+/**
+ * Facade to create the Scala-based or Java-based streams.
+ * Also, contains a reusable utility methods.
+ */
+object KinesisUtils extends Logging {
+  /**
+   * Create an InputDStream that pulls messages from a Kinesis stream.
+   *
+   * @param StreamingContext object
+   * @param app name
+   * @param stream name
+   * @param endpoint
+   * @param checkpoint interval (millis) for Kinesis checkpointing (not 
Spark checkpointing).
+   * See the Kinesis Spark Streaming documentation for more details on the 
different types of checkpoints.
+   * The default is TRIM_HORIZON to avoid potential data loss.  However, 
this presents the risk of processing records more than once.
+   * @param in the absence of Kinesis checkpoint info, this is the 
worker's initial starting position in the stream.
+   * The values are either the beginning of the stream per Kinesis' limit 
of 24 hours (InitialPositionInStream.TRIM_HORIZON)
+   *   or the tip of the stream using InitialPositionInStream.LATEST.
+   * The default is StorageLevel.MEMORY_AND_DISK_2 which replicates 
in-memory and on-disk to 2 nodes total (primary and secondary)
+   *
+   * @return ReceiverInputDStream[Array[Byte]]
+   */
+  def createStream(
+ssc: StreamingContext,
+app: String,
+stream: String,
+endpoint: String,
+checkpointIntervalMillis: Long,
+initialPositionInStream: InitialPositionInStream = 
InitialPositionInStream.TRIM_HORIZON,
+storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2): 
ReceiverInputDStream[Array[Byte]] = {
+
+ssc.receiverStream(new KinesisReceiver(app, stream, endpoint, 
checkpointIntervalMillis, initialPositionInStream, storageLevel))
+  }
+
+  /**
+   * Create a Java-friendly InputDStream that pulls messages from a 
Kinesis stream.
+   *
+   * @param JavaStreamingContext object
+   * @param app name
+   * @param stream name
+   * @param endpoint
+   * @param checkpoint interval (millis) for Kinesis checkpointing (not 
Spark checkpointing).
+   * See the Kinesis Spark Streaming documentation for more details on the 
different types of checkpoints.
+   * The default is TRIM_HORIZON to avoid potential data loss.  However, 
this presents the risk of processing records more than once.
+   * @param in the absence of Kinesis checkpoint info, this is the 
worker's initial starting position in the stream.
+   * The values are either the beginning of the stream 

[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1434#issuecomment-50701651
  
QA tests have started for PR 1434. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17530/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15623324
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.streaming.api.java.JavaStreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+
+/**
+ * Facade to create the Scala-based or Java-based streams.
+ * Also, contains a reusable utility methods.
+ * :: Experimental ::
+ */
+@Experimental
+object KinesisUtils extends Logging {
+  /**
+   * Create an InputDStream that pulls messages from a Kinesis stream.
+   *
+   * @param StreamingContext object
+   * @param appName Kinesis Application Name.  Kinesis Apps are mapped to 
Kinesis Streams 
--- End diff --

Should `@param ssc StreamingContext object`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15623331
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.streaming.api.java.JavaStreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+
+/**
+ * Facade to create the Scala-based or Java-based streams.
+ * Also, contains a reusable utility methods.
+ * :: Experimental ::
+ */
+@Experimental
+object KinesisUtils extends Logging {
+  /**
+   * Create an InputDStream that pulls messages from a Kinesis stream.
+   *
+   * @param StreamingContext object
--- End diff --

Should `@param ssc StreamingContext object`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15623426
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.streaming.api.java.JavaStreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+
+/**
+ * Facade to create the Scala-based or Java-based streams.
+ * Also, contains a reusable utility methods.
+ * :: Experimental ::
+ */
+@Experimental
+object KinesisUtils extends Logging {
+  /**
+   * Create an InputDStream that pulls messages from a Kinesis stream.
+   *
+   * @param StreamingContext object
+   * @param appName Kinesis Application Name.  Kinesis Apps are mapped to 
Kinesis Streams 
+   *   by the Kinesis Client Library.  If you change the App name or 
Stream name, 
+   *   the KCL will throw errors.
+   * @param stream Kinesis Stream Name
+   * @param endpoint url of Kinesis service
+   * @param checkpoint interval (millis) for Kinesis checkpointing (not 
Spark checkpointing).
+   * See the Kinesis Spark Streaming documentation for more details on the 
different types 
+   *   of checkpoints.
+   * @param initialPositionInStream in the absence of Kinesis checkpoint 
info, this is the 
+   *   worker's initial starting position in the stream.
+   * The values are either the beginning of the stream per Kinesis' limit 
of 24 hours 
+   *   (InitialPositionInStream.TRIM_HORIZON) or the tip of the stream 
+   *   (InitialPositionInStream.LATEST).
+   * The default is TRIM_HORIZON to avoid potential data loss.  However, 
this presents the risk 
--- End diff --

This is wrong now as there is not default. Why was the default values 
removed? If there was issues with Java compatibility then its good to create 
alternate versions of createStream, with less number of parameters (so 
basically defining default parameters explicitly). 




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15623520
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.streaming.api.java.JavaStreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+
+/**
+ * Facade to create the Scala-based or Java-based streams.
+ * Also, contains a reusable utility methods.
--- End diff --

Better to make it more explicit. `Utility / Helper class to create input 
stream with Amazon Kinesis` 
No need to add lower level details like  reusable methods  and stuff


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-30 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1434#issuecomment-50704814
  
Chris, thanks for this very thorough PR. One thought I have is that maybe 
we should put the example in the `examples` package, and make that depend on 
Kinesis. The Twitter example is in there. Now this would make the examples 
package depend on ASL, but I don't think anyone links to that directly, or we 
can even decide not to publish it to Maven. And to differentiate this file from 
the others, you could add ASL in its file name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15623527
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.streaming.api.java.JavaStreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+
+/**
+ * Facade to create the Scala-based or Java-based streams.
+ * Also, contains a reusable utility methods.
+ * :: Experimental ::
+ */
+@Experimental
+object KinesisUtils extends Logging {
+  /**
+   * Create an InputDStream that pulls messages from a Kinesis stream.
+   *
+   * @param StreamingContext object
+   * @param appName Kinesis Application Name.  Kinesis Apps are mapped to 
Kinesis Streams 
+   *   by the Kinesis Client Library.  If you change the App name or 
Stream name, 
+   *   the KCL will throw errors.
+   * @param stream Kinesis Stream Name
+   * @param endpoint url of Kinesis service
+   * @param checkpoint interval (millis) for Kinesis checkpointing (not 
Spark checkpointing).
+   * See the Kinesis Spark Streaming documentation for more details on the 
different types 
+   *   of checkpoints.
+   * @param initialPositionInStream in the absence of Kinesis checkpoint 
info, this is the 
+   *   worker's initial starting position in the stream.
+   * The values are either the beginning of the stream per Kinesis' limit 
of 24 hours 
+   *   (InitialPositionInStream.TRIM_HORIZON) or the tip of the stream 
+   *   (InitialPositionInStream.LATEST).
+   * The default is TRIM_HORIZON to avoid potential data loss.  However, 
this presents the risk 
+   *   of processing records more than once.
+   * @param storageLevel The default is StorageLevel.MEMORY_AND_DISK_2 
which replicates in-memory 
+   *   and on-disk to 2 nodes total (primary and secondary)
+   *
+   * @return ReceiverInputDStream[Array[Byte]]
+   */
+  def createStream(
+  ssc: StreamingContext,
+  appName: String,
+  stream: String,
+  endpoint: String,
+  checkpointIntervalMillis: Long,
+  initialPositionInStream: InitialPositionInStream,
+  storageLevel: StorageLevel): ReceiverInputDStream[Array[Byte]] = {
+ssc.receiverStream(new KinesisReceiver(appName, stream, endpoint, 
checkpointIntervalMillis, 
+initialPositionInStream, storageLevel))
+  }
+
+  /**
+   * Create a Java-friendly InputDStream that pulls messages from a 
Kinesis stream.
+   *
+   * @param JavaStreamingContext object
+   * @param appName Kinesis Application Name.  Kinesis Apps are mapped to 
Kinesis Streams 
+   *   by the Kinesis Client Library.  If you change the App name or 
Stream name, 
+   *   the KCL will throw errors.
+   * @param stream Kinesis Stream Name
+   * @param endpoint url of Kinesis service
+   * @param checkpoint interval (millis) for Kinesis checkpointing (not 
Spark checkpointing).
+   * See the Kinesis Spark Streaming documentation for more details on the 
different types 
+   *   of checkpoints.
+   * @param initialPositionInStream in the absence of Kinesis checkpoint 
info, this is the 
+   *   worker's initial starting position in the stream.
+   * The values are either the beginning of the stream per Kinesis' limit 
of 24 hours 
+   *   (InitialPositionInStream.TRIM_HORIZON) or the tip of the stream 
+   *   (InitialPositionInStream.LATEST).
+   * The default is TRIM_HORIZON to 

[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15623534
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.streaming.api.java.JavaStreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+
+/**
+ * Facade to create the Scala-based or Java-based streams.
+ * Also, contains a reusable utility methods.
+ * :: Experimental ::
+ */
+@Experimental
+object KinesisUtils extends Logging {
+  /**
+   * Create an InputDStream that pulls messages from a Kinesis stream.
+   *
+   * @param StreamingContext object
+   * @param appName Kinesis Application Name.  Kinesis Apps are mapped to 
Kinesis Streams 
+   *   by the Kinesis Client Library.  If you change the App name or 
Stream name, 
+   *   the KCL will throw errors.
+   * @param stream Kinesis Stream Name
+   * @param endpoint url of Kinesis service
+   * @param checkpoint interval (millis) for Kinesis checkpointing (not 
Spark checkpointing).
+   * See the Kinesis Spark Streaming documentation for more details on the 
different types 
+   *   of checkpoints.
+   * @param initialPositionInStream in the absence of Kinesis checkpoint 
info, this is the 
+   *   worker's initial starting position in the stream.
+   * The values are either the beginning of the stream per Kinesis' limit 
of 24 hours 
+   *   (InitialPositionInStream.TRIM_HORIZON) or the tip of the stream 
+   *   (InitialPositionInStream.LATEST).
+   * The default is TRIM_HORIZON to avoid potential data loss.  However, 
this presents the risk 
+   *   of processing records more than once.
+   * @param storageLevel The default is StorageLevel.MEMORY_AND_DISK_2 
which replicates in-memory 
+   *   and on-disk to 2 nodes total (primary and secondary)
+   *
+   * @return ReceiverInputDStream[Array[Byte]]
+   */
+  def createStream(
+  ssc: StreamingContext,
+  appName: String,
+  stream: String,
+  endpoint: String,
+  checkpointIntervalMillis: Long,
+  initialPositionInStream: InitialPositionInStream,
+  storageLevel: StorageLevel): ReceiverInputDStream[Array[Byte]] = {
+ssc.receiverStream(new KinesisReceiver(appName, stream, endpoint, 
checkpointIntervalMillis, 
+initialPositionInStream, storageLevel))
+  }
+
+  /**
+   * Create a Java-friendly InputDStream that pulls messages from a 
Kinesis stream.
+   *
+   * @param JavaStreamingContext object
--- End diff --

Same comment as with ssc, should be `@param ssc StreamingContext object`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15623553
  
--- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.streaming.api.java.JavaStreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+
+/**
+ * Facade to create the Scala-based or Java-based streams.
+ * Also, contains a reusable utility methods.
+ * :: Experimental ::
+ */
+@Experimental
+object KinesisUtils extends Logging {
+  /**
+   * Create an InputDStream that pulls messages from a Kinesis stream.
+   *
+   * @param StreamingContext object
+   * @param appName Kinesis Application Name.  Kinesis Apps are mapped to 
Kinesis Streams 
+   *   by the Kinesis Client Library.  If you change the App name or 
Stream name, 
+   *   the KCL will throw errors.
+   * @param stream Kinesis Stream Name
+   * @param endpoint url of Kinesis service
+   * @param checkpoint interval (millis) for Kinesis checkpointing (not 
Spark checkpointing).
--- End diff --

Please set the field names correctly. Otherwise the scala docs would not 
work right.

`@param checkpointIntervalMillis  Kinesis checkpoint interval in 
milliseconds`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-30 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15623547
  
--- Diff: project/SparkBuild.scala ---
@@ -62,7 +62,7 @@ object SparkBuild extends PomBuild {
 var isAlphaYarn = false
 var profiles: mutable.Seq[String] = mutable.Seq.empty
 if (Properties.envOrNone(SPARK_GANGLIA_LGPL).isDefined) {
-  println(NOTE: SPARK_GANGLIA_LGPL is deprecated, please use 
-Pganglia-lgpl flag.)
+  println(NOTE: SPARK_GANGLIA_LGPL is deprecated, please use 
-Pspark-ganglia-lgpl flag.)
--- End diff --

As per Patrick's point, we probably don't need to change the name of this 
profile, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-29 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1434#issuecomment-50443974
  
@pdeyhim can you take a look over this too when you have a chance?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-29 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/1434#issuecomment-50511469
  
Jenkins, this is ok to test. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-29 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/1434#issuecomment-50511485
  
Jenkins, test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1434#issuecomment-50512322
  
QA tests have started for PR 1434. This patch DID NOT merge cleanly! 
brView progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17365/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1434#issuecomment-50525716
  
QA results for PR 1434:br- This patch PASSES unit tests.brbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17365/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-29 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/1434#issuecomment-50530784
  
Hey @cfregly, 

My apologies for being so late to review this. Seems like the PR requires 
merging with the master, probably because of changes to pom.xml. Would you get 
a chance to merge with the master once again? 




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-29 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15549163
  
--- Diff: extras/spark-kinesis-asl/pom.xml ---
@@ -0,0 +1,98 @@
+?xml version=1.0 encoding=UTF-8?
--- End diff --

Can you rename is the directory `spark-kinesis-asl` to just `kinesis`. Its 
already in spark code base, so does not quite need another `spark` in the 
directory name. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-29 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15549239
  
--- Diff: bin/run-kinesis-example ---
@@ -0,0 +1,60 @@
+#!/usr/bin/env bash
+
--- End diff --

@pdeyhim explained me that this script is necessary as the jar for kinesis 
is not included in the spark jar. That makes sense. 
But its best not to put such module-specific scripts in the main spark/bin 
directory. Its best to put this in `extras/kinesis/bin/` .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-29 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15549518
  
--- Diff: extras/spark-kinesis-asl/pom.xml ---
@@ -0,0 +1,98 @@
+?xml version=1.0 encoding=UTF-8?
--- End diff --

Aah, I understand, you must have followed the existing `spark-ganglia-gpl` 
directory name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-29 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15552530
  
--- Diff: 
extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.net.InetAddress
+import java.util.UUID
+import org.apache.spark.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.receiver.Receiver
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
+import java.nio.ByteBuffer
+import org.apache.spark.streaming.util.SystemClock
+
+/**
+ * Custom AWS Kinesis-specific implementation of Spark Streaming's 
Receiver.
+ * This implementation relies on the Kinesis Client Library (KCL) Worker 
as described here:
+ * https://github.com/awslabs/amazon-kinesis-client
+ * This is a custom receiver used with 
StreamingContext.receiverStream(Receiver) as described here:
+ * http://spark.apache.org/docs/latest/streaming-custom-receivers.html
+ * Instances of this class will get shipped to the Spark Streaming Workers 
to run within a Spark Executor.
+ *
+ * @param app name
--- End diff --

What is this app name? Please add to comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-29 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15552628
  
--- Diff: 
extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.net.InetAddress
+import java.util.UUID
+import org.apache.spark.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.receiver.Receiver
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
+import java.nio.ByteBuffer
+import org.apache.spark.streaming.util.SystemClock
+
+/**
+ * Custom AWS Kinesis-specific implementation of Spark Streaming's 
Receiver.
+ * This implementation relies on the Kinesis Client Library (KCL) Worker 
as described here:
+ * https://github.com/awslabs/amazon-kinesis-client
+ * This is a custom receiver used with 
StreamingContext.receiverStream(Receiver) as described here:
+ * http://spark.apache.org/docs/latest/streaming-custom-receivers.html
+ * Instances of this class will get shipped to the Spark Streaming Workers 
to run within a Spark Executor.
+ *
+ * @param app name
+ * @param Kinesis stream name
+ * @param endpoint url of Kinesis service
+ * @param checkpoint interval (millis) for Kinesis checkpointing (not 
Spark checkpointing).
+ *   See the Kinesis Spark Streaming documentation for more details on the 
different types of checkpoints.
+ * @param in the absence of Kinesis checkpoint info, this is the worker's 
initial starting position in the stream.
+ *   The values are either the beginning of the stream per Kinesis' limit 
of 24 hours (InitialPositionInStream.TRIM_HORIZON)
+ *  or the tip of the stream using InitialPositionInStream.LATEST.
+ * @param persistence strategy for RDDs and DStreams.
+ */
+private[streaming] class KinesisReceiver(
+  app: String,
--- End diff --

Formatting incorrect. 4 spaces necessary here. Please see the spark style 
guide
https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-29 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15554012
  
--- Diff: extras/spark-kinesis-asl/pom.xml ---
@@ -0,0 +1,98 @@
+?xml version=1.0 encoding=UTF-8?
--- End diff --

Okay, I spoke to @pwendell and there is not need to have `spark-` as part 
of the name directory name and profile name. So can you make it `kinesis-asl` 
at all places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >