[GitHub] spark issue #18029: [SPARK-20168] [DStream] Add changes to use kinesis fetch...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 Thanks for being so helpful and patient on this one @brkyvz . I will leave this with you now for your final âï¸ if you're happy with it :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168] [DStream] Add changes to use kinesis fetch...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 @brkyvz could you please have a look if it looks good. Would be great if you're happy with the changes and we could merge it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168] [DStream] Add changes to use kinesis fetch...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 @brkyvz Squashed multiple commits into one for better readability. Please have a look when you get time. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168] [DStream] Add changes to use kinesis fetch...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 @brkyvz Please have a look once you have time. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168] [DStream] Add changes to use kinesis fetch...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 Thanks for the review @brkyvz . Updated the pull request with version 2.3.0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19427: [SparkStreaming] Reset spark.driver.bindAddress when sta...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/19427 I don't think there is an automated way. You could create a JIRA ticket and rename this title with the ticket id and component name. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19427: Reset spark.driver.bindAddress when starting a Checkpoin...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/19427 @ssaavedra Could you also update the Title as [SPARK-X][component] Title... please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168] [DStream] Add changes to use kinesis fetch...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 Thanks for the review @brkyvz . Please have a look at the new PR. I have implemented the review comments. For the API hygiene, I have added both the new and old API, and have marked the old API as deprecated. The rational behind keeping the new API is to support any future changes to the Kinesis consumer lib, and we could just add new case classes in one place to handle new params. Otherwise we would have to keep adding new arguments to the kinesis receiver function. The New api also makes it clear to understand what exactly is the purpose of the arguments. I have also added new test cases to test the old api with both +ve and -ve scenarios. Please have a look and share your thoughts. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168] [DStream] Add changes to use kinesis fetch...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 Thanks @brkyvz . Appreciate the review comments. I am facing some issues with the scala packaging. I will keep working on it and update the patch soon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19416: [SPARK-22187][SS] Update unsaferow format for sav...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/19416#discussion_r142368186 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/FlatMapGroupsWithState_StateManager.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BoundReference, CaseWhen, CreateNamedStruct, GetStructField, IsNull, Literal, UnsafeRow} +import org.apache.spark.sql.execution.ObjectOperator +import org.apache.spark.sql.execution.streaming.GroupStateImpl +import org.apache.spark.sql.execution.streaming.GroupStateImpl.NO_TIMESTAMP +import org.apache.spark.sql.types.{IntegerType, LongType, StructType} + + +class FlatMapGroupsWithState_StateManager( --- End diff -- Is the underscore in the file name for clarity ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168] [DStream] Add changes to use kinesis fetch...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 Hi @brkyvz can I get some love here please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168] [DStream] Add changes to use kinesis fetch...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 @brkyvz Could you please check this for the last suggestions ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168] [DStream] Add changes to use kinesis fetch...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 @budde could you please do one last review of this one. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r137926490 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + val initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.LATEST +} + +/** + * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON. + */ +case object TrimHorizon extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.TRIM_HORIZON +} + +/** + * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP. + */ +case class AtTimestamp(timestamp: Date) extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.AT_TIMESTAMP +} + +/** + * Companion object for InitialPosition that returns + * appropriate version of InitialPositionInStream. + */ +object InitialPosition { + + /** + * An instance of Latest with InitialPositionInStream.LATEST. + * @return [[Latest]] + */ + val latest: InitialPosition = Latest --- End diff -- Implemented new java wrapper for the Api ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r137684968 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + val initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.LATEST +} + +/** + * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON. + */ +case object TrimHorizon extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.TRIM_HORIZON +} + +/** + * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP. + */ +case class AtTimestamp(timestamp: Date) extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.AT_TIMESTAMP +} + +/** + * Companion object for InitialPosition that returns + * appropriate version of InitialPositionInStream. + */ +object InitialPosition { --- End diff -- I've implemented the functions with this Capital naming, but still feel a bit salty about this :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168] [DStream] Add changes to use kinesis fetch...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 Thanks for the comments @brkyvz . I will be working on the changes and update the PR very soon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r137245127 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + val initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.LATEST +} + +/** + * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON. + */ +case object TrimHorizon extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.TRIM_HORIZON +} + +/** + * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP. + */ +case class AtTimestamp(timestamp: Date) extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.AT_TIMESTAMP +} + +/** + * Companion object for InitialPosition that returns + * appropriate version of InitialPositionInStream. + */ +object InitialPosition { --- End diff -- Looks good. How do you compare the syntax: `InitialPosition initialPosition = TrimHorizon.instance();` or, introducing a new java class KinesisInitialPosition.java for: `InitialPosition initialPosition = KinesisInitialPosition.TrimHorizon();` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r137237166 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + val initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + val instance: InitialPosition = this --- End diff -- It was required for the Java Api for using `TrimHorizon.instance()`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r137235013 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + val initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.LATEST +} + +/** + * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON. + */ +case object TrimHorizon extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.TRIM_HORIZON +} + +/** + * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP. + */ +case class AtTimestamp(timestamp: Date) extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.AT_TIMESTAMP +} + +/** + * Companion object for InitialPosition that returns + * appropriate version of InitialPositionInStream. + */ +object InitialPosition { + + /** + * An instance of Latest with InitialPositionInStream.LATEST. + * @return [[Latest]] + */ + val latest: InitialPosition = Latest --- End diff -- Interesting. Could you please explain why have we done this capitalization. Once() and ProcessingTime() are methods and shouldn't they be camel cased ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168] [DStream] Add changes to use kinesis fetch...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 Resolved conflict introduced by other code commits. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168] [DStream] Add changes to use kinesis fetch...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 Could I get some love here from the committers please @brkyvz @HyukjinKwon @srowen . Would love to work on any changes if required. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168] [DStream] Add changes to use kinesis fetch...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 Hi @srowen we've iterated this patch to bring it in a good state. Need a committer âï¸ before we can go ahead merging it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168] [DStream] Add changes to use kinesis fetch...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 Will wait for @brkyvz , @HyukjinKwon for final âï¸ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r134138994 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + val initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.LATEST +} + +/** + * Case object for Kinesis's InitialPositionInStream.TRIM_HORIZON. + */ +case object TrimHorizon extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.TRIM_HORIZON +} + +/** + * Case object for Kinesis's InitialPositionInStream.AT_TIMESTAMP. + */ +case class AtTimestamp(timestamp: Date) extends InitialPosition { + val instance: InitialPosition = this + override val initialPositionInStream: InitialPositionInStream += InitialPositionInStream.AT_TIMESTAMP +} + +/** + * Companion object for InitialPosition that returns + * appropriate version of InitialPositionInStream. + */ +object InitialPosition { + + /** + * An instance of Latest with InitialPositionInStream.LATEST. + * @return [[Latest]] + */ + val latest : InitialPosition = Latest + + /** + * An instance of Latest with InitialPositionInStream.TRIM_HORIZON. + * @return [[TrimHorizon]] + */ + val trimHorizon : InitialPosition = TrimHorizon + + /** + * Returns instance of AtTimestamp with InitialPositionInStream.AT_TIMESTAMP. + * @return [[AtTimestamp]] + */ + def atTimestamp(timestamp: Date) : InitialPosition = AtTimestamp(timestamp) + + /** + * Returns instance of [[InitialPosition]] based on the passed [[InitialPositionInStream]]. + * This method is used in KinesisUtils for translating the InitialPositionInStream + * to InitialPosition. This function would be removed when we deprecate the KinesisUtils. + * + * @return [[InitialPosition]] + */ + def kinesisInitialPositionInStream( +initialPositionInStream: InitialPositionInStream) : InitialPosition = { --- End diff -- Added all other review comments. The indentation was making it look weird, so skipped the indentation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168] [DStream] Add changes to use kinesis fetch...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 Added review suggestions @budde ! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168] [DStream] Add changes to use kinesis fetch...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 Will update and post another request seen. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168] [DStream] Add changes to use kinesis fetch...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 @brkyvz I have made the suggested modifications to the code. Please have a look when you get time. Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168] [DStream] Add changes to use kinesis fetch...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 Thanks @budde for the review . Love the suggestions. ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168] [DStream] Add changes to use kinesis fetch...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 @brkyvz , @budde, @HyukjinKwon could you please review this sometime. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168][DStream] Add changes to use kinesis fetche...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 Would love to work on the review feedback here @budde @brkyvz . Please have a look when you have time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18071: [SPARK-20855][Docs][DStream] Update the Spark kinesis do...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18071 Thanks @budde @srowen --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18071: [SPARK-20855][Docs][DStream] Update the Spark kinesis do...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18071 Updated the patch @HyukjinKwon . Thanks for the note. ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18071: [SPARK-20855][Docs][DStream] Update the Spark kinesis do...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18071 I noticed that now. Yes I will post an updated patch today. Thanks @HyukjinKwon --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18071: [SPARK-20855][WIP][DStream] Update the Spark kinesis doc...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18071 @HyukjinKwon The PR is ready. Just waiting for some ð from the reviewers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168][DStream] Add changes to use kinesis fetche...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 @budde @brkyvz - could I get some love here please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168][WIP][DStream] Add changes to use kinesis f...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 Hi, @budde , @brkyvz would love to hear your thoughts on the new patch --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168][WIP][DStream] Add changes to use kinesis f...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 I really appreciate you taking time to review the changes. I know balancing work and foss contributions is difficult :) I have given a clean new stab at the issue with all the feedbacks. Let me know if it looks good. GodSpeed @budde. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168][WIP][DStream] Add changes to use kinesis f...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 @budde @brkyvz could you suggest if the current patch seems ok, or I should make something similar to the case class/ trait ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168][WIP][DStream] Add changes to use kinesis f...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 @budde What would that mean for existing applications that are sending an `InitialPositionInStream.SOMETHING` to the the builder. We would need to provide multiple overloaded versions of the `builder.initialPositionInStream()` so that we don't mess them up, and the appropriate method populates the new `InitialPosition ` argument and pass it down to `KinesisReceiver`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168][WIP][DStream] Add changes to use kinesis f...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 Thoughts - @budde @brkyvz ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168][WIP][DStream] Add changes to use kinesis f...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 For review @budde @brkyvz ^ this looks in a good state. @brkyvz - I still didn't quite get the code bit you shared, would love to make changes if you could explain that piece of your mind please. Where exactly would this part fit- ``` trait InitialPosition { def setInitialPosition(clientConf: KinesisClientLibConfiguration): KinesisClientLibConfiguration } case object Latest { override def setInitialPosition(clientConf: KinesisClientLibConfiguration): KinesisClientLibConfiguration = { clientLibConf.withInitialPositionInStream(InitialPositionInStream.LATEST) } } case object TrimHorizon { override def setInitialPosition(clientConf: KinesisClientLibConfiguration): KinesisClientLibConfiguration = { clientLibConf.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) } } case class AtTimestamp(timestamp: Date) { override def setInitialPosition(clientConf: KinesisClientLibConfiguration): KinesisClientLibConfiguration = { clientLibConf. withTimestampAtInitialPositionInStream(timestamp) } } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168][WIP][DStream] Add changes to use ki...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r120236128 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala --- @@ -193,6 +197,21 @@ object KinesisInputDStream { } /** + * Sets the Kinesis initial position data to the provided timestamp. + * Sets InitialPositionInStream to [[InitialPositionInStream.AT_TIMESTAMP]] + * and the timestamp to the provided value. + * + * @param timestamp Timestamp to resume the Kinesis stream from a provided + * timestamp. + * @return Reference to this [[KinesisInputDStream.Builder]] + */ +def withTimestampAtInitialPositionInStream(timestamp: Date) : Builder = { --- End diff -- Got it now. Read your new comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168][WIP][DStream] Add changes to use ki...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r120236059 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala --- @@ -100,6 +103,7 @@ object KinesisInputDStream { private var endpointUrl: Option[String] = None private var regionName: Option[String] = None private var initialPositionInStream: Option[InitialPositionInStream] = None +private var initialPositionInStreamTimestamp: Option[Date] = None --- End diff -- Ah alright, so you're asking to get another `initialPositionInStreamTimestamp`. Thats similar to the `withInitialPositionAtTimestamp`. Can rename that to suit this purpose. Another question, The InitialPosition gets passed to the KinesisReceiver. I was passing a timestamp along with the Initial position at the moment. Are we planning to pass the `KinesisClientLibConfiguration` to the `KinesisReceiver` now ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168][WIP][DStream] Add changes to use ki...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r120235619 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala --- @@ -193,6 +197,21 @@ object KinesisInputDStream { } /** + * Sets the Kinesis initial position data to the provided timestamp. + * Sets InitialPositionInStream to [[InitialPositionInStream.AT_TIMESTAMP]] + * and the timestamp to the provided value. + * + * @param timestamp Timestamp to resume the Kinesis stream from a provided + * timestamp. + * @return Reference to this [[KinesisInputDStream.Builder]] + */ +def withTimestampAtInitialPositionInStream(timestamp: Date) : Builder = { --- End diff -- @brkyvz `withInitialPositionAtTimestamp` is an enhancer method for the InitialPositionAtTimestamp. If provided It will set the timestamp value along with the InitialPosition.AT_TIMESTAMP. Its optional, hence the `initialPositionInStream` can still be used. This will not introduce and incompatibilities in usage. Thoughts ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168][WIP][DStream] Add changes to use ki...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r120234200 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala --- @@ -100,6 +103,7 @@ object KinesisInputDStream { private var endpointUrl: Option[String] = None private var regionName: Option[String] = None private var initialPositionInStream: Option[InitialPositionInStream] = None +private var initialPositionInStreamTimestamp: Option[Date] = None --- End diff -- @brkyvz Where exactly are we planning to add these changes. Are you proposing to change the type of `private var initialPositionInStreamTimestamp: Option[Date] = None` That would introduce a backward incompatibility on the current builder ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168][WIP][DStream] Add changes to use kinesis f...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 ``` ./bin/spark-shell --jars /Users/ysharma/.m2/repository/org/apache/spark/spark-streaming-kinesis-asl_2.11/2.3.0-SNAPSHOT/spark-streaming-kinesis-asl_2.11-2.3.0-SNAPSHOT.jar --packages com.amazonaws:amazon-kinesis-client:1.0.0 -Ylog-classpath import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kinesis.KinesisInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream val streamingContext = new StreamingContext(sc, Seconds(30)) val kinesisStream = KinesisInputDStream.builder .streamingContext(streamingContext) .endpointUrl("https://kinesis.us-east-1.amazonaws.com;) .regionName("us-east-1") .streamName("a-very-nice-kinesis-app") .initialPositionInStream(InitialPositionInStream.TRIM_HORIZON) .checkpointAppName("a-very-nice-kinesis-app") .checkpointInterval(Seconds(30)) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .build() Error: error: bad symbolic reference. A signature in KinesisInputDStream.class refers to term InterfaceStability in package org.apache.spark.annotation which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling KinesisInputDStream.class. ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168][WIP][DStream] Add changes to use kinesis f...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 @budde @brkyvz - Would appreciate if you could share some tips by which I could include these external kinesis modules on spark-shell to test this directly from shell (both for scala and python) ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168][WIP][DStream] Add changes to use kinesis f...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 @budde Added a new method to pass the timestamp more elegantly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168][WIP][DStream] Add changes to use ki...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r119986035 --- Diff: external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java --- @@ -45,7 +46,7 @@ public void testJavaKinesisDStreamBuilder() { .streamName(streamName) .endpointUrl(endpointUrl) .regionName(region) - .initialPositionInStream(initialPosition) + .initialPositionInStream(initialPosition, scala.Option.apply(null)) --- End diff -- @budde not having the overloaded methods introduces this backward compatibility issue which I didn't like much. What are your thoughts on this ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168][WIP][DStream] Add changes to use ki...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r119986280 --- Diff: external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala --- @@ -111,5 +110,28 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE assert(dstream.kinesisCreds == customKinesisCreds) assert(dstream.dynamoDBCreds == Option(customDynamoDBCreds)) assert(dstream.cloudWatchCreds == Option(customCloudWatchCreds)) + +val yesterday = DateUtils.addDays(new Date, -1) +val dStreamFromTimestamp = builder +.endpointUrl(customEndpointUrl) +.regionName(customRegion) +.initialPositionInStream(InitialPositionInStream.AT_TIMESTAMP, Some(yesterday)) --- End diff -- @budde Added optional timestamp for resume, but passing as Some() doesn't seem very interesting. Passing a date directly seems more intuitive. Thoughts ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168][WIP][DStream] Add changes to use ki...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r119984045 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala --- @@ -38,6 +40,7 @@ private[kinesis] class KinesisInputDStream[T: ClassTag]( val endpointUrl: String, val regionName: String, val initialPositionInStream: InitialPositionInStream, +val initialPositionInStreamTimestamp: Date, --- End diff -- @budde - I had two approaches in mind while adding this functionality- 1. Additional parameter which can be set by an overloaded method in Builder. 2. Creating a new case class for wrapping initial position with an optional timestamp. I went ahead with implementing the first one for backward compatibility such that users can use their same builders. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168][WIP][DStream] Add changes to use kinesis f...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 Could I get some love here please @budde @brkyvz This change is pretty useful for us and I would like to bake this patch well so others can take advantage of this feature. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18071: [SPARK-20855][WIP][DStream] Update the Spark kinesis doc...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18071 Hey @budde , drawing your attention here please. Just wanted to update the docs with the new Builder interface by you. Few questions on top of my mind - - Is it compatible with the python API as well like KinesisUtils used to be ? - How can I test that on the py spark-shell. Trying to use KinesisInputDStream gives me Builder not found execptions. I might be missing something obvious here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18071: [SPARK-20855][WIP][DStream] Update the Spark kine...
GitHub user yssharma opened a pull request: https://github.com/apache/spark/pull/18071 [SPARK-20855][WIP][DStream] Update the Spark kinesis docs to use the KinesisInputDStream builder instead of deprecated KinesisUtils ## What changes were proposed in this pull request? The examples and docs for Spark-Kinesis integrations use the deprecated KinesisUtils. We should update the docs to use the KinesisInputDStream builder to create DStreams. ## How was this patch tested? The patch primarily updates the documents. The patch will also need to make changes to the Spark-Kinesis examples. The examples need to be tested. You can merge this pull request into a Git repository by running: $ git pull https://github.com/yssharma/spark ysharma/kinesis_docs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18071.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18071 commit 9c1800435ce718c8d32829417dfe8ea462ad9fca Author: Yash Sharma <ysha...@atlassian.com> Date: 2017-05-22T12:42:40Z initial changes to update kinesis docs to newer version of API commit 1e7e8790bdba4373e9bc79b0d12773505367d4ca Author: Yash Sharma <ysha...@atlassian.com> Date: 2017-05-23T11:56:55Z fixing indentations --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168][WIP][DStream] Add changes to use kinesis f...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 Simplified how timestamp is passed to Kinesis for InitialPositionInStream.AT_TIMESTAMP. cc: @budde , @brkyvz --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168][WIP][DStream] Add changes to use kinesis f...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 Commit https://github.com/apache/spark/commit/424550c8450937f78ce608ff7b18e46f41478a8a should fix the timeouts mentioned in the https://github.com/apache/spark/commit/b71a8d621ff048958dd5f10ef16cf5989026ed5f commit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168][WIP][DStream] Add changes to use kinesis f...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 @budde @brkyvz would love to hear your thoughts if this is the best way to add this functionality --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18028: [DStream][DOC]Add documentation for kinesis retry config...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18028 Thanks @brkyvz --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168][WIP][DStream] Add changes to use ki...
GitHub user yssharma opened a pull request: https://github.com/apache/spark/pull/18029 [SPARK-20168][WIP][DStream] Add changes to use kinesis fetches from specified timestamp ## What changes were proposed in this pull request? Kinesis client can resume from a specified timestamp while creating a stream. We should have option to pass a timestamp in config to allow kinesis to resume from the given timestamp. The patch introduces a new `KinesisInitialPositionInStream` that takes the `InitialPositionInStream` with the `timestamp` information that can be used to resume kinesis fetches from the provided timestamp. ## How was this patch tested? todo cc : @budde @brkyvz You can merge this pull request into a Git repository by running: $ git pull https://github.com/yssharma/spark ysharma/kcl_resume Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18029.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18029 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18028: [DStream][DOC]Add documentation for kinesis retry...
GitHub user yssharma opened a pull request: https://github.com/apache/spark/pull/18028 [DStream][DOC]Add documentation for kinesis retry configurations ## What changes were proposed in this pull request? The changes were merged as part of - https://github.com/apache/spark/pull/17467. The documentation was missed somewhere in the review iterations. Adding the documentation where it belongs. ## How was this patch tested? Docs. Not tested. cc @budde , @brkyvz You can merge this pull request into a Git repository by running: $ git pull https://github.com/yssharma/spark ysharma/kinesis_retry_docs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18028.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18028 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/17467 Awesome. Thanks @budde @brkyvz for reviews and patch improvements. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/17467 @budde @brkyvz - Any feed back on this one please ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/17467 Thanks for all the review comments @budde @brkyvz . Added new review changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/17467#discussion_r115110773 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kinesis + +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.streaming.StreamingContext + +/** + * Configurations to pass to the [KinesisBackedBlockRDD]. + * + * @param maxRetries: The maximum number of attempts to be made to Kinesis. Defaults to 3. + * @param retryWaitTimeMs: The interval between consequent Kinesis retries. + * Defaults to 100ms. + * @param retryTimeoutMs: The timeout in milliseconds for a Kinesis request. +* Defaults to batch duration provided for streaming, +* else uses 1 if invoked directly. + */ +private[kinesis] case class KinesisReadConfigurations( + maxRetries: Int, + retryWaitTimeMs: Long, + retryTimeoutMs: Long) + +object KinesisReadConfigurations { + def apply(): KinesisReadConfigurations = { --- End diff -- It can be used in places where we don't have the spark conf. I am using this in `KinesisBackedBlockRDD`'s constructor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/17467 Thanks for the comments @budde @brkyvz . Would be adding the changes soon. I too liked pulling the values of spark conf directly and got it working with the private val in `KinesisBackedBlockRDD` [1]. I don't mind getting the conf from the case class either since it keeps all the configs in a place and the class acts as a self documented code. Open to thoughts from you both. 1. https://github.com/yssharma/spark/commit/f5026b4fb1bb98a0d31cfaa6571eee896051aa2b --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/17467 @brkyvz - Thanks for the review comments. Updated the patch, please review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/17467 @budde would you like to share your thoughts on the new changes when you have time ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/17467 @brkyvz - Added new changes that adds - - A case class `KinesisReadConfigurations` that adds all the kinesis read configs in a single place - A test class that passes the kinesis configs in `SparkConf` which are then used to create the kinesis configs object in `KinesisInputDStream` and passed down to `KinesisBackedBlockRDD` - Docs improvement I also played with the `PrivateMethodTester ` but wasn't able to access the private function `KinesisSequenceRangeIterator#retryOrTimeout` . Probably because of the generics used in the function. I used an alternative to fetch the RDD's directly and check the configs passed in there. I would still like to learn how to get the `retryOrTimeout` working just out of interest. Adding the error below: ``` // KinesisSequenceRangeIterator # retryOrTimeout val retryOrTimeoutMethod = PrivateMethod[Object]('retryOrTimeout) // <<<- Issue val partitions = kinesisRDD.partitions.map { _.asInstanceOf[KinesisBackedBlockRDDPartition] }.toSeq seqNumRanges1.ranges.map{ range => val seqRangeIter = new KinesisSequenceRangeIterator(DefaultCredentials.provider.getCredentials, dummyEndpointUrl, dummyRegionName, range, kinesisRDD.kinesisReadConfigs) seqRangeIter.invokePrivate(retryOrTimeoutMethod("Passing custom message")) } - Kinesis read with custom configurations *** FAILED *** java.lang.IllegalArgumentException: Can't find a private method named: retryOrTimeout at org.scalatest.PrivateMethodTester$Invoker.invokePrivate(PrivateMethodTester.scala:247) at org.apache.spark.streaming.kinesis.KinesisStreamTests$$anonfun$7$$anonfun$apply$mcV$sp$13.apply(KinesisStreamSuite.scala:286) at org.apache.spark.streaming.kinesis.KinesisStreamTests$$anonfun$7$$anonfun$apply$mcV$sp$13.apply(KinesisStreamSuite.scala:281) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.apache.spark.streaming.kinesis.KinesisStreamTests$$anonfun$7.apply$mcV$sp(KinesisStreamSuite.scala:281) at org.apache.spark.streaming.kinesis.KinesisStreamTests$$anonfun$7.apply(KinesisStreamSuite.scala:237) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/17467#discussion_r112848855 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala --- @@ -135,7 +139,8 @@ class KinesisSequenceRangeIterator( endpointUrl: String, regionId: String, range: SequenceNumberRange, -retryTimeoutMs: Int) extends NextIterator[Record] with Logging { +retryTimeoutMs: Int, +sparkConf: SparkConf) extends NextIterator[Record] with Logging { --- End diff -- Or we can pass then via spark conf and construct the KinesisReadConfigurations object in `KinesisInputDStream` and pass it down to `KinesisBackedBlockRDD `. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/17467#discussion_r112848762 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala --- @@ -135,7 +139,8 @@ class KinesisSequenceRangeIterator( endpointUrl: String, regionId: String, range: SequenceNumberRange, -retryTimeoutMs: Int) extends NextIterator[Record] with Logging { +retryTimeoutMs: Int, +sparkConf: SparkConf) extends NextIterator[Record] with Logging { --- End diff -- And would you expect it to be passed directly to the`KinesisInputDStream` ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/17467#discussion_r112848401 --- Diff: external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala --- @@ -101,6 +103,36 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean) } } + testIfEnabled("Basic reading from Kinesis with modified configurations") { --- End diff -- I wasn't able to test the actual waiting of Kinesis. I haven't looked at the `PrivateMethodTester ` yet and check how that can help us to test how the vars are picked. I used this testcase to debug and verify that all the values are passed correctly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/17467#discussion_r112848373 --- Diff: external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala --- @@ -101,6 +103,36 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean) } } + testIfEnabled("Basic reading from Kinesis with modified configurations") { +// Add Kinesis retry configurations +sc.conf.set(RETRY_WAIT_TIME_KEY, "1000ms") --- End diff -- +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/17467#discussion_r112848363 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala --- @@ -135,7 +139,8 @@ class KinesisSequenceRangeIterator( endpointUrl: String, regionId: String, range: SequenceNumberRange, -retryTimeoutMs: Int) extends NextIterator[Record] with Logging { +retryTimeoutMs: Int, +sparkConf: SparkConf) extends NextIterator[Record] with Logging { --- End diff -- @brkyvz - I was thinking not to pass individual configs to the constructor because that would just cause the list to grow. Using SparkConf or a Map would enable us to add new configs without any code changes. I was using a Map earlier for this so that its easy to pass more configs. What are your thoughts on Map vs Case class ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/17467 @felixcheung : would love to get more review comments to improve the patch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/17467 @budde - Added changes for the minor review comments and docs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/17467 @budde - Not sure if we can exactly test the configured timeouts. I have debugged the flow in my testcase to verify that the custom values are passed down to the Kinesis backed block rdd. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/17467 @budde : Implemented review changes and checked scala code style checks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/17467#discussion_r112614624 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala --- @@ -83,7 +83,8 @@ class KinesisBackedBlockRDD[T: ClassTag]( @transient private val isBlockIdValid: Array[Boolean] = Array.empty, val retryTimeoutMs: Int = 1, val messageHandler: Record => T = KinesisInputDStream.defaultMessageHandler _, -val kinesisCreds: SparkAWSCredentials = DefaultCredentials +val kinesisCreds: SparkAWSCredentials = DefaultCredentials, +val kinesisConf: Map[String, String] = Map.empty --- End diff -- +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/17467 @budde @brkyvz - Implemented the review changes. Please review. - Using SparkConf for all the user parameters - removed kinesisWait to be val instead of var - fixed documentation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/17467 @budde - thanks for taking time to review it. Appreciate it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/17467#discussion_r112347174 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala --- @@ -249,6 +252,17 @@ object KinesisInputDStream { } /** + * Sets the [[SparkAWSCredentials]] to use for authenticating to the AWS CloudWatch + * endpoint. Will use the same credentials used for AWS Kinesis if no custom value is set. + * + * @param conf: Map[String, String] to use for CloudWatch authentication + */ +def kinesisConf(conf: Map[String, String]): Builder = { --- End diff -- Do you think it would be better to pass values to builder rather than a map of configs. I thought map of configs can be easily extended when we need to support new configurations without code changes? What is your thought on values+builder per config vs one map for all configs ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/17467#discussion_r112346805 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala --- @@ -147,6 +152,14 @@ class KinesisSequenceRangeIterator( private var lastSeqNumber: String = null private var internalIterator: Iterator[Record] = null + // variable for kinesis wait time interval between next retry + private var kinesisWaitTimeMs = JavaUtils.timeStringAsMs( --- End diff -- No, this value is modified after waits - `kinesisWaitTimeMs *= 2 // if you have waited, then double wait time for next round` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17506: [SPARK-20189][DStream] Fix spark kinesis testcases to re...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/17506 Thanks @srowen --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17506: [SPARK-20189][DStream] Fix spark kinesis testcase...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/17506#discussion_r111292109 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala --- @@ -267,7 +267,7 @@ object KinesisInputDStream { getRequiredParam(checkpointAppName, "checkpointAppName"), checkpointInterval.getOrElse(ssc.graph.batchDuration), storageLevel.getOrElse(DEFAULT_STORAGE_LEVEL), -handler, +ssc.sc.clean(handler), --- End diff -- Yes this is required. KinesisUtils used to send a cleaned Handler while creating stream, so we do this in the KinesisInputDstream now. `val cleanedHandler = ssc.sc.clean(messageHandler)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17506: [SPARK-20189][DStream] Fix spark kinesis testcase...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/17506#discussion_r111291842 --- Diff: external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala --- @@ -233,11 +241,15 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun val localTestUtils = new KPLBasedKinesisTestUtils(1) localTestUtils.createStream() try { - val awsCredentials = KinesisTestUtils.getAWSCredentials() - val stream = KinesisUtils.createStream(ssc, localAppName, localTestUtils.streamName, -localTestUtils.endpointUrl, localTestUtils.regionName, InitialPositionInStream.LATEST, -Seconds(10), StorageLevel.MEMORY_ONLY, -awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey) + val stream = KinesisInputDStream.builder.streamingContext(ssc) --- End diff -- Yes thats true. The KinesisInputDStream.builder uses the Dafault credentials if no creds are passed, and the default creds work with both AWS Key and Session tokens. So now "mvn test" works on both permanent aws keys & session based token environments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/17467 Waiting for review @brkyvz . Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17506: [SPARK-20189][DStream] Fix spark kinesis testcases to re...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/17506 @srowen do you feel this patch could be merged now ? Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/17467 Just for info, while trying to use the `sc` in the `KinesisBackedBlockRDD ` : `- Basic reading from Kinesis *** FAILED *** org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2284) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2058) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2084) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) ... Cause: java.io.NotSerializableException: org.apache.spark.SparkContext Serialization stack: - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@60c1663c) - field (class: org.apache.spark.streaming.kinesis.KinesisBackedBlockRDD, name: org$apache$spark$streaming$kinesis$KinesisBackedBlockRDD$$sc, type: class org.apache.spark.SparkContext) - object (class org.apache.spark.streaming.kinesis.KinesisBackedBlockRDD, KinesisBackedBlockRDD[0] at BlockRDD at KinesisBackedBlockRDD.scala:90) - field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD) - object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@52a33c3f) - writeObject data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@71ed560f) - writeReplace data (class: scala.collection.immutable.List$SerializationProxy) - object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@52a33c3f)) - field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq) - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[1] at map at KinesisBackedBlockRDDSuite.scala:83) - field (class: org.apache.spark.rdd.RDD$$anonfun$collect$1, name: $outer, type: class org.apache.spark.rdd.RDD) - object (class org.apache.spark.rdd.RDD$$anonfun$collect$1, ) - field (class: org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13, name: $outer, type: class org.apache.spark.rdd.RDD$$anonfun$collect$1) - object (class org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2284) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2058) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2084) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17506: [SPARK-20189][DStream] Fix spark kinesis testcases to re...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/17506 Is there anything else that can be done on this patch. The patch fixes all the deprecated api testcases that try to use the aws secret/id credentials instead of the builder. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/17467 @brkyvz - thanks for taking time to review the patch. appreciate it. Implemented all your suggestions. Now passing a new map for the kinesis configs and added mechanism to use the builder for the configs. As for the spark context, I wanted to use the sparkcontext available in `KinesisBackedBlockRDD` directly as well (instead of creating a new config map), but the sc in `KinesisBackedBlockRDD` is not available, and trying to use it there causes serialization errors. Passing a different config map looked like the only simple solution to access the kineses configs. The patch now doesnot use the `sc` at all and expects a kinesisConf to be passed to the `KinesisInputDStream` builder directly. Let me know your thoughts. Thanks again for the review comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17506: [SPARK-20189][DStream] Fix spark kinesis testcases to re...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/17506 Thanks @srowen --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17506: [SPARK-20189][DStream] Fix spark kinesis testcases to re...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/17506 @srowen - does the Jenkins re-test trigger automatically? else, could I request a retest on this patch please ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17506: [SPARK-20189][DStream] Fix spark kinesis testcases to re...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/17506 @srowen : It failed on the earlier patch with KinesisTestUtils.scala changes. This version is clean. Will wait for the next automated build :) Build logs: https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/3635/console Commit : https://github.com/apache/spark/commit/7f86f3923b4b63778f9a9b4a997367368623c7a2 ``` Running Scala style checks Scalastyle checks failed at following occurrences: [error] /home/jenkins/workspace/NewSparkPullRequestBuilder/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala:220: File line length exceeds 100 characters [error] /home/jenkins/workspace/NewSparkPullRequestBuilder/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala:242: File line length exceeds 100 characters [error] /home/jenkins/workspace/NewSparkPullRequestBuilder/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala:246: File line length exceeds 100 characters [error] /home/jenkins/workspace/NewSparkPullRequestBuilder/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala:256: File line length exceeds 100 characters [error] /home/jenkins/workspace/NewSparkPullRequestBuilder/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala:219:0: Use Javadoc style indentation for multiline comments [error] /home/jenkins/workspace/NewSparkPullRequestBuilder/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala:227:0: Use Javadoc style indentation for multiline comments [error] /home/jenkins/workspace/NewSparkPullRequestBuilder/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala:234:0: Use Javadoc style indentation for multiline comments [error] (streaming-kinesis-asl/compile:scalastyle) errors exist [error] Total time: 20 s, completed Apr 3, 2017 5:14:05 AM ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17506: [SPARK-20189][DStream] Fix spark kinesis testcases to re...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/17506 The Scala style check fail because of the double spaced lines probably. But that's how the existing code was so thought of keeping it that way. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/17467 @srowen - Could I get some love here as well. Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17506: [SPARK-20189][DStream] Fix spark kinesis testcases to re...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/17506 @srowen - Thanks for the feedback. Appreciate it. Added a minimal simplified patch which fixes the testcases that fail with the old api. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17506: [SPARK-20189][DStream] Fix spark kinesis testcases to re...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/17506 @srowen - Yes, 2 objectives for the patch: - add new testcases for new api (that also tests aws session tokens) - ability to disable old api test cases For now I have added new test cases for the new api, so that we have the coverage for session tokens. I could remove the part of disabling old test cases, but then while running the whole test suites there would be few failures for environments with session tokens and assumed roles, and devs might think why do we have broken test cases. I could remove the flag option, and just keep the new test cases if that suits better. Thoughts ? Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/17467 Can I get some feedback here please @tdas @brkyvz Thanks :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17506: [SPARK-20189][DStream] Fix spark kinesis testcases to re...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/17506 @srowen - Have made following improvements to the patch to incorporate your points: - Did not remove old testcases for keeping the checks on old api - Added new testcases that test the new kinesis api - Added an option to tag the old api testcases with flag (which can also be useful for identifying them in future easily) - Added option to set env variable to disable the old api kinesis tests - this will help devs to run the complete testsuites on aws-session-token based infrastructure without any changes Let me know your thoughts. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17506: [SPARK-20189][DStream] Fix spark kinesis testcase...
Github user yssharma commented on a diff in the pull request: https://github.com/apache/spark/pull/17506#discussion_r109304637 --- Diff: external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java --- @@ -50,48 +56,86 @@ public void testAwsCreds() { String dummyRegionName = KinesisTestUtils.getRegionNameByEndpoint(dummyEndpointUrl); // Tests the API, does not actually test data receiving -JavaDStream<byte[]> kinesisStream = KinesisUtils.createStream(ssc, "myAppName", "mySparkStream", -dummyEndpointUrl, dummyRegionName, InitialPositionInStream.LATEST, new Duration(2000), -StorageLevel.MEMORY_AND_DISK_2(), "fakeAccessKey", "fakeSecretKey"); +BasicCredentials credentials = new BasicCredentials("fakeAccessKey", "fakeSecretKey"); + +KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder().streamingContext(ssc) +.checkpointAppName("myAppName") +.streamName("mySparkStream") +.endpointUrl(dummyEndpointUrl) +.regionName(dummyRegionName) +.initialPositionInStream(InitialPositionInStream.LATEST) +.checkpointInterval(new Duration(2000)) +.storageLevel(StorageLevel.MEMORY_AND_DISK_2()) +.kinesisCredentials(credentials) +.build(); + ssc.stop(); } - private static Function<Record, String> handler = new Function<Record, String>() { + public static Function1<Record, String> handler = new AbstractFunction1<Record, String>() { --- End diff -- The old style testcases do not work with the aws session tokens. I could add a ENV variable based check if that seems like a better solution ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org