Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/2538#discussion_r18193173
--- Diff: python/pyspark/streaming/context.py ---
@@ -0,0 +1,243 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from py4j.java_collections import ListConverter
+from py4j.java_gateway import java_import
+
+from pyspark import RDD
+from pyspark.serializers import UTF8Deserializer
+from pyspark.context import SparkContext
+from pyspark.storagelevel import StorageLevel
+from pyspark.streaming.dstream import DStream
+from pyspark.streaming.util import RDDFunction
+
+__all__ = ["StreamingContext"]
+
+
+def _daemonize_callback_server():
+ """
+ Hack Py4J to daemonize callback server
+ """
+ # TODO: create a patch for Py4J
+ import socket
+ import py4j.java_gateway
+ logger = py4j.java_gateway.logger
+ from py4j.java_gateway import Py4JNetworkError
+ from threading import Thread
+
+ def start(self):
+ """Starts the CallbackServer. This method should be called by the
+ client instead of run()."""
+ self.server_socket = socket.socket(socket.AF_INET,
socket.SOCK_STREAM)
+ self.server_socket.setsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR,
+ 1)
+ try:
+ self.server_socket.bind((self.address, self.port))
+ # self.port = self.server_socket.getsockname()[1]
+ except Exception:
+ msg = 'An error occurred while trying to start the callback
server'
+ logger.exception(msg)
+ raise Py4JNetworkError(msg)
+
+ # Maybe thread needs to be cleanup up?
+ self.thread = Thread(target=self.run)
+ self.thread.daemon = True
+ self.thread.start()
+
+ py4j.java_gateway.CallbackServer.start = start
+
+
+class StreamingContext(object):
+ """
+ Main entry point for Spark Streaming functionality. A StreamingContext
represents the
+ connection to a Spark cluster, and can be used to create L{DStream}s
and
+ broadcast variables on that cluster.
+ """
+
+ def __init__(self, sparkContext, duration):
+ """
+ Create a new StreamingContext. At least the master and app name
and duration
+ should be set, either through the named parameters here or through
C{conf}.
+
+ @param sparkContext: L{SparkContext} object.
+ @param duration: seconds for SparkStreaming.
+
+ """
+ self._sc = sparkContext
+ self._jvm = self._sc._jvm
+ self._start_callback_server()
+ self._jssc = self._initialize_context(self._sc, duration)
+
+ def _start_callback_server(self):
+ gw = self._sc._gateway
+ # getattr will fallback to JVM
+ if "_callback_server" not in gw.__dict__:
+ _daemonize_callback_server()
+ gw._start_callback_server(gw._python_proxy_port)
+ gw._python_proxy_port = gw._callback_server.port # update
port with real port
+
+ def _initialize_context(self, sc, duration):
+ java_import(self._jvm, "org.apache.spark.streaming.*")
+ java_import(self._jvm, "org.apache.spark.streaming.api.java.*")
+ java_import(self._jvm, "org.apache.spark.streaming.api.python.*")
+ return self._jvm.JavaStreamingContext(sc._jsc,
self._jduration(duration))
+
+ def _jduration(self, seconds):
+ """
+ Create Duration object given number of seconds
+ """
+ return self._jvm.Duration(int(seconds * 1000))
+
+ @property
+ def sparkContext(self):
+ """
+ Return SparkContext which is associated with this StreamingContext.
+ """
+ return self._sc
+
+ def start(self):
+ """
+ Start the execution of the streams.
+ """
+ self._jssc.start()
+
+ def awaitTermination(self, timeout=None):
+ """
+ Wait for the execution to stop.
+ @param timeout: time to wait in seconds
+ """
+ if timeout is None:
+ self._jssc.awaitTermination()
+ else:
+ self._jssc.awaitTermination(int(timeout * 1000))
+
+ def stop(self, stopSparkContext=True, stopGraceFully=False):
+ """
+ Stop the execution of the streams immediately (does not wait for
all received data
--- End diff --
Wrong doc string. The Scala doc string is
/**
* Stop the execution of the streams, with option of ensuring all
received data
* has been processed.
* @param stopSparkContext Stop the associated SparkContext or not
* @param stopGracefully Stop gracefully by waiting for the processing of
all
* received data to be completed
*/
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]