[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user jerryshao commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-97359613 So can you try receiver-based Kafka stream using `createStream`? If this is a problem of py4j, I assume this will also be failed. Originally we actually use `MapConverter` to convert python dict into Java Map, but now seems this conversion is done implicitly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user jerryshao commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-97374537 Might be the problem of py4j, I'm not the expert of py4j, what is your insight @davies ? Maybe you could report a bug in JIRA, so others can take a crack of this problem. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user Arttii commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-97357834 I dont think this is a bug, but rather a problem on my side. I am just wondering what parts can impact this. There's nothing in KafkaUtils or the PythonHelper class that handles any of the conversion logic( I think), so this is a py4j thing. Well I think so at least --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user Arttii commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-97353029 Hi I seem to be having a problem with this. The default py4j dict and set conversion does not seem to be working on my machine(Windows 8.1 Anaconda Python 2.7 64 Bit Java 1.7). I had to hack kafka.py a bit and include the old conversion code in there for this to work? Any idea what the problem might be? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user jerryshao commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-97355450 Would you please paste your stack trace? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user Arttii commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-97357470 This is the normal WordCount example running with local[*] ``` 15/04/29 10:53:48 INFO SparkContext: Running Spark version 1.3.1 15/04/29 10:53:49 INFO SecurityManager: Changing view acls to: a.topchyan 15/04/29 10:53:49 INFO SecurityManager: Changing modify acls to: a.topchyan 15/04/29 10:53:49 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(a.topchyan); users with mod ify permissions: Set(a.topchyan) 15/04/29 10:53:49 INFO Slf4jLogger: Slf4jLogger started 15/04/29 10:53:49 INFO Remoting: Starting remoting 15/04/29 10:53:49 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkdri...@dellhr57l12.replynet.prv:52706] 15/04/29 10:53:49 INFO Utils: Successfully started service 'sparkDriver' on port 52706. 15/04/29 10:53:49 INFO SparkEnv: Registering MapOutputTracker 15/04/29 10:53:49 INFO SparkEnv: Registering BlockManagerMaster 15/04/29 10:53:49 INFO DiskBlockManager: Created local directory at C:\Users\ATOPCH~1.REP\AppData\Local\Temp\spark-af5177e7-d148-4013-a097-113c8f92ed63\blockmgr -c45e19f6-a825-4763-a94e-eea19dbb1f26 15/04/29 10:53:49 INFO MemoryStore: MemoryStore started with capacity 265.4 MB 15/04/29 10:53:49 INFO HttpFileServer: HTTP File server directory is C:\Users\ATOPCH~1.REP\AppData\Local\Temp\spark-8474fc1d-7c66-4b25-b304-2fd743061967\httpd-e ba80b64-1a5a-49c7-9bd2-6f7a4be543d7 15/04/29 10:53:49 INFO HttpServer: Starting HTTP Server 15/04/29 10:53:49 INFO Server: jetty-8.y.z-SNAPSHOT 15/04/29 10:53:49 INFO AbstractConnector: Started SocketConnector@0.0.0.0:52707 15/04/29 10:53:49 INFO Utils: Successfully started service 'HTTP file server' on port 52707. 15/04/29 10:53:49 INFO SparkEnv: Registering OutputCommitCoordinator 15/04/29 10:53:50 INFO Server: jetty-8.y.z-SNAPSHOT 15/04/29 10:53:50 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 15/04/29 10:53:50 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/04/29 10:53:50 INFO SparkUI: Started SparkUI at http://DELLHR57L12.replynet.prv:4040 15/04/29 10:53:50 INFO Executor: Starting executor ID driver on host localhost 15/04/29 10:53:50 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkdri...@dellhr57l12.replynet.prv:52706/user/HeartbeatReceiver 15/04/29 10:53:50 INFO NettyBlockTransferService: Server created on 52745 15/04/29 10:53:50 INFO BlockManagerMaster: Trying to register BlockManager 15/04/29 10:53:50 INFO BlockManagerMasterActor: Registering block manager localhost:52745 with 265.4 MB RAM, BlockManagerId(driver, localhost, 52745) 15/04/29 10:53:50 INFO BlockManagerMaster: Registered BlockManager Traceback (most recent call last): File D:\workingdir\app\driver.py, line 33, in module kvs = KafkaUtils.createDirectStream(ssc, [testopic], {metadata.broker.list: obiwan:9092,r2d2:9092,vader:9092}) File C:/spark\python\pyspark\streaming\kafka.py, line 126, in createDirectStream jstream = helper.createDirectStream(ssc._jssc, kafkaParams, jtopics, jfromOffsets) File C:\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py, line 529, in __call__ [get_command_part(arg, self.pool) for arg in new_args]) File C:\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py, line 265, in get_command_part command_part = REFERENCE_TYPE + parameter._get_object_id() AttributeError: 'dict' object has no attribute '_get_object_id' Press any key to continue . . . ``` If I pass in ```python jparam = MapConverter().convert(kafkaParams, ssc.sparkContext._gateway._gateway_client) ``` seems to work fine. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user jerryshao commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-97354820 Hi @Arttii , maybe it is the problem of your local environment, my machine is Ubuntu 14.04 with Python 2.7.6, it looks OK on my side, also there's no problem in Jenkins environment, where Python version is 2.6. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user Arttii commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-97362965 I did try it. Also fails. Py4J docs say that dict and set are converted to HashMap and HasSet by default. This does not seem to be happening, which is really weird. So I just put in the direct conversion and everything works. I might try py4j forums or something. This is fairly weird. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-97542899 I believe @davies will be slow in responding as he is on a vacation. Pinging @joshrosen instead. On Wed, Apr 29, 2015 at 2:54 AM, Saisai Shao notificati...@github.com wrote: Might be the problem of py4j, I'm not the expert of py4j, what is your insight @davies https://github.com/davies ? Maybe you could report a bug in JIRA, so others can take a crack of this problem. â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/4723#issuecomment-97374537. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-96947833 Alright! Merging it. Thanks! :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/4723 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-96947367 LGTM. Will merge when tests pass. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-96947387 [Test build #31115 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31115/consoleFull) for PR 4723 at commit [`a1fe97c`](https://github.com/apache/spark/commit/a1fe97c496f1441e0d2a5879e257c7e569f2e541). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class OffsetRange(object):` * `class TopicAndPartition(object):` * `class Broker(object):` * This patch does not change any dependencies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user jerryshao commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-96580759 Hi @davies and @tdas , I met a problem of converting Python `int` into Java `Long`, the Java API in KafkaUtils requires offset as `Long` type, this is simple for Python 2, since Python 2 has a built-in `long` type which can be mapped to Java `Long` through py4j automatically, but python 3 only has `int` type, and py4j will map python `int` into Java `Integer`, I'm not sure how to support `Long` in python 3. A simple solution is to modify all the Java-Python interface to change to type `Interger`, but it may not support super large offset. I'm not sure is there any other solution. Sorry for dumb question and thanks a lot in advance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-96866693 @JoshRosen Any thoughts on this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-96885628 Offline conversation with @JoshRosen If it works for python 2, then make the corresponding unit test such that it will be skipped for python 3. Take a look at https://github.com/apache/spark/blob/master/python/pyspark/tests.py#L944 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-96889905 Aaah, I forgot to mention. Please open up a JIRA regarding this pySpark problem. Its most likely a Py4J problem, and hence needs to be followed up with Py4J. And the python 3 is still an experimental so its okay for now. There are other unit tests in Spark that are skipped for Python 3. On Mon, Apr 27, 2015 at 7:29 PM, Saisai Shao notificati...@github.com wrote: Thanks a lot @tdas https://github.com/tdas for your comments, but what if users want to use this API in Python 3, they will still meet such problem, we only neglect the problem by skipping unit test, but not actually solve this. â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/4723#issuecomment-96886946. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user jerryshao commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-96892260 OK, thanks a lot for your suggestion, I will open a JIRA about this issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user jerryshao commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-96886946 Thanks a lot @tdas for your comments, but what if users want to use this API in Python 3, they will still meet such problem, we only neglect the problem by skipping unit test, but not actually solve this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-96910058 [Test build #31115 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31115/consoleFull) for PR 4723 at commit [`a1fe97c`](https://github.com/apache/spark/commit/a1fe97c496f1441e0d2a5879e257c7e569f2e541). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-95846092 Looks almost good, except the comments on the API. Other than that, i took a detailed pass on everything else and it looks good. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/4723#discussion_r29031149 --- Diff: python/pyspark/streaming/kafka.py --- @@ -70,7 +71,195 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={}, except Py4JJavaError, e: # TODO: use --jar once it also work on driver if 'ClassNotFoundException' in str(e.java_exception): -print +KafkaUtils._printErrorMsg(ssc.sparkContext) +raise e +ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) +stream = DStream(jstream, ssc, ser) +return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v))) + +@staticmethod +def createDirectStream(ssc, topics, kafkaParams, + keyDecoder=utf8_decoder, valueDecoder=utf8_decoder): + +.. note:: Experimental + +Create an input stream that directly pulls messages from a Kafka Broker. + +This is not a receiver based Kafka input stream, it directly pulls the message from Kafka +in each batch duration and processed without storing. + +This does not use Zookeeper to store offsets. The consumed offsets are tracked +by the stream itself. For interoperability with Kafka monitoring tools that depend on +Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. +You can access the offsets used in each batch from the generated RDDs (see + +To recover from driver failures, you have to enable checkpointing in the StreamingContext. +The information on consumed offset can be recovered from the checkpoint. +See the programming guide for details (constraints, etc.). + +:param ssc: StreamingContext object +:param topics: list of topic_name to consume. +:param kafkaParams: Additional params for Kafka +:param keyDecoder: A function used to decode key (default is utf8_decoder) +:param valueDecoder: A function used to decode value (default is utf8_decoder) +:return: A DStream object + +if not isinstance(topics, list): +raise TypeError(topics should be list) +if not isinstance(kafkaParams, dict): +raise TypeError(kafkaParams should be dict) + +jtopics = SetConverter().convert(topics, ssc.sparkContext._gateway._gateway_client) +jparam = MapConverter().convert(kafkaParams, ssc.sparkContext._gateway._gateway_client) + +try: +helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ + .loadClass(org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper) +helper = helperClass.newInstance() +jstream = helper.createDirectStream(ssc._jssc, jparam, jtopics) +except Py4JJavaError, e: +if 'ClassNotFoundException' in str(e.java_exception): +KafkaUtils._printErrorMsg(ssc.sparkContext) +raise e + +ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) +stream = DStream(jstream, ssc, ser) +return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v))) + +@staticmethod +def createDirectStreamFromOffset(ssc, kafkaParams, fromOffsets, + keyDecoder=utf8_decoder, valueDecoder=utf8_decoder): + +.. note:: Experimental + +Create an input stream that directly pulls messages from a Kafka Broker and specific offset. + +This is not a receiver based Kafka input stream, it directly pulls the message from Kafka +in each batch duration and processed without storing. + +This does not use Zookeeper to store offsets. The consumed offsets are tracked +by the stream itself. For interoperability with Kafka monitoring tools that depend on +Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. +You can access the offsets used in each batch from the generated RDDs (see + +To recover from driver failures, you have to enable checkpointing in the StreamingContext. +The information on consumed offset can be recovered from the checkpoint. +See the programming guide for details (constraints, etc.). + +:param ssc: StreamingContext object. +:param kafkaParams: Additional params for Kafka. +:param fromOffsets: Per-topic/partition Kafka offsets defining the (inclusive) starting +point of the stream. +:param
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/4723#discussion_r29031104 --- Diff: python/pyspark/streaming/kafka.py --- @@ -70,7 +71,195 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={}, except Py4JJavaError, e: # TODO: use --jar once it also work on driver if 'ClassNotFoundException' in str(e.java_exception): -print +KafkaUtils._printErrorMsg(ssc.sparkContext) +raise e +ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) +stream = DStream(jstream, ssc, ser) +return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v))) + +@staticmethod +def createDirectStream(ssc, topics, kafkaParams, + keyDecoder=utf8_decoder, valueDecoder=utf8_decoder): + +.. note:: Experimental + +Create an input stream that directly pulls messages from a Kafka Broker. + +This is not a receiver based Kafka input stream, it directly pulls the message from Kafka +in each batch duration and processed without storing. + +This does not use Zookeeper to store offsets. The consumed offsets are tracked +by the stream itself. For interoperability with Kafka monitoring tools that depend on +Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. +You can access the offsets used in each batch from the generated RDDs (see + +To recover from driver failures, you have to enable checkpointing in the StreamingContext. +The information on consumed offset can be recovered from the checkpoint. +See the programming guide for details (constraints, etc.). + +:param ssc: StreamingContext object +:param topics: list of topic_name to consume. +:param kafkaParams: Additional params for Kafka +:param keyDecoder: A function used to decode key (default is utf8_decoder) +:param valueDecoder: A function used to decode value (default is utf8_decoder) +:return: A DStream object + +if not isinstance(topics, list): +raise TypeError(topics should be list) +if not isinstance(kafkaParams, dict): +raise TypeError(kafkaParams should be dict) + +jtopics = SetConverter().convert(topics, ssc.sparkContext._gateway._gateway_client) +jparam = MapConverter().convert(kafkaParams, ssc.sparkContext._gateway._gateway_client) + +try: +helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ + .loadClass(org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper) +helper = helperClass.newInstance() +jstream = helper.createDirectStream(ssc._jssc, jparam, jtopics) +except Py4JJavaError, e: +if 'ClassNotFoundException' in str(e.java_exception): +KafkaUtils._printErrorMsg(ssc.sparkContext) +raise e + +ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) +stream = DStream(jstream, ssc, ser) +return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v))) + +@staticmethod +def createDirectStreamFromOffset(ssc, kafkaParams, fromOffsets, --- End diff -- I thought about this a little bit. But I think we should follow the precedent set by the `createStream` and other Python API where there is only method, with many optional parameters. So instead of having `createDirectStream` and `createDirectStreamFromOffsets`, lets just have `createDirectStream` with another optional parameter `fromOffsets`. `fromOffsets` should have the same keys as in topics, otherwise throw an error. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/4723#discussion_r29031516 --- Diff: python/pyspark/streaming/kafka.py --- @@ -70,7 +71,195 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={}, except Py4JJavaError, e: # TODO: use --jar once it also work on driver if 'ClassNotFoundException' in str(e.java_exception): -print +KafkaUtils._printErrorMsg(ssc.sparkContext) +raise e +ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) +stream = DStream(jstream, ssc, ser) +return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v))) + +@staticmethod +def createDirectStream(ssc, topics, kafkaParams, + keyDecoder=utf8_decoder, valueDecoder=utf8_decoder): + +.. note:: Experimental + +Create an input stream that directly pulls messages from a Kafka Broker. + +This is not a receiver based Kafka input stream, it directly pulls the message from Kafka +in each batch duration and processed without storing. + +This does not use Zookeeper to store offsets. The consumed offsets are tracked +by the stream itself. For interoperability with Kafka monitoring tools that depend on +Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. +You can access the offsets used in each batch from the generated RDDs (see + +To recover from driver failures, you have to enable checkpointing in the StreamingContext. +The information on consumed offset can be recovered from the checkpoint. +See the programming guide for details (constraints, etc.). + +:param ssc: StreamingContext object +:param topics: list of topic_name to consume. +:param kafkaParams: Additional params for Kafka +:param keyDecoder: A function used to decode key (default is utf8_decoder) +:param valueDecoder: A function used to decode value (default is utf8_decoder) +:return: A DStream object + +if not isinstance(topics, list): +raise TypeError(topics should be list) +if not isinstance(kafkaParams, dict): +raise TypeError(kafkaParams should be dict) + +jtopics = SetConverter().convert(topics, ssc.sparkContext._gateway._gateway_client) +jparam = MapConverter().convert(kafkaParams, ssc.sparkContext._gateway._gateway_client) + +try: +helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ + .loadClass(org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper) +helper = helperClass.newInstance() +jstream = helper.createDirectStream(ssc._jssc, jparam, jtopics) +except Py4JJavaError, e: +if 'ClassNotFoundException' in str(e.java_exception): +KafkaUtils._printErrorMsg(ssc.sparkContext) +raise e + +ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) +stream = DStream(jstream, ssc, ser) +return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v))) + +@staticmethod +def createDirectStreamFromOffset(ssc, kafkaParams, fromOffsets, --- End diff -- Since python do not support method overload, so I use different method name to differentiate it. I will change to way you mentioned. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/4723#discussion_r29029779 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -558,4 +560,100 @@ private class KafkaUtilsPythonHelper { topics, storageLevel) } + + def createRDD( + jsc: JavaSparkContext, + kafkaParams: JMap[String, String], + offsetRanges: JList[OffsetRange]): JavaPairRDD[Array[Byte], Array[Byte]] = { +KafkaUtils.createRDD[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder]( + jsc, + classOf[Array[Byte]], + classOf[Array[Byte]], + classOf[DefaultDecoder], + classOf[DefaultDecoder], + kafkaParams, + offsetRanges.toArray(new Array[OffsetRange](offsetRanges.size( + } + + def createRDD( + jsc: JavaSparkContext, + kafkaParams: JMap[String, String], + offsetRanges: JList[OffsetRange], + leaders: JMap[TopicAndPartition, Broker]): JavaPairRDD[Array[Byte], Array[Byte]] = { +val messageHandler = new JFunction[MessageAndMetadata[Array[Byte], Array[Byte]], + (Array[Byte], Array[Byte])] { + def call(t1: MessageAndMetadata[Array[Byte], Array[Byte]]): (Array[Byte], Array[Byte]) = +(t1.key(), t1.message()) +} + +val jrdd = KafkaUtils.createRDD[ + Array[Byte], + Array[Byte], + DefaultDecoder, + DefaultDecoder, + (Array[Byte], Array[Byte])]( +jsc, +classOf[Array[Byte]], +classOf[Array[Byte]], +classOf[DefaultDecoder], +classOf[DefaultDecoder], +classOf[(Array[Byte], Array[Byte])], +kafkaParams, +offsetRanges.toArray(new Array[OffsetRange](offsetRanges.size())), +leaders, +messageHandler + ) +new JavaPairRDD(jrdd.rdd) + } + + def createDirectStream( + jssc: JavaStreamingContext, + kafkaParams: JMap[String, String], + topics: JSet[String]): JavaPairInputDStream[Array[Byte], Array[Byte]] = { +KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder]( + jssc, + classOf[Array[Byte]], + classOf[Array[Byte]], + classOf[DefaultDecoder], + classOf[DefaultDecoder], + kafkaParams, + topics) + } + + def createDirectStream( + jssc: JavaStreamingContext, + kafkaParams: JMap[String, String], + fromOffsets: JMap[TopicAndPartition, JLong]) +: JavaPairInputDStream[Array[Byte], Array[Byte]] = { --- End diff -- I think we generally dont put ':' in the next line. Either ``` fromOffsets: JMap[TopicAndPartition, JLong]) : JavaPairInputDStream[Array[Byte], Array[Byte]] = ``` or ``` fromOffsets: JMap[TopicAndPartition, JLong] ) : JavaPairInputDStream[Array[Byte], Array[Byte]] = ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/4723#discussion_r29030076 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -558,4 +560,100 @@ private class KafkaUtilsPythonHelper { topics, storageLevel) } + + def createRDD( + jsc: JavaSparkContext, + kafkaParams: JMap[String, String], + offsetRanges: JList[OffsetRange]): JavaPairRDD[Array[Byte], Array[Byte]] = { +KafkaUtils.createRDD[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder]( + jsc, + classOf[Array[Byte]], + classOf[Array[Byte]], + classOf[DefaultDecoder], + classOf[DefaultDecoder], + kafkaParams, + offsetRanges.toArray(new Array[OffsetRange](offsetRanges.size( + } + + def createRDD( + jsc: JavaSparkContext, + kafkaParams: JMap[String, String], + offsetRanges: JList[OffsetRange], + leaders: JMap[TopicAndPartition, Broker]): JavaPairRDD[Array[Byte], Array[Byte]] = { +val messageHandler = new JFunction[MessageAndMetadata[Array[Byte], Array[Byte]], + (Array[Byte], Array[Byte])] { + def call(t1: MessageAndMetadata[Array[Byte], Array[Byte]]): (Array[Byte], Array[Byte]) = +(t1.key(), t1.message()) +} + +val jrdd = KafkaUtils.createRDD[ + Array[Byte], + Array[Byte], + DefaultDecoder, + DefaultDecoder, + (Array[Byte], Array[Byte])]( +jsc, +classOf[Array[Byte]], +classOf[Array[Byte]], +classOf[DefaultDecoder], +classOf[DefaultDecoder], +classOf[(Array[Byte], Array[Byte])], +kafkaParams, +offsetRanges.toArray(new Array[OffsetRange](offsetRanges.size())), +leaders, +messageHandler + ) +new JavaPairRDD(jrdd.rdd) + } + + def createDirectStream( + jssc: JavaStreamingContext, + kafkaParams: JMap[String, String], + topics: JSet[String]): JavaPairInputDStream[Array[Byte], Array[Byte]] = { +KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder]( + jssc, + classOf[Array[Byte]], + classOf[Array[Byte]], + classOf[DefaultDecoder], + classOf[DefaultDecoder], + kafkaParams, + topics) + } + + def createDirectStream( + jssc: JavaStreamingContext, + kafkaParams: JMap[String, String], + fromOffsets: JMap[TopicAndPartition, JLong]) +: JavaPairInputDStream[Array[Byte], Array[Byte]] = { +val messageHandler = new JFunction[MessageAndMetadata[Array[Byte], Array[Byte]], + (Array[Byte], Array[Byte])] { + def call(t1: MessageAndMetadata[Array[Byte], Array[Byte]]): (Array[Byte], Array[Byte]) = +(t1.key(), t1.message()) +} + +val jstream = KafkaUtils.createDirectStream[ + Array[Byte], + Array[Byte], + DefaultDecoder, + DefaultDecoder, + (Array[Byte], Array[Byte])]( +jssc, +classOf[Array[Byte]], +classOf[Array[Byte]], +classOf[DefaultDecoder], +classOf[DefaultDecoder], +classOf[(Array[Byte], Array[Byte])], +kafkaParams, +fromOffsets, +messageHandler) +new JavaPairInputDStream(jstream.inputDStream) + } + + def createOffsetRange(topic: String, partition: Int, fromOffset: Long, untilOffset: Long) + : OffsetRange = OffsetRange.create(topic, partition, fromOffset, untilOffset) --- End diff -- Same comment as above --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user mattaylor commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-95312036 any more news on this one? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user jerryshao commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-95377844 Just waiting for review :). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-93380621 [Test build #30340 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30340/consoleFull) for PR 4723 at commit [`defbad7`](https://github.com/apache/spark/commit/defbad7993c21b8530b94db0db5811728d0a614b). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user jerryshao commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-93378234 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-93429833 [Test build #30340 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30340/consoleFull) for PR 4723 at commit [`defbad7`](https://github.com/apache/spark/commit/defbad7993c21b8530b94db0db5811728d0a614b). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class OffsetRange(object):` * `class TopicAndPartition(object):` * `class Broker(object):` * This patch does not change any dependencies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-93429881 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30340/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user jerryshao commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-93611808 @tdas , I've added more APIs with unit tests, would you please take a look at it, thanks a lot. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-93219403 [Test build #30320 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30320/consoleFull) for PR 4723 at commit [`aaf4c5a`](https://github.com/apache/spark/commit/aaf4c5a4a06cd3fe9cf44e48dbfa6d209a4e75f1). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-93227401 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30310/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-93227369 [Test build #30310 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30310/consoleFull) for PR 4723 at commit [`dc0cf6f`](https://github.com/apache/spark/commit/dc0cf6ffdd6f4c4c58a47f69ecef3f9103caef4f). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class OffsetRange(object):` * `class TopicAndPartition(object):` * `class Broker(object):` * This patch does not change any dependencies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-93232943 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30308/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-93232903 [Test build #30308 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30308/consoleFull) for PR 4723 at commit [`9da49be`](https://github.com/apache/spark/commit/9da49be0cf2e569a9c871dd7bbb3aee7820f9e0e). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class OffsetRange(object):` * `class TopicAndPartition(object):` * `class Broker(object):` * This patch **adds the following new dependencies:** * `snappy-java-1.1.1.7.jar` * This patch **removes the following dependencies:** * `snappy-java-1.1.1.6.jar` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-93274741 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30326/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-93274690 [Test build #30326 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30326/consoleFull) for PR 4723 at commit [`defbad7`](https://github.com/apache/spark/commit/defbad7993c21b8530b94db0db5811728d0a614b). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class OffsetRange(object):` * `class TopicAndPartition(object):` * `class Broker(object):` * This patch does not change any dependencies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-93248588 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30320/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-93248559 [Test build #30320 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30320/consoleFull) for PR 4723 at commit [`aaf4c5a`](https://github.com/apache/spark/commit/aaf4c5a4a06cd3fe9cf44e48dbfa6d209a4e75f1). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class OffsetRange(object):` * `class TopicAndPartition(object):` * `class Broker(object):` * This patch does not change any dependencies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user jerryshao commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-93283025 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-93284311 [Test build #30333 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30333/consoleFull) for PR 4723 at commit [`defbad7`](https://github.com/apache/spark/commit/defbad7993c21b8530b94db0db5811728d0a614b). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-93244981 [Test build #30326 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30326/consoleFull) for PR 4723 at commit [`defbad7`](https://github.com/apache/spark/commit/defbad7993c21b8530b94db0db5811728d0a614b). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-93325703 [Test build #30333 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30333/consoleFull) for PR 4723 at commit [`defbad7`](https://github.com/apache/spark/commit/defbad7993c21b8530b94db0db5811728d0a614b). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class OffsetRange(object):` * `class TopicAndPartition(object):` * `class Broker(object):` * This patch does not change any dependencies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-93325759 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30333/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-93205694 [Test build #30310 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30310/consoleFull) for PR 4723 at commit [`dc0cf6f`](https://github.com/apache/spark/commit/dc0cf6ffdd6f4c4c58a47f69ecef3f9103caef4f). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-93203535 [Test build #30308 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30308/consoleFull) for PR 4723 at commit [`9da49be`](https://github.com/apache/spark/commit/9da49be0cf2e569a9c871dd7bbb3aee7820f9e0e). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/4723#discussion_r28296922 --- Diff: python/pyspark/streaming/kafka.py --- @@ -70,7 +71,103 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={}, except Py4JJavaError, e: # TODO: use --jar once it also work on driver if 'ClassNotFoundException' in str(e.java_exception): -print +KafkaUtils._printErrorMsg(ssc.sparkContext) +raise e +ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) +stream = DStream(jstream, ssc, ser) +return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v))) + +@staticmethod +def createDirectStream(ssc, brokerList, topics, kafkaParams={}, --- End diff -- Hi @tdas , these days I'm trying to enable custom function `MessageAndMetadata = R` in Python related API, I just tried several solutions but unfortunately no success, do you have any suggestion? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/4723#discussion_r28301651 --- Diff: python/pyspark/streaming/kafka.py --- @@ -70,7 +71,103 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={}, except Py4JJavaError, e: # TODO: use --jar once it also work on driver if 'ClassNotFoundException' in str(e.java_exception): -print +KafkaUtils._printErrorMsg(ssc.sparkContext) +raise e +ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) +stream = DStream(jstream, ssc, ser) +return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v))) + +@staticmethod +def createDirectStream(ssc, brokerList, topics, kafkaParams={}, --- End diff -- Lets leave that for another PR. That sounds complicated, and needs careful thought. On Mon, Apr 13, 2015 at 10:32 PM, Saisai Shao notificati...@github.com wrote: In python/pyspark/streaming/kafka.py https://github.com/apache/spark/pull/4723#discussion_r28301541: @@ -70,7 +71,103 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={}, except Py4JJavaError, e: # TODO: use --jar once it also work on driver if 'ClassNotFoundException' in str(e.java_exception): -print +KafkaUtils._printErrorMsg(ssc.sparkContext) +raise e +ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) +stream = DStream(jstream, ssc, ser) +return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v))) + +@staticmethod +def createDirectStream(ssc, brokerList, topics, kafkaParams={}, Currently I have a solution, serialize the content of MessageAndMetadata into byte array in Scala code, and unpack, reconstruct the python object of MessageAndMetadata in python code. So the handler can be supported. What do you think of this solution? â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/4723/files#r28301541. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/4723#discussion_r28301930 --- Diff: python/pyspark/streaming/kafka.py --- @@ -70,7 +71,103 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={}, except Py4JJavaError, e: # TODO: use --jar once it also work on driver if 'ClassNotFoundException' in str(e.java_exception): -print +KafkaUtils._printErrorMsg(ssc.sparkContext) +raise e +ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) +stream = DStream(jstream, ssc, ser) +return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v))) + +@staticmethod +def createDirectStream(ssc, brokerList, topics, kafkaParams={}, --- End diff -- OK, I will leave this to another PR. Basically this solution is not so complicated, just write (topic, partition, key, message, offset) with a controlled format into a byte array. And the python code unpack this byte array with same format into the original data and construct a object used for message handler function. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/4723#discussion_r28301541 --- Diff: python/pyspark/streaming/kafka.py --- @@ -70,7 +71,103 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={}, except Py4JJavaError, e: # TODO: use --jar once it also work on driver if 'ClassNotFoundException' in str(e.java_exception): -print +KafkaUtils._printErrorMsg(ssc.sparkContext) +raise e +ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) +stream = DStream(jstream, ssc, ser) +return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v))) + +@staticmethod +def createDirectStream(ssc, brokerList, topics, kafkaParams={}, --- End diff -- Currently I have a solution, serialize the content of `MessageAndMetadata` into byte array in Scala code, and unpack, reconstruct the python object of `MessageAndMetadata` in python code. So the handler can be supported. What do you think of this solution? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/4723#discussion_r28301414 --- Diff: python/pyspark/streaming/kafka.py --- @@ -70,7 +71,103 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={}, except Py4JJavaError, e: # TODO: use --jar once it also work on driver if 'ClassNotFoundException' in str(e.java_exception): -print +KafkaUtils._printErrorMsg(ssc.sparkContext) +raise e +ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) +stream = DStream(jstream, ssc, ser) +return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v))) + +@staticmethod +def createDirectStream(ssc, brokerList, topics, kafkaParams={}, --- End diff -- That is quite tricky. And its not even clear to me what the demand will be for this from the Python API. What I wanted to have in the Python aPI is the ability to specify starting offsets. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/4723#discussion_r28302015 --- Diff: python/pyspark/streaming/kafka.py --- @@ -70,7 +71,103 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={}, except Py4JJavaError, e: # TODO: use --jar once it also work on driver if 'ClassNotFoundException' in str(e.java_exception): -print +KafkaUtils._printErrorMsg(ssc.sparkContext) +raise e +ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) +stream = DStream(jstream, ssc, ser) +return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v))) + +@staticmethod +def createDirectStream(ssc, brokerList, topics, kafkaParams={}, --- End diff -- I agree as such it is not a complicated thing, just that this way of doing things is non-standard practice (unless you show me some examples of this being done in other python kafka libraries) and therefore should be added only if there is a good use case / demand for it. On Mon, Apr 13, 2015 at 10:48 PM, Saisai Shao notificati...@github.com wrote: In python/pyspark/streaming/kafka.py https://github.com/apache/spark/pull/4723#discussion_r28301930: @@ -70,7 +71,103 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={}, except Py4JJavaError, e: # TODO: use --jar once it also work on driver if 'ClassNotFoundException' in str(e.java_exception): -print +KafkaUtils._printErrorMsg(ssc.sparkContext) +raise e +ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) +stream = DStream(jstream, ssc, ser) +return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v))) + +@staticmethod +def createDirectStream(ssc, brokerList, topics, kafkaParams={}, OK, I will leave this to another PR. Basically this solution is not so complicated, just write (topic, partition, key, message, offset) with a controlled format into a byte array. And the python code unpack this byte array with same format into the original data and construct a object used for message handler function. â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/4723/files#r28301930. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/4723#discussion_r28302149 --- Diff: python/pyspark/streaming/kafka.py --- @@ -70,7 +71,103 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={}, except Py4JJavaError, e: # TODO: use --jar once it also work on driver if 'ClassNotFoundException' in str(e.java_exception): -print +KafkaUtils._printErrorMsg(ssc.sparkContext) +raise e +ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) +stream = DStream(jstream, ssc, ser) +return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v))) + +@staticmethod +def createDirectStream(ssc, brokerList, topics, kafkaParams={}, --- End diff -- OK, that make sense, I will ignore this handler support in Python API unless someone has specific requirement for this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user jerryshao commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-92159344 Yes, will do. I'm now working on this to add more API related to DirectKafkaInputStream as well as unit test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-91734374 Yes, only after unit tests cover this API. @jerryshao can you add unit tests to this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user jerryshao commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-77786791 Thanks @tdas for your review, maybe we should figure out a way to test the Kafka Python API at first. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/4723#discussion_r25986002 --- Diff: python/pyspark/streaming/kafka.py --- @@ -70,7 +71,103 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={}, except Py4JJavaError, e: # TODO: use --jar once it also work on driver if 'ClassNotFoundException' in str(e.java_exception): -print +KafkaUtils._printErrorMsg(ssc.sparkContext) +raise e +ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) +stream = DStream(jstream, ssc, ser) +return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v))) + +@staticmethod +def createDirectStream(ssc, brokerList, topics, kafkaParams={}, --- End diff -- We should also allow creating direct stream with offsets and specify optional leaders. That is, all the stuff the advanced version of createDirectStream supports. The only thing that we cannot easily support in python, is the custom function MessagAndMetadata = R. Other than that we should be able to do others. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-77655852 This is looking good, but unfortunately its hard to say due to the lack of unit tests. We have to test the kafka python API through python unit tests. I can open a separate JIRA for that, and it will be good to have that framework set up first. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/4723#discussion_r25986066 --- Diff: python/pyspark/streaming/kafka.py --- @@ -70,7 +71,103 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={}, except Py4JJavaError, e: # TODO: use --jar once it also work on driver if 'ClassNotFoundException' in str(e.java_exception): -print +KafkaUtils._printErrorMsg(ssc.sparkContext) +raise e +ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) +stream = DStream(jstream, ssc, ser) +return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v))) + +@staticmethod +def createDirectStream(ssc, brokerList, topics, kafkaParams={}, + keyDecoder=utf8_decoder, valueDecoder=utf8_decoder): + --- End diff -- Also, taking a broker list is different from the Scala / Java API. We should keep this consistent with the Scala / Java API. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-77102323 [Test build #28254 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/28254/consoleFull) for PR 4723 at commit [`c301951`](https://github.com/apache/spark/commit/c3019515e3636568db6e625705b774f792c8bc2b). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-77108853 [Test build #28254 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/28254/consoleFull) for PR 4723 at commit [`c301951`](https://github.com/apache/spark/commit/c3019515e3636568db6e625705b774f792c8bc2b). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class OffsetRange(object):` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-77108858 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/28254/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-76512903 [Test build #28111 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/28111/consoleFull) for PR 4723 at commit [`1b6e873`](https://github.com/apache/spark/commit/1b6e873602785c5e5c78ee23d77725d2c51129fc). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class OffsetRange(object):` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-76512906 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/28111/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-76510453 [Test build #28111 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/28111/consoleFull) for PR 4723 at commit [`1b6e873`](https://github.com/apache/spark/commit/1b6e873602785c5e5c78ee23d77725d2c51129fc). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user jerryshao commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-75994230 Hi @davies , mind taking a look at this again, I've addressed your comments, though some duplications are hard to remove, any suggestions? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user davies commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-76003479 LGTM, thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user jerryshao commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-75799652 Hi @davies , should I use `::note: experimental` or `.. note:: Experimental`, seems the MLlib codes just use the latter #3951 . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user davies commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-75805094 @jerryshao please follow the examples in MLlib, thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-75874294 [Test build #27919 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27919/consoleFull) for PR 4723 at commit [`90ed034`](https://github.com/apache/spark/commit/90ed034453914eff60bde04d9fad36f905288aa5). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/4723#discussion_r25306398 --- Diff: python/pyspark/streaming/kafka.py --- @@ -81,3 +79,128 @@ def getClassByName(name): ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) stream = DStream(jstream, ssc, ser) return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v))) + +@staticmethod +def createDirectStream(ssc, brokerList, topics, kafkaParams={}, + keyDecoder=utf8_decoder, valueDecoder=utf8_decoder): + +..note:: experimental + +Create an input stream that directly pulls messages from a Kafka Broker. + +This is not a receiver based Kafka input stream, it directly pulls the message from Kafka +in each batch duration and processed without storing. + +This does not use Zookeeper to store offsets. The consumed offsets are tracked +by the stream itself. For interoperability with Kafka monitoring tools that depend on +Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. +You can access the offsets used in each batch from the generated RDDs (see + +To recover from driver failures, you have to enable checkpointing in the StreamingContext. +The information on consumed offset can be recovered from the checkpoint. +See the programming guide for details (constraints, etc.). + +:param ssc: StreamingContext object +:param brokerList: A String representing a list of seed Kafka brokers (hostname:port,...) +:param topics: list of topic_name to consume. +:param kafkaParams: Additional params for Kafka +:param keyDecoder: A function used to decode key (default is utf8_decoder) +:param valueDecoder: A function used to decode value (default is utf8_decoder) +:return: A DStream object + +java_import(ssc._jvm, org.apache.spark.streaming.kafka.KafkaUtils) + +kafkaParams.update({metadata.broker.list: brokerList}) + +if not isinstance(topics, list): +raise TypeError(topics should be list) +jtopics = SetConverter().convert(topics, ssc.sparkContext._gateway._gateway_client) +jparam = MapConverter().convert(kafkaParams, ssc.sparkContext._gateway._gateway_client) + +try: +array = KafkaUtils._getClassByName(ssc._jvm, [B) +decoder = KafkaUtils._getClassByName(ssc._jvm, kafka.serializer.DefaultDecoder) +jstream = ssc._jvm.KafkaUtils.createDirectStream(ssc._jssc, array, array, decoder, + decoder, jparam, jtopics) +except Py4JError, e: +# TODO: use --jar once it also work on driver +if not e.message or 'call a package' in e.message: +print No kafka package, please put the assembly jar into classpath: +print $ bin/spark-submit --driver-class-path external/kafka-assembly/target/ + \ + scala-*/spark-streaming-kafka-assembly-*.jar +raise e +ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) +stream = DStream(jstream, ssc, ser) +return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v))) + +@staticmethod +def createRDD(sc, brokerList, offsetRanges, kafkaParams={}, + keyDecoder=utf8_decoder, valueDecoder=utf8_decoder): + +..note:: experimental + +Create a RDD from Kafka using offset ranges for each topic and partition. +:param sc: SparkContext object +:param brokerList: A String representing a list of seed Kafka brokers (hostname:port,...) +:param offsetRanges: list of offsetRange to specify topic:partition [start, +end) to consume. +:param kafkaParams: Additional params for Kafka +:param keyDecoder: A function used to decode key (default is utf8_decoder) +:param valueDecoder: A function used to decode value (default is utf8_decoder) +:return: A RDD object + +java_import(sc._jvm, org.apache.spark.streaming.kafka.KafkaUtils) +java_import(sc._jvm, org.apache.spark.streaming.kafka.OffsetRange) + +kafkaParams.update({metadata.broker.list: brokerList}) + +if not isinstance(offsetRanges, list): +raise TypeError(offsetRanges should be list) +jparam = MapConverter().convert(kafkaParams, sc._gateway._gateway_client) + +try: +array =
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-75911521 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27941/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-75911518 [Test build #27941 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27941/consoleFull) for PR 4723 at commit [`d68aad2`](https://github.com/apache/spark/commit/d68aad239c2eb4917daa755efd60c9fdcd87a53e). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class OffsetRange(object):` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/4723#discussion_r25306191 --- Diff: python/pyspark/streaming/kafka.py --- @@ -81,3 +79,128 @@ def getClassByName(name): ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) stream = DStream(jstream, ssc, ser) return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v))) + +@staticmethod +def createDirectStream(ssc, brokerList, topics, kafkaParams={}, + keyDecoder=utf8_decoder, valueDecoder=utf8_decoder): + +..note:: experimental + +Create an input stream that directly pulls messages from a Kafka Broker. + +This is not a receiver based Kafka input stream, it directly pulls the message from Kafka +in each batch duration and processed without storing. + +This does not use Zookeeper to store offsets. The consumed offsets are tracked +by the stream itself. For interoperability with Kafka monitoring tools that depend on +Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. +You can access the offsets used in each batch from the generated RDDs (see + +To recover from driver failures, you have to enable checkpointing in the StreamingContext. +The information on consumed offset can be recovered from the checkpoint. +See the programming guide for details (constraints, etc.). + +:param ssc: StreamingContext object +:param brokerList: A String representing a list of seed Kafka brokers (hostname:port,...) +:param topics: list of topic_name to consume. +:param kafkaParams: Additional params for Kafka +:param keyDecoder: A function used to decode key (default is utf8_decoder) +:param valueDecoder: A function used to decode value (default is utf8_decoder) +:return: A DStream object + +java_import(ssc._jvm, org.apache.spark.streaming.kafka.KafkaUtils) + +kafkaParams.update({metadata.broker.list: brokerList}) + +if not isinstance(topics, list): +raise TypeError(topics should be list) +jtopics = SetConverter().convert(topics, ssc.sparkContext._gateway._gateway_client) +jparam = MapConverter().convert(kafkaParams, ssc.sparkContext._gateway._gateway_client) + +try: +array = KafkaUtils._getClassByName(ssc._jvm, [B) +decoder = KafkaUtils._getClassByName(ssc._jvm, kafka.serializer.DefaultDecoder) +jstream = ssc._jvm.KafkaUtils.createDirectStream(ssc._jssc, array, array, decoder, + decoder, jparam, jtopics) +except Py4JError, e: +# TODO: use --jar once it also work on driver +if not e.message or 'call a package' in e.message: +print No kafka package, please put the assembly jar into classpath: +print $ bin/spark-submit --driver-class-path external/kafka-assembly/target/ + \ + scala-*/spark-streaming-kafka-assembly-*.jar +raise e +ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) +stream = DStream(jstream, ssc, ser) +return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v))) + +@staticmethod +def createRDD(sc, brokerList, offsetRanges, kafkaParams={}, + keyDecoder=utf8_decoder, valueDecoder=utf8_decoder): + +..note:: experimental + +Create a RDD from Kafka using offset ranges for each topic and partition. +:param sc: SparkContext object +:param brokerList: A String representing a list of seed Kafka brokers (hostname:port,...) +:param offsetRanges: list of offsetRange to specify topic:partition [start, +end) to consume. +:param kafkaParams: Additional params for Kafka +:param keyDecoder: A function used to decode key (default is utf8_decoder) +:param valueDecoder: A function used to decode value (default is utf8_decoder) +:return: A RDD object + +java_import(sc._jvm, org.apache.spark.streaming.kafka.KafkaUtils) +java_import(sc._jvm, org.apache.spark.streaming.kafka.OffsetRange) + +kafkaParams.update({metadata.broker.list: brokerList}) + +if not isinstance(offsetRanges, list): +raise TypeError(offsetRanges should be list) +jparam = MapConverter().convert(kafkaParams, sc._gateway._gateway_client) + +try: +array =
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-75871599 [Test build #27918 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27918/consoleFull) for PR 4723 at commit [`cd56575`](https://github.com/apache/spark/commit/cd56575b00fd43df273b49068dfd62e47dc5370b). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user jerryshao commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-75871321 Add a new API of `createRDD`, @davies @tdas , please help to review. Thanks a lot. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/4723#discussion_r25306507 --- Diff: python/pyspark/streaming/kafka.py --- @@ -81,3 +79,128 @@ def getClassByName(name): ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) stream = DStream(jstream, ssc, ser) return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v))) + +@staticmethod +def createDirectStream(ssc, brokerList, topics, kafkaParams={}, + keyDecoder=utf8_decoder, valueDecoder=utf8_decoder): + +..note:: experimental --- End diff -- Sorry, it should be ``` .. note:: Experimental ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-75881810 [Test build #27919 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27919/consoleFull) for PR 4723 at commit [`90ed034`](https://github.com/apache/spark/commit/90ed034453914eff60bde04d9fad36f905288aa5). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class OffsetRange(object):` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-75881819 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27919/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-75871757 [Test build #27918 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27918/consoleFull) for PR 4723 at commit [`cd56575`](https://github.com/apache/spark/commit/cd56575b00fd43df273b49068dfd62e47dc5370b). * This patch **fails Python style tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class OffsetRange(object):` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-75871761 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27918/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-75906035 [Test build #27939 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27939/consoleFull) for PR 4723 at commit [`377b73f`](https://github.com/apache/spark/commit/377b73f7c8a6d840253d5da01c8a219d4905fdcf). * This patch **fails Python style tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class OffsetRange(object):` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-75906037 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27939/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-75906298 [Test build #27941 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27941/consoleFull) for PR 4723 at commit [`d68aad2`](https://github.com/apache/spark/commit/d68aad239c2eb4917daa755efd60c9fdcd87a53e). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-75905949 [Test build #27939 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27939/consoleFull) for PR 4723 at commit [`377b73f`](https://github.com/apache/spark/commit/377b73f7c8a6d840253d5da01c8a219d4905fdcf). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user jerryshao commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-75884024 Thanks @davies for your review, I will fix these comments :). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user jerryshao commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-75581979 Hi @tdas , do we need to add a Python version of `createRDD` for direct Kafka stream? Seems this API requires Python wrapper of Java object like `OffsetRange`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/4723#discussion_r25178444 --- Diff: examples/src/main/python/streaming/direct_kafka_wordcount.py --- @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + + Counts words in UTF8 encoded, '\n' delimited text directly received from Kafka in every 2 seconds. + Usage: direct_kafka_wordcount.py broker_list topic + + To run this on your local machine, you need to setup Kafka and create a producer first, see + http://kafka.apache.org/documentation.html#quickstart + + and then run the example +`$ bin/spark-submit --driver-class-path external/kafka-assembly/target/scala-*/\ + spark-streaming-kafka-assembly-*.jar \ + examples/src/main/python/streaming/direct_kafka_wordcount.py \ + localhost:9092 test` + + +import sys + +from pyspark import SparkContext +from pyspark.streaming import StreamingContext +from pyspark.streaming.kafka import KafkaUtils + +if __name__ == __main__: +if len(sys.argv) != 3: +print sys.stderr, Usage: direct_kafka_wordcount.py broker_list topic +exit(-1) + +sc = SparkContext(appName=PythonStreamingDirectKafkaWordCount) +ssc = StreamingContext(sc, 2) + +brokers, topic = sys.argv[1:] +kvs = KafkaUtils.createDirectStream(ssc, [topic], {metadata.broker.list: brokers}) --- End diff -- Hi @davies , thanks for your comment, I will add this as a argument. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-75594343 [Test build #27855 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27855/consoleFull) for PR 4723 at commit [`5381db1`](https://github.com/apache/spark/commit/5381db1ad833ab72a2eb15b0f30d745c1bfbe764). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-75610987 [Test build #27855 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27855/consoleFull) for PR 4723 at commit [`5381db1`](https://github.com/apache/spark/commit/5381db1ad833ab72a2eb15b0f30d745c1bfbe764). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-75610996 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27855/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-75698289 Yes, python version of createRDD would be great. BTW, is it possible to mark these experimental in Python @davies? The Scala, Java AP is experimental as of now. TD On Mon, Feb 23, 2015 at 11:16 AM, UCB AMPLab notificati...@github.com wrote: Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27855/ Test PASSed. â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/4723#issuecomment-75610996. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user davies commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-75706796 We can mark it as experimental by ``` ::note: experimental ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-75495853 @davies It is a the new experimental API for Kafka, which does not use Receivers at all, and rather treats Kafka as a file system and topics as files. It finds new data in topic pretty much like your patch to support appends in file stream. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user davies commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-75496536 @tdas Cool, thanks. Do it mean that it does NOT need WAL to have high durability? This patch looks good to me, just need more docs (somewhere) to tell the difference between these two APIs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-75491038 [Test build #27848 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27848/consoleFull) for PR 4723 at commit [`f80a6be`](https://github.com/apache/spark/commit/f80a6be693366c042f460d31ec90228e5e0a71a1). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5946][Streaming] Add Python API for dir...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4723#issuecomment-75491095 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27848/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org