[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r18240973 --- Diff: extras/kinesis-asl/src/main/resources/log4j.properties --- @@ -0,0 +1,37 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +log4j.rootCategory=WARN, console --- End diff -- question: why do we need this log4j file here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15727578 --- Diff: bin/run-example --- @@ -29,7 +29,9 @@ if [ -n $1 ]; then else echo Usage: ./bin/run-example example-class [example-args] 12 echo - set MASTER=XX to use a specific master 12 - echo - can use abbreviated example class name (e.g. SparkPi, mllib.LinearRegression) 12 + echo - can use abbreviated example class name relative to com.apache.spark.examples 12 + echo (e.g. SparkPi, mllib.LinearRegression, streaming.KinesisWordCountASL) 12 + echo - to run the Kinesis Spark Streaming example, make sure you build with -Pkinesis-asl 12 --- End diff -- makes sense. i'll make sure this is addressed in streaming-kinesis.md. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15727580 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.Logging +import org.apache.spark.annotation.Experimental +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.Duration +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + + +/** + * Helper class to create Amazon Kinesis Input Stream + * :: Experimental :: + */ +@Experimental +object KinesisUtils extends Logging { + /** --- End diff -- removed and cleaned up imports --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15727587 --- Diff: extras/kinesis-asl/pom.xml --- @@ -0,0 +1,99 @@ +?xml version=1.0 encoding=UTF-8? +!-- +~ Licensed to the Apache Software Foundation (ASF) under one or more +~ contributor license agreements. See the NOTICE file distributed with +~ this work for additional information regarding copyright ownership. +~ The ASF licenses this file to You under the Apache License, Version 2.0 +~ (the License); you may not use this file except in compliance with +~ the License. You may obtain a copy of the License at +~ +~http://www.apache.org/licenses/LICENSE-2.0 +~ +~ Unless required by applicable law or agreed to in writing, software +~ distributed under the License is distributed on an AS IS BASIS, +~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +~ See the License for the specific language governing permissions and +~ limitations under the License. +-- +project xmlns=http://maven.apache.org/POM/4.0.0; xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd; + modelVersion4.0.0/modelVersion + parent +groupIdorg.apache.spark/groupId +artifactIdspark-parent/artifactId +version1.1.0-SNAPSHOT/version +relativePath../../pom.xml/relativePath + /parent + + !-- +Kinesis integration is not included by default due to ASL-licensed code. +Note: This project - if activated - is packaged with the main Spark assembly. --- End diff -- oh yikes. yeah, that's just plain wrong. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15727599 --- Diff: extras/kinesis-asl/pom.xml --- @@ -0,0 +1,99 @@ +?xml version=1.0 encoding=UTF-8? +!-- +~ Licensed to the Apache Software Foundation (ASF) under one or more +~ contributor license agreements. See the NOTICE file distributed with +~ this work for additional information regarding copyright ownership. +~ The ASF licenses this file to You under the Apache License, Version 2.0 +~ (the License); you may not use this file except in compliance with +~ the License. You may obtain a copy of the License at +~ +~http://www.apache.org/licenses/LICENSE-2.0 +~ +~ Unless required by applicable law or agreed to in writing, software +~ distributed under the License is distributed on an AS IS BASIS, +~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +~ See the License for the specific language governing permissions and +~ limitations under the License. +-- +project xmlns=http://maven.apache.org/POM/4.0.0; xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd; + modelVersion4.0.0/modelVersion + parent +groupIdorg.apache.spark/groupId +artifactIdspark-parent/artifactId +version1.1.0-SNAPSHOT/version +relativePath../../pom.xml/relativePath + /parent + + !-- +Kinesis integration is not included by default due to ASL-licensed code. +Note: This project - if activated - is packaged with the main Spark assembly. --- End diff -- Correction, this is not a nit. Its already confusing ... please change this! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15727602 --- Diff: extras/kinesis-asl/pom.xml --- @@ -0,0 +1,99 @@ +?xml version=1.0 encoding=UTF-8? +!-- +~ Licensed to the Apache Software Foundation (ASF) under one or more +~ contributor license agreements. See the NOTICE file distributed with +~ this work for additional information regarding copyright ownership. +~ The ASF licenses this file to You under the Apache License, Version 2.0 +~ (the License); you may not use this file except in compliance with +~ the License. You may obtain a copy of the License at +~ +~http://www.apache.org/licenses/LICENSE-2.0 +~ +~ Unless required by applicable law or agreed to in writing, software +~ distributed under the License is distributed on an AS IS BASIS, +~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +~ See the License for the specific language governing permissions and +~ limitations under the License. +-- +project xmlns=http://maven.apache.org/POM/4.0.0; xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd; + modelVersion4.0.0/modelVersion + parent +groupIdorg.apache.spark/groupId +artifactIdspark-parent/artifactId +version1.1.0-SNAPSHOT/version +relativePath../../pom.xml/relativePath + /parent + + !-- +Kinesis integration is not included by default due to ASL-licensed code. +Note: This project - if activated - is packaged with the main Spark assembly. --- End diff -- Haha, didnt see your response. Good :+1: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15727688 --- Diff: examples/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala --- @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import java.nio.ByteBuffer + +import scala.util.Random + +import org.apache.spark.Logging +import org.apache.spark.SparkConf +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.Milliseconds +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions +import org.apache.spark.streaming.kinesis.KinesisUtils + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.AmazonKinesisClient +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.services.kinesis.model.PutRecordRequest + +/** + * Kinesis Spark Streaming WordCount example. + * + * See http://spark.apache.org/docs/latest/streaming-kinesis.html for more details on + * the Kinesis Spark Streaming integration. + * + * This example spins up 1 Kinesis Worker (Spark Streaming Receiver) per shard + * for the given stream. + * It then starts pulling from the last checkpointed sequence number of the given + * stream-name and endpoint-url. + * + * Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region + * + * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials + * in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs + * Instance profile credentials - delivered through the Amazon EC2 metadata service + * + * Usage: KinesisWordCountASL stream-name endpoint-url + * stream-name is the name of the Kinesis stream (ie. mySparkStream) + * endpoint-url is the endpoint of the Kinesis service + * (ie. https://kinesis.us-east-1.amazonaws.com) + * + * Example: + *$ export AWS_ACCESS_KEY_ID=your-access-key + *$ export AWS_SECRET_KEY=your-secret-key + *$ $SPARK_HOME/bin/run-example \ + *org.apache.spark.examples.streaming.KinesisWordCountASL mySparkStream \ + *https://kinesis.us-east-1.amazonaws.com + * + * There is a companion helper class below called KinesisWordCountProducerASL which puts + * dummy data onto the Kinesis stream. + * Usage instructions for KinesisWordCountProducerASL are provided in that class definition. + */ +object KinesisWordCountASL extends Logging { + def main(args: Array[String]) { +/** + * Check that all required args were passed in. + */ +if (args.length 2) { + System.err.println( + + |Usage: KinesisWordCount stream-name endpoint-url + |stream-name is the name of the Kinesis stream + |endpoint-url is the endpoint of the Kinesis service + | (e.g. https://kinesis.us-east-1.amazonaws.com) +.stripMargin) + System.exit(1) +} + +StreamingExamples.setStreamingLogLevels() + +/** Populate the appropriate variables from the given args */ +val Array(streamName, endpointUrl) = args + +/** Determine the number of shards from the stream */ +val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) +kinesisClient.setEndpoint(endpointUrl) +val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards() + .size() + +/** In this example, we're going to create 1
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15727694 --- Diff: dev/audit-release/sbt_app_kinesis/build.sbt --- @@ -0,0 +1,30 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the License); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an AS IS BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +name := Kinesis Test + +version := 1.0 + +scalaVersion := System.getenv.get(SCALA_VERSION) + +libraryDependencies += org.apache.spark %% spark-core % System.getenv.get(SPARK_VERSION) +libraryDependencies += org.apache.spark %% spark-streaming % System.getenv.get(SPARK_VERSION) --- End diff -- gotcha --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15727693 --- Diff: dev/audit-release/audit_release.py --- @@ -105,7 +105,7 @@ def get_url(url): spark-core, spark-bagel, spark-mllib, spark-streaming, spark-repl, spark-graphx, spark-streaming-flume, spark-streaming-kafka, spark-streaming-mqtt, spark-streaming-twitter, spark-streaming-zeromq, -spark-catalyst, spark-sql, spark-hive +spark-catalyst, spark-sql, spark-hive, kinesis-asl --- End diff -- changed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1434#issuecomment-50956263 QA tests have started for PR 1434. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17760/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1434#issuecomment-50957227 QA results for PR 1434:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brpublic final class JavaKinesisWordCountASL {brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17760/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/1434#issuecomment-50973644 Thanks you very much @cfregly! I have merged this!! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/1434 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user nchammas commented on the pull request: https://github.com/apache/spark/pull/1434#issuecomment-50974810 This was an epic pull request. Nice work, people. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user pdeyhim commented on the pull request: https://github.com/apache/spark/pull/1434#issuecomment-50974841 Awsome work!! @tdas @cfregly --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15685837 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessorUtils.scala --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import scala.util.Random + +import org.apache.spark.Logging + +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException + + +/** + * Helper for the KinesisRecordProcessor. + */ +private[kinesis] object KinesisRecordProcessorUtils extends Logging { --- End diff -- added the companion object --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15685828 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisStringRecordSerializer.scala --- @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.Logging + +/** + * Implementation of KinesisRecordSerializer to convert Array[Byte] to/from String. + */ +class KinesisStringRecordSerializer extends KinesisRecordSerializer[String] with Logging { --- End diff -- i removed the Serializer abstraction and am just using basic byte[] - String conversions --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15685855 --- Diff: examples/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala --- @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import java.nio.ByteBuffer + +import scala.util.Random + +import org.apache.log4j.Level +import org.apache.log4j.Logger +import org.apache.spark.Logging +import org.apache.spark.SparkConf +import org.apache.spark.SparkContext.rddToOrderedRDDFunctions +import org.apache.spark.annotation.Experimental +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.Milliseconds +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer +import org.apache.spark.streaming.kinesis.KinesisUtils + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.AmazonKinesisClient +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.services.kinesis.model.PutRecordRequest + +/** + * Kinesis Spark Streaming WordCount example. + * + * See http://spark.apache.org/docs/latest/streaming-programming-guide.html for more details on + * the Kinesis Spark Streaming integration. + * + * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per shard of the + * given stream. + * It then starts pulling from the last checkpointed sequence number of the given + * stream-name and endpoint-url. + * + * Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region + * + * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials + * in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs + * Instance profile credentials - delivered through the Amazon EC2 metadata service + * + * Usage: KinesisWordCount stream-name endpoint-url + * stream-name is the name of the Kinesis stream (ie. mySparkStream) + * endpoint-url is the endpoint of the Kinesis service + * (ie. https://kinesis.us-east-1.amazonaws.com) + * + * Example: + *$ export AWS_ACCESS_KEY_ID=your-access-key + *$ export AWS_SECRET_KEY=your-secret-key + *$ $SPARK_HOME/bin/run-example \ + *org.apache.spark.examples.streaming.KinesisWordCount mySparkStream \ + *https://kinesis.us-east-1.amazonaws.com + * + * There is a companion helper class below called KinesisWordCountProducer which puts + * dummy data onto the Kinesis stream. + * Usage instructions for KinesisWordCountProducer are provided in that class definition. + */ +object KinesisWordCount extends Logging { + val WordSeparator = + + def main(args: Array[String]) { +/** + * Check that all required args were passed in. + */ +if (args.length 2) { + System.err.println(Usage: KinesisWordCount stream-name endpoint-url) + System.exit(1) +} + +/** + * (This was lifted from the StreamingExamples.scala in order to avoid the dependency + * on the spark-examples artifact.) + * Set reasonable logging levels for streaming if the user has not configured log4j. + */ +val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements() +if (!log4jInitialized) { + /** + * We first log something to initialize Spark's default logging, + * then we
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15685887 --- Diff: examples/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala --- @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import java.nio.ByteBuffer + +import scala.util.Random + +import org.apache.log4j.Level +import org.apache.log4j.Logger +import org.apache.spark.Logging +import org.apache.spark.SparkConf +import org.apache.spark.SparkContext.rddToOrderedRDDFunctions +import org.apache.spark.annotation.Experimental +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.Milliseconds +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer +import org.apache.spark.streaming.kinesis.KinesisUtils + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.AmazonKinesisClient +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.services.kinesis.model.PutRecordRequest + +/** + * Kinesis Spark Streaming WordCount example. + * + * See http://spark.apache.org/docs/latest/streaming-programming-guide.html for more details on + * the Kinesis Spark Streaming integration. + * + * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per shard of the + * given stream. + * It then starts pulling from the last checkpointed sequence number of the given + * stream-name and endpoint-url. + * + * Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region + * + * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials + * in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs + * Instance profile credentials - delivered through the Amazon EC2 metadata service + * + * Usage: KinesisWordCount stream-name endpoint-url + * stream-name is the name of the Kinesis stream (ie. mySparkStream) + * endpoint-url is the endpoint of the Kinesis service + * (ie. https://kinesis.us-east-1.amazonaws.com) + * + * Example: + *$ export AWS_ACCESS_KEY_ID=your-access-key + *$ export AWS_SECRET_KEY=your-secret-key + *$ $SPARK_HOME/bin/run-example \ + *org.apache.spark.examples.streaming.KinesisWordCount mySparkStream \ + *https://kinesis.us-east-1.amazonaws.com + * + * There is a companion helper class below called KinesisWordCountProducer which puts + * dummy data onto the Kinesis stream. + * Usage instructions for KinesisWordCountProducer are provided in that class definition. + */ +object KinesisWordCount extends Logging { + val WordSeparator = + + def main(args: Array[String]) { +/** + * Check that all required args were passed in. + */ +if (args.length 2) { + System.err.println(Usage: KinesisWordCount stream-name endpoint-url) + System.exit(1) +} + +/** + * (This was lifted from the StreamingExamples.scala in order to avoid the dependency + * on the spark-examples artifact.) + * Set reasonable logging levels for streaming if the user has not configured log4j. + */ +val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements() +if (!log4jInitialized) { + /** + * We first log something to initialize Spark's default logging, + * then we
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15685865 --- Diff: examples/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala --- @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import java.nio.ByteBuffer + +import scala.util.Random + +import org.apache.log4j.Level +import org.apache.log4j.Logger +import org.apache.spark.Logging +import org.apache.spark.SparkConf +import org.apache.spark.SparkContext.rddToOrderedRDDFunctions +import org.apache.spark.annotation.Experimental +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.Milliseconds +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer +import org.apache.spark.streaming.kinesis.KinesisUtils + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.AmazonKinesisClient +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.services.kinesis.model.PutRecordRequest + +/** + * Kinesis Spark Streaming WordCount example. + * + * See http://spark.apache.org/docs/latest/streaming-programming-guide.html for more details on + * the Kinesis Spark Streaming integration. + * + * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per shard of the + * given stream. + * It then starts pulling from the last checkpointed sequence number of the given + * stream-name and endpoint-url. + * + * Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region + * + * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials + * in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs + * Instance profile credentials - delivered through the Amazon EC2 metadata service + * + * Usage: KinesisWordCount stream-name endpoint-url + * stream-name is the name of the Kinesis stream (ie. mySparkStream) + * endpoint-url is the endpoint of the Kinesis service + * (ie. https://kinesis.us-east-1.amazonaws.com) + * + * Example: + *$ export AWS_ACCESS_KEY_ID=your-access-key + *$ export AWS_SECRET_KEY=your-secret-key + *$ $SPARK_HOME/bin/run-example \ + *org.apache.spark.examples.streaming.KinesisWordCount mySparkStream \ + *https://kinesis.us-east-1.amazonaws.com + * + * There is a companion helper class below called KinesisWordCountProducer which puts + * dummy data onto the Kinesis stream. + * Usage instructions for KinesisWordCountProducer are provided in that class definition. + */ +object KinesisWordCount extends Logging { + val WordSeparator = + + def main(args: Array[String]) { +/** + * Check that all required args were passed in. + */ +if (args.length 2) { + System.err.println(Usage: KinesisWordCount stream-name endpoint-url) + System.exit(1) +} + +/** + * (This was lifted from the StreamingExamples.scala in order to avoid the dependency + * on the spark-examples artifact.) + * Set reasonable logging levels for streaming if the user has not configured log4j. + */ +val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements() +if (!log4jInitialized) { + /** + * We first log something to initialize Spark's default logging, + * then we
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15690371 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisStringRecordSerializer.scala --- @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.Logging + +/** + * Implementation of KinesisRecordSerializer to convert Array[Byte] to/from String. + */ +class KinesisStringRecordSerializer extends KinesisRecordSerializer[String] with Logging { --- End diff -- Cool! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1434#issuecomment-50938687 QA tests have started for PR 1434. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17701/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1434#issuecomment-50938943 QA results for PR 1434:br- This patch FAILED unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brpublic final class JavaKinesisWordCountASL {brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17701/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15721090 --- Diff: bin/run-example --- @@ -29,7 +29,9 @@ if [ -n $1 ]; then else echo Usage: ./bin/run-example example-class [example-args] 12 echo - set MASTER=XX to use a specific master 12 - echo - can use abbreviated example class name (e.g. SparkPi, mllib.LinearRegression) 12 + echo - can use abbreviated example class name relative to com.apache.spark.examples 12 + echo (e.g. SparkPi, mllib.LinearRegression, streaming.KinesisWordCountASL) 12 + echo - to run the Kinesis Spark Streaming example, make sure you build with -Pkinesis-asl 12 --- End diff -- @pwendell Is it a good idea to have this in the main bin/run-example? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15721167 --- Diff: dev/audit-release/sbt_app_kinesis/build.sbt --- @@ -0,0 +1,30 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the License); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an AS IS BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +name := Kinesis Test + +version := 1.0 + +scalaVersion := System.getenv.get(SCALA_VERSION) + +libraryDependencies += org.apache.spark %% spark-core % System.getenv.get(SPARK_VERSION) +libraryDependencies += org.apache.spark %% spark-streaming % System.getenv.get(SPARK_VERSION) --- End diff -- nit: done need spark-core and spark-streaming as they will be caught through dependencies of spark-streaming-kinesis-asl --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15722812 --- Diff: examples/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java --- @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.examples.streaming; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; + +import org.apache.log4j.Logger; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.kinesis.KinesisUtils; + +import scala.Tuple2; + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; +import com.google.common.collect.Lists; + +/** + * Java-friendly Kinesis Spark Streaming WordCount example + * + * See http://spark.apache.org/docs/latest/streaming-kinesis.html for more details + * on the Kinesis Spark Streaming integration. + * + * This example spins up 1 Kinesis Worker (Spark Streaming Receiver) per shard + * for the given stream. + * It then starts pulling from the last checkpointed sequence number of the given + * stream-name and endpoint-url. + * + * Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region + * + * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials + * in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs + * Instance profile credentials - delivered through the Amazon EC2 metadata service + * + * Usage: JavaKinesisWordCountASL stream-name endpoint-url + * stream-name is the name of the Kinesis stream (ie. mySparkStream) + * endpoint-url is the endpoint of the Kinesis service + * (ie. https://kinesis.us-east-1.amazonaws.com) + * + * Example: + * $ export AWS_ACCESS_KEY_ID=your-access-key + * $ export AWS_SECRET_KEY=your-secret-key + * $ $SPARK_HOME/bin/run-example \ + *org.apache.spark.examples.streaming.JavaKinesisWordCountASL mySparkStream \ + *https://kinesis.us-east-1.amazonaws.com + * + * There is a companion helper class called KinesisWordCountProducerASL which puts dummy data + * onto the Kinesis stream. + * Usage instructions for KinesisWordCountProducerASL are provided in the class definition. + */ +public final class JavaKinesisWordCountASL { +private static final Pattern WORD_SEPARATOR = Pattern.compile( ); +private static final Logger logger = Logger.getLogger(JavaKinesisWordCountASL.class); + +/** + * Make the constructor private to enforce singleton + */ +private JavaKinesisWordCountASL() { +} + +public static void main(String[] args) { +/** --- End diff -- Inline comment style should /* ... */ /** is reserved only for scala docs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15722833 --- Diff: examples/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala --- @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import java.nio.ByteBuffer + +import scala.util.Random + +import org.apache.spark.Logging +import org.apache.spark.SparkConf +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.Milliseconds +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions +import org.apache.spark.streaming.kinesis.KinesisUtils + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.AmazonKinesisClient +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.services.kinesis.model.PutRecordRequest + +/** + * Kinesis Spark Streaming WordCount example. + * + * See http://spark.apache.org/docs/latest/streaming-kinesis.html for more details on + * the Kinesis Spark Streaming integration. + * + * This example spins up 1 Kinesis Worker (Spark Streaming Receiver) per shard + * for the given stream. + * It then starts pulling from the last checkpointed sequence number of the given + * stream-name and endpoint-url. + * + * Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region + * + * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials + * in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs + * Instance profile credentials - delivered through the Amazon EC2 metadata service + * + * Usage: KinesisWordCountASL stream-name endpoint-url + * stream-name is the name of the Kinesis stream (ie. mySparkStream) + * endpoint-url is the endpoint of the Kinesis service + * (ie. https://kinesis.us-east-1.amazonaws.com) + * + * Example: + *$ export AWS_ACCESS_KEY_ID=your-access-key + *$ export AWS_SECRET_KEY=your-secret-key + *$ $SPARK_HOME/bin/run-example \ + *org.apache.spark.examples.streaming.KinesisWordCountASL mySparkStream \ + *https://kinesis.us-east-1.amazonaws.com + * + * There is a companion helper class below called KinesisWordCountProducerASL which puts + * dummy data onto the Kinesis stream. + * Usage instructions for KinesisWordCountProducerASL are provided in that class definition. + */ +object KinesisWordCountASL extends Logging { + def main(args: Array[String]) { +/** + * Check that all required args were passed in. + */ +if (args.length 2) { + System.err.println( + + |Usage: KinesisWordCount stream-name endpoint-url + |stream-name is the name of the Kinesis stream + |endpoint-url is the endpoint of the Kinesis service + | (e.g. https://kinesis.us-east-1.amazonaws.com) +.stripMargin) + System.exit(1) +} + +StreamingExamples.setStreamingLogLevels() + +/** Populate the appropriate variables from the given args */ +val Array(streamName, endpointUrl) = args + +/** Determine the number of shards from the stream */ +val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) +kinesisClient.setEndpoint(endpointUrl) +val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards() + .size() + +/** In this example, we're going to create 1
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15723335 --- Diff: extras/kinesis-asl/src/main/resources/log4j.properties --- @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file streaming/target/unit-tests.log +log4j.rootCategory=WARN, console + +# File appender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Console appender +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.out +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n + +# Settings to quiet third party logs that are too verbose +log4j.logger.org.eclipse.jetty=WARN +log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR +log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO +log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO + +# Log all Kinesis Streaming messages +log4j.logger.org.apache.spark.examples.streaming=DEBUG --- End diff -- These lines were removed in the PR I made to your branch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15723363 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.Logging +import org.apache.spark.streaming.Duration +import org.apache.spark.streaming.util.Clock +import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.streaming.util.SystemClock + +/** + * This is a helper class for managing checkpoint clocks. + * + * @param checkpointInterval + * @param currentClock. Default to current SystemClock if none is passed in (mocking purposes) + */ +private[kinesis] class KinesisCheckpointState( --- End diff -- Good you changed the name! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15723355 --- Diff: extras/kinesis-asl/src/main/resources/log4j.properties --- @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file streaming/target/unit-tests.log --- End diff -- This lines was removed. as this property does not determine that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15724763 --- Diff: extras/kinesis-asl/src/test/resources/log4j.properties --- @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Set everything to be logged to the file streaming/target/unit-tests.log --- End diff -- This path is probably wrong. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15724783 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.Logging +import org.apache.spark.annotation.Experimental +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.Duration +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + + +/** + * Helper class to create Amazon Kinesis Input Stream + * :: Experimental :: + */ +@Experimental +object KinesisUtils extends Logging { + /** + * Create an InputDStream that pulls messages from a Kinesis stream. + * + * @param sscStreamingContext object + * @param streamName Kinesis stream name + * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) + * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. + *See the Kinesis Spark Streaming documentation for more + *details on the different types of checkpoints. + * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the + * worker's initial starting position in the stream. + * The values are either the beginning of the stream + * per Kinesis' limit of 24 hours + * (InitialPositionInStream.TRIM_HORIZON) or + * the tip of the stream (InitialPositionInStream.LATEST). + * @param storageLevel Storage level to use for storing the received objects + * + * @return ReceiverInputDStream[Array[Byte]] + */ + def createStream( + ssc: StreamingContext, + streamName: String, + endpointUrl: String, + checkpointInterval: Duration, + initialPositionInStream: InitialPositionInStream, + storageLevel: StorageLevel): ReceiverInputDStream[Array[Byte]] = { +ssc.receiverStream(new KinesisReceiver(ssc.sc.appName, streamName, endpointUrl, +checkpointInterval, initialPositionInStream, storageLevel)) + } + + /** + * Create a Java-friendly InputDStream that pulls messages from a Kinesis stream. + * + * @param jssc Java StreamingContext object + * @param sscStreamingContext object + * @param streamName Kinesis stream name + * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) + * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. + *See the Kinesis Spark Streaming documentation for more + *details on the different types of checkpoints. + * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the + * worker's initial starting position in the stream. + * The values are either the beginning of the stream + * per Kinesis' limit of 24 hours + * (InitialPositionInStream.TRIM_HORIZON) or + * the tip of the stream (InitialPositionInStream.LATEST). + * @param storageLevel Storage level to use for storing the received objects + * + * @return JavaReceiverInputDStream[Array[Byte]] + * + *
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15724923 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.List + +import scala.collection.JavaConversions.asScalaBuffer +import scala.util.Random + +import org.apache.spark.Logging + +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason +import com.amazonaws.services.kinesis.model.Record + +/** + * Kinesis-specific implementation of the Kinesis Client Library (KCL) IRecordProcessor. + * This implementation operates on the Array[Byte] from the KinesisReceiver. + * The Kinesis Worker creates an instance of this KinesisRecordProcessor upon startup. + * + * @param receiver Kinesis receiver + * @param workerId for logging purposes + * @param checkpointState represents the checkpoint state including the next checkpoint time. + * It's injected here for mocking purposes. + */ +private[kinesis] class KinesisRecordProcessor( +receiver: KinesisReceiver, +workerId: String, +checkpointState: KinesisCheckpointState) extends IRecordProcessor with Logging { + + /** shardId to be populated during initialize() */ + var shardId: String = _ + + /** + * The Kinesis Client Library calls this method during IRecordProcessor initialization. + * + * @param shardId assigned by the KCL to this particular RecordProcessor. + */ + override def initialize(shardId: String) { +logInfo(sInitialize: Initializing workerId $workerId with shardId $shardId) +this.shardId = shardId + } + + /** + * This method is called by the KCL when a batch of records is pulled from the Kinesis stream. + * This is the record-processing bridge between the KCL's IRecordProcessor.processRecords() + * and Spark Streaming's Receiver.store(). + * + * @param batch list of records from the Kinesis stream shard + * @param checkpointer used to update Kinesis when this batch has been processed/stored + * in the DStream + */ + override def processRecords(batch: List[Record], checkpointer: IRecordProcessorCheckpointer) { +if (!receiver.isStopped()) { + try { +/** + * Note: If we try to store the raw ByteBuffer from record.getData(), the Spark Streaming + * Receiver.store(ByteBuffer) attempts to deserialize the ByteBuffer using the + * internally-configured Spark serializer (kryo, etc). + * This is not desirable, so we instead store a raw Array[Byte] and decouple + * ourselves from Spark's internal serialization strategy. + */ +batch.foreach(record = + KinesisRecordProcessor.retry(receiver.store(record.getData().array()), 4, 500) --- End diff -- Backing off by 500 here is a probably a bad idea. Maybe reduce it to 10 ms. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15726162 --- Diff: project/SparkBuild.scala --- @@ -62,7 +62,7 @@ object SparkBuild extends PomBuild { var isAlphaYarn = false var profiles: mutable.Seq[String] = mutable.Seq.empty if (Properties.envOrNone(SPARK_GANGLIA_LGPL).isDefined) { - println(NOTE: SPARK_GANGLIA_LGPL is deprecated, please use -Pganglia-lgpl flag.) + println(NOTE: SPARK_GANGLIA_LGPL is deprecated, please use -Pspark-ganglia-lgpl flag.) --- End diff -- There is nothing in that JIRA. no monty at all. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15726168 --- Diff: project/SparkBuild.scala --- @@ -62,7 +62,7 @@ object SparkBuild extends PomBuild { var isAlphaYarn = false var profiles: mutable.Seq[String] = mutable.Seq.empty if (Properties.envOrNone(SPARK_GANGLIA_LGPL).isDefined) { - println(NOTE: SPARK_GANGLIA_LGPL is deprecated, please use -Pganglia-lgpl flag.) + println(NOTE: SPARK_GANGLIA_LGPL is deprecated, please use -Pspark-ganglia-lgpl flag.) --- End diff -- But I think i get the point, that this is the wrong advice statement. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1434#issuecomment-50951626 QA tests have started for PR 1434. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17736/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15726533 --- Diff: extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java --- @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.examples.streaming; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; + +import org.apache.log4j.Logger; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.kinesis.KinesisUtils; + +import scala.Tuple2; + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; +import com.google.common.collect.Lists; + +/** + * Java-friendly Kinesis Spark Streaming WordCount example + * + * See http://spark.apache.org/docs/latest/streaming-kinesis.html for more details + * on the Kinesis Spark Streaming integration. + * + * This example spins up 1 Kinesis Worker (Spark Streaming Receiver) per shard + * for the given stream. + * It then starts pulling from the last checkpointed sequence number of the given + * stream-name and endpoint-url. + * + * Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region + * + * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials + * in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs + * Instance profile credentials - delivered through the Amazon EC2 metadata service + * + * Usage: JavaKinesisWordCountASL stream-name endpoint-url + * stream-name is the name of the Kinesis stream (ie. mySparkStream) + * endpoint-url is the endpoint of the Kinesis service + * (ie. https://kinesis.us-east-1.amazonaws.com) + * + * Example: + * $ export AWS_ACCESS_KEY_ID=your-access-key + * $ export AWS_SECRET_KEY=your-secret-key + * $ $SPARK_HOME/bin/run-example \ + *org.apache.spark.examples.streaming.JavaKinesisWordCountASL mySparkStream \ + *https://kinesis.us-east-1.amazonaws.com + * + * There is a companion helper class called KinesisWordCountProducerASL which puts dummy data + * onto the Kinesis stream. + * Usage instructions for KinesisWordCountProducerASL are provided in the class definition. + */ +public final class JavaKinesisWordCountASL { +private static final Pattern WORD_SEPARATOR = Pattern.compile( ); +private static final Logger logger = Logger.getLogger(JavaKinesisWordCountASL.class); + +/* + * Make the constructor private to enforce singleton + */ +private JavaKinesisWordCountASL() { +} + +public static void main(String[] args) { +/* + * Check that all required args were passed in. + */ +if (args.length 2) { + System.err.println( + |Usage: KinesisWordCount stream-name endpoint-url\n + + |stream-name is the name of the Kinesis stream\n + + |endpoint-url is the endpoint of the
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15726872 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.Logging +import org.apache.spark.annotation.Experimental +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.Duration +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + + +/** + * Helper class to create Amazon Kinesis Input Stream + * :: Experimental :: + */ +@Experimental +object KinesisUtils extends Logging { + /** --- End diff -- This does not need to extend Logging. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15726898 --- Diff: bin/run-example --- @@ -29,7 +29,9 @@ if [ -n $1 ]; then else echo Usage: ./bin/run-example example-class [example-args] 12 echo - set MASTER=XX to use a specific master 12 - echo - can use abbreviated example class name (e.g. SparkPi, mllib.LinearRegression) 12 + echo - can use abbreviated example class name relative to com.apache.spark.examples 12 + echo (e.g. SparkPi, mllib.LinearRegression, streaming.KinesisWordCountASL) 12 + echo - to run the Kinesis Spark Streaming example, make sure you build with -Pkinesis-asl 12 --- End diff -- Hey I'd prefer not to have this in this short text here. We can add a note in the docs for this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15631461 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.Logging +import org.apache.spark.annotation.Experimental +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + + +/** + * Facade to create the Scala-based or Java-based streams. + * Also, contains a reusable utility methods. + * :: Experimental :: + */ +@Experimental +object KinesisUtils extends Logging { + /** + * Create an InputDStream that pulls messages from a Kinesis stream. + * + * @param StreamingContext object + * @param appName Kinesis Application Name. Kinesis Apps are mapped to Kinesis Streams + * by the Kinesis Client Library. If you change the App name or Stream name, + * the KCL will throw errors. + * @param stream Kinesis Stream Name + * @param endpoint url of Kinesis service + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types + * of checkpoints. + * @param initialPositionInStream in the absence of Kinesis checkpoint info, this is the + * worker's initial starting position in the stream. + * The values are either the beginning of the stream per Kinesis' limit of 24 hours + * (InitialPositionInStream.TRIM_HORIZON) or the tip of the stream + * (InitialPositionInStream.LATEST). + * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk + * of processing records more than once. + * @param storageLevel The default is StorageLevel.MEMORY_AND_DISK_2 which replicates in-memory + * and on-disk to 2 nodes total (primary and secondary) + * + * @return ReceiverInputDStream[Array[Byte]] + */ + def createStream( + ssc: StreamingContext, + appName: String, + stream: String, + endpoint: String, + checkpointIntervalMillis: Long, + initialPositionInStream: InitialPositionInStream, + storageLevel: StorageLevel): ReceiverInputDStream[Array[Byte]] = { +ssc.receiverStream(new KinesisReceiver(appName, stream, endpoint, checkpointIntervalMillis, +initialPositionInStream, storageLevel)) + } + + /** + * Create a Java-friendly InputDStream that pulls messages from a Kinesis stream. + * + * @param JavaStreamingContext object + * @param appName Kinesis Application Name. Kinesis Apps are mapped to Kinesis Streams + * by the Kinesis Client Library. If you change the App name or Stream name, + * the KCL will throw errors. + * @param stream Kinesis Stream Name + * @param endpoint url of Kinesis service --- End diff -- Can you give couple of examples of endpoints to give an idea what they look like `@param endpoint Url of the Kinesis service (e.g. https://kinesis.us-west-2.amazon.com.or.something.like.that)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15651823 --- Diff: project/SparkBuild.scala --- @@ -62,7 +62,7 @@ object SparkBuild extends PomBuild { var isAlphaYarn = false var profiles: mutable.Seq[String] = mutable.Seq.empty if (Properties.envOrNone(SPARK_GANGLIA_LGPL).isDefined) { - println(NOTE: SPARK_GANGLIA_LGPL is deprecated, please use -Pganglia-lgpl flag.) + println(NOTE: SPARK_GANGLIA_LGPL is deprecated, please use -Pspark-ganglia-lgpl flag.) --- End diff -- that -P wasn't working. new jira for the full monty: https://issues.apache.org/jira/browse/SPARK-2770 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15651971 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.Logging +import org.apache.spark.annotation.Experimental +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + + +/** + * Facade to create the Scala-based or Java-based streams. + * Also, contains a reusable utility methods. + * :: Experimental :: + */ +@Experimental +object KinesisUtils extends Logging { + /** + * Create an InputDStream that pulls messages from a Kinesis stream. + * + * @param StreamingContext object + * @param appName Kinesis Application Name. Kinesis Apps are mapped to Kinesis Streams + * by the Kinesis Client Library. If you change the App name or Stream name, + * the KCL will throw errors. + * @param stream Kinesis Stream Name + * @param endpoint url of Kinesis service + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). --- End diff -- fixed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15653238 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.Logging +import org.apache.spark.annotation.Experimental +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + + +/** + * Facade to create the Scala-based or Java-based streams. + * Also, contains a reusable utility methods. + * :: Experimental :: + */ +@Experimental +object KinesisUtils extends Logging { + /** + * Create an InputDStream that pulls messages from a Kinesis stream. + * + * @param StreamingContext object + * @param appName Kinesis Application Name. Kinesis Apps are mapped to Kinesis Streams + * by the Kinesis Client Library. If you change the App name or Stream name, + * the KCL will throw errors. + * @param stream Kinesis Stream Name + * @param endpoint url of Kinesis service + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types + * of checkpoints. + * @param initialPositionInStream in the absence of Kinesis checkpoint info, this is the + * worker's initial starting position in the stream. + * The values are either the beginning of the stream per Kinesis' limit of 24 hours + * (InitialPositionInStream.TRIM_HORIZON) or the tip of the stream + * (InitialPositionInStream.LATEST). + * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk --- End diff -- right, i knew you would ask about this! scala can't differentiate between the two overloaded methods with default arguments in this situation: https://groups.google.com/forum/#!msg/scala-user/FyQK3-cqfaY/fXLHr8QsW_0J it's totes weird, i know. i'll remove storageLevel to simplify. i'd like to keep initialPositionInStream in case there's a need to differentiate. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15654371 --- Diff: make-distribution.sh --- @@ -196,6 +196,8 @@ cp -r $FWDIR/bin $DISTDIR cp -r $FWDIR/python $DISTDIR cp -r $FWDIR/sbin $DISTDIR cp -r $FWDIR/ec2 $DISTDIR +cp -r $FWDIR/extras/kinesis-asl/bin $DISTDIR --- End diff -- @pwendell Can you take a look? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on the pull request: https://github.com/apache/spark/pull/1434#issuecomment-50786500 ah, gotcha matei. the examples aren't part of the core, so they can depend on external libs. i'll make the change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15656394 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.net.InetAddress +import java.util.UUID + +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.receiver.Receiver + +import com.amazonaws.auth.AWSCredentialsProvider +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker + +/** + * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver. + * This implementation relies on the Kinesis Client Library (KCL) Worker as described here: + * https://github.com/awslabs/amazon-kinesis-client + * This is a custom receiver used with StreamingContext.receiverStream(Receiver) + * as described here: + * http://spark.apache.org/docs/latest/streaming-custom-receivers.html + * Instances of this class will get shipped to the Spark Streaming Workers + * to run within a Spark Executor. + * + * @param appName Kinesis Application Name. Kinesis apps are mapped to Kinesis streams + * by the Kinesis Client Library. If you change the app name or stream name, + * the KCL will throw errors. + * @param stream Kinesis stream name + * @param endpoint url of Kinesis service + * @param checkpointIntervalMillis for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types + * of checkpoints. + * @param initialPositionInStream in the absence of Kinesis checkpoint info, this is the worker's initial --- End diff -- This lines is more than 100 chars --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1434#issuecomment-50792452 QA tests have started for PR 1434. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17589/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1434#issuecomment-50792565 QA results for PR 1434:br- This patch FAILED unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brpublic final class JavaKinesisWordCount {brclass KinesisStringRecordSerializer extends KinesisRecordSerializer[String] with Logging {brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17589/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15657271 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisStringRecordSerializer.scala --- @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.Logging + +/** + * Implementation of KinesisRecordSerializer to convert Array[Byte] to/from String. + */ +class KinesisStringRecordSerializer extends KinesisRecordSerializer[String] with Logging { --- End diff -- Is this class supposed to be publcly visible? If not, do the expected :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15657421 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessorUtils.scala --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import scala.util.Random + +import org.apache.spark.Logging + +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException + + +/** + * Helper for the KinesisRecordProcessor. + */ +private[kinesis] object KinesisRecordProcessorUtils extends Logging { --- End diff -- This is a nit pick. But such utils function can often be added in the companion object of the same class. So this could have been added to `object KinesisRecordProcessor` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15668060 --- Diff: examples/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala --- @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import java.nio.ByteBuffer + +import scala.util.Random + +import org.apache.log4j.Level +import org.apache.log4j.Logger +import org.apache.spark.Logging +import org.apache.spark.SparkConf +import org.apache.spark.SparkContext.rddToOrderedRDDFunctions +import org.apache.spark.annotation.Experimental +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.Milliseconds +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer +import org.apache.spark.streaming.kinesis.KinesisUtils + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.AmazonKinesisClient +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.services.kinesis.model.PutRecordRequest + +/** + * Kinesis Spark Streaming WordCount example. + * + * See http://spark.apache.org/docs/latest/streaming-programming-guide.html for more details on + * the Kinesis Spark Streaming integration. + * + * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per shard of the + * given stream. + * It then starts pulling from the last checkpointed sequence number of the given + * stream-name and endpoint-url. + * + * Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region + * + * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials + * in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs + * Instance profile credentials - delivered through the Amazon EC2 metadata service + * + * Usage: KinesisWordCount stream-name endpoint-url + * stream-name is the name of the Kinesis stream (ie. mySparkStream) + * endpoint-url is the endpoint of the Kinesis service + * (ie. https://kinesis.us-east-1.amazonaws.com) + * + * Example: + *$ export AWS_ACCESS_KEY_ID=your-access-key + *$ export AWS_SECRET_KEY=your-secret-key + *$ $SPARK_HOME/bin/run-example \ + *org.apache.spark.examples.streaming.KinesisWordCount mySparkStream \ + *https://kinesis.us-east-1.amazonaws.com + * + * There is a companion helper class below called KinesisWordCountProducer which puts + * dummy data onto the Kinesis stream. + * Usage instructions for KinesisWordCountProducer are provided in that class definition. + */ +object KinesisWordCount extends Logging { + val WordSeparator = + + def main(args: Array[String]) { +/** + * Check that all required args were passed in. + */ +if (args.length 2) { + System.err.println(Usage: KinesisWordCount stream-name endpoint-url) + System.exit(1) +} + +/** + * (This was lifted from the StreamingExamples.scala in order to avoid the dependency + * on the spark-examples artifact.) + * Set reasonable logging levels for streaming if the user has not configured log4j. + */ +val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements() +if (!log4jInitialized) { + /** + * We first log something to initialize Spark's default logging, + * then we
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15668098 --- Diff: examples/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCount.java --- @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.examples.streaming; + +import java.util.List; +import java.util.regex.Pattern; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.Milliseconds; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.kinesis.KinesisRecordSerializer; +import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer; +import org.apache.spark.streaming.kinesis.KinesisUtils; + +import scala.Tuple2; + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; +import com.google.common.base.Optional; +import com.google.common.collect.Lists; + +/** + * Java-friendly Kinesis Spark Streaming WordCount example + * + * See http://spark.apache.org/docs/latest/streaming-programming-guide.html for more details + * on the Kinesis Spark Streaming integration. + * + * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per shard + * of the given stream. + * It then starts pulling from the last checkpointed sequence number of the given + * stream-name and endpoint-url. + * + * Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region + * + * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials + * in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs + * Instance profile credentials - delivered through the Amazon EC2 metadata service + * + * Usage: JavaKinesisWordCount stream-name endpoint-url + * stream-name is the name of the Kinesis stream (ie. mySparkStream) + * endpoint-url is the endpoint of the Kinesis service + * (ie. https://kinesis.us-east-1.amazonaws.com) + * + * Example: + * $ export AWS_ACCESS_KEY_ID=your-access-key + * $ export AWS_SECRET_KEY=your-secret-key + * $ $SPARK_HOME/bin/run-example \ + *org.apache.spark.examples.streaming.JavaKinesisWordCount mySparkStream \ + *https://kinesis.us-east-1.amazonaws.com + * + * There is a companion helper class called KinesisWordCountProducer which puts dummy data + * onto the Kinesis stream. + * Usage instructions for KinesisWordCountProducer are provided in the class definition. + */ +public final class JavaKinesisWordCount { +private static final Pattern WORD_SEPARATOR = Pattern.compile( ); +private static final Logger logger = Logger.getLogger(JavaKinesisWordCount.class); + +/** + * Make the constructor private to enforce singleton + */ +private JavaKinesisWordCount() { +} + +public static void main(String[] args) { +/** + *
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15668154 --- Diff: examples/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala --- @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import java.nio.ByteBuffer + +import scala.util.Random + +import org.apache.log4j.Level +import org.apache.log4j.Logger +import org.apache.spark.Logging +import org.apache.spark.SparkConf +import org.apache.spark.SparkContext.rddToOrderedRDDFunctions +import org.apache.spark.annotation.Experimental +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.Milliseconds +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer +import org.apache.spark.streaming.kinesis.KinesisUtils + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.AmazonKinesisClient +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.services.kinesis.model.PutRecordRequest + +/** + * Kinesis Spark Streaming WordCount example. + * + * See http://spark.apache.org/docs/latest/streaming-programming-guide.html for more details on + * the Kinesis Spark Streaming integration. + * + * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per shard of the + * given stream. + * It then starts pulling from the last checkpointed sequence number of the given + * stream-name and endpoint-url. + * + * Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region + * + * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials + * in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs + * Instance profile credentials - delivered through the Amazon EC2 metadata service + * + * Usage: KinesisWordCount stream-name endpoint-url + * stream-name is the name of the Kinesis stream (ie. mySparkStream) + * endpoint-url is the endpoint of the Kinesis service + * (ie. https://kinesis.us-east-1.amazonaws.com) + * + * Example: + *$ export AWS_ACCESS_KEY_ID=your-access-key + *$ export AWS_SECRET_KEY=your-secret-key + *$ $SPARK_HOME/bin/run-example \ + *org.apache.spark.examples.streaming.KinesisWordCount mySparkStream \ + *https://kinesis.us-east-1.amazonaws.com + * + * There is a companion helper class below called KinesisWordCountProducer which puts + * dummy data onto the Kinesis stream. + * Usage instructions for KinesisWordCountProducer are provided in that class definition. + */ +object KinesisWordCount extends Logging { + val WordSeparator = + + def main(args: Array[String]) { +/** + * Check that all required args were passed in. + */ +if (args.length 2) { + System.err.println(Usage: KinesisWordCount stream-name endpoint-url) + System.exit(1) +} + +/** + * (This was lifted from the StreamingExamples.scala in order to avoid the dependency + * on the spark-examples artifact.) + * Set reasonable logging levels for streaming if the user has not configured log4j. + */ +val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements() +if (!log4jInitialized) { + /** + * We first log something to initialize Spark's default logging, + * then we
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15668266 --- Diff: examples/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala --- @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import java.nio.ByteBuffer + +import scala.util.Random + +import org.apache.log4j.Level +import org.apache.log4j.Logger +import org.apache.spark.Logging +import org.apache.spark.SparkConf +import org.apache.spark.SparkContext.rddToOrderedRDDFunctions +import org.apache.spark.annotation.Experimental +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.Milliseconds +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer +import org.apache.spark.streaming.kinesis.KinesisUtils + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.AmazonKinesisClient +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.services.kinesis.model.PutRecordRequest + +/** + * Kinesis Spark Streaming WordCount example. + * + * See http://spark.apache.org/docs/latest/streaming-programming-guide.html for more details on + * the Kinesis Spark Streaming integration. + * + * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per shard of the + * given stream. + * It then starts pulling from the last checkpointed sequence number of the given + * stream-name and endpoint-url. + * + * Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region + * + * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials + * in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs + * Instance profile credentials - delivered through the Amazon EC2 metadata service + * + * Usage: KinesisWordCount stream-name endpoint-url + * stream-name is the name of the Kinesis stream (ie. mySparkStream) + * endpoint-url is the endpoint of the Kinesis service + * (ie. https://kinesis.us-east-1.amazonaws.com) + * + * Example: + *$ export AWS_ACCESS_KEY_ID=your-access-key + *$ export AWS_SECRET_KEY=your-secret-key + *$ $SPARK_HOME/bin/run-example \ + *org.apache.spark.examples.streaming.KinesisWordCount mySparkStream \ + *https://kinesis.us-east-1.amazonaws.com + * + * There is a companion helper class below called KinesisWordCountProducer which puts + * dummy data onto the Kinesis stream. + * Usage instructions for KinesisWordCountProducer are provided in that class definition. + */ +object KinesisWordCount extends Logging { + val WordSeparator = + + def main(args: Array[String]) { +/** + * Check that all required args were passed in. + */ +if (args.length 2) { + System.err.println(Usage: KinesisWordCount stream-name endpoint-url) + System.exit(1) +} + +/** + * (This was lifted from the StreamingExamples.scala in order to avoid the dependency + * on the spark-examples artifact.) + * Set reasonable logging levels for streaming if the user has not configured log4j. + */ +val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements() +if (!log4jInitialized) { + /** + * We first log something to initialize Spark's default logging, + * then we
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/1434#issuecomment-50819992 @pwendell Can you take a look at the pom, and other build stuff? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15670682 --- Diff: dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala --- @@ -47,7 +47,14 @@ object SimpleApp { System.exit(-1) } if (foundGanglia) { - println(Ganglia sink was loaded via spark-core) + println(Ganglia sink was loaded via spark-ganglia-lgpl) + System.exit(-1) +} + +// Remove kinesis from default build due to ASL license issue +val foundKinesis = Try(Class.forName(org.apache.spark.streaming.kinesis.KinesisUtils)).isSuccess +if (foundKinesis) { + println(Kinesis was loaded via kinesis-asl) --- End diff -- This statement should be Kinesis was loaded via spark-core --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15670661 --- Diff: dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala --- @@ -47,7 +47,14 @@ object SimpleApp { System.exit(-1) } if (foundGanglia) { - println(Ganglia sink was loaded via spark-core) + println(Ganglia sink was loaded via spark-ganglia-lgpl) --- End diff -- This should remain as spark-core. This essentially checks whether dependencies on spark-core should not bring in Ganglia. So the statement is correct. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15670918 --- Diff: assembly/pom.xml --- @@ -186,6 +186,16 @@ /dependencies /profile profile + idkinesis-asl/id + dependencies +dependency + groupIdorg.apache.spark/groupId + artifactIdkinesis-asl_${scala.binary.version}/artifactId --- End diff -- The artifact id should `spark-streaming-kinesis-asl` because the published maven dependencies should have spark (to make it clear that its related to Spark). The profile id can be `kinesis-asl` as it is used only at Spark compile time when the person is already dealing with spark. Sorry if this wasnt clear. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15567656 --- Diff: extras/spark-kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCount.java --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.examples.streaming; + +import java.util.List; +import java.util.regex.Pattern; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.Milliseconds; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.dstream.DStream; +import org.apache.spark.streaming.kinesis.KinesisRecordSerializer; +import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer; +import org.apache.spark.streaming.kinesis.KinesisUtils; + +import scala.Tuple2; + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; +import com.google.common.base.Optional; +import com.google.common.collect.Lists; + +/** + * Java-friendly Kinesis Spark Streaming WordCount example + * + * See http://spark.apache.org/docs/latest/streaming-programming-guide.html for more details on the Kinesis Spark Streaming integration. + * + * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per shard of the given stream. + * It then starts pulling from the tip of the given stream-name and endpoint-url at the given batch-interval. + * Because we're pulling from the tip (InitialPositionInStream.LATEST), only new stream data will be picked up after the KinesisReceiver starts. + * This could lead to missed records if data is added to the stream while no KinesisReceivers are running. + * In production, you'll want to switch to InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis limit) of previous stream data + * depending on the checkpoint frequency. + * InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records depending on the checkpoint frequency. + * Record processing should be idempotent when possible. + * + * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs + * Instance profile credentials - delivered through the Amazon EC2 metadata service + * + * Usage: JavaKinesisWordCount stream-name endpoint-url batch-interval + * stream-name is the name of the Kinesis stream (ie. mySparkStream) + * endpoint-url is the endpoint of the Kinesis service (ie. https://kinesis.us-east-1.amazonaws.com) + * batch-interval is the batch interval in milliseconds (ie. 1000ms) + * + * Example: + * $ export AWS_ACCESS_KEY_ID=your-access-key + * $ export AWS_SECRET_KEY=your-secret-key + *$ bin/run-kinesis-example \ + *org.apache.spark.examples.streaming.JavaKinesisWordCount mySparkStream https://kinesis.us-east-1.amazonaws.com 1000 + * + * There is a companion helper class
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15567741 --- Diff: extras/spark-kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCount.java --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.examples.streaming; + +import java.util.List; +import java.util.regex.Pattern; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.Milliseconds; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.dstream.DStream; +import org.apache.spark.streaming.kinesis.KinesisRecordSerializer; +import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer; +import org.apache.spark.streaming.kinesis.KinesisUtils; + +import scala.Tuple2; + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; +import com.google.common.base.Optional; +import com.google.common.collect.Lists; + +/** + * Java-friendly Kinesis Spark Streaming WordCount example + * + * See http://spark.apache.org/docs/latest/streaming-programming-guide.html for more details on the Kinesis Spark Streaming integration. + * + * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per shard of the given stream. + * It then starts pulling from the tip of the given stream-name and endpoint-url at the given batch-interval. + * Because we're pulling from the tip (InitialPositionInStream.LATEST), only new stream data will be picked up after the KinesisReceiver starts. + * This could lead to missed records if data is added to the stream while no KinesisReceivers are running. + * In production, you'll want to switch to InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis limit) of previous stream data + * depending on the checkpoint frequency. + * InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records depending on the checkpoint frequency. + * Record processing should be idempotent when possible. + * + * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs + * Instance profile credentials - delivered through the Amazon EC2 metadata service + * + * Usage: JavaKinesisWordCount stream-name endpoint-url batch-interval + * stream-name is the name of the Kinesis stream (ie. mySparkStream) + * endpoint-url is the endpoint of the Kinesis service (ie. https://kinesis.us-east-1.amazonaws.com) + * batch-interval is the batch interval in milliseconds (ie. 1000ms) + * + * Example: + * $ export AWS_ACCESS_KEY_ID=your-access-key + * $ export AWS_SECRET_KEY=your-secret-key + *$ bin/run-kinesis-example \ + *org.apache.spark.examples.streaming.JavaKinesisWordCount mySparkStream https://kinesis.us-east-1.amazonaws.com 1000 + * + * There is a companion helper class
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15568250 --- Diff: extras/spark-kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala --- @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import java.nio.ByteBuffer +import org.apache.log4j.Level +import org.apache.log4j.Logger +import org.apache.spark.Logging +import org.apache.spark.SparkConf +import org.apache.spark.SparkContext._ +import org.apache.spark.streaming.Milliseconds +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions +import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer +import org.apache.spark.streaming.kinesis.KinesisUtils +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.AmazonKinesisClient +import com.amazonaws.services.kinesis.model.PutRecordRequest +import scala.util.Random +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.spark.streaming.dstream.DStream + +/** + * Kinesis Spark Streaming WordCount example. + * + * See http://spark.apache.org/docs/latest/streaming-programming-guide.html for more details on the Kinesis Spark Streaming integration. + * + * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per shard of the given stream. + * It then starts pulling from the tip of the given stream-name and endpoint-url at the given batch-interval. + * Because we're pulling from the tip (InitialPositionInStream.LATEST), only new stream data will be picked up after the KinesisReceiver starts. + * This could lead to missed records if data is added to the stream while no KinesisReceivers are running. + * In production, you'll want to switch to InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis limit) of previous stream data + * depending on the checkpoint frequency. + * + * InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records depending on the checkpoint frequency. + * Record processing should be idempotent when possible. + * + * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs + * Instance profile credentials - delivered through the Amazon EC2 metadata service + * + * Usage: KinesisWordCount stream-name endpoint-url batch-interval + * stream-name is the name of the Kinesis stream (ie. mySparkStream) + * endpoint-url is the endpoint of the Kinesis service (ie. https://kinesis.us-east-1.amazonaws.com) + * batch-interval is the batch interval in millis (ie. 1000ms) + * + * Example: + * $ export AWS_ACCESS_KEY_ID=your-access-key + * $ export AWS_SECRET_KEY=your-secret-key + *$ bin/run-kinesis-example \ + *org.apache.spark.examples.streaming.KinesisWordCount mySparkStream https://kinesis.us-east-1.amazonaws.com 100 + * + * There is a companion helper class below called KinesisWordCountProducer which puts dummy data onto the Kinesis stream. + * Usage instructions for KinesisWordCountProducer are provided in that class definition. + */ +object KinesisWordCount extends Logging { + val WordSeparator = + + def main(args: Array[String]) { +/** + * Check that all required args were passed in. + */ +if (args.length 3) { + System.err.println(Usage: KinesisWordCount stream-name
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15568742 --- Diff: extras/spark-kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala --- @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import java.nio.ByteBuffer +import org.apache.log4j.Level +import org.apache.log4j.Logger +import org.apache.spark.Logging +import org.apache.spark.SparkConf +import org.apache.spark.SparkContext._ +import org.apache.spark.streaming.Milliseconds +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions +import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer +import org.apache.spark.streaming.kinesis.KinesisUtils +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.AmazonKinesisClient +import com.amazonaws.services.kinesis.model.PutRecordRequest +import scala.util.Random +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.spark.streaming.dstream.DStream + +/** + * Kinesis Spark Streaming WordCount example. + * + * See http://spark.apache.org/docs/latest/streaming-programming-guide.html for more details on the Kinesis Spark Streaming integration. + * + * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per shard of the given stream. + * It then starts pulling from the tip of the given stream-name and endpoint-url at the given batch-interval. + * Because we're pulling from the tip (InitialPositionInStream.LATEST), only new stream data will be picked up after the KinesisReceiver starts. + * This could lead to missed records if data is added to the stream while no KinesisReceivers are running. + * In production, you'll want to switch to InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis limit) of previous stream data + * depending on the checkpoint frequency. + * + * InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records depending on the checkpoint frequency. + * Record processing should be idempotent when possible. + * + * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs + * Instance profile credentials - delivered through the Amazon EC2 metadata service + * + * Usage: KinesisWordCount stream-name endpoint-url batch-interval + * stream-name is the name of the Kinesis stream (ie. mySparkStream) + * endpoint-url is the endpoint of the Kinesis service (ie. https://kinesis.us-east-1.amazonaws.com) + * batch-interval is the batch interval in millis (ie. 1000ms) + * + * Example: + * $ export AWS_ACCESS_KEY_ID=your-access-key + * $ export AWS_SECRET_KEY=your-secret-key + *$ bin/run-kinesis-example \ + *org.apache.spark.examples.streaming.KinesisWordCount mySparkStream https://kinesis.us-east-1.amazonaws.com 100 + * + * There is a companion helper class below called KinesisWordCountProducer which puts dummy data onto the Kinesis stream. + * Usage instructions for KinesisWordCountProducer are provided in that class definition. + */ +object KinesisWordCount extends Logging { + val WordSeparator = + + def main(args: Array[String]) { +/** + * Check that all required args were passed in. + */ +if (args.length 3) { + System.err.println(Usage: KinesisWordCount stream-name
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15568964 --- Diff: extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException +import org.apache.spark.Logging +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import scala.util.Random +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.streaming.util.Clock +import org.apache.spark.streaming.util.SystemClock + +/** + * Facade to create the Scala-based or Java-based streams. + * Also, contains a reusable utility methods. + */ +object KinesisUtils extends Logging { + /** + * Create an InputDStream that pulls messages from a Kinesis stream. + * + * @param StreamingContext object + * @param app name + * @param stream name + * @param endpoint + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk of processing records more than once. + * @param in the absence of Kinesis checkpoint info, this is the worker's initial starting position in the stream. + * The values are either the beginning of the stream per Kinesis' limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) + * or the tip of the stream using InitialPositionInStream.LATEST. + * The default is StorageLevel.MEMORY_AND_DISK_2 which replicates in-memory and on-disk to 2 nodes total (primary and secondary) + * + * @return ReceiverInputDStream[Array[Byte]] + */ + def createStream( +ssc: StreamingContext, --- End diff -- Incorrect formatting. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15568976 --- Diff: extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException +import org.apache.spark.Logging +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import scala.util.Random +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.streaming.util.Clock +import org.apache.spark.streaming.util.SystemClock + +/** + * Facade to create the Scala-based or Java-based streams. + * Also, contains a reusable utility methods. + */ +object KinesisUtils extends Logging { + /** + * Create an InputDStream that pulls messages from a Kinesis stream. + * + * @param StreamingContext object + * @param app name + * @param stream name + * @param endpoint + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk of processing records more than once. + * @param in the absence of Kinesis checkpoint info, this is the worker's initial starting position in the stream. + * The values are either the beginning of the stream per Kinesis' limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) + * or the tip of the stream using InitialPositionInStream.LATEST. + * The default is StorageLevel.MEMORY_AND_DISK_2 which replicates in-memory and on-disk to 2 nodes total (primary and secondary) + * + * @return ReceiverInputDStream[Array[Byte]] + */ + def createStream( +ssc: StreamingContext, +app: String, +stream: String, +endpoint: String, +checkpointIntervalMillis: Long, +initialPositionInStream: InitialPositionInStream = InitialPositionInStream.TRIM_HORIZON, +storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2): ReceiverInputDStream[Array[Byte]] = { + +ssc.receiverStream(new KinesisReceiver(app, stream, endpoint, checkpointIntervalMillis, initialPositionInStream, storageLevel)) + } + + /** + * Create a Java-friendly InputDStream that pulls messages from a Kinesis stream. + * + * @param JavaStreamingContext object + * @param app name + * @param stream name + * @param endpoint + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk of processing records more than once. + * @param in the absence of Kinesis checkpoint info, this is the worker's initial starting position in the stream. + * The values are either the beginning of the stream per
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15615786 --- Diff: extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.net.InetAddress +import java.util.UUID +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.receiver.Receiver +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker +import java.nio.ByteBuffer +import org.apache.spark.streaming.util.SystemClock + +/** + * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver. + * This implementation relies on the Kinesis Client Library (KCL) Worker as described here: + * https://github.com/awslabs/amazon-kinesis-client + * This is a custom receiver used with StreamingContext.receiverStream(Receiver) as described here: + * http://spark.apache.org/docs/latest/streaming-custom-receivers.html + * Instances of this class will get shipped to the Spark Streaming Workers to run within a Spark Executor. + * + * @param app name + * @param Kinesis stream name + * @param endpoint url of Kinesis service + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * @param in the absence of Kinesis checkpoint info, this is the worker's initial starting position in the stream. + * The values are either the beginning of the stream per Kinesis' limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) + * or the tip of the stream using InitialPositionInStream.LATEST. + * @param persistence strategy for RDDs and DStreams. + */ +private[streaming] class KinesisReceiver( + app: String, + stream: String, + endpoint: String, + checkpointIntervalMillis: Long, + initialPositionInStream: InitialPositionInStream, + storageLevel: StorageLevel) + extends Receiver[Array[Byte]](storageLevel) with Logging { receiver = + + /** + * The lazy val's below will get instantiated in the remote Executor after the closure is shipped to the Spark Worker. + * These are all lazy because they're from third-party Amazon libraries and are not Serializable. + * If they're not marked lazy, they will cause NotSerializableExceptions when they're shipped to the Spark Worker. + */ + + /** + * workerId is lazy because we want the address of the actual Worker where the code runs - not the Driver's ip address. + * This makes a difference when running in a cluster. + */ + lazy val workerId = InetAddress.getLocalHost.getHostAddress() + : + UUID.randomUUID() + + /** + * This impl uses the DefaultAWSCredentialsProviderChain per the following url: + * http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html + * and searches for credentials in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file at the default location (~/.aws/credentials) shared by all AWS SDKs and the AWS CLI + * Instance profile credentials delivered through the Amazon EC2 metadata service + */ + lazy val
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15616231 --- Diff: extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException +import org.apache.spark.Logging +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import scala.util.Random +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.streaming.util.Clock +import org.apache.spark.streaming.util.SystemClock + +/** + * Facade to create the Scala-based or Java-based streams. + * Also, contains a reusable utility methods. + */ +object KinesisUtils extends Logging { + /** + * Create an InputDStream that pulls messages from a Kinesis stream. + * + * @param StreamingContext object + * @param app name + * @param stream name + * @param endpoint + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk of processing records more than once. + * @param in the absence of Kinesis checkpoint info, this is the worker's initial starting position in the stream. + * The values are either the beginning of the stream per Kinesis' limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) + * or the tip of the stream using InitialPositionInStream.LATEST. + * The default is StorageLevel.MEMORY_AND_DISK_2 which replicates in-memory and on-disk to 2 nodes total (primary and secondary) + * + * @return ReceiverInputDStream[Array[Byte]] + */ + def createStream( +ssc: StreamingContext, +app: String, +stream: String, +endpoint: String, +checkpointIntervalMillis: Long, +initialPositionInStream: InitialPositionInStream = InitialPositionInStream.TRIM_HORIZON, +storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2): ReceiverInputDStream[Array[Byte]] = { + +ssc.receiverStream(new KinesisReceiver(app, stream, endpoint, checkpointIntervalMillis, initialPositionInStream, storageLevel)) + } + + /** + * Create a Java-friendly InputDStream that pulls messages from a Kinesis stream. + * + * @param JavaStreamingContext object + * @param app name + * @param stream name + * @param endpoint + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk of processing records more than once. + * @param in the absence of Kinesis checkpoint info, this is the worker's initial starting position in the stream. + * The values are either the beginning of the stream
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15616323 --- Diff: extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.net.InetAddress +import java.util.UUID +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.receiver.Receiver +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker +import java.nio.ByteBuffer +import org.apache.spark.streaming.util.SystemClock + +/** + * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver. + * This implementation relies on the Kinesis Client Library (KCL) Worker as described here: + * https://github.com/awslabs/amazon-kinesis-client + * This is a custom receiver used with StreamingContext.receiverStream(Receiver) as described here: + * http://spark.apache.org/docs/latest/streaming-custom-receivers.html + * Instances of this class will get shipped to the Spark Streaming Workers to run within a Spark Executor. + * + * @param app name + * @param Kinesis stream name + * @param endpoint url of Kinesis service + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * @param in the absence of Kinesis checkpoint info, this is the worker's initial starting position in the stream. + * The values are either the beginning of the stream per Kinesis' limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) + * or the tip of the stream using InitialPositionInStream.LATEST. + * @param persistence strategy for RDDs and DStreams. + */ +private[streaming] class KinesisReceiver( + app: String, + stream: String, + endpoint: String, + checkpointIntervalMillis: Long, + initialPositionInStream: InitialPositionInStream, + storageLevel: StorageLevel) + extends Receiver[Array[Byte]](storageLevel) with Logging { receiver = + + /** + * The lazy val's below will get instantiated in the remote Executor after the closure is shipped to the Spark Worker. + * These are all lazy because they're from third-party Amazon libraries and are not Serializable. + * If they're not marked lazy, they will cause NotSerializableExceptions when they're shipped to the Spark Worker. + */ + + /** + * workerId is lazy because we want the address of the actual Worker where the code runs - not the Driver's ip address. + * This makes a difference when running in a cluster. + */ + lazy val workerId = InetAddress.getLocalHost.getHostAddress() + : + UUID.randomUUID() + + /** + * This impl uses the DefaultAWSCredentialsProviderChain per the following url: + * http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html + * and searches for credentials in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file at the default location (~/.aws/credentials) shared by all AWS SDKs and the AWS CLI + * Instance profile credentials delivered through the Amazon EC2 metadata service + */ + lazy val
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15617751 --- Diff: docs/streaming-programming-guide.md --- @@ -467,6 +468,62 @@ For more details on these additional sources, see the corresponding [API documen Furthermore, you can also implement your own custom receiver for your sources. See the [Custom Receiver Guide](streaming-custom-receivers.html). +### Kinesis --- End diff -- moved --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15617745 --- Diff: extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.net.InetAddress +import java.util.UUID +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.receiver.Receiver +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker +import java.nio.ByteBuffer +import org.apache.spark.streaming.util.SystemClock + +/** + * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver. + * This implementation relies on the Kinesis Client Library (KCL) Worker as described here: + * https://github.com/awslabs/amazon-kinesis-client + * This is a custom receiver used with StreamingContext.receiverStream(Receiver) as described here: + * http://spark.apache.org/docs/latest/streaming-custom-receivers.html + * Instances of this class will get shipped to the Spark Streaming Workers to run within a Spark Executor. + * + * @param app name --- End diff -- updated --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15617770 --- Diff: extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException +import org.apache.spark.Logging +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import scala.util.Random +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.streaming.util.Clock +import org.apache.spark.streaming.util.SystemClock + +/** + * Facade to create the Scala-based or Java-based streams. + * Also, contains a reusable utility methods. + */ +object KinesisUtils extends Logging { + /** + * Create an InputDStream that pulls messages from a Kinesis stream. + * + * @param StreamingContext object + * @param app name + * @param stream name + * @param endpoint + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk of processing records more than once. + * @param in the absence of Kinesis checkpoint info, this is the worker's initial starting position in the stream. + * The values are either the beginning of the stream per Kinesis' limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) + * or the tip of the stream using InitialPositionInStream.LATEST. + * The default is StorageLevel.MEMORY_AND_DISK_2 which replicates in-memory and on-disk to 2 nodes total (primary and secondary) + * + * @return ReceiverInputDStream[Array[Byte]] + */ + def createStream( +ssc: StreamingContext, +app: String, +stream: String, +endpoint: String, +checkpointIntervalMillis: Long, +initialPositionInStream: InitialPositionInStream = InitialPositionInStream.TRIM_HORIZON, +storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2): ReceiverInputDStream[Array[Byte]] = { + +ssc.receiverStream(new KinesisReceiver(app, stream, endpoint, checkpointIntervalMillis, initialPositionInStream, storageLevel)) + } + + /** + * Create a Java-friendly InputDStream that pulls messages from a Kinesis stream. + * + * @param JavaStreamingContext object + * @param app name + * @param stream name + * @param endpoint + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk of processing records more than once. + * @param in the absence of Kinesis checkpoint info, this is the worker's initial starting position in the stream. --- End diff -- fixed --- If your project is
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15617760 --- Diff: extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException +import org.apache.spark.Logging +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import scala.util.Random +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.streaming.util.Clock +import org.apache.spark.streaming.util.SystemClock + +/** + * Facade to create the Scala-based or Java-based streams. + * Also, contains a reusable utility methods. + */ +object KinesisUtils extends Logging { --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15617773 --- Diff: extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException +import org.apache.spark.Logging +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import scala.util.Random +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.streaming.util.Clock +import org.apache.spark.streaming.util.SystemClock + +/** + * Facade to create the Scala-based or Java-based streams. + * Also, contains a reusable utility methods. + */ +object KinesisUtils extends Logging { + /** + * Create an InputDStream that pulls messages from a Kinesis stream. + * + * @param StreamingContext object + * @param app name + * @param stream name + * @param endpoint + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk of processing records more than once. --- End diff -- fixed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15618463 --- Diff: extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException +import org.apache.spark.Logging +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import scala.util.Random +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.streaming.util.Clock +import org.apache.spark.streaming.util.SystemClock + +/** + * Facade to create the Scala-based or Java-based streams. + * Also, contains a reusable utility methods. + */ +object KinesisUtils extends Logging { + /** + * Create an InputDStream that pulls messages from a Kinesis stream. + * + * @param StreamingContext object + * @param app name + * @param stream name + * @param endpoint + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk of processing records more than once. + * @param in the absence of Kinesis checkpoint info, this is the worker's initial starting position in the stream. + * The values are either the beginning of the stream per Kinesis' limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) + * or the tip of the stream using InitialPositionInStream.LATEST. + * The default is StorageLevel.MEMORY_AND_DISK_2 which replicates in-memory and on-disk to 2 nodes total (primary and secondary) + * + * @return ReceiverInputDStream[Array[Byte]] + */ + def createStream( +ssc: StreamingContext, +app: String, +stream: String, +endpoint: String, +checkpointIntervalMillis: Long, +initialPositionInStream: InitialPositionInStream = InitialPositionInStream.TRIM_HORIZON, +storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2): ReceiverInputDStream[Array[Byte]] = { + +ssc.receiverStream(new KinesisReceiver(app, stream, endpoint, checkpointIntervalMillis, initialPositionInStream, storageLevel)) + } + + /** + * Create a Java-friendly InputDStream that pulls messages from a Kinesis stream. + * + * @param JavaStreamingContext object + * @param app name + * @param stream name + * @param endpoint + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk of processing records more than once. + * @param in the absence of Kinesis checkpoint info, this is the worker's initial starting position in the stream. + * The values are either the beginning of the stream
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15618507 --- Diff: project/SparkBuild.scala --- @@ -60,9 +60,13 @@ object SparkBuild extends PomBuild { var isAlphaYarn = false var profiles: mutable.Seq[String] = mutable.Seq.empty if (Properties.envOrNone(SPARK_GANGLIA_LGPL).isDefined) { - println(NOTE: SPARK_GANGLIA_LGPL is deprecated, please use -Pganglia-lgpl flag.) + println(NOTE: SPARK_GANGLIA_LGPL is deprecated, please use -Pspark-ganglia-lgpl flag.) profiles ++= Seq(spark-ganglia-lgpl) } +if (Properties.envOrNone(SPARK_KINESIS_ASL).isDefined) { --- End diff -- removed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15618503 --- Diff: extras/spark-kinesis-asl/src/test/resources/log4j.properties --- @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file streaming/target/unit-tests.log +log4j.rootCategory=WARN, console + --- End diff -- matched --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15618546 --- Diff: extras/spark-kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCount.java --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.examples.streaming; + +import java.util.List; +import java.util.regex.Pattern; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.Milliseconds; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.dstream.DStream; +import org.apache.spark.streaming.kinesis.KinesisRecordSerializer; +import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer; +import org.apache.spark.streaming.kinesis.KinesisUtils; + +import scala.Tuple2; + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; +import com.google.common.base.Optional; +import com.google.common.collect.Lists; + +/** + * Java-friendly Kinesis Spark Streaming WordCount example + * + * See http://spark.apache.org/docs/latest/streaming-programming-guide.html for more details on the Kinesis Spark Streaming integration. + * + * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per shard of the given stream. + * It then starts pulling from the tip of the given stream-name and endpoint-url at the given batch-interval. + * Because we're pulling from the tip (InitialPositionInStream.LATEST), only new stream data will be picked up after the KinesisReceiver starts. + * This could lead to missed records if data is added to the stream while no KinesisReceivers are running. + * In production, you'll want to switch to InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis limit) of previous stream data + * depending on the checkpoint frequency. + * InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records depending on the checkpoint frequency. + * Record processing should be idempotent when possible. + * + * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs + * Instance profile credentials - delivered through the Amazon EC2 metadata service + * + * Usage: JavaKinesisWordCount stream-name endpoint-url batch-interval + * stream-name is the name of the Kinesis stream (ie. mySparkStream) + * endpoint-url is the endpoint of the Kinesis service (ie. https://kinesis.us-east-1.amazonaws.com) + * batch-interval is the batch interval in milliseconds (ie. 1000ms) + * + * Example: + * $ export AWS_ACCESS_KEY_ID=your-access-key + * $ export AWS_SECRET_KEY=your-secret-key + *$ bin/run-kinesis-example \ + *org.apache.spark.examples.streaming.JavaKinesisWordCount mySparkStream https://kinesis.us-east-1.amazonaws.com 1000 + * + * There is a companion helper
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15618561 --- Diff: extras/spark-kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala --- @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import java.nio.ByteBuffer +import org.apache.log4j.Level +import org.apache.log4j.Logger +import org.apache.spark.Logging +import org.apache.spark.SparkConf +import org.apache.spark.SparkContext._ +import org.apache.spark.streaming.Milliseconds +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions +import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer +import org.apache.spark.streaming.kinesis.KinesisUtils +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.AmazonKinesisClient +import com.amazonaws.services.kinesis.model.PutRecordRequest +import scala.util.Random +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.spark.streaming.dstream.DStream + +/** + * Kinesis Spark Streaming WordCount example. + * + * See http://spark.apache.org/docs/latest/streaming-programming-guide.html for more details on the Kinesis Spark Streaming integration. + * + * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per shard of the given stream. + * It then starts pulling from the tip of the given stream-name and endpoint-url at the given batch-interval. + * Because we're pulling from the tip (InitialPositionInStream.LATEST), only new stream data will be picked up after the KinesisReceiver starts. + * This could lead to missed records if data is added to the stream while no KinesisReceivers are running. + * In production, you'll want to switch to InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis limit) of previous stream data + * depending on the checkpoint frequency. + * + * InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records depending on the checkpoint frequency. + * Record processing should be idempotent when possible. + * + * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs + * Instance profile credentials - delivered through the Amazon EC2 metadata service + * + * Usage: KinesisWordCount stream-name endpoint-url batch-interval + * stream-name is the name of the Kinesis stream (ie. mySparkStream) + * endpoint-url is the endpoint of the Kinesis service (ie. https://kinesis.us-east-1.amazonaws.com) + * batch-interval is the batch interval in millis (ie. 1000ms) + * + * Example: + * $ export AWS_ACCESS_KEY_ID=your-access-key + * $ export AWS_SECRET_KEY=your-secret-key + *$ bin/run-kinesis-example \ + *org.apache.spark.examples.streaming.KinesisWordCount mySparkStream https://kinesis.us-east-1.amazonaws.com 100 + * + * There is a companion helper class below called KinesisWordCountProducer which puts dummy data onto the Kinesis stream. + * Usage instructions for KinesisWordCountProducer are provided in that class definition. + */ +object KinesisWordCount extends Logging { + val WordSeparator = + + def main(args: Array[String]) { +/** + * Check that all required args were passed in. + */ +if (args.length 3) { + System.err.println(Usage: KinesisWordCount stream-name
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15620896 --- Diff: extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException +import org.apache.spark.Logging +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import scala.util.Random +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.streaming.util.Clock +import org.apache.spark.streaming.util.SystemClock + +/** + * Facade to create the Scala-based or Java-based streams. + * Also, contains a reusable utility methods. + */ +object KinesisUtils extends Logging { + /** + * Create an InputDStream that pulls messages from a Kinesis stream. + * + * @param StreamingContext object + * @param app name + * @param stream name + * @param endpoint + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk of processing records more than once. + * @param in the absence of Kinesis checkpoint info, this is the worker's initial starting position in the stream. + * The values are either the beginning of the stream per Kinesis' limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) + * or the tip of the stream using InitialPositionInStream.LATEST. + * The default is StorageLevel.MEMORY_AND_DISK_2 which replicates in-memory and on-disk to 2 nodes total (primary and secondary) + * + * @return ReceiverInputDStream[Array[Byte]] + */ + def createStream( +ssc: StreamingContext, +app: String, +stream: String, +endpoint: String, +checkpointIntervalMillis: Long, +initialPositionInStream: InitialPositionInStream = InitialPositionInStream.TRIM_HORIZON, +storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2): ReceiverInputDStream[Array[Byte]] = { + +ssc.receiverStream(new KinesisReceiver(app, stream, endpoint, checkpointIntervalMillis, initialPositionInStream, storageLevel)) + } + + /** + * Create a Java-friendly InputDStream that pulls messages from a Kinesis stream. + * + * @param JavaStreamingContext object + * @param app name + * @param stream name + * @param endpoint + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk of processing records more than once. + * @param in the absence of Kinesis checkpoint info, this is the worker's initial starting position in the stream. + * The values are either the beginning of the stream
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1434#issuecomment-50701651 QA tests have started for PR 1434. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17530/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15623324 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.Logging +import org.apache.spark.annotation.Experimental +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + + +/** + * Facade to create the Scala-based or Java-based streams. + * Also, contains a reusable utility methods. + * :: Experimental :: + */ +@Experimental +object KinesisUtils extends Logging { + /** + * Create an InputDStream that pulls messages from a Kinesis stream. + * + * @param StreamingContext object + * @param appName Kinesis Application Name. Kinesis Apps are mapped to Kinesis Streams --- End diff -- Should `@param ssc StreamingContext object` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15623331 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.Logging +import org.apache.spark.annotation.Experimental +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + + +/** + * Facade to create the Scala-based or Java-based streams. + * Also, contains a reusable utility methods. + * :: Experimental :: + */ +@Experimental +object KinesisUtils extends Logging { + /** + * Create an InputDStream that pulls messages from a Kinesis stream. + * + * @param StreamingContext object --- End diff -- Should `@param ssc StreamingContext object` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15623426 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.Logging +import org.apache.spark.annotation.Experimental +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + + +/** + * Facade to create the Scala-based or Java-based streams. + * Also, contains a reusable utility methods. + * :: Experimental :: + */ +@Experimental +object KinesisUtils extends Logging { + /** + * Create an InputDStream that pulls messages from a Kinesis stream. + * + * @param StreamingContext object + * @param appName Kinesis Application Name. Kinesis Apps are mapped to Kinesis Streams + * by the Kinesis Client Library. If you change the App name or Stream name, + * the KCL will throw errors. + * @param stream Kinesis Stream Name + * @param endpoint url of Kinesis service + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types + * of checkpoints. + * @param initialPositionInStream in the absence of Kinesis checkpoint info, this is the + * worker's initial starting position in the stream. + * The values are either the beginning of the stream per Kinesis' limit of 24 hours + * (InitialPositionInStream.TRIM_HORIZON) or the tip of the stream + * (InitialPositionInStream.LATEST). + * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk --- End diff -- This is wrong now as there is not default. Why was the default values removed? If there was issues with Java compatibility then its good to create alternate versions of createStream, with less number of parameters (so basically defining default parameters explicitly). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15623520 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.Logging +import org.apache.spark.annotation.Experimental +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + + +/** + * Facade to create the Scala-based or Java-based streams. + * Also, contains a reusable utility methods. --- End diff -- Better to make it more explicit. `Utility / Helper class to create input stream with Amazon Kinesis` No need to add lower level details like reusable methods and stuff --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1434#issuecomment-50704814 Chris, thanks for this very thorough PR. One thought I have is that maybe we should put the example in the `examples` package, and make that depend on Kinesis. The Twitter example is in there. Now this would make the examples package depend on ASL, but I don't think anyone links to that directly, or we can even decide not to publish it to Maven. And to differentiate this file from the others, you could add ASL in its file name. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15623527 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.Logging +import org.apache.spark.annotation.Experimental +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + + +/** + * Facade to create the Scala-based or Java-based streams. + * Also, contains a reusable utility methods. + * :: Experimental :: + */ +@Experimental +object KinesisUtils extends Logging { + /** + * Create an InputDStream that pulls messages from a Kinesis stream. + * + * @param StreamingContext object + * @param appName Kinesis Application Name. Kinesis Apps are mapped to Kinesis Streams + * by the Kinesis Client Library. If you change the App name or Stream name, + * the KCL will throw errors. + * @param stream Kinesis Stream Name + * @param endpoint url of Kinesis service + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types + * of checkpoints. + * @param initialPositionInStream in the absence of Kinesis checkpoint info, this is the + * worker's initial starting position in the stream. + * The values are either the beginning of the stream per Kinesis' limit of 24 hours + * (InitialPositionInStream.TRIM_HORIZON) or the tip of the stream + * (InitialPositionInStream.LATEST). + * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk + * of processing records more than once. + * @param storageLevel The default is StorageLevel.MEMORY_AND_DISK_2 which replicates in-memory + * and on-disk to 2 nodes total (primary and secondary) + * + * @return ReceiverInputDStream[Array[Byte]] + */ + def createStream( + ssc: StreamingContext, + appName: String, + stream: String, + endpoint: String, + checkpointIntervalMillis: Long, + initialPositionInStream: InitialPositionInStream, + storageLevel: StorageLevel): ReceiverInputDStream[Array[Byte]] = { +ssc.receiverStream(new KinesisReceiver(appName, stream, endpoint, checkpointIntervalMillis, +initialPositionInStream, storageLevel)) + } + + /** + * Create a Java-friendly InputDStream that pulls messages from a Kinesis stream. + * + * @param JavaStreamingContext object + * @param appName Kinesis Application Name. Kinesis Apps are mapped to Kinesis Streams + * by the Kinesis Client Library. If you change the App name or Stream name, + * the KCL will throw errors. + * @param stream Kinesis Stream Name + * @param endpoint url of Kinesis service + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types + * of checkpoints. + * @param initialPositionInStream in the absence of Kinesis checkpoint info, this is the + * worker's initial starting position in the stream. + * The values are either the beginning of the stream per Kinesis' limit of 24 hours + * (InitialPositionInStream.TRIM_HORIZON) or the tip of the stream + * (InitialPositionInStream.LATEST). + * The default is TRIM_HORIZON to
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15623534 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.Logging +import org.apache.spark.annotation.Experimental +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + + +/** + * Facade to create the Scala-based or Java-based streams. + * Also, contains a reusable utility methods. + * :: Experimental :: + */ +@Experimental +object KinesisUtils extends Logging { + /** + * Create an InputDStream that pulls messages from a Kinesis stream. + * + * @param StreamingContext object + * @param appName Kinesis Application Name. Kinesis Apps are mapped to Kinesis Streams + * by the Kinesis Client Library. If you change the App name or Stream name, + * the KCL will throw errors. + * @param stream Kinesis Stream Name + * @param endpoint url of Kinesis service + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types + * of checkpoints. + * @param initialPositionInStream in the absence of Kinesis checkpoint info, this is the + * worker's initial starting position in the stream. + * The values are either the beginning of the stream per Kinesis' limit of 24 hours + * (InitialPositionInStream.TRIM_HORIZON) or the tip of the stream + * (InitialPositionInStream.LATEST). + * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk + * of processing records more than once. + * @param storageLevel The default is StorageLevel.MEMORY_AND_DISK_2 which replicates in-memory + * and on-disk to 2 nodes total (primary and secondary) + * + * @return ReceiverInputDStream[Array[Byte]] + */ + def createStream( + ssc: StreamingContext, + appName: String, + stream: String, + endpoint: String, + checkpointIntervalMillis: Long, + initialPositionInStream: InitialPositionInStream, + storageLevel: StorageLevel): ReceiverInputDStream[Array[Byte]] = { +ssc.receiverStream(new KinesisReceiver(appName, stream, endpoint, checkpointIntervalMillis, +initialPositionInStream, storageLevel)) + } + + /** + * Create a Java-friendly InputDStream that pulls messages from a Kinesis stream. + * + * @param JavaStreamingContext object --- End diff -- Same comment as with ssc, should be `@param ssc StreamingContext object` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15623553 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.Logging +import org.apache.spark.annotation.Experimental +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + + +/** + * Facade to create the Scala-based or Java-based streams. + * Also, contains a reusable utility methods. + * :: Experimental :: + */ +@Experimental +object KinesisUtils extends Logging { + /** + * Create an InputDStream that pulls messages from a Kinesis stream. + * + * @param StreamingContext object + * @param appName Kinesis Application Name. Kinesis Apps are mapped to Kinesis Streams + * by the Kinesis Client Library. If you change the App name or Stream name, + * the KCL will throw errors. + * @param stream Kinesis Stream Name + * @param endpoint url of Kinesis service + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). --- End diff -- Please set the field names correctly. Otherwise the scala docs would not work right. `@param checkpointIntervalMillis Kinesis checkpoint interval in milliseconds` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15623547 --- Diff: project/SparkBuild.scala --- @@ -62,7 +62,7 @@ object SparkBuild extends PomBuild { var isAlphaYarn = false var profiles: mutable.Seq[String] = mutable.Seq.empty if (Properties.envOrNone(SPARK_GANGLIA_LGPL).isDefined) { - println(NOTE: SPARK_GANGLIA_LGPL is deprecated, please use -Pganglia-lgpl flag.) + println(NOTE: SPARK_GANGLIA_LGPL is deprecated, please use -Pspark-ganglia-lgpl flag.) --- End diff -- As per Patrick's point, we probably don't need to change the name of this profile, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1434#issuecomment-50443974 @pdeyhim can you take a look over this too when you have a chance? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/1434#issuecomment-50511469 Jenkins, this is ok to test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/1434#issuecomment-50511485 Jenkins, test this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1434#issuecomment-50512322 QA tests have started for PR 1434. This patch DID NOT merge cleanly! brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17365/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1434#issuecomment-50525716 QA results for PR 1434:br- This patch PASSES unit tests.brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17365/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/1434#issuecomment-50530784 Hey @cfregly, My apologies for being so late to review this. Seems like the PR requires merging with the master, probably because of changes to pom.xml. Would you get a chance to merge with the master once again? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15549163 --- Diff: extras/spark-kinesis-asl/pom.xml --- @@ -0,0 +1,98 @@ +?xml version=1.0 encoding=UTF-8? --- End diff -- Can you rename is the directory `spark-kinesis-asl` to just `kinesis`. Its already in spark code base, so does not quite need another `spark` in the directory name. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15549239 --- Diff: bin/run-kinesis-example --- @@ -0,0 +1,60 @@ +#!/usr/bin/env bash + --- End diff -- @pdeyhim explained me that this script is necessary as the jar for kinesis is not included in the spark jar. That makes sense. But its best not to put such module-specific scripts in the main spark/bin directory. Its best to put this in `extras/kinesis/bin/` . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15549518 --- Diff: extras/spark-kinesis-asl/pom.xml --- @@ -0,0 +1,98 @@ +?xml version=1.0 encoding=UTF-8? --- End diff -- Aah, I understand, you must have followed the existing `spark-ganglia-gpl` directory name. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15552530 --- Diff: extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.net.InetAddress +import java.util.UUID +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.receiver.Receiver +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker +import java.nio.ByteBuffer +import org.apache.spark.streaming.util.SystemClock + +/** + * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver. + * This implementation relies on the Kinesis Client Library (KCL) Worker as described here: + * https://github.com/awslabs/amazon-kinesis-client + * This is a custom receiver used with StreamingContext.receiverStream(Receiver) as described here: + * http://spark.apache.org/docs/latest/streaming-custom-receivers.html + * Instances of this class will get shipped to the Spark Streaming Workers to run within a Spark Executor. + * + * @param app name --- End diff -- What is this app name? Please add to comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15552628 --- Diff: extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.net.InetAddress +import java.util.UUID +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.receiver.Receiver +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker +import java.nio.ByteBuffer +import org.apache.spark.streaming.util.SystemClock + +/** + * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver. + * This implementation relies on the Kinesis Client Library (KCL) Worker as described here: + * https://github.com/awslabs/amazon-kinesis-client + * This is a custom receiver used with StreamingContext.receiverStream(Receiver) as described here: + * http://spark.apache.org/docs/latest/streaming-custom-receivers.html + * Instances of this class will get shipped to the Spark Streaming Workers to run within a Spark Executor. + * + * @param app name + * @param Kinesis stream name + * @param endpoint url of Kinesis service + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * @param in the absence of Kinesis checkpoint info, this is the worker's initial starting position in the stream. + * The values are either the beginning of the stream per Kinesis' limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) + * or the tip of the stream using InitialPositionInStream.LATEST. + * @param persistence strategy for RDDs and DStreams. + */ +private[streaming] class KinesisReceiver( + app: String, --- End diff -- Formatting incorrect. 4 spaces necessary here. Please see the spark style guide https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15554012 --- Diff: extras/spark-kinesis-asl/pom.xml --- @@ -0,0 +1,98 @@ +?xml version=1.0 encoding=UTF-8? --- End diff -- Okay, I spoke to @pwendell and there is not need to have `spark-` as part of the name directory name and profile name. So can you make it `kinesis-asl` at all places. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---