Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2538#discussion_r18373236
  
    --- Diff: python/pyspark/streaming/context.py ---
    @@ -0,0 +1,305 @@
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one or more
    +# contributor license agreements.  See the NOTICE file distributed with
    +# this work for additional information regarding copyright ownership.
    +# The ASF licenses this file to You under the Apache License, Version 2.0
    +# (the "License"); you may not use this file except in compliance with
    +# the License.  You may obtain a copy of the License at
    +#
    +#    http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +#
    +import os
    +import sys
    +
    +from py4j.java_collections import ListConverter
    +from py4j.java_gateway import java_import
    +
    +from pyspark import RDD, SparkConf
    +from pyspark.serializers import UTF8Deserializer, CloudPickleSerializer
    +from pyspark.context import SparkContext
    +from pyspark.storagelevel import StorageLevel
    +from pyspark.streaming.dstream import DStream
    +from pyspark.streaming.util import TransformFunction, 
TransformFunctionSerializer
    +
    +__all__ = ["StreamingContext"]
    +
    +
    +def _daemonize_callback_server():
    +    """
    +    Hack Py4J to daemonize callback server
    +
    +    The thread of callback server has daemon=False, it will block the 
driver
    +    from exiting if it's not shutdown. The following code replace `start()`
    +    of CallbackServer with a new version, which set daemon=True for this
    +    thread.
    +    """
    +    # TODO: create a patch for Py4J
    +    import socket
    +    import py4j.java_gateway
    +    logger = py4j.java_gateway.logger
    +    from py4j.java_gateway import Py4JNetworkError
    +    from threading import Thread
    +
    +    def start(self):
    +        """Starts the CallbackServer. This method should be called by the
    +        client instead of run()."""
    +        self.server_socket = socket.socket(socket.AF_INET, 
socket.SOCK_STREAM)
    +        self.server_socket.setsockopt(socket.SOL_SOCKET, 
socket.SO_REUSEADDR,
    +                                      1)
    +        try:
    +            self.server_socket.bind((self.address, self.port))
    +        except Exception:
    +            msg = 'An error occurred while trying to start the callback 
server'
    +            logger.exception(msg)
    +            raise Py4JNetworkError(msg)
    +
    +        # Maybe thread needs to be cleanup up?
    +        self.thread = Thread(target=self.run)
    +        self.thread.daemon = True
    +        self.thread.start()
    +
    +    py4j.java_gateway.CallbackServer.start = start
    +
    +
    +class StreamingContext(object):
    +    """
    +    Main entry point for Spark Streaming functionality. A StreamingContext
    +    represents the connection to a Spark cluster, and can be used to create
    +    L{DStream} various input sources. It can be from an existing 
L{SparkContext}.
    +    After creating and transforming DStreams, the streaming computation can
    +    be started and stopped using `context.start()` and `context.stop()`,
    +    respectively. `context.awaitTransformation()` allows the current thread
    +    to wait for the termination of the context by `stop()` or by an 
exception.
    +    """
    +    _transformerSerializer = None
    +
    +    def __init__(self, sparkContext, duration=None, jssc=None):
    +        """
    +        Create a new StreamingContext.
    +
    +        @param sparkContext: L{SparkContext} object.
    +        @param duration: number of seconds.
    +        """
    +
    +        self._sc = sparkContext
    +        self._jvm = self._sc._jvm
    +        self._jssc = jssc or self._initialize_context(self._sc, duration)
    +
    +    def _initialize_context(self, sc, duration):
    +        self._ensure_initialized()
    +        return self._jvm.JavaStreamingContext(sc._jsc, 
self._jduration(duration))
    +
    +    def _jduration(self, seconds):
    +        """
    +        Create Duration object given number of seconds
    +        """
    +        return self._jvm.Duration(int(seconds * 1000))
    +
    +    @classmethod
    +    def _ensure_initialized(cls):
    +        SparkContext._ensure_initialized()
    +        gw = SparkContext._gateway
    +        # start callback server
    +        # getattr will fallback to JVM
    +        if "_callback_server" not in gw.__dict__:
    +            _daemonize_callback_server()
    +            gw._start_callback_server(gw._python_proxy_port)
    +
    +        java_import(gw.jvm, "org.apache.spark.streaming.*")
    +        java_import(gw.jvm, "org.apache.spark.streaming.api.java.*")
    +        java_import(gw.jvm, "org.apache.spark.streaming.api.python.*")
    +        # register serializer for TransformFunction
    +        # it happens before creating SparkContext when loading from 
checkpointing
    +        cls._transformerSerializer = TransformFunctionSerializer(
    +            SparkContext._active_spark_context, CloudPickleSerializer(), 
gw)
    +
    +    @classmethod
    +    def getOrCreate(cls, path, setupFunc):
    +        """
    +        Get the StreamingContext from checkpoint file at `path`, or setup
    --- End diff --
    
    Here, too, we might want to copy the Scala docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to