Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/2538#discussion_r18373408
--- Diff: python/pyspark/streaming/context.py ---
@@ -0,0 +1,319 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import os
+import sys
+
+from py4j.java_collections import ListConverter
+from py4j.java_gateway import java_import, JavaObject
+
+from pyspark import RDD, SparkConf
+from pyspark.serializers import UTF8Deserializer, CloudPickleSerializer
+from pyspark.context import SparkContext
+from pyspark.storagelevel import StorageLevel
+from pyspark.streaming.dstream import DStream
+from pyspark.streaming.util import TransformFunction,
TransformFunctionSerializer
+
+__all__ = ["StreamingContext"]
+
+
+def _daemonize_callback_server():
+ """
+ Hack Py4J to daemonize callback server
+
+ The thread of callback server has daemon=False, it will block the
driver
+ from exiting if it's not shutdown. The following code replace `start()`
+ of CallbackServer with a new version, which set daemon=True for this
+ thread.
+
+ Also, it will update the port number (0) with real port
+ """
+ # TODO: create a patch for Py4J
+ import socket
+ import py4j.java_gateway
+ logger = py4j.java_gateway.logger
+ from py4j.java_gateway import Py4JNetworkError
+ from threading import Thread
+
+ def start(self):
+ """Starts the CallbackServer. This method should be called by the
+ client instead of run()."""
+ self.server_socket = socket.socket(socket.AF_INET,
socket.SOCK_STREAM)
+ self.server_socket.setsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR,
+ 1)
+ try:
+ self.server_socket.bind((self.address, self.port))
+ if not self.port:
+ # update port with real port
+ self.port = self.server_socket.getsockname()[1]
+ except Exception as e:
+ msg = 'An error occurred while trying to start the callback
server: %s' % e
+ logger.exception(msg)
+ raise Py4JNetworkError(msg)
+
+ # Maybe thread needs to be cleanup up?
+ self.thread = Thread(target=self.run)
+ self.thread.daemon = True
+ self.thread.start()
+
+ py4j.java_gateway.CallbackServer.start = start
+
+
+class StreamingContext(object):
+ """
+ Main entry point for Spark Streaming functionality. A StreamingContext
+ represents the connection to a Spark cluster, and can be used to create
+ L{DStream} various input sources. It can be from an existing
L{SparkContext}.
+ After creating and transforming DStreams, the streaming computation can
+ be started and stopped using `context.start()` and `context.stop()`,
+ respectively. `context.awaitTransformation()` allows the current thread
+ to wait for the termination of the context by `stop()` or by an
exception.
+ """
+ _transformerSerializer = None
+
+ def __init__(self, sparkContext, duration=None, jssc=None):
+ """
+ Create a new StreamingContext.
+
+ @param sparkContext: L{SparkContext} object.
+ @param duration: number of seconds.
+ """
+
+ self._sc = sparkContext
+ self._jvm = self._sc._jvm
+ self._jssc = jssc or self._initialize_context(self._sc, duration)
+
+ def _initialize_context(self, sc, duration):
+ self._ensure_initialized()
+ return self._jvm.JavaStreamingContext(sc._jsc,
self._jduration(duration))
+
+ def _jduration(self, seconds):
+ """
+ Create Duration object given number of seconds
+ """
+ return self._jvm.Duration(int(seconds * 1000))
+
+ @classmethod
+ def _ensure_initialized(cls):
+ SparkContext._ensure_initialized()
+ gw = SparkContext._gateway
+
+ java_import(gw.jvm, "org.apache.spark.streaming.*")
+ java_import(gw.jvm, "org.apache.spark.streaming.api.java.*")
+ java_import(gw.jvm, "org.apache.spark.streaming.api.python.*")
+
+ # start callback server
+ # getattr will fallback to JVM, so we cannot test by hasattr()
+ if "_callback_server" not in gw.__dict__:
+ _daemonize_callback_server()
+ # use random port
+ gw._start_callback_server(0)
+ # gateway with real port
+ gw._python_proxy_port = gw._callback_server.port
+ # get the GatewayServer object in JVM by ID
+ jgws = JavaObject("GATEWAY_SERVER", gw._gateway_client)
+ # update the port of CallbackClient with real port
+ gw.jvm.PythonDStream.updatePythonGatewayPort(jgws,
gw._python_proxy_port)
+
+ # register serializer for TransformFunction
+ # it happens before creating SparkContext when loading from
checkpointing
+ cls._transformerSerializer = TransformFunctionSerializer(
+ SparkContext._active_spark_context, CloudPickleSerializer(),
gw)
+
+ @classmethod
+ def getOrCreate(cls, path, setupFunc):
+ """
+ Get the StreamingContext from checkpoint file at `path`, or setup
+ it by `setupFunc`.
+
+ :param path: directory of checkpoint
+ :param setupFunc: a function used to create StreamingContext and
+ setup DStreams.
+ :return: a StreamingContext
+ """
+ if not os.path.exists(path) or not os.path.isdir(path) or not
os.listdir(path):
+ ssc = setupFunc()
+ ssc.checkpoint(path)
+ return ssc
+
+ cls._ensure_initialized()
+ gw = SparkContext._gateway
+
+ try:
+ jssc = gw.jvm.JavaStreamingContext(path)
+ except Exception:
+ print >>sys.stderr, "failed to load StreamingContext from
checkpoint"
+ raise
+
+ jsc = jssc.sparkContext()
+ conf = SparkConf(_jconf=jsc.getConf())
+ sc = SparkContext(conf=conf, gateway=gw, jsc=jsc)
+ # update ctx in serializer
+ SparkContext._active_spark_context = sc
+ cls._transformerSerializer.ctx = sc
+ return StreamingContext(sc, None, jssc)
+
+ @property
+ def sparkContext(self):
+ """
+ Return SparkContext which is associated with this StreamingContext.
+ """
+ return self._sc
+
+ def start(self):
+ """
+ Start the execution of the streams.
+ """
+ self._jssc.start()
+
+ def awaitTermination(self, timeout=None):
+ """
+ Wait for the execution to stop.
+ @param timeout: time to wait in seconds
+ """
+ if timeout is None:
+ self._jssc.awaitTermination()
+ else:
+ self._jssc.awaitTermination(int(timeout * 1000))
+
+ def stop(self, stopSparkContext=True, stopGraceFully=False):
+ """
+ Stop the execution of the streams, with option of ensuring all
+ received data has been processed.
+
+ @param stopSparkContext: Stop the associated SparkContext or not
+ @param stopGracefully: Stop gracefully by waiting for the
processing
+ of all received data to be completed
+ """
+ self._jssc.stop(stopSparkContext, stopGraceFully)
+ if stopSparkContext:
+ self._sc.stop()
+
+ def remember(self, duration):
+ """
+ Set each DStreams in this context to remember RDDs it generated
+ in the last given duration. DStreams remember RDDs only for a
+ limited duration of time and releases them for garbage collection.
+ This method allows the developer to specify how to long to remember
+ the RDDs (if the developer wishes to query old data outside the
+ DStream computation).
+
+ @param duration: Minimum duration (in seconds) that each DStream
+ should remember its RDDs
+ """
+ self._jssc.remember(self._jduration(duration))
+
+ def checkpoint(self, directory):
+ """
+ Sets the context to periodically checkpoint the DStream operations
for master
+ fault-tolerance. The graph will be checkpointed every batch
interval.
+
+ @param directory: HDFS-compatible directory where the checkpoint
data
+ will be reliably stored
+ """
+ self._jssc.checkpoint(directory)
+
+ def socketTextStream(self, hostname, port,
storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2):
+ """
+ Create an input from TCP source hostname:port. Data is received
using
+ a TCP socket and receive byte is interpreted as UTF8 encoded
``\\n`` delimited
+ lines.
+
+ @param hostname: Hostname to connect to for receiving data
+ @param port: Port to connect to for receiving data
+ @param storageLevel: Storage level to use for storing the
received objects
+ """
+ jlevel = self._sc._getJavaStorageLevel(storageLevel)
+ return DStream(self._jssc.socketTextStream(hostname, port,
jlevel), self,
+ UTF8Deserializer())
+
+ def textFileStream(self, directory):
+ """
+ Create an input stream that monitors a Hadoop-compatible file
system
+ for new files and reads them as text files. Files must be wrriten
to the
+ monitored directory by "moving" them from another location within
the same
+ file system. File names starting with . are ignored.
+ """
+ return DStream(self._jssc.textFileStream(directory), self,
UTF8Deserializer())
+
+ def _check_serialzers(self, rdds):
--- End diff --
Minor typo: serialzers -> serializers.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]