Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5882#discussion_r29813184
  
    --- Diff: 
extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
 ---
    @@ -90,30 +95,37 @@ private object KinesisWordCountASL extends Logging {
         StreamingExamples.setStreamingLogLevels()
     
         /* Populate the appropriate variables from the given args */
    -    val Array(streamName, endpointUrl) = args
    +    val Array(appName, streamName, endpointUrl, regionName, 
awsAccessKeyId, awsSecretKey) = args
    +
    +    /* Spark Streaming batch interval */
    +    val batchInterval = Milliseconds(2000)
     
    -    /* Determine the number of shards from the stream */
    -    val kinesisClient = new AmazonKinesisClient(new 
DefaultAWSCredentialsProviderChain())
    +    /* Create the low-level Kinesis Client from the AWS Java SDK. */
    +    /* Determine the number of shards from the stream. */
    +    val kinesisClient = new AmazonKinesisClient(
    +        new BasicAWSCredentials(awsAccessKeyId, awsSecretKey))
         kinesisClient.setEndpoint(endpointUrl)
    +
         val numShards = 
kinesisClient.describeStream(streamName).getStreamDescription().getShards()
           .size()
     
         /* In this example, we're going to create 1 Kinesis 
Worker/Receiver/DStream for each shard. */
         val numStreams = numShards
     
    -    /* Setup the and SparkConfig and StreamingContext */
    -    /* Spark Streaming batch interval */
    -    val batchInterval = Milliseconds(2000)
    -    val sparkConfig = new SparkConf().setAppName("KinesisWordCount")
    -    val ssc = new StreamingContext(sparkConfig, batchInterval)
    -
         /* Kinesis checkpoint interval.  Same as batchInterval for this 
example. */
    -    val kinesisCheckpointInterval = batchInterval
    +    val checkpointInterval = batchInterval
    +
    +    /* Setup the and SparkConfig */
    +    val sparkConfig = new SparkConf().setAppName(appName)
    +
    +    /* Setup the StreamingContext */
    +    val ssc = new StreamingContext(sparkConfig, batchInterval)
     
         /* Create the same number of Kinesis DStreams/Receivers as Kinesis 
stream's shards */
         val kinesisStreams = (0 until numStreams).map { i =>
    -      KinesisUtils.createStream(ssc, streamName, endpointUrl, 
kinesisCheckpointInterval,
    -          InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
    +      KinesisUtils.createStream(appName, ssc, streamName, endpointUrl, 
regionName, awsAccessKeyId,
    --- End diff --
    
    Are you sure this is in the 100 character limit?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to