http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala ---------------------------------------------------------------------- diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala deleted file mode 100644 index 15ac588..0000000 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala +++ /dev/null @@ -1,560 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.kinesis - -import scala.reflect.ClassTag - -import com.amazonaws.regions.RegionUtils -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream -import com.amazonaws.services.kinesis.model.Record - -import org.apache.spark.api.java.function.{Function => JFunction} -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Duration, StreamingContext} -import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} -import org.apache.spark.streaming.dstream.ReceiverInputDStream - -object KinesisUtils { - /** - * Create an input stream that pulls messages from a Kinesis stream. - * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. - * - * Note: The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain - * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain - * gets the AWS credentials. - * - * @param ssc StreamingContext object - * @param kinesisAppName Kinesis application name used by the Kinesis Client Library - * (KCL) to update DynamoDB - * @param streamName Kinesis stream name - * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) - * @param regionName Name of region used by the Kinesis Client Library (KCL) to update - * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) - * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the - * worker's initial starting position in the stream. - * The values are either the beginning of the stream - * per Kinesis' limit of 24 hours - * (InitialPositionInStream.TRIM_HORIZON) or - * the tip of the stream (InitialPositionInStream.LATEST). - * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. - * See the Kinesis Spark Streaming documentation for more - * details on the different types of checkpoints. - * @param storageLevel Storage level to use for storing the received objects. - * StorageLevel.MEMORY_AND_DISK_2 is recommended. - * @param messageHandler A custom message handler that can generate a generic output from a - * Kinesis `Record`, which contains both message data, and metadata. - */ - def createStream[T: ClassTag]( - ssc: StreamingContext, - kinesisAppName: String, - streamName: String, - endpointUrl: String, - regionName: String, - initialPositionInStream: InitialPositionInStream, - checkpointInterval: Duration, - storageLevel: StorageLevel, - messageHandler: Record => T): ReceiverInputDStream[T] = { - val cleanedHandler = ssc.sc.clean(messageHandler) - // Setting scope to override receiver stream's scope of "receiver stream" - ssc.withNamedScope("kinesis stream") { - new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName), - initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel, - cleanedHandler, None) - } - } - - /** - * Create an input stream that pulls messages from a Kinesis stream. - * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. - * - * Note: - * The given AWS credentials will get saved in DStream checkpoints if checkpointing - * is enabled. Make sure that your checkpoint directory is secure. - * - * @param ssc StreamingContext object - * @param kinesisAppName Kinesis application name used by the Kinesis Client Library - * (KCL) to update DynamoDB - * @param streamName Kinesis stream name - * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) - * @param regionName Name of region used by the Kinesis Client Library (KCL) to update - * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) - * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the - * worker's initial starting position in the stream. - * The values are either the beginning of the stream - * per Kinesis' limit of 24 hours - * (InitialPositionInStream.TRIM_HORIZON) or - * the tip of the stream (InitialPositionInStream.LATEST). - * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. - * See the Kinesis Spark Streaming documentation for more - * details on the different types of checkpoints. - * @param storageLevel Storage level to use for storing the received objects. - * StorageLevel.MEMORY_AND_DISK_2 is recommended. - * @param messageHandler A custom message handler that can generate a generic output from a - * Kinesis `Record`, which contains both message data, and metadata. - * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain) - * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain) - */ - // scalastyle:off - def createStream[T: ClassTag]( - ssc: StreamingContext, - kinesisAppName: String, - streamName: String, - endpointUrl: String, - regionName: String, - initialPositionInStream: InitialPositionInStream, - checkpointInterval: Duration, - storageLevel: StorageLevel, - messageHandler: Record => T, - awsAccessKeyId: String, - awsSecretKey: String): ReceiverInputDStream[T] = { - // scalastyle:on - val cleanedHandler = ssc.sc.clean(messageHandler) - ssc.withNamedScope("kinesis stream") { - new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName), - initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel, - cleanedHandler, Some(SerializableAWSCredentials(awsAccessKeyId, awsSecretKey))) - } - } - - /** - * Create an input stream that pulls messages from a Kinesis stream. - * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. - * - * Note: The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain - * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain - * gets the AWS credentials. - * - * @param ssc StreamingContext object - * @param kinesisAppName Kinesis application name used by the Kinesis Client Library - * (KCL) to update DynamoDB - * @param streamName Kinesis stream name - * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) - * @param regionName Name of region used by the Kinesis Client Library (KCL) to update - * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) - * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the - * worker's initial starting position in the stream. - * The values are either the beginning of the stream - * per Kinesis' limit of 24 hours - * (InitialPositionInStream.TRIM_HORIZON) or - * the tip of the stream (InitialPositionInStream.LATEST). - * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. - * See the Kinesis Spark Streaming documentation for more - * details on the different types of checkpoints. - * @param storageLevel Storage level to use for storing the received objects. - * StorageLevel.MEMORY_AND_DISK_2 is recommended. - */ - def createStream( - ssc: StreamingContext, - kinesisAppName: String, - streamName: String, - endpointUrl: String, - regionName: String, - initialPositionInStream: InitialPositionInStream, - checkpointInterval: Duration, - storageLevel: StorageLevel): ReceiverInputDStream[Array[Byte]] = { - // Setting scope to override receiver stream's scope of "receiver stream" - ssc.withNamedScope("kinesis stream") { - new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, validateRegion(regionName), - initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel, - defaultMessageHandler, None) - } - } - - /** - * Create an input stream that pulls messages from a Kinesis stream. - * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. - * - * Note: - * The given AWS credentials will get saved in DStream checkpoints if checkpointing - * is enabled. Make sure that your checkpoint directory is secure. - * - * @param ssc StreamingContext object - * @param kinesisAppName Kinesis application name used by the Kinesis Client Library - * (KCL) to update DynamoDB - * @param streamName Kinesis stream name - * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) - * @param regionName Name of region used by the Kinesis Client Library (KCL) to update - * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) - * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the - * worker's initial starting position in the stream. - * The values are either the beginning of the stream - * per Kinesis' limit of 24 hours - * (InitialPositionInStream.TRIM_HORIZON) or - * the tip of the stream (InitialPositionInStream.LATEST). - * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. - * See the Kinesis Spark Streaming documentation for more - * details on the different types of checkpoints. - * @param storageLevel Storage level to use for storing the received objects. - * StorageLevel.MEMORY_AND_DISK_2 is recommended. - * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain) - * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain) - */ - def createStream( - ssc: StreamingContext, - kinesisAppName: String, - streamName: String, - endpointUrl: String, - regionName: String, - initialPositionInStream: InitialPositionInStream, - checkpointInterval: Duration, - storageLevel: StorageLevel, - awsAccessKeyId: String, - awsSecretKey: String): ReceiverInputDStream[Array[Byte]] = { - ssc.withNamedScope("kinesis stream") { - new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, validateRegion(regionName), - initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel, - defaultMessageHandler, Some(SerializableAWSCredentials(awsAccessKeyId, awsSecretKey))) - } - } - - /** - * Create an input stream that pulls messages from a Kinesis stream. - * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. - * - * Note: - * - * - The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain - * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain - * gets AWS credentials. - * - The region of the `endpointUrl` will be used for DynamoDB and CloudWatch. - * - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name - * in [[org.apache.spark.SparkConf]]. - * - * @param ssc StreamingContext object - * @param streamName Kinesis stream name - * @param endpointUrl Endpoint url of Kinesis service - * (e.g., https://kinesis.us-east-1.amazonaws.com) - * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. - * See the Kinesis Spark Streaming documentation for more - * details on the different types of checkpoints. - * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the - * worker's initial starting position in the stream. - * The values are either the beginning of the stream - * per Kinesis' limit of 24 hours - * (InitialPositionInStream.TRIM_HORIZON) or - * the tip of the stream (InitialPositionInStream.LATEST). - * @param storageLevel Storage level to use for storing the received objects - * StorageLevel.MEMORY_AND_DISK_2 is recommended. - */ - @deprecated("use other forms of createStream", "1.4.0") - def createStream( - ssc: StreamingContext, - streamName: String, - endpointUrl: String, - checkpointInterval: Duration, - initialPositionInStream: InitialPositionInStream, - storageLevel: StorageLevel - ): ReceiverInputDStream[Array[Byte]] = { - ssc.withNamedScope("kinesis stream") { - new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, - getRegionByEndpoint(endpointUrl), initialPositionInStream, ssc.sc.appName, - checkpointInterval, storageLevel, defaultMessageHandler, None) - } - } - - /** - * Create an input stream that pulls messages from a Kinesis stream. - * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. - * - * Note: The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain - * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain - * gets the AWS credentials. - * - * @param jssc Java StreamingContext object - * @param kinesisAppName Kinesis application name used by the Kinesis Client Library - * (KCL) to update DynamoDB - * @param streamName Kinesis stream name - * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) - * @param regionName Name of region used by the Kinesis Client Library (KCL) to update - * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) - * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the - * worker's initial starting position in the stream. - * The values are either the beginning of the stream - * per Kinesis' limit of 24 hours - * (InitialPositionInStream.TRIM_HORIZON) or - * the tip of the stream (InitialPositionInStream.LATEST). - * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. - * See the Kinesis Spark Streaming documentation for more - * details on the different types of checkpoints. - * @param storageLevel Storage level to use for storing the received objects. - * StorageLevel.MEMORY_AND_DISK_2 is recommended. - * @param messageHandler A custom message handler that can generate a generic output from a - * Kinesis `Record`, which contains both message data, and metadata. - * @param recordClass Class of the records in DStream - */ - def createStream[T]( - jssc: JavaStreamingContext, - kinesisAppName: String, - streamName: String, - endpointUrl: String, - regionName: String, - initialPositionInStream: InitialPositionInStream, - checkpointInterval: Duration, - storageLevel: StorageLevel, - messageHandler: JFunction[Record, T], - recordClass: Class[T]): JavaReceiverInputDStream[T] = { - implicit val recordCmt: ClassTag[T] = ClassTag(recordClass) - val cleanedHandler = jssc.sparkContext.clean(messageHandler.call(_)) - createStream[T](jssc.ssc, kinesisAppName, streamName, endpointUrl, regionName, - initialPositionInStream, checkpointInterval, storageLevel, cleanedHandler) - } - - /** - * Create an input stream that pulls messages from a Kinesis stream. - * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. - * - * Note: - * The given AWS credentials will get saved in DStream checkpoints if checkpointing - * is enabled. Make sure that your checkpoint directory is secure. - * - * @param jssc Java StreamingContext object - * @param kinesisAppName Kinesis application name used by the Kinesis Client Library - * (KCL) to update DynamoDB - * @param streamName Kinesis stream name - * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) - * @param regionName Name of region used by the Kinesis Client Library (KCL) to update - * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) - * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the - * worker's initial starting position in the stream. - * The values are either the beginning of the stream - * per Kinesis' limit of 24 hours - * (InitialPositionInStream.TRIM_HORIZON) or - * the tip of the stream (InitialPositionInStream.LATEST). - * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. - * See the Kinesis Spark Streaming documentation for more - * details on the different types of checkpoints. - * @param storageLevel Storage level to use for storing the received objects. - * StorageLevel.MEMORY_AND_DISK_2 is recommended. - * @param messageHandler A custom message handler that can generate a generic output from a - * Kinesis `Record`, which contains both message data, and metadata. - * @param recordClass Class of the records in DStream - * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain) - * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain) - */ - // scalastyle:off - def createStream[T]( - jssc: JavaStreamingContext, - kinesisAppName: String, - streamName: String, - endpointUrl: String, - regionName: String, - initialPositionInStream: InitialPositionInStream, - checkpointInterval: Duration, - storageLevel: StorageLevel, - messageHandler: JFunction[Record, T], - recordClass: Class[T], - awsAccessKeyId: String, - awsSecretKey: String): JavaReceiverInputDStream[T] = { - // scalastyle:on - implicit val recordCmt: ClassTag[T] = ClassTag(recordClass) - val cleanedHandler = jssc.sparkContext.clean(messageHandler.call(_)) - createStream[T](jssc.ssc, kinesisAppName, streamName, endpointUrl, regionName, - initialPositionInStream, checkpointInterval, storageLevel, cleanedHandler, - awsAccessKeyId, awsSecretKey) - } - - /** - * Create an input stream that pulls messages from a Kinesis stream. - * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. - * - * Note: The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain - * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain - * gets the AWS credentials. - * - * @param jssc Java StreamingContext object - * @param kinesisAppName Kinesis application name used by the Kinesis Client Library - * (KCL) to update DynamoDB - * @param streamName Kinesis stream name - * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) - * @param regionName Name of region used by the Kinesis Client Library (KCL) to update - * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) - * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the - * worker's initial starting position in the stream. - * The values are either the beginning of the stream - * per Kinesis' limit of 24 hours - * (InitialPositionInStream.TRIM_HORIZON) or - * the tip of the stream (InitialPositionInStream.LATEST). - * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. - * See the Kinesis Spark Streaming documentation for more - * details on the different types of checkpoints. - * @param storageLevel Storage level to use for storing the received objects. - * StorageLevel.MEMORY_AND_DISK_2 is recommended. - */ - def createStream( - jssc: JavaStreamingContext, - kinesisAppName: String, - streamName: String, - endpointUrl: String, - regionName: String, - initialPositionInStream: InitialPositionInStream, - checkpointInterval: Duration, - storageLevel: StorageLevel - ): JavaReceiverInputDStream[Array[Byte]] = { - createStream[Array[Byte]](jssc.ssc, kinesisAppName, streamName, endpointUrl, regionName, - initialPositionInStream, checkpointInterval, storageLevel, defaultMessageHandler(_)) - } - - /** - * Create an input stream that pulls messages from a Kinesis stream. - * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. - * - * Note: - * The given AWS credentials will get saved in DStream checkpoints if checkpointing - * is enabled. Make sure that your checkpoint directory is secure. - * - * @param jssc Java StreamingContext object - * @param kinesisAppName Kinesis application name used by the Kinesis Client Library - * (KCL) to update DynamoDB - * @param streamName Kinesis stream name - * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) - * @param regionName Name of region used by the Kinesis Client Library (KCL) to update - * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) - * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the - * worker's initial starting position in the stream. - * The values are either the beginning of the stream - * per Kinesis' limit of 24 hours - * (InitialPositionInStream.TRIM_HORIZON) or - * the tip of the stream (InitialPositionInStream.LATEST). - * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. - * See the Kinesis Spark Streaming documentation for more - * details on the different types of checkpoints. - * @param storageLevel Storage level to use for storing the received objects. - * StorageLevel.MEMORY_AND_DISK_2 is recommended. - * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain) - * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain) - */ - def createStream( - jssc: JavaStreamingContext, - kinesisAppName: String, - streamName: String, - endpointUrl: String, - regionName: String, - initialPositionInStream: InitialPositionInStream, - checkpointInterval: Duration, - storageLevel: StorageLevel, - awsAccessKeyId: String, - awsSecretKey: String): JavaReceiverInputDStream[Array[Byte]] = { - createStream[Array[Byte]](jssc.ssc, kinesisAppName, streamName, endpointUrl, regionName, - initialPositionInStream, checkpointInterval, storageLevel, - defaultMessageHandler(_), awsAccessKeyId, awsSecretKey) - } - - /** - * Create an input stream that pulls messages from a Kinesis stream. - * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. - * - * Note: - * - The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain - * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain - * gets AWS credentials. - * - The region of the `endpointUrl` will be used for DynamoDB and CloudWatch. - * - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name in - * [[org.apache.spark.SparkConf]]. - * - * @param jssc Java StreamingContext object - * @param streamName Kinesis stream name - * @param endpointUrl Endpoint url of Kinesis service - * (e.g., https://kinesis.us-east-1.amazonaws.com) - * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. - * See the Kinesis Spark Streaming documentation for more - * details on the different types of checkpoints. - * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the - * worker's initial starting position in the stream. - * The values are either the beginning of the stream - * per Kinesis' limit of 24 hours - * (InitialPositionInStream.TRIM_HORIZON) or - * the tip of the stream (InitialPositionInStream.LATEST). - * @param storageLevel Storage level to use for storing the received objects - * StorageLevel.MEMORY_AND_DISK_2 is recommended. - */ - @deprecated("use other forms of createStream", "1.4.0") - def createStream( - jssc: JavaStreamingContext, - streamName: String, - endpointUrl: String, - checkpointInterval: Duration, - initialPositionInStream: InitialPositionInStream, - storageLevel: StorageLevel - ): JavaReceiverInputDStream[Array[Byte]] = { - createStream( - jssc.ssc, streamName, endpointUrl, checkpointInterval, initialPositionInStream, storageLevel) - } - - private def getRegionByEndpoint(endpointUrl: String): String = { - RegionUtils.getRegionByEndpoint(endpointUrl).getName() - } - - private def validateRegion(regionName: String): String = { - Option(RegionUtils.getRegion(regionName)).map { _.getName }.getOrElse { - throw new IllegalArgumentException(s"Region name '$regionName' is not valid") - } - } - - private[kinesis] def defaultMessageHandler(record: Record): Array[Byte] = { - if (record == null) return null - val byteBuffer = record.getData() - val byteArray = new Array[Byte](byteBuffer.remaining()) - byteBuffer.get(byteArray) - byteArray - } -} - -/** - * This is a helper class that wraps the methods in KinesisUtils into more Python-friendly class and - * function so that it can be easily instantiated and called from Python's KinesisUtils. - */ -private class KinesisUtilsPythonHelper { - - def getInitialPositionInStream(initialPositionInStream: Int): InitialPositionInStream = { - initialPositionInStream match { - case 0 => InitialPositionInStream.LATEST - case 1 => InitialPositionInStream.TRIM_HORIZON - case _ => throw new IllegalArgumentException( - "Illegal InitialPositionInStream. Please use " + - "InitialPositionInStream.LATEST or InitialPositionInStream.TRIM_HORIZON") - } - } - - def createStream( - jssc: JavaStreamingContext, - kinesisAppName: String, - streamName: String, - endpointUrl: String, - regionName: String, - initialPositionInStream: Int, - checkpointInterval: Duration, - storageLevel: StorageLevel, - awsAccessKeyId: String, - awsSecretKey: String - ): JavaReceiverInputDStream[Array[Byte]] = { - if (awsAccessKeyId == null && awsSecretKey != null) { - throw new IllegalArgumentException("awsSecretKey is set but awsAccessKeyId is null") - } - if (awsAccessKeyId != null && awsSecretKey == null) { - throw new IllegalArgumentException("awsAccessKeyId is set but awsSecretKey is null") - } - if (awsAccessKeyId == null && awsSecretKey == null) { - KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName, - getInitialPositionInStream(initialPositionInStream), checkpointInterval, storageLevel) - } else { - KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName, - getInitialPositionInStream(initialPositionInStream), checkpointInterval, storageLevel, - awsAccessKeyId, awsSecretKey) - } - } - -}
http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java ---------------------------------------------------------------------- diff --git a/extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java b/extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java deleted file mode 100644 index 5c2371c..0000000 --- a/extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kinesis; - -import com.amazonaws.services.kinesis.model.Record; -import org.junit.Test; - -import org.apache.spark.api.java.function.Function; -import org.apache.spark.storage.StorageLevel; -import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.LocalJavaStreamingContext; -import org.apache.spark.streaming.api.java.JavaDStream; - -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; - -/** - * Demonstrate the use of the KinesisUtils Java API - */ -public class JavaKinesisStreamSuite extends LocalJavaStreamingContext { - @Test - public void testKinesisStream() { - // Tests the API, does not actually test data receiving - JavaDStream<byte[]> kinesisStream = KinesisUtils.createStream(ssc, "mySparkStream", - "https://kinesis.us-west-2.amazonaws.com", new Duration(2000), - InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2()); - - ssc.stop(); - } - - - private static Function<Record, String> handler = new Function<Record, String>() { - @Override - public String call(Record record) { - return record.getPartitionKey() + "-" + record.getSequenceNumber(); - } - }; - - @Test - public void testCustomHandler() { - // Tests the API, does not actually test data receiving - JavaDStream<String> kinesisStream = KinesisUtils.createStream(ssc, "testApp", "mySparkStream", - "https://kinesis.us-west-2.amazonaws.com", "us-west-2", InitialPositionInStream.LATEST, - new Duration(2000), StorageLevel.MEMORY_AND_DISK_2(), handler, String.class); - - ssc.stop(); - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/extras/kinesis-asl/src/test/resources/log4j.properties b/extras/kinesis-asl/src/test/resources/log4j.properties deleted file mode 100644 index edbecda..0000000 --- a/extras/kinesis-asl/src/test/resources/log4j.properties +++ /dev/null @@ -1,27 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Set everything to be logged to the file target/unit-tests.log -log4j.rootCategory=INFO, file -log4j.appender.file=org.apache.log4j.FileAppender -log4j.appender.file.append=true -log4j.appender.file.file=target/unit-tests.log -log4j.appender.file.layout=org.apache.log4j.PatternLayout -log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n - -# Ignore messages below warning level from Jetty, because it's a bit verbose -log4j.logger.org.spark-project.jetty=WARN http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala ---------------------------------------------------------------------- diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala deleted file mode 100644 index fdb270e..0000000 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.kinesis - -import java.nio.ByteBuffer - -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer - -import com.amazonaws.services.kinesis.producer.{KinesisProducer => KPLProducer, KinesisProducerConfiguration, UserRecordResult} -import com.google.common.util.concurrent.{FutureCallback, Futures} - -private[kinesis] class KPLBasedKinesisTestUtils extends KinesisTestUtils { - override protected def getProducer(aggregate: Boolean): KinesisDataGenerator = { - if (!aggregate) { - new SimpleDataGenerator(kinesisClient) - } else { - new KPLDataGenerator(regionName) - } - } -} - -/** A wrapper for the KinesisProducer provided in the KPL. */ -private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataGenerator { - - private lazy val producer: KPLProducer = { - val conf = new KinesisProducerConfiguration() - .setRecordMaxBufferedTime(1000) - .setMaxConnections(1) - .setRegion(regionName) - .setMetricsLevel("none") - - new KPLProducer(conf) - } - - override def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]] = { - val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]() - data.foreach { num => - val str = num.toString - val data = ByteBuffer.wrap(str.getBytes()) - val future = producer.addUserRecord(streamName, str, data) - val kinesisCallBack = new FutureCallback[UserRecordResult]() { - override def onFailure(t: Throwable): Unit = {} // do nothing - - override def onSuccess(result: UserRecordResult): Unit = { - val shardId = result.getShardId - val seqNumber = result.getSequenceNumber() - val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId, - new ArrayBuffer[(Int, String)]()) - sentSeqNumbers += ((num, seqNumber)) - } - } - Futures.addCallback(future, kinesisCallBack) - } - producer.flushSync() - shardIdToSeqNumbers.toMap - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala ---------------------------------------------------------------------- diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala deleted file mode 100644 index 2555332..0000000 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala +++ /dev/null @@ -1,259 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kinesis - -import org.scalatest.BeforeAndAfterEach - -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException} -import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId} - -abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean) - extends KinesisFunSuite with BeforeAndAfterEach with LocalSparkContext { - - private val testData = 1 to 8 - - private var testUtils: KinesisTestUtils = null - private var shardIds: Seq[String] = null - private var shardIdToData: Map[String, Seq[Int]] = null - private var shardIdToSeqNumbers: Map[String, Seq[String]] = null - private var shardIdToDataAndSeqNumbers: Map[String, Seq[(Int, String)]] = null - private var shardIdToRange: Map[String, SequenceNumberRange] = null - private var allRanges: Seq[SequenceNumberRange] = null - - private var blockManager: BlockManager = null - - override def beforeAll(): Unit = { - super.beforeAll() - runIfTestsEnabled("Prepare KinesisTestUtils") { - testUtils = new KPLBasedKinesisTestUtils() - testUtils.createStream() - - shardIdToDataAndSeqNumbers = testUtils.pushData(testData, aggregate = aggregateTestData) - require(shardIdToDataAndSeqNumbers.size > 1, "Need data to be sent to multiple shards") - - shardIds = shardIdToDataAndSeqNumbers.keySet.toSeq - shardIdToData = shardIdToDataAndSeqNumbers.mapValues { _.map { _._1 }} - shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.mapValues { _.map { _._2 }} - shardIdToRange = shardIdToSeqNumbers.map { case (shardId, seqNumbers) => - val seqNumRange = SequenceNumberRange( - testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last) - (shardId, seqNumRange) - } - allRanges = shardIdToRange.values.toSeq - } - } - - override def beforeEach(): Unit = { - super.beforeEach() - val conf = new SparkConf().setMaster("local[4]").setAppName("KinesisBackedBlockRDDSuite") - sc = new SparkContext(conf) - blockManager = sc.env.blockManager - } - - override def afterAll(): Unit = { - try { - if (testUtils != null) { - testUtils.deleteStream() - } - } finally { - super.afterAll() - } - } - - testIfEnabled("Basic reading from Kinesis") { - // Verify all data using multiple ranges in a single RDD partition - val receivedData1 = new KinesisBackedBlockRDD[Array[Byte]](sc, testUtils.regionName, - testUtils.endpointUrl, fakeBlockIds(1), - Array(SequenceNumberRanges(allRanges.toArray)) - ).map { bytes => new String(bytes).toInt }.collect() - assert(receivedData1.toSet === testData.toSet) - - // Verify all data using one range in each of the multiple RDD partitions - val receivedData2 = new KinesisBackedBlockRDD[Array[Byte]](sc, testUtils.regionName, - testUtils.endpointUrl, fakeBlockIds(allRanges.size), - allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray - ).map { bytes => new String(bytes).toInt }.collect() - assert(receivedData2.toSet === testData.toSet) - - // Verify ordering within each partition - val receivedData3 = new KinesisBackedBlockRDD[Array[Byte]](sc, testUtils.regionName, - testUtils.endpointUrl, fakeBlockIds(allRanges.size), - allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray - ).map { bytes => new String(bytes).toInt }.collectPartitions() - assert(receivedData3.length === allRanges.size) - for (i <- 0 until allRanges.size) { - assert(receivedData3(i).toSeq === shardIdToData(allRanges(i).shardId)) - } - } - - testIfEnabled("Read data available in both block manager and Kinesis") { - testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 2) - } - - testIfEnabled("Read data available only in block manager, not in Kinesis") { - testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 0) - } - - testIfEnabled("Read data available only in Kinesis, not in block manager") { - testRDD(numPartitions = 2, numPartitionsInBM = 0, numPartitionsInKinesis = 2) - } - - testIfEnabled("Read data available partially in block manager, rest in Kinesis") { - testRDD(numPartitions = 2, numPartitionsInBM = 1, numPartitionsInKinesis = 1) - } - - testIfEnabled("Test isBlockValid skips block fetching from block manager") { - testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 0, - testIsBlockValid = true) - } - - testIfEnabled("Test whether RDD is valid after removing blocks from block anager") { - testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 2, - testBlockRemove = true) - } - - /** - * Test the WriteAheadLogBackedRDD, by writing some partitions of the data to block manager - * and the rest to a write ahead log, and then reading reading it all back using the RDD. - * It can also test if the partitions that were read from the log were again stored in - * block manager. - * - * - * - * @param numPartitions Number of partitions in RDD - * @param numPartitionsInBM Number of partitions to write to the BlockManager. - * Partitions 0 to (numPartitionsInBM-1) will be written to BlockManager - * @param numPartitionsInKinesis Number of partitions to write to the Kinesis. - * Partitions (numPartitions - 1 - numPartitionsInKinesis) to - * (numPartitions - 1) will be written to Kinesis - * @param testIsBlockValid Test whether setting isBlockValid to false skips block fetching - * @param testBlockRemove Test whether calling rdd.removeBlock() makes the RDD still usable with - * reads falling back to the WAL - * Example with numPartitions = 5, numPartitionsInBM = 3, and numPartitionsInWAL = 4 - * - * numPartitionsInBM = 3 - * |------------------| - * | | - * 0 1 2 3 4 - * | | - * |-------------------------| - * numPartitionsInKinesis = 4 - */ - private def testRDD( - numPartitions: Int, - numPartitionsInBM: Int, - numPartitionsInKinesis: Int, - testIsBlockValid: Boolean = false, - testBlockRemove: Boolean = false - ): Unit = { - require(shardIds.size > 1, "Need at least 2 shards to test") - require(numPartitionsInBM <= shardIds.size, - "Number of partitions in BlockManager cannot be more than the Kinesis test shards available") - require(numPartitionsInKinesis <= shardIds.size, - "Number of partitions in Kinesis cannot be more than the Kinesis test shards available") - require(numPartitionsInBM <= numPartitions, - "Number of partitions in BlockManager cannot be more than that in RDD") - require(numPartitionsInKinesis <= numPartitions, - "Number of partitions in Kinesis cannot be more than that in RDD") - - // Put necessary blocks in the block manager - val blockIds = fakeBlockIds(numPartitions) - blockIds.foreach(blockManager.removeBlock(_)) - (0 until numPartitionsInBM).foreach { i => - val blockData = shardIdToData(shardIds(i)).iterator.map { _.toString.getBytes() } - blockManager.putIterator(blockIds(i), blockData, StorageLevel.MEMORY_ONLY) - } - - // Create the necessary ranges to use in the RDD - val fakeRanges = Array.fill(numPartitions - numPartitionsInKinesis)( - SequenceNumberRanges(SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy"))) - val realRanges = Array.tabulate(numPartitionsInKinesis) { i => - val range = shardIdToRange(shardIds(i + (numPartitions - numPartitionsInKinesis))) - SequenceNumberRanges(Array(range)) - } - val ranges = (fakeRanges ++ realRanges) - - - // Make sure that the left `numPartitionsInBM` blocks are in block manager, and others are not - require( - blockIds.take(numPartitionsInBM).forall(blockManager.get(_).nonEmpty), - "Expected blocks not in BlockManager" - ) - - require( - blockIds.drop(numPartitionsInBM).forall(blockManager.get(_).isEmpty), - "Unexpected blocks in BlockManager" - ) - - // Make sure that the right sequence `numPartitionsInKinesis` are configured, and others are not - require( - ranges.takeRight(numPartitionsInKinesis).forall { - _.ranges.forall { _.streamName == testUtils.streamName } - }, "Incorrect configuration of RDD, expected ranges not set: " - ) - - require( - ranges.dropRight(numPartitionsInKinesis).forall { - _.ranges.forall { _.streamName != testUtils.streamName } - }, "Incorrect configuration of RDD, unexpected ranges set" - ) - - val rdd = new KinesisBackedBlockRDD[Array[Byte]]( - sc, testUtils.regionName, testUtils.endpointUrl, blockIds, ranges) - val collectedData = rdd.map { bytes => - new String(bytes).toInt - }.collect() - assert(collectedData.toSet === testData.toSet) - - // Verify that the block fetching is skipped when isBlockValid is set to false. - // This is done by using a RDD whose data is only in memory but is set to skip block fetching - // Using that RDD will throw exception, as it skips block fetching even if the blocks are in - // in BlockManager. - if (testIsBlockValid) { - require(numPartitionsInBM === numPartitions, "All partitions must be in BlockManager") - require(numPartitionsInKinesis === 0, "No partitions must be in Kinesis") - val rdd2 = new KinesisBackedBlockRDD[Array[Byte]]( - sc, testUtils.regionName, testUtils.endpointUrl, blockIds.toArray, ranges, - isBlockIdValid = Array.fill(blockIds.length)(false)) - intercept[SparkException] { - rdd2.collect() - } - } - - // Verify that the RDD is not invalid after the blocks are removed and can still read data - // from write ahead log - if (testBlockRemove) { - require(numPartitions === numPartitionsInKinesis, - "All partitions must be in WAL for this test") - require(numPartitionsInBM > 0, "Some partitions must be in BlockManager for this test") - rdd.removeBlocks() - assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSet === testData.toSet) - } - } - - /** Generate fake block ids */ - private def fakeBlockIds(num: Int): Array[BlockId] = { - Array.tabulate(num) { i => new StreamBlockId(0, i) } - } -} - -class WithAggregationKinesisBackedBlockRDDSuite - extends KinesisBackedBlockRDDTests(aggregateTestData = true) - -class WithoutAggregationKinesisBackedBlockRDDSuite - extends KinesisBackedBlockRDDTests(aggregateTestData = false) http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala ---------------------------------------------------------------------- diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala deleted file mode 100644 index e1499a8..0000000 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kinesis - -import java.util.concurrent.{ExecutorService, TimeoutException} - -import scala.concurrent.{Await, ExecutionContext, Future} -import scala.concurrent.duration._ -import scala.language.postfixOps - -import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.mockito.invocation.InvocationOnMock -import org.mockito.stubbing.Answer -import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester} -import org.scalatest.concurrent.Eventually -import org.scalatest.concurrent.Eventually._ -import org.scalatest.mock.MockitoSugar - -import org.apache.spark.streaming.{Duration, TestSuiteBase} -import org.apache.spark.util.ManualClock - -class KinesisCheckpointerSuite extends TestSuiteBase - with MockitoSugar - with BeforeAndAfterEach - with PrivateMethodTester - with Eventually { - - private val workerId = "dummyWorkerId" - private val shardId = "dummyShardId" - private val seqNum = "123" - private val otherSeqNum = "245" - private val checkpointInterval = Duration(10) - private val someSeqNum = Some(seqNum) - private val someOtherSeqNum = Some(otherSeqNum) - - private var receiverMock: KinesisReceiver[Array[Byte]] = _ - private var checkpointerMock: IRecordProcessorCheckpointer = _ - private var kinesisCheckpointer: KinesisCheckpointer = _ - private var clock: ManualClock = _ - - private val checkpoint = PrivateMethod[Unit]('checkpoint) - - override def beforeEach(): Unit = { - receiverMock = mock[KinesisReceiver[Array[Byte]]] - checkpointerMock = mock[IRecordProcessorCheckpointer] - clock = new ManualClock() - kinesisCheckpointer = new KinesisCheckpointer(receiverMock, checkpointInterval, workerId, clock) - } - - test("checkpoint is not called twice for the same sequence number") { - when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) - kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock)) - kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock)) - - verify(checkpointerMock, times(1)).checkpoint(anyString()) - } - - test("checkpoint is called after sequence number increases") { - when(receiverMock.getLatestSeqNumToCheckpoint(shardId)) - .thenReturn(someSeqNum).thenReturn(someOtherSeqNum) - kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock)) - kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock)) - - verify(checkpointerMock, times(1)).checkpoint(seqNum) - verify(checkpointerMock, times(1)).checkpoint(otherSeqNum) - } - - test("should checkpoint if we have exceeded the checkpoint interval") { - when(receiverMock.getLatestSeqNumToCheckpoint(shardId)) - .thenReturn(someSeqNum).thenReturn(someOtherSeqNum) - - kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock) - clock.advance(5 * checkpointInterval.milliseconds) - - eventually(timeout(1 second)) { - verify(checkpointerMock, times(1)).checkpoint(seqNum) - verify(checkpointerMock, times(1)).checkpoint(otherSeqNum) - } - } - - test("shouldn't checkpoint if we have not exceeded the checkpoint interval") { - when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) - - kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock) - clock.advance(checkpointInterval.milliseconds / 2) - - verify(checkpointerMock, never()).checkpoint(anyString()) - } - - test("should not checkpoint for the same sequence number") { - when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) - - kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock) - - clock.advance(checkpointInterval.milliseconds * 5) - eventually(timeout(1 second)) { - verify(checkpointerMock, atMost(1)).checkpoint(anyString()) - } - } - - test("removing checkpointer checkpoints one last time") { - when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) - - kinesisCheckpointer.removeCheckpointer(shardId, checkpointerMock) - verify(checkpointerMock, times(1)).checkpoint(anyString()) - } - - test("if checkpointing is going on, wait until finished before removing and checkpointing") { - when(receiverMock.getLatestSeqNumToCheckpoint(shardId)) - .thenReturn(someSeqNum).thenReturn(someOtherSeqNum) - when(checkpointerMock.checkpoint(anyString)).thenAnswer(new Answer[Unit] { - override def answer(invocations: InvocationOnMock): Unit = { - clock.waitTillTime(clock.getTimeMillis() + checkpointInterval.milliseconds / 2) - } - }) - - kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock) - clock.advance(checkpointInterval.milliseconds) - eventually(timeout(1 second)) { - verify(checkpointerMock, times(1)).checkpoint(anyString()) - } - // don't block test thread - val f = Future(kinesisCheckpointer.removeCheckpointer(shardId, checkpointerMock))( - ExecutionContext.global) - - intercept[TimeoutException] { - Await.ready(f, 50 millis) - } - - clock.advance(checkpointInterval.milliseconds / 2) - eventually(timeout(1 second)) { - verify(checkpointerMock, times(2)).checkpoint(anyString()) - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala ---------------------------------------------------------------------- diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala deleted file mode 100644 index ee428f3..0000000 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kinesis - -import org.apache.spark.SparkFunSuite - -/** - * Helper class that runs Kinesis real data transfer tests or - * ignores them based on env variable is set or not. - */ -trait KinesisFunSuite extends SparkFunSuite { - import KinesisTestUtils._ - - /** Run the test if environment variable is set or ignore the test */ - def testIfEnabled(testName: String)(testBody: => Unit) { - if (shouldRunTests) { - test(testName)(testBody) - } else { - ignore(s"$testName [enable by setting env var $envVarNameForEnablingTests=1]")(testBody) - } - } - - /** Run the give body of code only if Kinesis tests are enabled */ - def runIfTestsEnabled(message: String)(body: => Unit): Unit = { - if (shouldRunTests) { - body - } else { - ignore(s"$message [enable by setting env var $envVarNameForEnablingTests=1]")() - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala ---------------------------------------------------------------------- diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala deleted file mode 100644 index fd15b6c..0000000 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.kinesis - -import java.nio.ByteBuffer -import java.nio.charset.StandardCharsets -import java.util.Arrays - -import com.amazonaws.services.kinesis.clientlibrary.exceptions._ -import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason -import com.amazonaws.services.kinesis.model.Record -import org.mockito.Matchers._ -import org.mockito.Matchers.{eq => meq} -import org.mockito.Mockito._ -import org.scalatest.{BeforeAndAfter, Matchers} -import org.scalatest.mock.MockitoSugar - -import org.apache.spark.streaming.{Duration, TestSuiteBase} -import org.apache.spark.util.Utils - -/** - * Suite of Kinesis streaming receiver tests focusing mostly on the KinesisRecordProcessor - */ -class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAfter - with MockitoSugar { - - val app = "TestKinesisReceiver" - val stream = "mySparkStream" - val endpoint = "endpoint-url" - val workerId = "dummyWorkerId" - val shardId = "dummyShardId" - val seqNum = "dummySeqNum" - val checkpointInterval = Duration(10) - val someSeqNum = Some(seqNum) - - val record1 = new Record() - record1.setData(ByteBuffer.wrap("Spark In Action".getBytes(StandardCharsets.UTF_8))) - val record2 = new Record() - record2.setData(ByteBuffer.wrap("Learning Spark".getBytes(StandardCharsets.UTF_8))) - val batch = Arrays.asList(record1, record2) - - var receiverMock: KinesisReceiver[Array[Byte]] = _ - var checkpointerMock: IRecordProcessorCheckpointer = _ - - override def beforeFunction(): Unit = { - receiverMock = mock[KinesisReceiver[Array[Byte]]] - checkpointerMock = mock[IRecordProcessorCheckpointer] - } - - test("check serializability of SerializableAWSCredentials") { - Utils.deserialize[SerializableAWSCredentials]( - Utils.serialize(new SerializableAWSCredentials("x", "y"))) - } - - test("process records including store and set checkpointer") { - when(receiverMock.isStopped()).thenReturn(false) - - val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId) - recordProcessor.initialize(shardId) - recordProcessor.processRecords(batch, checkpointerMock) - - verify(receiverMock, times(1)).isStopped() - verify(receiverMock, times(1)).addRecords(shardId, batch) - verify(receiverMock, times(1)).setCheckpointer(shardId, checkpointerMock) - } - - test("shouldn't store and update checkpointer when receiver is stopped") { - when(receiverMock.isStopped()).thenReturn(true) - - val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId) - recordProcessor.processRecords(batch, checkpointerMock) - - verify(receiverMock, times(1)).isStopped() - verify(receiverMock, never).addRecords(anyString, anyListOf(classOf[Record])) - verify(receiverMock, never).setCheckpointer(anyString, meq(checkpointerMock)) - } - - test("shouldn't update checkpointer when exception occurs during store") { - when(receiverMock.isStopped()).thenReturn(false) - when( - receiverMock.addRecords(anyString, anyListOf(classOf[Record])) - ).thenThrow(new RuntimeException()) - - intercept[RuntimeException] { - val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId) - recordProcessor.initialize(shardId) - recordProcessor.processRecords(batch, checkpointerMock) - } - - verify(receiverMock, times(1)).isStopped() - verify(receiverMock, times(1)).addRecords(shardId, batch) - verify(receiverMock, never).setCheckpointer(anyString, meq(checkpointerMock)) - } - - test("shutdown should checkpoint if the reason is TERMINATE") { - when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) - - val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId) - recordProcessor.initialize(shardId) - recordProcessor.shutdown(checkpointerMock, ShutdownReason.TERMINATE) - - verify(receiverMock, times(1)).removeCheckpointer(meq(shardId), meq(checkpointerMock)) - } - - - test("shutdown should not checkpoint if the reason is something other than TERMINATE") { - when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) - - val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId) - recordProcessor.initialize(shardId) - recordProcessor.shutdown(checkpointerMock, ShutdownReason.ZOMBIE) - recordProcessor.shutdown(checkpointerMock, null) - - verify(receiverMock, times(2)).removeCheckpointer(meq(shardId), - meq[IRecordProcessorCheckpointer](null)) - } - - test("retry success on first attempt") { - val expectedIsStopped = false - when(receiverMock.isStopped()).thenReturn(expectedIsStopped) - - val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100) - assert(actualVal == expectedIsStopped) - - verify(receiverMock, times(1)).isStopped() - } - - test("retry success on second attempt after a Kinesis throttling exception") { - val expectedIsStopped = false - when(receiverMock.isStopped()) - .thenThrow(new ThrottlingException("error message")) - .thenReturn(expectedIsStopped) - - val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100) - assert(actualVal == expectedIsStopped) - - verify(receiverMock, times(2)).isStopped() - } - - test("retry success on second attempt after a Kinesis dependency exception") { - val expectedIsStopped = false - when(receiverMock.isStopped()) - .thenThrow(new KinesisClientLibDependencyException("error message")) - .thenReturn(expectedIsStopped) - - val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100) - assert(actualVal == expectedIsStopped) - - verify(receiverMock, times(2)).isStopped() - } - - test("retry failed after a shutdown exception") { - when(checkpointerMock.checkpoint()).thenThrow(new ShutdownException("error message")) - - intercept[ShutdownException] { - KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100) - } - - verify(checkpointerMock, times(1)).checkpoint() - } - - test("retry failed after an invalid state exception") { - when(checkpointerMock.checkpoint()).thenThrow(new InvalidStateException("error message")) - - intercept[InvalidStateException] { - KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100) - } - - verify(checkpointerMock, times(1)).checkpoint() - } - - test("retry failed after unexpected exception") { - when(checkpointerMock.checkpoint()).thenThrow(new RuntimeException("error message")) - - intercept[RuntimeException] { - KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100) - } - - verify(checkpointerMock, times(1)).checkpoint() - } - - test("retry failed after exhausing all retries") { - val expectedErrorMessage = "final try error message" - when(checkpointerMock.checkpoint()) - .thenThrow(new ThrottlingException("error message")) - .thenThrow(new ThrottlingException(expectedErrorMessage)) - - val exception = intercept[RuntimeException] { - KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100) - } - exception.getMessage().shouldBe(expectedErrorMessage) - - verify(checkpointerMock, times(2)).checkpoint() - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala ---------------------------------------------------------------------- diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala deleted file mode 100644 index ca5d13d..0000000 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ /dev/null @@ -1,297 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kinesis - -import scala.collection.mutable -import scala.concurrent.duration._ -import scala.language.postfixOps -import scala.util.Random - -import com.amazonaws.regions.RegionUtils -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream -import com.amazonaws.services.kinesis.model.Record -import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} -import org.scalatest.Matchers._ -import org.scalatest.concurrent.Eventually - -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.network.util.JavaUtils -import org.apache.spark.rdd.RDD -import org.apache.spark.storage.{StorageLevel, StreamBlockId} -import org.apache.spark.streaming._ -import org.apache.spark.streaming.dstream.ReceiverInputDStream -import org.apache.spark.streaming.kinesis.KinesisTestUtils._ -import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult -import org.apache.spark.streaming.scheduler.ReceivedBlockInfo -import org.apache.spark.util.Utils - -abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFunSuite - with Eventually with BeforeAndAfter with BeforeAndAfterAll { - - // This is the name that KCL will use to save metadata to DynamoDB - private val appName = s"KinesisStreamSuite-${math.abs(Random.nextLong())}" - private val batchDuration = Seconds(1) - - // Dummy parameters for API testing - private val dummyEndpointUrl = defaultEndpointUrl - private val dummyRegionName = RegionUtils.getRegionByEndpoint(dummyEndpointUrl).getName() - private val dummyAWSAccessKey = "dummyAccessKey" - private val dummyAWSSecretKey = "dummySecretKey" - - private var testUtils: KinesisTestUtils = null - private var ssc: StreamingContext = null - private var sc: SparkContext = null - - override def beforeAll(): Unit = { - val conf = new SparkConf() - .setMaster("local[4]") - .setAppName("KinesisStreamSuite") // Setting Spark app name to Kinesis app name - sc = new SparkContext(conf) - - runIfTestsEnabled("Prepare KinesisTestUtils") { - testUtils = new KPLBasedKinesisTestUtils() - testUtils.createStream() - } - } - - override def afterAll(): Unit = { - if (ssc != null) { - ssc.stop() - } - if (sc != null) { - sc.stop() - } - if (testUtils != null) { - // Delete the Kinesis stream as well as the DynamoDB table generated by - // Kinesis Client Library when consuming the stream - testUtils.deleteStream() - testUtils.deleteDynamoDBTable(appName) - } - } - - before { - ssc = new StreamingContext(sc, batchDuration) - } - - after { - if (ssc != null) { - ssc.stop(stopSparkContext = false) - ssc = null - } - if (testUtils != null) { - testUtils.deleteDynamoDBTable(appName) - } - } - - test("KinesisUtils API") { - // Tests the API, does not actually test data receiving - val kinesisStream1 = KinesisUtils.createStream(ssc, "mySparkStream", - dummyEndpointUrl, Seconds(2), - InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2) - val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream", - dummyEndpointUrl, dummyRegionName, - InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2) - val kinesisStream3 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream", - dummyEndpointUrl, dummyRegionName, - InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2, - dummyAWSAccessKey, dummyAWSSecretKey) - } - - test("RDD generation") { - val inputStream = KinesisUtils.createStream(ssc, appName, "dummyStream", - dummyEndpointUrl, dummyRegionName, InitialPositionInStream.LATEST, Seconds(2), - StorageLevel.MEMORY_AND_DISK_2, dummyAWSAccessKey, dummyAWSSecretKey) - assert(inputStream.isInstanceOf[KinesisInputDStream[Array[Byte]]]) - - val kinesisStream = inputStream.asInstanceOf[KinesisInputDStream[Array[Byte]]] - val time = Time(1000) - - // Generate block info data for testing - val seqNumRanges1 = SequenceNumberRanges( - SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy")) - val blockId1 = StreamBlockId(kinesisStream.id, 123) - val blockInfo1 = ReceivedBlockInfo( - 0, None, Some(seqNumRanges1), new BlockManagerBasedStoreResult(blockId1, None)) - - val seqNumRanges2 = SequenceNumberRanges( - SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb")) - val blockId2 = StreamBlockId(kinesisStream.id, 345) - val blockInfo2 = ReceivedBlockInfo( - 0, None, Some(seqNumRanges2), new BlockManagerBasedStoreResult(blockId2, None)) - - // Verify that the generated KinesisBackedBlockRDD has the all the right information - val blockInfos = Seq(blockInfo1, blockInfo2) - val nonEmptyRDD = kinesisStream.createBlockRDD(time, blockInfos) - nonEmptyRDD shouldBe a [KinesisBackedBlockRDD[_]] - val kinesisRDD = nonEmptyRDD.asInstanceOf[KinesisBackedBlockRDD[_]] - assert(kinesisRDD.regionName === dummyRegionName) - assert(kinesisRDD.endpointUrl === dummyEndpointUrl) - assert(kinesisRDD.retryTimeoutMs === batchDuration.milliseconds) - assert(kinesisRDD.awsCredentialsOption === - Some(SerializableAWSCredentials(dummyAWSAccessKey, dummyAWSSecretKey))) - assert(nonEmptyRDD.partitions.size === blockInfos.size) - nonEmptyRDD.partitions.foreach { _ shouldBe a [KinesisBackedBlockRDDPartition] } - val partitions = nonEmptyRDD.partitions.map { - _.asInstanceOf[KinesisBackedBlockRDDPartition] }.toSeq - assert(partitions.map { _.seqNumberRanges } === Seq(seqNumRanges1, seqNumRanges2)) - assert(partitions.map { _.blockId } === Seq(blockId1, blockId2)) - assert(partitions.forall { _.isBlockIdValid === true }) - - // Verify that KinesisBackedBlockRDD is generated even when there are no blocks - val emptyRDD = kinesisStream.createBlockRDD(time, Seq.empty) - emptyRDD shouldBe a [KinesisBackedBlockRDD[Array[Byte]]] - emptyRDD.partitions shouldBe empty - - // Verify that the KinesisBackedBlockRDD has isBlockValid = false when blocks are invalid - blockInfos.foreach { _.setBlockIdInvalid() } - kinesisStream.createBlockRDD(time, blockInfos).partitions.foreach { partition => - assert(partition.asInstanceOf[KinesisBackedBlockRDDPartition].isBlockIdValid === false) - } - } - - - /** - * Test the stream by sending data to a Kinesis stream and receiving from it. - * This test is not run by default as it requires AWS credentials that the test - * environment may not have. Even if there is AWS credentials available, the user - * may not want to run these tests to avoid the Kinesis costs. To enable this test, - * you must have AWS credentials available through the default AWS provider chain, - * and you have to set the system environment variable RUN_KINESIS_TESTS=1 . - */ - testIfEnabled("basic operation") { - val awsCredentials = KinesisTestUtils.getAWSCredentials() - val stream = KinesisUtils.createStream(ssc, appName, testUtils.streamName, - testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST, - Seconds(10), StorageLevel.MEMORY_ONLY, - awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey) - - val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int] - stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd => - collected ++= rdd.collect() - logInfo("Collected = " + collected.mkString(", ")) - } - ssc.start() - - val testData = 1 to 10 - eventually(timeout(120 seconds), interval(10 second)) { - testUtils.pushData(testData, aggregateTestData) - assert(collected === testData.toSet, "\nData received does not match data sent") - } - ssc.stop(stopSparkContext = false) - } - - testIfEnabled("custom message handling") { - val awsCredentials = KinesisTestUtils.getAWSCredentials() - def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5 - val stream = KinesisUtils.createStream(ssc, appName, testUtils.streamName, - testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST, - Seconds(10), StorageLevel.MEMORY_ONLY, addFive, - awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey) - - stream shouldBe a [ReceiverInputDStream[_]] - - val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int] - stream.foreachRDD { rdd => - collected ++= rdd.collect() - logInfo("Collected = " + collected.mkString(", ")) - } - ssc.start() - - val testData = 1 to 10 - eventually(timeout(120 seconds), interval(10 second)) { - testUtils.pushData(testData, aggregateTestData) - val modData = testData.map(_ + 5) - assert(collected === modData.toSet, "\nData received does not match data sent") - } - ssc.stop(stopSparkContext = false) - } - - testIfEnabled("failure recovery") { - val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName) - val checkpointDir = Utils.createTempDir().getAbsolutePath - - ssc = new StreamingContext(sc, Milliseconds(1000)) - ssc.checkpoint(checkpointDir) - - val awsCredentials = KinesisTestUtils.getAWSCredentials() - val collectedData = new mutable.HashMap[Time, (Array[SequenceNumberRanges], Seq[Int])] - - val kinesisStream = KinesisUtils.createStream(ssc, appName, testUtils.streamName, - testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST, - Seconds(10), StorageLevel.MEMORY_ONLY, - awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey) - - // Verify that the generated RDDs are KinesisBackedBlockRDDs, and collect the data in each batch - kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => { - val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]] - val data = rdd.map { bytes => new String(bytes).toInt }.collect().toSeq - collectedData.synchronized { - collectedData(time) = (kRdd.arrayOfseqNumberRanges, data) - } - }) - - ssc.remember(Minutes(60)) // remember all the batches so that they are all saved in checkpoint - ssc.start() - - def numBatchesWithData: Int = - collectedData.synchronized { collectedData.count(_._2._2.nonEmpty) } - - def isCheckpointPresent: Boolean = Checkpoint.getCheckpointFiles(checkpointDir).nonEmpty - - // Run until there are at least 10 batches with some data in them - // If this times out because numBatchesWithData is empty, then its likely that foreachRDD - // function failed with exceptions, and nothing got added to `collectedData` - eventually(timeout(2 minutes), interval(1 seconds)) { - testUtils.pushData(1 to 5, aggregateTestData) - assert(isCheckpointPresent && numBatchesWithData > 10) - } - ssc.stop(stopSparkContext = true) // stop the SparkContext so that the blocks are not reused - - // Restart the context from checkpoint and verify whether the - logInfo("Restarting from checkpoint") - ssc = new StreamingContext(checkpointDir) - ssc.start() - val recoveredKinesisStream = ssc.graph.getInputStreams().head - - // Verify that the recomputed RDDs are KinesisBackedBlockRDDs with the same sequence ranges - // and return the same data - collectedData.synchronized { - val times = collectedData.keySet - times.foreach { time => - val (arrayOfSeqNumRanges, data) = collectedData(time) - val rdd = recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]] - rdd shouldBe a[KinesisBackedBlockRDD[_]] - - // Verify the recovered sequence ranges - val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]] - assert(kRdd.arrayOfseqNumberRanges.size === arrayOfSeqNumRanges.size) - arrayOfSeqNumRanges.zip(kRdd.arrayOfseqNumberRanges).foreach { case (expected, found) => - assert(expected.ranges.toSeq === found.ranges.toSeq) - } - - // Verify the recovered data - assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSeq === data) - } - } - ssc.stop() - } -} - -class WithAggregationKinesisStreamSuite extends KinesisStreamTests(aggregateTestData = true) - -class WithoutAggregationKinesisStreamSuite extends KinesisStreamTests(aggregateTestData = false) http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/spark-ganglia-lgpl/pom.xml ---------------------------------------------------------------------- diff --git a/extras/spark-ganglia-lgpl/pom.xml b/extras/spark-ganglia-lgpl/pom.xml deleted file mode 100644 index bfb9279..0000000 --- a/extras/spark-ganglia-lgpl/pom.xml +++ /dev/null @@ -1,49 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -~ Licensed to the Apache Software Foundation (ASF) under one or more -~ contributor license agreements. See the NOTICE file distributed with -~ this work for additional information regarding copyright ownership. -~ The ASF licenses this file to You under the Apache License, Version 2.0 -~ (the "License"); you may not use this file except in compliance with -~ the License. You may obtain a copy of the License at -~ -~ http://www.apache.org/licenses/LICENSE-2.0 -~ -~ Unless required by applicable law or agreed to in writing, software -~ distributed under the License is distributed on an "AS IS" BASIS, -~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -~ See the License for the specific language governing permissions and -~ limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.spark</groupId> - <artifactId>spark-parent_2.11</artifactId> - <version>2.0.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <!-- Ganglia integration is not included by default due to LGPL-licensed code --> - <groupId>org.apache.spark</groupId> - <artifactId>spark-ganglia-lgpl_2.11</artifactId> - <packaging>jar</packaging> - <name>Spark Ganglia Integration</name> - - <properties> - <sbt.project.name>ganglia-lgpl</sbt.project.name> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_${scala.binary.version}</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>io.dropwizard.metrics</groupId> - <artifactId>metrics-ganglia</artifactId> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala ---------------------------------------------------------------------- diff --git a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala deleted file mode 100644 index 3b1880e..0000000 --- a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.metrics.sink - -import java.util.Properties -import java.util.concurrent.TimeUnit - -import com.codahale.metrics.MetricRegistry -import com.codahale.metrics.ganglia.GangliaReporter -import info.ganglia.gmetric4j.gmetric.GMetric -import info.ganglia.gmetric4j.gmetric.GMetric.UDPAddressingMode - -import org.apache.spark.SecurityManager -import org.apache.spark.metrics.MetricsSystem - -class GangliaSink(val property: Properties, val registry: MetricRegistry, - securityMgr: SecurityManager) extends Sink { - val GANGLIA_KEY_PERIOD = "period" - val GANGLIA_DEFAULT_PERIOD = 10 - - val GANGLIA_KEY_UNIT = "unit" - val GANGLIA_DEFAULT_UNIT: TimeUnit = TimeUnit.SECONDS - - val GANGLIA_KEY_MODE = "mode" - val GANGLIA_DEFAULT_MODE: UDPAddressingMode = GMetric.UDPAddressingMode.MULTICAST - - // TTL for multicast messages. If listeners are X hops away in network, must be at least X. - val GANGLIA_KEY_TTL = "ttl" - val GANGLIA_DEFAULT_TTL = 1 - - val GANGLIA_KEY_HOST = "host" - val GANGLIA_KEY_PORT = "port" - - def propertyToOption(prop: String): Option[String] = Option(property.getProperty(prop)) - - if (!propertyToOption(GANGLIA_KEY_HOST).isDefined) { - throw new Exception("Ganglia sink requires 'host' property.") - } - - if (!propertyToOption(GANGLIA_KEY_PORT).isDefined) { - throw new Exception("Ganglia sink requires 'port' property.") - } - - val host = propertyToOption(GANGLIA_KEY_HOST).get - val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt - val ttl = propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL) - val mode: UDPAddressingMode = propertyToOption(GANGLIA_KEY_MODE) - .map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(GANGLIA_DEFAULT_MODE) - val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt) - .getOrElse(GANGLIA_DEFAULT_PERIOD) - val pollUnit: TimeUnit = propertyToOption(GANGLIA_KEY_UNIT) - .map(u => TimeUnit.valueOf(u.toUpperCase)) - .getOrElse(GANGLIA_DEFAULT_UNIT) - - MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) - - val ganglia = new GMetric(host, port, mode, ttl) - val reporter: GangliaReporter = GangliaReporter.forRegistry(registry) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .convertRatesTo(TimeUnit.SECONDS) - .build(ganglia) - - override def start() { - reporter.start(pollPeriod, pollUnit) - } - - override def stop() { - reporter.stop() - } - - override def report() { - reporter.report() - } -} - --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org