This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d3d35ab3e64 [SPARK-46080][PYTHON] Upgrade Cloudpickle to 3.0.0
d3d35ab3e64 is described below

commit d3d35ab3e647b3f3eac84447ef9e5c9a9d7c20d6
Author: Hyukjin Kwon <gurwls...@apache.org>
AuthorDate: Thu Nov 23 17:53:41 2023 -0800

    [SPARK-46080][PYTHON] Upgrade Cloudpickle to 3.0.0
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to upgrade Cloudpickle from 2.2.1 to 3.0.0.
    
    ### Why are the changes needed?
    
    It includes official support of Python 3.12 
(https://github.com/cloudpipe/cloudpickle/pull/517)
    
    ### Does this PR introduce _any_ user-facing change?
    
    It includes official support of Python 3.12 
(https://github.com/cloudpipe/cloudpickle/pull/517)
    
    ### How was this patch tested?
    
    Relies on cloudpickle's unittests. Existing test cases should pass too.
    
    Closes #43989 from HyukjinKwon/SPARK-46080.
    
    Authored-by: Hyukjin Kwon <gurwls...@apache.org>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 python/pyspark/cloudpickle/__init__.py         |   20 +-
 python/pyspark/cloudpickle/cloudpickle.py      | 1391 ++++++++++++++++--------
 python/pyspark/cloudpickle/cloudpickle_fast.py |  847 +--------------
 3 files changed, 988 insertions(+), 1270 deletions(-)

diff --git a/python/pyspark/cloudpickle/__init__.py 
b/python/pyspark/cloudpickle/__init__.py
index af35a0a194b..a3348e8b3da 100644
--- a/python/pyspark/cloudpickle/__init__.py
+++ b/python/pyspark/cloudpickle/__init__.py
@@ -1,8 +1,18 @@
+from pyspark.cloudpickle import cloudpickle  # noqa
 from pyspark.cloudpickle.cloudpickle import *  # noqa
-from pyspark.cloudpickle.cloudpickle_fast import CloudPickler, dumps, dump  # 
noqa
 
-# Conform to the convention used by python serialization libraries, which
-# expose their Pickler subclass at top-level under the  "Pickler" name.
-Pickler = CloudPickler
+__doc__ = cloudpickle.__doc__
 
-__version__ = '2.2.1'
+__version__ = "3.0.0"
+
+__all__ = [  # noqa
+    "__version__",
+    "Pickler",
+    "CloudPickler",
+    "dumps",
+    "loads",
+    "dump",
+    "load",
+    "register_pickle_by_value",
+    "unregister_pickle_by_value",
+]
diff --git a/python/pyspark/cloudpickle/cloudpickle.py 
b/python/pyspark/cloudpickle/cloudpickle.py
index 317be69151a..eb43a9676bb 100644
--- a/python/pyspark/cloudpickle/cloudpickle.py
+++ b/python/pyspark/cloudpickle/cloudpickle.py
@@ -1,16 +1,25 @@
-"""
-This class is defined to override standard pickle functionality
+"""Pickler class to extend the standard pickle.Pickler functionality
+
+The main objective is to make it natural to perform distributed computing on
+clusters (such as PySpark, Dask, Ray...) with interactively defined code
+(functions, classes, ...) written in notebooks or console.
 
-The goals of it follow:
--Serialize lambdas and nested functions to compiled byte code
--Deal with main module correctly
--Deal with other non-serializable objects
+In particular this pickler adds the following features:
+- serialize interactively-defined or locally-defined functions, classes,
+  enums, typevars, lambdas and nested functions to compiled byte code;
+- deal with some other non-serializable objects in an ad-hoc manner where
+  applicable.
 
-It does not include an unpickler, as standard python unpickling suffices.
+This pickler is therefore meant to be used for the communication between short
+lived Python processes running the same version of Python and libraries. In
+particular, it is not meant to be used for long term storage of Python objects.
+
+It does not include an unpickler, as standard Python unpickling suffices.
 
 This module was extracted from the `cloud` package, developed by `PiCloud, Inc.
 <https://web.archive.org/web/20140626004012/http://www.picloud.com/>`_.
 
+Copyright (c) 2012-now, CloudPickle developers and contributors.
 Copyright (c) 2012, Regents of the University of California.
 Copyright (c) 2009 `PiCloud, Inc. 
<https://web.archive.org/web/20140626004012/http://www.picloud.com/>`_.
 All rights reserved.
@@ -41,40 +50,34 @@ NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 
OF THIS
 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 """
 
+import _collections_abc
+from collections import ChainMap, OrderedDict
+import abc
 import builtins
+import copyreg
+import dataclasses
 import dis
+from enum import Enum
+import io
+import itertools
+import logging
 import opcode
+import pickle
+from pickle import _getattribute
 import platform
+import struct
 import sys
-import types
-import weakref
-import uuid
 import threading
+import types
 import typing
+import uuid
 import warnings
+import weakref
 
-from .compat import pickle
-from collections import OrderedDict
-from typing import ClassVar, Generic, Union, Tuple, Callable
-from pickle import _getattribute
-from importlib._bootstrap import _find_spec
-
-try:  # pragma: no branch
-    import typing_extensions as _typing_extensions
-    from typing_extensions import Literal, Final
-except ImportError:
-    _typing_extensions = Literal = Final = None
-
-if sys.version_info >= (3, 8):
-    from types import CellType
-else:
-    def f():
-        a = 1
-
-        def g():
-            return a
-        return g
-    CellType = type(f().__closure__[0])
+# The following import is required to be imported in the cloudpickle
+# namespace to be able to load pickle files generated with older versions of
+# cloudpickle. See: tests/test_backward_compat.py
+from types import CellType  # noqa: F401
 
 
 # cloudpickle is meant for inter process communication: we expect all
@@ -116,7 +119,8 @@ def _lookup_class_or_track(class_tracker_id, class_def):
     if class_tracker_id is not None:
         with _DYNAMIC_CLASS_TRACKER_LOCK:
             class_def = _DYNAMIC_CLASS_TRACKER_BY_ID.setdefault(
-                class_tracker_id, class_def)
+                class_tracker_id, class_def
+            )
             _DYNAMIC_CLASS_TRACKER_BY_CLASS[class_def] = class_tracker_id
     return class_def
 
@@ -141,9 +145,7 @@ def register_pickle_by_value(module):
     README.md file for more details and limitations.
     """
     if not isinstance(module, types.ModuleType):
-        raise ValueError(
-            f"Input should be a module object, got {str(module)} instead"
-        )
+        raise ValueError(f"Input should be a module object, got {str(module)} 
instead")
     # In the future, cloudpickle may need a way to access any module registered
     # for pickling by value in order to introspect relative imports inside
     # functions pickled by value. (see
@@ -157,7 +159,7 @@ def register_pickle_by_value(module):
     if module.__name__ not in sys.modules:
         raise ValueError(
             f"{module} was not imported correctly, have you used an "
-            f"`import` statement to access it?"
+            "`import` statement to access it?"
         )
     _PICKLE_BY_VALUE_MODULES.add(module.__name__)
 
@@ -165,9 +167,7 @@ def register_pickle_by_value(module):
 def unregister_pickle_by_value(module):
     """Unregister that the input module should be pickled by value."""
     if not isinstance(module, types.ModuleType):
-        raise ValueError(
-            f"Input should be a module object, got {str(module)} instead"
-        )
+        raise ValueError(f"Input should be a module object, got {str(module)} 
instead")
     if module.__name__ not in _PICKLE_BY_VALUE_MODULES:
         raise ValueError(f"{module} is not registered for pickle by value")
     else:
@@ -201,20 +201,7 @@ def _whichmodule(obj, name):
     - Errors arising during module introspection are ignored, as those errors
       are considered unwanted side effects.
     """
-    if sys.version_info[:2] < (3, 7) and isinstance(obj, typing.TypeVar):  # 
pragma: no branch  # noqa
-        # Workaround bug in old Python versions: prior to Python 3.7,
-        # T.__module__ would always be set to "typing" even when the TypeVar T
-        # would be defined in a different module.
-        if name is not None and getattr(typing, name, None) is obj:
-            # Built-in TypeVar defined in typing such as AnyStr
-            return 'typing'
-        else:
-            # User defined or third-party TypeVar: __module__ attribute is
-            # irrelevant, thus trigger a exhaustive search for obj in all
-            # modules.
-            module_name = None
-    else:
-        module_name = getattr(obj, '__module__', None)
+    module_name = getattr(obj, "__module__", None)
 
     if module_name is not None:
         return module_name
@@ -225,9 +212,9 @@ def _whichmodule(obj, name):
         # Some modules such as coverage can inject non-module objects inside
         # sys.modules
         if (
-                module_name == '__main__' or
-                module is None or
-                not isinstance(module, types.ModuleType)
+            module_name == "__main__"
+            or module is None
+            or not isinstance(module, types.ModuleType)
         ):
             continue
         try:
@@ -241,17 +228,17 @@ def _whichmodule(obj, name):
 def _should_pickle_by_reference(obj, name=None):
     """Test whether an function or a class should be pickled by reference
 
-     Pickling by reference means by that the object (typically a function or a
-     class) is an attribute of a module that is assumed to be importable in the
-     target Python environment. Loading will therefore rely on importing the
-     module and then calling `getattr` on it to access the function or class.
-
-     Pickling by reference is the only option to pickle functions and classes
-     in the standard library. In cloudpickle the alternative option is to
-     pickle by value (for instance for interactively or locally defined
-     functions and classes or for attributes of modules that have been
-     explicitly registered to be pickled by value.
-     """
+    Pickling by reference means by that the object (typically a function or a
+    class) is an attribute of a module that is assumed to be importable in the
+    target Python environment. Loading will therefore rely on importing the
+    module and then calling `getattr` on it to access the function or class.
+
+    Pickling by reference is the only option to pickle functions and classes
+    in the standard library. In cloudpickle the alternative option is to
+    pickle by value (for instance for interactively or locally defined
+    functions and classes or for attributes of modules that have been
+    explicitly registered to be pickled by value.
+    """
     if isinstance(obj, types.FunctionType) or issubclass(type(obj), type):
         module_and_name = _lookup_module_and_qualname(obj, name=name)
         if module_and_name is None:
@@ -270,19 +257,18 @@ def _should_pickle_by_reference(obj, name=None):
         return obj.__name__ in sys.modules
     else:
         raise TypeError(
-            "cannot check importability of {} instances".format(
-                type(obj).__name__)
+            "cannot check importability of {} 
instances".format(type(obj).__name__)
         )
 
 
 def _lookup_module_and_qualname(obj, name=None):
     if name is None:
-        name = getattr(obj, '__qualname__', None)
+        name = getattr(obj, "__qualname__", None)
     if name is None:  # pragma: no cover
         # This used to be needed for Python 2.7 support but is probably not
         # needed anymore. However we keep the __name__ introspection in case
         # users of cloudpickle rely on this old behavior for unknown reasons.
-        name = getattr(obj, '__name__', None)
+        name = getattr(obj, "__name__", None)
 
     module_name = _whichmodule(obj, name)
 
@@ -316,22 +302,20 @@ def _lookup_module_and_qualname(obj, name=None):
 
 
 def _extract_code_globals(co):
-    """
-    Find all globals names read or written to by codeblock co
-    """
+    """Find all globals names read or written to by codeblock co."""
     out_names = _extract_code_globals_cache.get(co)
     if out_names is None:
         # We use a dict with None values instead of a set to get a
-        # deterministic order (assuming Python 3.6+) and avoid introducing
-        # non-deterministic pickle bytes as a results.
+        # deterministic order and avoid introducing non-deterministic pickle
+        # bytes as a results.
         out_names = {name: None for name in _walk_global_ops(co)}
 
-        # Declaring a function inside another one using the "def ..."
-        # syntax generates a constant code object corresponding to the one
-        # of the nested function's As the nested function may itself need
-        # global variables, we need to introspect its code, extract its
-        # globals, (look for code object in it's co_consts attribute..) and
-        # add the result to code_globals
+        # Declaring a function inside another one using the "def ..." syntax
+        # generates a constant code object corresponding to the one of the
+        # nested function's As the nested function may itself need global
+        # variables, we need to introspect its code, extract its globals, (look
+        # for code object in it's co_consts attribute..) and add the result to
+        # code_globals
         if co.co_consts:
             for const in co.co_consts:
                 if isinstance(const, types.CodeType):
@@ -343,8 +327,7 @@ def _extract_code_globals(co):
 
 
 def _find_imported_submodules(code, top_level_dependencies):
-    """
-    Find currently imported submodules used by a function.
+    """Find currently imported submodules used by a function.
 
     Submodules used by a function need to be detected and referenced for the
     function to work correctly at depickling time. Because submodules can be
@@ -372,10 +355,13 @@ def _find_imported_submodules(code, 
top_level_dependencies):
     subimports = []
     # check if any known dependency is an imported package
     for x in top_level_dependencies:
-        if (isinstance(x, types.ModuleType) and
-                hasattr(x, '__package__') and x.__package__):
+        if (
+            isinstance(x, types.ModuleType)
+            and hasattr(x, "__package__")
+            and x.__package__
+        ):
             # check if the package has any currently loaded sub-imports
-            prefix = x.__name__ + '.'
+            prefix = x.__name__ + "."
             # A concurrent thread could mutate sys.modules,
             # make sure we iterate over a copy to avoid exceptions
             for name in list(sys.modules):
@@ -383,111 +369,16 @@ def _find_imported_submodules(code, 
top_level_dependencies):
                 # sys.modules.
                 if name is not None and name.startswith(prefix):
                     # check whether the function can address the sub-module
-                    tokens = set(name[len(prefix):].split('.'))
+                    tokens = set(name[len(prefix) :].split("."))
                     if not tokens - set(code.co_names):
                         subimports.append(sys.modules[name])
     return subimports
 
 
-def cell_set(cell, value):
-    """Set the value of a closure cell.
-
-    The point of this function is to set the cell_contents attribute of a cell
-    after its creation. This operation is necessary in case the cell contains a
-    reference to the function the cell belongs to, as when calling the
-    function's constructor
-    ``f = types.FunctionType(code, globals, name, argdefs, closure)``,
-    closure will not be able to contain the yet-to-be-created f.
-
-    In Python3.7, cell_contents is writeable, so setting the contents of a cell
-    can be done simply using
-    >>> cell.cell_contents = value
-
-    In earlier Python3 versions, the cell_contents attribute of a cell is read
-    only, but this limitation can be worked around by leveraging the Python 3
-    ``nonlocal`` keyword.
-
-    In Python2 however, this attribute is read only, and there is no
-    ``nonlocal`` keyword. For this reason, we need to come up with more
-    complicated hacks to set this attribute.
-
-    The chosen approach is to create a function with a STORE_DEREF opcode,
-    which sets the content of a closure variable. Typically:
-
-    >>> def inner(value):
-    ...     lambda: cell  # the lambda makes cell a closure
-    ...     cell = value  # cell is a closure, so this triggers a STORE_DEREF
-
-    (Note that in Python2, A STORE_DEREF can never be triggered from an inner
-    function. The function g for example here
-    >>> def f(var):
-    ...     def g():
-    ...         var += 1
-    ...     return g
-
-    will not modify the closure variable ``var```inplace, but instead try to
-    load a local variable var and increment it. As g does not assign the local
-    variable ``var`` any initial value, calling f(1)() will fail at runtime.)
-
-    Our objective is to set the value of a given cell ``cell``. So we need to
-    somewhat reference our ``cell`` object into the ``inner`` function so that
-    this object (and not the smoke cell of the lambda function) gets affected
-    by the STORE_DEREF operation.
-
-    In inner, ``cell`` is referenced as a cell variable (an enclosing variable
-    that is referenced by the inner function). If we create a new function
-    cell_set with the exact same code as ``inner``, but with ``cell`` marked as
-    a free variable instead, the STORE_DEREF will be applied on its closure -
-    ``cell``, which we can specify explicitly during construction! The new
-    cell_set variable thus actually sets the contents of a specified cell!
-
-    Note: we do not make use of the ``nonlocal`` keyword to set the contents of
-    a cell in early python3 versions to limit possible syntax errors in case
-    test and checker libraries decide to parse the whole file.
-    """
-
-    if sys.version_info[:2] >= (3, 7):  # pragma: no branch
-        cell.cell_contents = value
-    else:
-        _cell_set = types.FunctionType(
-            _cell_set_template_code, {}, '_cell_set', (), (cell,),)
-        _cell_set(value)
-
-
-def _make_cell_set_template_code():
-    def _cell_set_factory(value):
-        lambda: cell
-        cell = value
-
-    co = _cell_set_factory.__code__
-
-    _cell_set_template_code = types.CodeType(
-        co.co_argcount,
-        co.co_kwonlyargcount,   # Python 3 only argument
-        co.co_nlocals,
-        co.co_stacksize,
-        co.co_flags,
-        co.co_code,
-        co.co_consts,
-        co.co_names,
-        co.co_varnames,
-        co.co_filename,
-        co.co_name,
-        co.co_firstlineno,
-        co.co_lnotab,
-        co.co_cellvars,  # co_freevars is initialized with co_cellvars
-        (),  # co_cellvars is made empty
-    )
-    return _cell_set_template_code
-
-
-if sys.version_info[:2] < (3, 7):
-    _cell_set_template_code = _make_cell_set_template_code()
-
 # relevant opcodes
-STORE_GLOBAL = opcode.opmap['STORE_GLOBAL']
-DELETE_GLOBAL = opcode.opmap['DELETE_GLOBAL']
-LOAD_GLOBAL = opcode.opmap['LOAD_GLOBAL']
+STORE_GLOBAL = opcode.opmap["STORE_GLOBAL"]
+DELETE_GLOBAL = opcode.opmap["DELETE_GLOBAL"]
+LOAD_GLOBAL = opcode.opmap["LOAD_GLOBAL"]
 GLOBAL_OPS = (STORE_GLOBAL, DELETE_GLOBAL, LOAD_GLOBAL)
 HAVE_ARGUMENT = dis.HAVE_ARGUMENT
 EXTENDED_ARG = dis.EXTENDED_ARG
@@ -509,9 +400,7 @@ def _builtin_type(name):
 
 
 def _walk_global_ops(code):
-    """
-    Yield referenced name for all global-referencing instructions in *code*.
-    """
+    """Yield referenced name for global-referencing instructions in code."""
     for instr in dis.get_instructions(code):
         op = instr.opcode
         if op in GLOBAL_OPS:
@@ -519,7 +408,7 @@ def _walk_global_ops(code):
 
 
 def _extract_class_dict(cls):
-    """Retrieve a copy of the dict of a class without the inherited methods"""
+    """Retrieve a copy of the dict of a class without the inherited method."""
     clsdict = dict(cls.__dict__)  # copy dict proxy to a dict
     if len(cls.__bases__) == 1:
         inherited_dict = cls.__bases__[0].__dict__
@@ -540,107 +429,26 @@ def _extract_class_dict(cls):
     return clsdict
 
 
-if sys.version_info[:2] < (3, 7):  # pragma: no branch
-    def _is_parametrized_type_hint(obj):
-        # This is very cheap but might generate false positives. So try to
-        # narrow it down is good as possible.
-        type_module = getattr(type(obj), '__module__', None)
-        from_typing_extensions = type_module == 'typing_extensions'
-        from_typing = type_module == 'typing'
-
-        # general typing Constructs
-        is_typing = getattr(obj, '__origin__', None) is not None
-
-        # typing_extensions.Literal
-        is_literal = (
-            (getattr(obj, '__values__', None) is not None)
-            and from_typing_extensions
-        )
-
-        # typing_extensions.Final
-        is_final = (
-            (getattr(obj, '__type__', None) is not None)
-            and from_typing_extensions
-        )
-
-        # typing.ClassVar
-        is_classvar = (
-            (getattr(obj, '__type__', None) is not None) and from_typing
-        )
-
-        # typing.Union/Tuple for old Python 3.5
-        is_union = getattr(obj, '__union_params__', None) is not None
-        is_tuple = getattr(obj, '__tuple_params__', None) is not None
-        is_callable = (
-            getattr(obj, '__result__', None) is not None and
-            getattr(obj, '__args__', None) is not None
-        )
-        return any((is_typing, is_literal, is_final, is_classvar, is_union,
-                    is_tuple, is_callable))
-
-    def _create_parametrized_type_hint(origin, args):
-        return origin[args]
-else:
-    _is_parametrized_type_hint = None
-    _create_parametrized_type_hint = None
-
-
-def parametrized_type_hint_getinitargs(obj):
-    # The distorted type check sematic for typing construct becomes:
-    # ``type(obj) is type(TypeHint)``, which means "obj is a
-    # parametrized TypeHint"
-    if type(obj) is type(Literal):  # pragma: no branch
-        initargs = (Literal, obj.__values__)
-    elif type(obj) is type(Final):  # pragma: no branch
-        initargs = (Final, obj.__type__)
-    elif type(obj) is type(ClassVar):
-        initargs = (ClassVar, obj.__type__)
-    elif type(obj) is type(Generic):
-        initargs = (obj.__origin__, obj.__args__)
-    elif type(obj) is type(Union):
-        initargs = (Union, obj.__args__)
-    elif type(obj) is type(Tuple):
-        initargs = (Tuple, obj.__args__)
-    elif type(obj) is type(Callable):
-        (*args, result) = obj.__args__
-        if len(args) == 1 and args[0] is Ellipsis:
-            args = Ellipsis
-        else:
-            args = list(args)
-        initargs = (Callable, (args, result))
-    else:  # pragma: no cover
-        raise pickle.PicklingError(
-            f"Cloudpickle Error: Unknown type {type(obj)}"
-        )
-    return initargs
-
-
-# Tornado support
-
 def is_tornado_coroutine(func):
-    """
-    Return whether *func* is a Tornado coroutine function.
+    """Return whether `func` is a Tornado coroutine function.
+
     Running coroutines are not supported.
     """
-    if 'tornado.gen' not in sys.modules:
+    warnings.warn(
+        "is_tornado_coroutine is deprecated in cloudpickle 3.0 and will be "
+        "removed in cloudpickle 4.0. Use tornado.gen.is_coroutine_function "
+        "directly instead.",
+        category=DeprecationWarning,
+    )
+    if "tornado.gen" not in sys.modules:
         return False
-    gen = sys.modules['tornado.gen']
+    gen = sys.modules["tornado.gen"]
     if not hasattr(gen, "is_coroutine_function"):
         # Tornado version is too old
         return False
     return gen.is_coroutine_function(func)
 
 
-def _rebuild_tornado_coroutine(func):
-    from tornado import gen
-    return gen.coroutine(func)
-
-
-# including pickles unloading functions in this namespace
-load = pickle.load
-loads = pickle.loads
-
-
 def subimport(name):
     # We cannot do simply: `return __import__(name)`: Indeed, if ``name`` is
     # the name of a submodule, __import__ will return the top-level root module
@@ -653,23 +461,15 @@ def subimport(name):
 def dynamic_subimport(name, vars):
     mod = types.ModuleType(name)
     mod.__dict__.update(vars)
-    mod.__dict__['__builtins__'] = builtins.__dict__
+    mod.__dict__["__builtins__"] = builtins.__dict__
     return mod
 
 
-def _gen_ellipsis():
-    return Ellipsis
-
-
-def _gen_not_implemented():
-    return NotImplemented
-
-
 def _get_cell_contents(cell):
     try:
         return cell.cell_contents
     except ValueError:
-        # sentinel used by ``_fill_function`` which will leave the cell empty
+        # Handle empty cells explicitly with a sentinel value.
         return _empty_cell_value
 
 
@@ -691,78 +491,13 @@ def instance(cls):
 
 @instance
 class _empty_cell_value:
-    """sentinel for empty closures
-    """
+    """Sentinel for empty closures."""
+
     @classmethod
     def __reduce__(cls):
         return cls.__name__
 
 
-def _fill_function(*args):
-    """Fills in the rest of function data into the skeleton function object
-
-    The skeleton itself is create by _make_skel_func().
-    """
-    if len(args) == 2:
-        func = args[0]
-        state = args[1]
-    elif len(args) == 5:
-        # Backwards compat for cloudpickle v0.4.0, after which the `module`
-        # argument was introduced
-        func = args[0]
-        keys = ['globals', 'defaults', 'dict', 'closure_values']
-        state = dict(zip(keys, args[1:]))
-    elif len(args) == 6:
-        # Backwards compat for cloudpickle v0.4.1, after which the function
-        # state was passed as a dict to the _fill_function it-self.
-        func = args[0]
-        keys = ['globals', 'defaults', 'dict', 'module', 'closure_values']
-        state = dict(zip(keys, args[1:]))
-    else:
-        raise ValueError(f'Unexpected _fill_value arguments: {args!r}')
-
-    # - At pickling time, any dynamic global variable used by func is
-    #   serialized by value (in state['globals']).
-    # - At unpickling time, func's __globals__ attribute is initialized by
-    #   first retrieving an empty isolated namespace that will be shared
-    #   with other functions pickled from the same original module
-    #   by the same CloudPickler instance and then updated with the
-    #   content of state['globals'] to populate the shared isolated
-    #   namespace with all the global variables that are specifically
-    #   referenced for this function.
-    func.__globals__.update(state['globals'])
-
-    func.__defaults__ = state['defaults']
-    func.__dict__ = state['dict']
-    if 'annotations' in state:
-        func.__annotations__ = state['annotations']
-    if 'doc' in state:
-        func.__doc__ = state['doc']
-    if 'name' in state:
-        func.__name__ = state['name']
-    if 'module' in state:
-        func.__module__ = state['module']
-    if 'qualname' in state:
-        func.__qualname__ = state['qualname']
-    if 'kwdefaults' in state:
-        func.__kwdefaults__ = state['kwdefaults']
-    # _cloudpickle_subimports is a set of submodules that must be loaded for
-    # the pickled function to work correctly at unpickling time. Now that these
-    # submodules are depickled (hence imported), they can be removed from the
-    # object's state (the object state only served as a reference holder to
-    # these submodules)
-    if '_cloudpickle_submodules' in state:
-        state.pop('_cloudpickle_submodules')
-
-    cells = func.__closure__
-    if cells is not None:
-        for cell, value in zip(cells, state['closure_values']):
-            if value is not _empty_cell_value:
-                cell_set(cell, value)
-
-    return func
-
-
 def _make_function(code, globals, name, argdefs, closure):
     # Setting __builtins__ in globals is needed for nogil CPython.
     globals["__builtins__"] = __builtins__
@@ -773,7 +508,7 @@ def _make_empty_cell():
     if False:
         # trick the compiler into creating an empty cell in our lambda
         cell = None
-        raise AssertionError('this route should not be executed')
+        raise AssertionError("this route should not be executed")
 
     return (lambda: cell).__closure__[0]
 
@@ -781,39 +516,13 @@ def _make_empty_cell():
 def _make_cell(value=_empty_cell_value):
     cell = _make_empty_cell()
     if value is not _empty_cell_value:
-        cell_set(cell, value)
+        cell.cell_contents = value
     return cell
 
 
-def _make_skel_func(code, cell_count, base_globals=None):
-    """ Creates a skeleton function object that contains just the provided
-        code and the correct number of cells in func_closure.  All other
-        func attributes (e.g. func_globals) are empty.
-    """
-    # This function is deprecated and should be removed in cloudpickle 1.7
-    warnings.warn(
-        "A pickle file created using an old (<=1.4.1) version of cloudpickle "
-        "is currently being loaded. This is not supported by cloudpickle and "
-        "will break in cloudpickle 1.7", category=UserWarning
-    )
-    # This is backward-compatibility code: for cloudpickle versions between
-    # 0.5.4 and 0.7, base_globals could be a string or None. base_globals
-    # should now always be a dictionary.
-    if base_globals is None or isinstance(base_globals, str):
-        base_globals = {}
-
-    base_globals['__builtins__'] = __builtins__
-
-    closure = (
-        tuple(_make_empty_cell() for _ in range(cell_count))
-        if cell_count >= 0 else
-        None
-    )
-    return types.FunctionType(code, base_globals, None, None, closure)
-
-
-def _make_skeleton_class(type_constructor, name, bases, type_kwargs,
-                         class_tracker_id, extra):
+def _make_skeleton_class(
+    type_constructor, name, bases, type_kwargs, class_tracker_id, extra
+):
     """Build dynamic class with an empty __dict__ to be filled once memoized
 
     If class_tracker_id is not None, try to lookup an existing class definition
@@ -825,32 +534,14 @@ def _make_skeleton_class(type_constructor, name, bases, 
type_kwargs,
     forward compatibility shall the need arise.
     """
     skeleton_class = types.new_class(
-        name, bases, {'metaclass': type_constructor},
-        lambda ns: ns.update(type_kwargs)
+        name, bases, {"metaclass": type_constructor}, lambda ns: 
ns.update(type_kwargs)
     )
     return _lookup_class_or_track(class_tracker_id, skeleton_class)
 
 
-def _rehydrate_skeleton_class(skeleton_class, class_dict):
-    """Put attributes from `class_dict` back on `skeleton_class`.
-
-    See CloudPickler.save_dynamic_class for more info.
-    """
-    registry = None
-    for attrname, attr in class_dict.items():
-        if attrname == "_abc_impl":
-            registry = attr
-        else:
-            setattr(skeleton_class, attrname, attr)
-    if registry is not None:
-        for subclass in registry:
-            skeleton_class.register(subclass)
-
-    return skeleton_class
-
-
-def _make_skeleton_enum(bases, name, qualname, members, module,
-                        class_tracker_id, extra):
+def _make_skeleton_enum(
+    bases, name, qualname, members, module, class_tracker_id, extra
+):
     """Build dynamic enum with an empty __dict__ to be filled once memoized
 
     The creation of the enum class is inspired by the code of
@@ -879,23 +570,24 @@ def _make_skeleton_enum(bases, name, qualname, members, 
module,
     return _lookup_class_or_track(class_tracker_id, enum_class)
 
 
-def _make_typevar(name, bound, constraints, covariant, contravariant,
-                  class_tracker_id):
+def _make_typevar(name, bound, constraints, covariant, contravariant, 
class_tracker_id):
     tv = typing.TypeVar(
-        name, *constraints, bound=bound,
-        covariant=covariant, contravariant=contravariant
+        name,
+        *constraints,
+        bound=bound,
+        covariant=covariant,
+        contravariant=contravariant,
     )
-    if class_tracker_id is not None:
-        return _lookup_class_or_track(class_tracker_id, tv)
-    else:  # pragma: nocover
-        # Only for Python 3.5.3 compat.
-        return tv
+    return _lookup_class_or_track(class_tracker_id, tv)
 
 
 def _decompose_typevar(obj):
     return (
-        obj.__name__, obj.__bound__, obj.__constraints__,
-        obj.__covariant__, obj.__contravariant__,
+        obj.__name__,
+        obj.__bound__,
+        obj.__constraints__,
+        obj.__covariant__,
+        obj.__contravariant__,
         _get_or_create_tracker_id(obj),
     )
 
@@ -914,16 +606,16 @@ def _typevar_reduce(obj):
 
 
 def _get_bases(typ):
-    if '__orig_bases__' in getattr(typ, '__dict__', {}):
+    if "__orig_bases__" in getattr(typ, "__dict__", {}):
         # For generic types (see PEP 560)
         # Note that simply checking `hasattr(typ, '__orig_bases__')` is not
         # correct.  Subclasses of a fully-parameterized generic class does not
         # have `__orig_bases__` defined, but `hasattr(typ, '__orig_bases__')`
         # will return True because it's defined in the base class.
-        bases_attr = '__orig_bases__'
+        bases_attr = "__orig_bases__"
     else:
         # For regular class objects
-        bases_attr = '__bases__'
+        bases_attr = "__bases__"
     return getattr(typ, bases_attr)
 
 
@@ -946,3 +638,850 @@ def _make_dict_items(obj, is_ordered=False):
         return OrderedDict(obj).items()
     else:
         return obj.items()
+
+
+# COLLECTION OF OBJECTS __getnewargs__-LIKE METHODS
+# -------------------------------------------------
+
+
+def _class_getnewargs(obj):
+    type_kwargs = {}
+    if "__module__" in obj.__dict__:
+        type_kwargs["__module__"] = obj.__module__
+
+    __dict__ = obj.__dict__.get("__dict__", None)
+    if isinstance(__dict__, property):
+        type_kwargs["__dict__"] = __dict__
+
+    return (
+        type(obj),
+        obj.__name__,
+        _get_bases(obj),
+        type_kwargs,
+        _get_or_create_tracker_id(obj),
+        None,
+    )
+
+
+def _enum_getnewargs(obj):
+    members = {e.name: e.value for e in obj}
+    return (
+        obj.__bases__,
+        obj.__name__,
+        obj.__qualname__,
+        members,
+        obj.__module__,
+        _get_or_create_tracker_id(obj),
+        None,
+    )
+
+
+# COLLECTION OF OBJECTS RECONSTRUCTORS
+# ------------------------------------
+def _file_reconstructor(retval):
+    return retval
+
+
+# COLLECTION OF OBJECTS STATE GETTERS
+# -----------------------------------
+
+
+def _function_getstate(func):
+    # - Put func's dynamic attributes (stored in func.__dict__) in state. These
+    #   attributes will be restored at unpickling time using
+    #   f.__dict__.update(state)
+    # - Put func's members into slotstate. Such attributes will be restored at
+    #   unpickling time by iterating over slotstate and calling setattr(func,
+    #   slotname, slotvalue)
+    slotstate = {
+        "__name__": func.__name__,
+        "__qualname__": func.__qualname__,
+        "__annotations__": func.__annotations__,
+        "__kwdefaults__": func.__kwdefaults__,
+        "__defaults__": func.__defaults__,
+        "__module__": func.__module__,
+        "__doc__": func.__doc__,
+        "__closure__": func.__closure__,
+    }
+
+    f_globals_ref = _extract_code_globals(func.__code__)
+    f_globals = {k: func.__globals__[k] for k in f_globals_ref if k in 
func.__globals__}
+
+    if func.__closure__ is not None:
+        closure_values = list(map(_get_cell_contents, func.__closure__))
+    else:
+        closure_values = ()
+
+    # Extract currently-imported submodules used by func. Storing these modules
+    # in a smoke _cloudpickle_subimports attribute of the object's state will
+    # trigger the side effect of importing these modules at unpickling time
+    # (which is necessary for func to work correctly once depickled)
+    slotstate["_cloudpickle_submodules"] = _find_imported_submodules(
+        func.__code__, itertools.chain(f_globals.values(), closure_values)
+    )
+    slotstate["__globals__"] = f_globals
+
+    state = func.__dict__
+    return state, slotstate
+
+
+def _class_getstate(obj):
+    clsdict = _extract_class_dict(obj)
+    clsdict.pop("__weakref__", None)
+
+    if issubclass(type(obj), abc.ABCMeta):
+        # If obj is an instance of an ABCMeta subclass, don't pickle the
+        # cache/negative caches populated during isinstance/issubclass
+        # checks, but pickle the list of registered subclasses of obj.
+        clsdict.pop("_abc_cache", None)
+        clsdict.pop("_abc_negative_cache", None)
+        clsdict.pop("_abc_negative_cache_version", None)
+        registry = clsdict.pop("_abc_registry", None)
+        if registry is None:
+            # The abc caches and registered subclasses of a
+            # class are bundled into the single _abc_impl attribute
+            clsdict.pop("_abc_impl", None)
+            (registry, _, _, _) = abc._get_dump(obj)
+
+            clsdict["_abc_impl"] = [subclass_weakref() for subclass_weakref in 
registry]
+        else:
+            # In the above if clause, registry is a set of weakrefs -- in
+            # this case, registry is a WeakSet
+            clsdict["_abc_impl"] = [type_ for type_ in registry]
+
+    if "__slots__" in clsdict:
+        # pickle string length optimization: member descriptors of obj are
+        # created automatically from obj's __slots__ attribute, no need to
+        # save them in obj's state
+        if isinstance(obj.__slots__, str):
+            clsdict.pop(obj.__slots__)
+        else:
+            for k in obj.__slots__:
+                clsdict.pop(k, None)
+
+    clsdict.pop("__dict__", None)  # unpicklable property object
+
+    return (clsdict, {})
+
+
+def _enum_getstate(obj):
+    clsdict, slotstate = _class_getstate(obj)
+
+    members = {e.name: e.value for e in obj}
+    # Cleanup the clsdict that will be passed to _make_skeleton_enum:
+    # Those attributes are already handled by the metaclass.
+    for attrname in [
+        "_generate_next_value_",
+        "_member_names_",
+        "_member_map_",
+        "_member_type_",
+        "_value2member_map_",
+    ]:
+        clsdict.pop(attrname, None)
+    for member in members:
+        clsdict.pop(member)
+        # Special handling of Enum subclasses
+    return clsdict, slotstate
+
+
+# COLLECTIONS OF OBJECTS REDUCERS
+# -------------------------------
+# A reducer is a function taking a single argument (obj), and that returns a
+# tuple with all the necessary data to re-construct obj. Apart from a few
+# exceptions (list, dict, bytes, int, etc.), a reducer is necessary to
+# correctly pickle an object.
+# While many built-in objects (Exceptions objects, instances of the "object"
+# class, etc), are shipped with their own built-in reducer (invoked using
+# obj.__reduce__), some do not. The following methods were created to "fill
+# these holes".
+
+
+def _code_reduce(obj):
+    """code object reducer."""
+    # If you are not sure about the order of arguments, take a look at help
+    # of the specific type from types, for example:
+    # >>> from types import CodeType
+    # >>> help(CodeType)
+    if hasattr(obj, "co_exceptiontable"):
+        # Python 3.11 and later: there are some new attributes
+        # related to the enhanced exceptions.
+        args = (
+            obj.co_argcount,
+            obj.co_posonlyargcount,
+            obj.co_kwonlyargcount,
+            obj.co_nlocals,
+            obj.co_stacksize,
+            obj.co_flags,
+            obj.co_code,
+            obj.co_consts,
+            obj.co_names,
+            obj.co_varnames,
+            obj.co_filename,
+            obj.co_name,
+            obj.co_qualname,
+            obj.co_firstlineno,
+            obj.co_linetable,
+            obj.co_exceptiontable,
+            obj.co_freevars,
+            obj.co_cellvars,
+        )
+    elif hasattr(obj, "co_linetable"):
+        # Python 3.10 and later: obj.co_lnotab is deprecated and constructor
+        # expects obj.co_linetable instead.
+        args = (
+            obj.co_argcount,
+            obj.co_posonlyargcount,
+            obj.co_kwonlyargcount,
+            obj.co_nlocals,
+            obj.co_stacksize,
+            obj.co_flags,
+            obj.co_code,
+            obj.co_consts,
+            obj.co_names,
+            obj.co_varnames,
+            obj.co_filename,
+            obj.co_name,
+            obj.co_firstlineno,
+            obj.co_linetable,
+            obj.co_freevars,
+            obj.co_cellvars,
+        )
+    elif hasattr(obj, "co_nmeta"):  # pragma: no cover
+        # "nogil" Python: modified attributes from 3.9
+        args = (
+            obj.co_argcount,
+            obj.co_posonlyargcount,
+            obj.co_kwonlyargcount,
+            obj.co_nlocals,
+            obj.co_framesize,
+            obj.co_ndefaultargs,
+            obj.co_nmeta,
+            obj.co_flags,
+            obj.co_code,
+            obj.co_consts,
+            obj.co_varnames,
+            obj.co_filename,
+            obj.co_name,
+            obj.co_firstlineno,
+            obj.co_lnotab,
+            obj.co_exc_handlers,
+            obj.co_jump_table,
+            obj.co_freevars,
+            obj.co_cellvars,
+            obj.co_free2reg,
+            obj.co_cell2reg,
+        )
+    else:
+        # Backward compat for 3.8 and 3.9
+        args = (
+            obj.co_argcount,
+            obj.co_posonlyargcount,
+            obj.co_kwonlyargcount,
+            obj.co_nlocals,
+            obj.co_stacksize,
+            obj.co_flags,
+            obj.co_code,
+            obj.co_consts,
+            obj.co_names,
+            obj.co_varnames,
+            obj.co_filename,
+            obj.co_name,
+            obj.co_firstlineno,
+            obj.co_lnotab,
+            obj.co_freevars,
+            obj.co_cellvars,
+        )
+    return types.CodeType, args
+
+
+def _cell_reduce(obj):
+    """Cell (containing values of a function's free variables) reducer."""
+    try:
+        obj.cell_contents
+    except ValueError:  # cell is empty
+        return _make_empty_cell, ()
+    else:
+        return _make_cell, (obj.cell_contents,)
+
+
+def _classmethod_reduce(obj):
+    orig_func = obj.__func__
+    return type(obj), (orig_func,)
+
+
+def _file_reduce(obj):
+    """Save a file."""
+    import io
+
+    if not hasattr(obj, "name") or not hasattr(obj, "mode"):
+        raise pickle.PicklingError(
+            "Cannot pickle files that do not map to an actual file"
+        )
+    if obj is sys.stdout:
+        return getattr, (sys, "stdout")
+    if obj is sys.stderr:
+        return getattr, (sys, "stderr")
+    if obj is sys.stdin:
+        raise pickle.PicklingError("Cannot pickle standard input")
+    if obj.closed:
+        raise pickle.PicklingError("Cannot pickle closed files")
+    if hasattr(obj, "isatty") and obj.isatty():
+        raise pickle.PicklingError("Cannot pickle files that map to tty 
objects")
+    if "r" not in obj.mode and "+" not in obj.mode:
+        raise pickle.PicklingError(
+            "Cannot pickle files that are not opened for reading: %s" % 
obj.mode
+        )
+
+    name = obj.name
+
+    retval = io.StringIO()
+
+    try:
+        # Read the whole file
+        curloc = obj.tell()
+        obj.seek(0)
+        contents = obj.read()
+        obj.seek(curloc)
+    except OSError as e:
+        raise pickle.PicklingError(
+            "Cannot pickle file %s as it cannot be read" % name
+        ) from e
+    retval.write(contents)
+    retval.seek(curloc)
+
+    retval.name = name
+    return _file_reconstructor, (retval,)
+
+
+def _getset_descriptor_reduce(obj):
+    return getattr, (obj.__objclass__, obj.__name__)
+
+
+def _mappingproxy_reduce(obj):
+    return types.MappingProxyType, (dict(obj),)
+
+
+def _memoryview_reduce(obj):
+    return bytes, (obj.tobytes(),)
+
+
+def _module_reduce(obj):
+    if _should_pickle_by_reference(obj):
+        return subimport, (obj.__name__,)
+    else:
+        # Some external libraries can populate the "__builtins__" entry of a
+        # module's `__dict__` with unpicklable objects (see #316). For that
+        # reason, we do not attempt to pickle the "__builtins__" entry, and
+        # restore a default value for it at unpickling time.
+        state = obj.__dict__.copy()
+        state.pop("__builtins__", None)
+        return dynamic_subimport, (obj.__name__, state)
+
+
+def _method_reduce(obj):
+    return (types.MethodType, (obj.__func__, obj.__self__))
+
+
+def _logger_reduce(obj):
+    return logging.getLogger, (obj.name,)
+
+
+def _root_logger_reduce(obj):
+    return logging.getLogger, ()
+
+
+def _property_reduce(obj):
+    return property, (obj.fget, obj.fset, obj.fdel, obj.__doc__)
+
+
+def _weakset_reduce(obj):
+    return weakref.WeakSet, (list(obj),)
+
+
+def _dynamic_class_reduce(obj):
+    """Save a class that can't be referenced as a module attribute.
+
+    This method is used to serialize classes that are defined inside
+    functions, or that otherwise can't be serialized as attribute lookups
+    from importable modules.
+    """
+    if Enum is not None and issubclass(obj, Enum):
+        return (
+            _make_skeleton_enum,
+            _enum_getnewargs(obj),
+            _enum_getstate(obj),
+            None,
+            None,
+            _class_setstate,
+        )
+    else:
+        return (
+            _make_skeleton_class,
+            _class_getnewargs(obj),
+            _class_getstate(obj),
+            None,
+            None,
+            _class_setstate,
+        )
+
+
+def _class_reduce(obj):
+    """Select the reducer depending on the dynamic nature of the class obj."""
+    if obj is type(None):  # noqa
+        return type, (None,)
+    elif obj is type(Ellipsis):
+        return type, (Ellipsis,)
+    elif obj is type(NotImplemented):
+        return type, (NotImplemented,)
+    elif obj in _BUILTIN_TYPE_NAMES:
+        return _builtin_type, (_BUILTIN_TYPE_NAMES[obj],)
+    elif not _should_pickle_by_reference(obj):
+        return _dynamic_class_reduce(obj)
+    return NotImplemented
+
+
+def _dict_keys_reduce(obj):
+    # Safer not to ship the full dict as sending the rest might
+    # be unintended and could potentially cause leaking of
+    # sensitive information
+    return _make_dict_keys, (list(obj),)
+
+
+def _dict_values_reduce(obj):
+    # Safer not to ship the full dict as sending the rest might
+    # be unintended and could potentially cause leaking of
+    # sensitive information
+    return _make_dict_values, (list(obj),)
+
+
+def _dict_items_reduce(obj):
+    return _make_dict_items, (dict(obj),)
+
+
+def _odict_keys_reduce(obj):
+    # Safer not to ship the full dict as sending the rest might
+    # be unintended and could potentially cause leaking of
+    # sensitive information
+    return _make_dict_keys, (list(obj), True)
+
+
+def _odict_values_reduce(obj):
+    # Safer not to ship the full dict as sending the rest might
+    # be unintended and could potentially cause leaking of
+    # sensitive information
+    return _make_dict_values, (list(obj), True)
+
+
+def _odict_items_reduce(obj):
+    return _make_dict_items, (dict(obj), True)
+
+
+def _dataclass_field_base_reduce(obj):
+    return _get_dataclass_field_type_sentinel, (obj.name,)
+
+
+# COLLECTIONS OF OBJECTS STATE SETTERS
+# ------------------------------------
+# state setters are called at unpickling time, once the object is created and
+# it has to be updated to how it was at unpickling time.
+
+
+def _function_setstate(obj, state):
+    """Update the state of a dynamic function.
+
+    As __closure__ and __globals__ are readonly attributes of a function, we
+    cannot rely on the native setstate routine of pickle.load_build, that calls
+    setattr on items of the slotstate. Instead, we have to modify them inplace.
+    """
+    state, slotstate = state
+    obj.__dict__.update(state)
+
+    obj_globals = slotstate.pop("__globals__")
+    obj_closure = slotstate.pop("__closure__")
+    # _cloudpickle_subimports is a set of submodules that must be loaded for
+    # the pickled function to work correctly at unpickling time. Now that these
+    # submodules are depickled (hence imported), they can be removed from the
+    # object's state (the object state only served as a reference holder to
+    # these submodules)
+    slotstate.pop("_cloudpickle_submodules")
+
+    obj.__globals__.update(obj_globals)
+    obj.__globals__["__builtins__"] = __builtins__
+
+    if obj_closure is not None:
+        for i, cell in enumerate(obj_closure):
+            try:
+                value = cell.cell_contents
+            except ValueError:  # cell is empty
+                continue
+            obj.__closure__[i].cell_contents = value
+
+    for k, v in slotstate.items():
+        setattr(obj, k, v)
+
+
+def _class_setstate(obj, state):
+    state, slotstate = state
+    registry = None
+    for attrname, attr in state.items():
+        if attrname == "_abc_impl":
+            registry = attr
+        else:
+            setattr(obj, attrname, attr)
+    if registry is not None:
+        for subclass in registry:
+            obj.register(subclass)
+
+    return obj
+
+
+# COLLECTION OF DATACLASS UTILITIES
+# ---------------------------------
+# There are some internal sentinel values whose identity must be preserved when
+# unpickling dataclass fields. Each sentinel value has a unique name that we 
can
+# use to retrieve its identity at unpickling time.
+
+
+_DATACLASSE_FIELD_TYPE_SENTINELS = {
+    dataclasses._FIELD.name: dataclasses._FIELD,
+    dataclasses._FIELD_CLASSVAR.name: dataclasses._FIELD_CLASSVAR,
+    dataclasses._FIELD_INITVAR.name: dataclasses._FIELD_INITVAR,
+}
+
+
+def _get_dataclass_field_type_sentinel(name):
+    return _DATACLASSE_FIELD_TYPE_SENTINELS[name]
+
+
+class Pickler(pickle.Pickler):
+    # set of reducers defined and used by cloudpickle (private)
+    _dispatch_table = {}
+    _dispatch_table[classmethod] = _classmethod_reduce
+    _dispatch_table[io.TextIOWrapper] = _file_reduce
+    _dispatch_table[logging.Logger] = _logger_reduce
+    _dispatch_table[logging.RootLogger] = _root_logger_reduce
+    _dispatch_table[memoryview] = _memoryview_reduce
+    _dispatch_table[property] = _property_reduce
+    _dispatch_table[staticmethod] = _classmethod_reduce
+    _dispatch_table[CellType] = _cell_reduce
+    _dispatch_table[types.CodeType] = _code_reduce
+    _dispatch_table[types.GetSetDescriptorType] = _getset_descriptor_reduce
+    _dispatch_table[types.ModuleType] = _module_reduce
+    _dispatch_table[types.MethodType] = _method_reduce
+    _dispatch_table[types.MappingProxyType] = _mappingproxy_reduce
+    _dispatch_table[weakref.WeakSet] = _weakset_reduce
+    _dispatch_table[typing.TypeVar] = _typevar_reduce
+    _dispatch_table[_collections_abc.dict_keys] = _dict_keys_reduce
+    _dispatch_table[_collections_abc.dict_values] = _dict_values_reduce
+    _dispatch_table[_collections_abc.dict_items] = _dict_items_reduce
+    _dispatch_table[type(OrderedDict().keys())] = _odict_keys_reduce
+    _dispatch_table[type(OrderedDict().values())] = _odict_values_reduce
+    _dispatch_table[type(OrderedDict().items())] = _odict_items_reduce
+    _dispatch_table[abc.abstractmethod] = _classmethod_reduce
+    _dispatch_table[abc.abstractclassmethod] = _classmethod_reduce
+    _dispatch_table[abc.abstractstaticmethod] = _classmethod_reduce
+    _dispatch_table[abc.abstractproperty] = _property_reduce
+    _dispatch_table[dataclasses._FIELD_BASE] = _dataclass_field_base_reduce
+
+    dispatch_table = ChainMap(_dispatch_table, copyreg.dispatch_table)
+
+    # function reducers are defined as instance methods of cloudpickle.Pickler
+    # objects, as they rely on a cloudpickle.Pickler attribute (globals_ref)
+    def _dynamic_function_reduce(self, func):
+        """Reduce a function that is not pickleable via attribute lookup."""
+        newargs = self._function_getnewargs(func)
+        state = _function_getstate(func)
+        return (_make_function, newargs, state, None, None, _function_setstate)
+
+    def _function_reduce(self, obj):
+        """Reducer for function objects.
+
+        If obj is a top-level attribute of a file-backed module, this reducer
+        returns NotImplemented, making the cloudpickle.Pickler fall back to
+        traditional pickle.Pickler routines to save obj. Otherwise, it reduces
+        obj using a custom cloudpickle reducer designed specifically to handle
+        dynamic functions.
+        """
+        if _should_pickle_by_reference(obj):
+            return NotImplemented
+        else:
+            return self._dynamic_function_reduce(obj)
+
+    def _function_getnewargs(self, func):
+        code = func.__code__
+
+        # base_globals represents the future global namespace of func at
+        # unpickling time. Looking it up and storing it in
+        # cloudpickle.Pickler.globals_ref allow functions sharing the same
+        # globals at pickling time to also share them once unpickled, at one
+        # condition: since globals_ref is an attribute of a cloudpickle.Pickler
+        # instance, and that a new cloudpickle.Pickler is created each time
+        # cloudpickle.dump or cloudpickle.dumps is called, functions also need
+        # to be saved within the same invocation of
+        # cloudpickle.dump/cloudpickle.dumps (for example:
+        # cloudpickle.dumps([f1, f2])). There is no such limitation when using
+        # cloudpickle.Pickler.dump, as long as the multiple invocations are
+        # bound to the same cloudpickle.Pickler instance.
+        base_globals = self.globals_ref.setdefault(id(func.__globals__), {})
+
+        if base_globals == {}:
+            # Add module attributes used to resolve relative imports
+            # instructions inside func.
+            for k in ["__package__", "__name__", "__path__", "__file__"]:
+                if k in func.__globals__:
+                    base_globals[k] = func.__globals__[k]
+
+        # Do not bind the free variables before the function is created to
+        # avoid infinite recursion.
+        if func.__closure__ is None:
+            closure = None
+        else:
+            closure = tuple(_make_empty_cell() for _ in 
range(len(code.co_freevars)))
+
+        return code, base_globals, None, None, closure
+
+    def dump(self, obj):
+        try:
+            return super().dump(obj)
+        except RuntimeError as e:
+            if len(e.args) > 0 and "recursion" in e.args[0]:
+                msg = "Could not pickle object as excessively deep recursion 
required."
+                raise pickle.PicklingError(msg) from e
+            else:
+                raise
+
+    def __init__(self, file, protocol=None, buffer_callback=None):
+        if protocol is None:
+            protocol = DEFAULT_PROTOCOL
+        super().__init__(file, protocol=protocol, 
buffer_callback=buffer_callback)
+        # map functions __globals__ attribute ids, to ensure that functions
+        # sharing the same global namespace at pickling time also share
+        # their global namespace at unpickling time.
+        self.globals_ref = {}
+        self.proto = int(protocol)
+
+    if not PYPY:
+        # pickle.Pickler is the C implementation of the CPython pickler and
+        # therefore we rely on reduce_override method to customize the pickler
+        # behavior.
+
+        # `cloudpickle.Pickler.dispatch` is only left for backward
+        # compatibility - note that when using protocol 5,
+        # `cloudpickle.Pickler.dispatch` is not an extension of
+        # `pickle._Pickler.dispatch` dictionary, because `cloudpickle.Pickler`
+        # subclasses the C-implemented `pickle.Pickler`, which does not expose
+        # a `dispatch` attribute.  Earlier versions of `cloudpickle.Pickler`
+        # used `cloudpickle.Pickler.dispatch` as a class-level attribute
+        # storing all reducers implemented by cloudpickle, but the attribute
+        # name was not a great choice given because it would collide with a
+        # similarly named attribute in the pure-Python `pickle._Pickler`
+        # implementation in the standard library.
+        dispatch = dispatch_table
+
+        # Implementation of the reducer_override callback, in order to
+        # efficiently serialize dynamic functions and classes by subclassing
+        # the C-implemented `pickle.Pickler`.
+        # TODO: decorrelate reducer_override (which is tied to CPython's
+        # implementation - would it make sense to backport it to pypy? - and
+        # pickle's protocol 5 which is implementation agnostic. Currently, the
+        # availability of both notions coincide on CPython's pickle, but it may
+        # not be the case anymore when pypy implements protocol 5.
+
+        def reducer_override(self, obj):
+            """Type-agnostic reducing callback for function and classes.
+
+            For performance reasons, subclasses of the C `pickle.Pickler` class
+            cannot register custom reducers for functions and classes in the
+            dispatch_table attribute. Reducers for such types must instead
+            implemented via the special `reducer_override` method.
+
+            Note that this method will be called for any object except a few
+            builtin-types (int, lists, dicts etc.), which differs from reducers
+            in the Pickler's dispatch_table, each of them being invoked for
+            objects of a specific type only.
+
+            This property comes in handy for classes: although most classes are
+            instances of the ``type`` metaclass, some of them can be instances
+            of other custom metaclasses (such as enum.EnumMeta for example). In
+            particular, the metaclass will likely not be known in advance, and
+            thus cannot be special-cased using an entry in the dispatch_table.
+            reducer_override, among other things, allows us to register a
+            reducer that will be called for any class, independently of its
+            type.
+
+            Notes:
+
+            * reducer_override has the priority over dispatch_table-registered
+            reducers.
+            * reducer_override can be used to fix other limitations of
+              cloudpickle for other types that suffered from type-specific
+              reducers, such as Exceptions. See
+              https://github.com/cloudpipe/cloudpickle/issues/248
+            """
+            t = type(obj)
+            try:
+                is_anyclass = issubclass(t, type)
+            except TypeError:  # t is not a class (old Boost; see SF #502085)
+                is_anyclass = False
+
+            if is_anyclass:
+                return _class_reduce(obj)
+            elif isinstance(obj, types.FunctionType):
+                return self._function_reduce(obj)
+            else:
+                # fallback to save_global, including the Pickler's
+                # dispatch_table
+                return NotImplemented
+
+    else:
+        # When reducer_override is not available, hack the pure-Python
+        # Pickler's types.FunctionType and type savers. Note: the type saver
+        # must override Pickler.save_global, because pickle.py contains a
+        # hard-coded call to save_global when pickling meta-classes.
+        dispatch = pickle.Pickler.dispatch.copy()
+
+        def _save_reduce_pickle5(
+            self,
+            func,
+            args,
+            state=None,
+            listitems=None,
+            dictitems=None,
+            state_setter=None,
+            obj=None,
+        ):
+            save = self.save
+            write = self.write
+            self.save_reduce(
+                func,
+                args,
+                state=None,
+                listitems=listitems,
+                dictitems=dictitems,
+                obj=obj,
+            )
+            # backport of the Python 3.8 state_setter pickle operations
+            save(state_setter)
+            save(obj)  # simple BINGET opcode as obj is already memoized.
+            save(state)
+            write(pickle.TUPLE2)
+            # Trigger a state_setter(obj, state) function call.
+            write(pickle.REDUCE)
+            # The purpose of state_setter is to carry-out an
+            # inplace modification of obj. We do not care about what the
+            # method might return, so its output is eventually removed from
+            # the stack.
+            write(pickle.POP)
+
+        def save_global(self, obj, name=None, pack=struct.pack):
+            """Main dispatch method.
+
+            The name of this method is somewhat misleading: all types get
+            dispatched here.
+            """
+            if obj is type(None):  # noqa
+                return self.save_reduce(type, (None,), obj=obj)
+            elif obj is type(Ellipsis):
+                return self.save_reduce(type, (Ellipsis,), obj=obj)
+            elif obj is type(NotImplemented):
+                return self.save_reduce(type, (NotImplemented,), obj=obj)
+            elif obj in _BUILTIN_TYPE_NAMES:
+                return self.save_reduce(
+                    _builtin_type, (_BUILTIN_TYPE_NAMES[obj],), obj=obj
+                )
+
+            if name is not None:
+                super().save_global(obj, name=name)
+            elif not _should_pickle_by_reference(obj, name=name):
+                self._save_reduce_pickle5(*_dynamic_class_reduce(obj), obj=obj)
+            else:
+                super().save_global(obj, name=name)
+
+        dispatch[type] = save_global
+
+        def save_function(self, obj, name=None):
+            """Registered with the dispatch to handle all function types.
+
+            Determines what kind of function obj is (e.g. lambda, defined at
+            interactive prompt, etc) and handles the pickling appropriately.
+            """
+            if _should_pickle_by_reference(obj, name=name):
+                return super().save_global(obj, name=name)
+            elif PYPY and isinstance(obj.__code__, builtin_code_type):
+                return self.save_pypy_builtin_func(obj)
+            else:
+                return self._save_reduce_pickle5(
+                    *self._dynamic_function_reduce(obj), obj=obj
+                )
+
+        def save_pypy_builtin_func(self, obj):
+            """Save pypy equivalent of builtin functions.
+
+            PyPy does not have the concept of builtin-functions. Instead,
+            builtin-functions are simple function instances, but with a
+            builtin-code attribute.
+            Most of the time, builtin functions should be pickled by attribute.
+            But PyPy has flaky support for __qualname__, so some builtin
+            functions such as float.__new__ will be classified as dynamic. For
+            this reason only, we created this special routine. Because
+            builtin-functions are not expected to have closure or globals,
+            there is no additional hack (compared the one already implemented
+            in pickle) to protect ourselves from reference cycles. A simple
+            (reconstructor, newargs, obj.__dict__) tuple is save_reduced.  Note
+            also that PyPy improved their support for __qualname__ in v3.6, so
+            this routing should be removed when cloudpickle supports only PyPy
+            3.6 and later.
+            """
+            rv = (
+                types.FunctionType,
+                (obj.__code__, {}, obj.__name__, obj.__defaults__, 
obj.__closure__),
+                obj.__dict__,
+            )
+            self.save_reduce(*rv, obj=obj)
+
+        dispatch[types.FunctionType] = save_function
+
+
+# Shorthands similar to pickle.dump/pickle.dumps
+
+
+def dump(obj, file, protocol=None, buffer_callback=None):
+    """Serialize obj as bytes streamed into file
+
+    protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to
+    pickle.HIGHEST_PROTOCOL. This setting favors maximum communication
+    speed between processes running the same Python version.
+
+    Set protocol=pickle.DEFAULT_PROTOCOL instead if you need to ensure
+    compatibility with older versions of Python (although this is not always
+    guaranteed to work because cloudpickle relies on some internal
+    implementation details that can change from one Python version to the
+    next).
+    """
+    Pickler(file, protocol=protocol, buffer_callback=buffer_callback).dump(obj)
+
+
+def dumps(obj, protocol=None, buffer_callback=None):
+    """Serialize obj as a string of bytes allocated in memory
+
+    protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to
+    pickle.HIGHEST_PROTOCOL. This setting favors maximum communication
+    speed between processes running the same Python version.
+
+    Set protocol=pickle.DEFAULT_PROTOCOL instead if you need to ensure
+    compatibility with older versions of Python (although this is not always
+    guaranteed to work because cloudpickle relies on some internal
+    implementation details that can change from one Python version to the
+    next).
+    """
+    with io.BytesIO() as file:
+        cp = Pickler(file, protocol=protocol, buffer_callback=buffer_callback)
+        cp.dump(obj)
+        return file.getvalue()
+
+
+# Include pickles unloading functions in this namespace for convenience.
+load, loads = pickle.load, pickle.loads
+
+# Backward compat alias.
+CloudPickler = Pickler
diff --git a/python/pyspark/cloudpickle/cloudpickle_fast.py 
b/python/pyspark/cloudpickle/cloudpickle_fast.py
index ee1f4b8ee96..52d6732e44e 100644
--- a/python/pyspark/cloudpickle/cloudpickle_fast.py
+++ b/python/pyspark/cloudpickle/cloudpickle_fast.py
@@ -1,844 +1,13 @@
-"""
-New, fast version of the CloudPickler.
+"""Compatibility module.
 
-This new CloudPickler class can now extend the fast C Pickler instead of the
-previous Python implementation of the Pickler class. Because this functionality
-is only available for Python versions 3.8+, a lot of backward-compatibility
-code is also removed.
+It can be necessary to load files generated by previous versions of cloudpickle
+that rely on symbols being defined under the `cloudpickle.cloudpickle_fast`
+namespace.
 
-Note that the C Pickler subclassing API is CPython-specific. Therefore, some
-guards present in cloudpickle.py that were written to handle PyPy specificities
-are not present in cloudpickle_fast.py
+See: tests/test_backward_compat.py
 """
-import _collections_abc
-import abc
-import copyreg
-import io
-import itertools
-import logging
-import sys
-import struct
-import types
-import weakref
-import typing
-
-from enum import Enum
-from collections import ChainMap, OrderedDict
-
-from .compat import pickle, Pickler
-from .cloudpickle import (
-    _extract_code_globals, _BUILTIN_TYPE_NAMES, DEFAULT_PROTOCOL,
-    _find_imported_submodules, _get_cell_contents, _should_pickle_by_reference,
-    _builtin_type, _get_or_create_tracker_id,  _make_skeleton_class,
-    _make_skeleton_enum, _extract_class_dict, dynamic_subimport, subimport,
-    _typevar_reduce, _get_bases, _make_cell, _make_empty_cell, CellType,
-    _is_parametrized_type_hint, PYPY, cell_set,
-    parametrized_type_hint_getinitargs, _create_parametrized_type_hint,
-    builtin_code_type,
-    _make_dict_keys, _make_dict_values, _make_dict_items, _make_function,
-)
-
-
-if pickle.HIGHEST_PROTOCOL >= 5:
-    # Shorthands similar to pickle.dump/pickle.dumps
-
-    def dump(obj, file, protocol=None, buffer_callback=None):
-        """Serialize obj as bytes streamed into file
-
-        protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to
-        pickle.HIGHEST_PROTOCOL. This setting favors maximum communication
-        speed between processes running the same Python version.
-
-        Set protocol=pickle.DEFAULT_PROTOCOL instead if you need to ensure
-        compatibility with older versions of Python.
-        """
-        CloudPickler(
-            file, protocol=protocol, buffer_callback=buffer_callback
-        ).dump(obj)
-
-    def dumps(obj, protocol=None, buffer_callback=None):
-        """Serialize obj as a string of bytes allocated in memory
-
-        protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to
-        pickle.HIGHEST_PROTOCOL. This setting favors maximum communication
-        speed between processes running the same Python version.
-
-        Set protocol=pickle.DEFAULT_PROTOCOL instead if you need to ensure
-        compatibility with older versions of Python.
-        """
-        with io.BytesIO() as file:
-            cp = CloudPickler(
-                file, protocol=protocol, buffer_callback=buffer_callback
-            )
-            cp.dump(obj)
-            return file.getvalue()
-
-else:
-    # Shorthands similar to pickle.dump/pickle.dumps
-    def dump(obj, file, protocol=None):
-        """Serialize obj as bytes streamed into file
-
-        protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to
-        pickle.HIGHEST_PROTOCOL. This setting favors maximum communication
-        speed between processes running the same Python version.
-
-        Set protocol=pickle.DEFAULT_PROTOCOL instead if you need to ensure
-        compatibility with older versions of Python.
-        """
-        CloudPickler(file, protocol=protocol).dump(obj)
-
-    def dumps(obj, protocol=None):
-        """Serialize obj as a string of bytes allocated in memory
-
-        protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to
-        pickle.HIGHEST_PROTOCOL. This setting favors maximum communication
-        speed between processes running the same Python version.
-
-        Set protocol=pickle.DEFAULT_PROTOCOL instead if you need to ensure
-        compatibility with older versions of Python.
-        """
-        with io.BytesIO() as file:
-            cp = CloudPickler(file, protocol=protocol)
-            cp.dump(obj)
-            return file.getvalue()
-
-
-load, loads = pickle.load, pickle.loads
-
-
-# COLLECTION OF OBJECTS __getnewargs__-LIKE METHODS
-# -------------------------------------------------
-
-def _class_getnewargs(obj):
-    type_kwargs = {}
-    if "__module__" in obj.__dict__:
-        type_kwargs["__module__"] = obj.__module__
-
-    __dict__ = obj.__dict__.get('__dict__', None)
-    if isinstance(__dict__, property):
-        type_kwargs['__dict__'] = __dict__
-
-    return (type(obj), obj.__name__, _get_bases(obj), type_kwargs,
-            _get_or_create_tracker_id(obj), None)
-
-
-def _enum_getnewargs(obj):
-    members = {e.name: e.value for e in obj}
-    return (obj.__bases__, obj.__name__, obj.__qualname__, members,
-            obj.__module__, _get_or_create_tracker_id(obj), None)
-
-
-# COLLECTION OF OBJECTS RECONSTRUCTORS
-# ------------------------------------
-def _file_reconstructor(retval):
-    return retval
-
-
-# COLLECTION OF OBJECTS STATE GETTERS
-# -----------------------------------
-def _function_getstate(func):
-    # - Put func's dynamic attributes (stored in func.__dict__) in state. These
-    #   attributes will be restored at unpickling time using
-    #   f.__dict__.update(state)
-    # - Put func's members into slotstate. Such attributes will be restored at
-    #   unpickling time by iterating over slotstate and calling setattr(func,
-    #   slotname, slotvalue)
-    slotstate = {
-        "__name__": func.__name__,
-        "__qualname__": func.__qualname__,
-        "__annotations__": func.__annotations__,
-        "__kwdefaults__": func.__kwdefaults__,
-        "__defaults__": func.__defaults__,
-        "__module__": func.__module__,
-        "__doc__": func.__doc__,
-        "__closure__": func.__closure__,
-    }
-
-    f_globals_ref = _extract_code_globals(func.__code__)
-    f_globals = {k: func.__globals__[k] for k in f_globals_ref if k in
-                 func.__globals__}
-
-    closure_values = (
-        list(map(_get_cell_contents, func.__closure__))
-        if func.__closure__ is not None else ()
-    )
-
-    # Extract currently-imported submodules used by func. Storing these modules
-    # in a smoke _cloudpickle_subimports attribute of the object's state will
-    # trigger the side effect of importing these modules at unpickling time
-    # (which is necessary for func to work correctly once depickled)
-    slotstate["_cloudpickle_submodules"] = _find_imported_submodules(
-        func.__code__, itertools.chain(f_globals.values(), closure_values))
-    slotstate["__globals__"] = f_globals
-
-    state = func.__dict__
-    return state, slotstate
-
-
-def _class_getstate(obj):
-    clsdict = _extract_class_dict(obj)
-    clsdict.pop('__weakref__', None)
-
-    if issubclass(type(obj), abc.ABCMeta):
-        # If obj is an instance of an ABCMeta subclass, don't pickle the
-        # cache/negative caches populated during isinstance/issubclass
-        # checks, but pickle the list of registered subclasses of obj.
-        clsdict.pop('_abc_cache', None)
-        clsdict.pop('_abc_negative_cache', None)
-        clsdict.pop('_abc_negative_cache_version', None)
-        registry = clsdict.pop('_abc_registry', None)
-        if registry is None:
-            # in Python3.7+, the abc caches and registered subclasses of a
-            # class are bundled into the single _abc_impl attribute
-            clsdict.pop('_abc_impl', None)
-            (registry, _, _, _) = abc._get_dump(obj)
-
-            clsdict["_abc_impl"] = [subclass_weakref()
-                                    for subclass_weakref in registry]
-        else:
-            # In the above if clause, registry is a set of weakrefs -- in
-            # this case, registry is a WeakSet
-            clsdict["_abc_impl"] = [type_ for type_ in registry]
-
-    if "__slots__" in clsdict:
-        # pickle string length optimization: member descriptors of obj are
-        # created automatically from obj's __slots__ attribute, no need to
-        # save them in obj's state
-        if isinstance(obj.__slots__, str):
-            clsdict.pop(obj.__slots__)
-        else:
-            for k in obj.__slots__:
-                clsdict.pop(k, None)
-
-    clsdict.pop('__dict__', None)  # unpicklable property object
-
-    return (clsdict, {})
-
-
-def _enum_getstate(obj):
-    clsdict, slotstate = _class_getstate(obj)
-
-    members = {e.name: e.value for e in obj}
-    # Cleanup the clsdict that will be passed to _rehydrate_skeleton_class:
-    # Those attributes are already handled by the metaclass.
-    for attrname in ["_generate_next_value_", "_member_names_",
-                     "_member_map_", "_member_type_",
-                     "_value2member_map_"]:
-        clsdict.pop(attrname, None)
-    for member in members:
-        clsdict.pop(member)
-        # Special handling of Enum subclasses
-    return clsdict, slotstate
-
-
-# COLLECTIONS OF OBJECTS REDUCERS
-# -------------------------------
-# A reducer is a function taking a single argument (obj), and that returns a
-# tuple with all the necessary data to re-construct obj. Apart from a few
-# exceptions (list, dict, bytes, int, etc.), a reducer is necessary to
-# correctly pickle an object.
-# While many built-in objects (Exceptions objects, instances of the "object"
-# class, etc), are shipped with their own built-in reducer (invoked using
-# obj.__reduce__), some do not. The following methods were created to "fill
-# these holes".
-
-def _code_reduce(obj):
-    """codeobject reducer"""
-    # If you are not sure about the order of arguments, take a look at help
-    # of the specific type from types, for example:
-    # >>> from types import CodeType
-    # >>> help(CodeType)
-    if hasattr(obj, "co_exceptiontable"):  # pragma: no branch
-        # Python 3.11 and later: there are some new attributes
-        # related to the enhanced exceptions.
-        args = (
-            obj.co_argcount, obj.co_posonlyargcount,
-            obj.co_kwonlyargcount, obj.co_nlocals, obj.co_stacksize,
-            obj.co_flags, obj.co_code, obj.co_consts, obj.co_names,
-            obj.co_varnames, obj.co_filename, obj.co_name, obj.co_qualname,
-            obj.co_firstlineno, obj.co_linetable, obj.co_exceptiontable,
-            obj.co_freevars, obj.co_cellvars,
-        )
-    elif hasattr(obj, "co_linetable"):  # pragma: no branch
-        # Python 3.10 and later: obj.co_lnotab is deprecated and constructor
-        # expects obj.co_linetable instead.
-        args = (
-            obj.co_argcount, obj.co_posonlyargcount,
-            obj.co_kwonlyargcount, obj.co_nlocals, obj.co_stacksize,
-            obj.co_flags, obj.co_code, obj.co_consts, obj.co_names,
-            obj.co_varnames, obj.co_filename, obj.co_name,
-            obj.co_firstlineno, obj.co_linetable, obj.co_freevars,
-            obj.co_cellvars
-        )
-    elif hasattr(obj, "co_nmeta"):  # pragma: no cover
-        # "nogil" Python: modified attributes from 3.9
-        args = (
-            obj.co_argcount, obj.co_posonlyargcount,
-            obj.co_kwonlyargcount, obj.co_nlocals, obj.co_framesize,
-            obj.co_ndefaultargs, obj.co_nmeta,
-            obj.co_flags, obj.co_code, obj.co_consts,
-            obj.co_varnames, obj.co_filename, obj.co_name,
-            obj.co_firstlineno, obj.co_lnotab, obj.co_exc_handlers,
-            obj.co_jump_table, obj.co_freevars, obj.co_cellvars,
-            obj.co_free2reg, obj.co_cell2reg
-        )
-    elif hasattr(obj, "co_posonlyargcount"):
-        # Backward compat for 3.9 and older
-        args = (
-            obj.co_argcount, obj.co_posonlyargcount,
-            obj.co_kwonlyargcount, obj.co_nlocals, obj.co_stacksize,
-            obj.co_flags, obj.co_code, obj.co_consts, obj.co_names,
-            obj.co_varnames, obj.co_filename, obj.co_name,
-            obj.co_firstlineno, obj.co_lnotab, obj.co_freevars,
-            obj.co_cellvars
-        )
-    else:
-        # Backward compat for even older versions of Python
-        args = (
-            obj.co_argcount, obj.co_kwonlyargcount, obj.co_nlocals,
-            obj.co_stacksize, obj.co_flags, obj.co_code, obj.co_consts,
-            obj.co_names, obj.co_varnames, obj.co_filename,
-            obj.co_name, obj.co_firstlineno, obj.co_lnotab,
-            obj.co_freevars, obj.co_cellvars
-        )
-    return types.CodeType, args
-
-
-def _cell_reduce(obj):
-    """Cell (containing values of a function's free variables) reducer"""
-    try:
-        obj.cell_contents
-    except ValueError:  # cell is empty
-        return _make_empty_cell, ()
-    else:
-        return _make_cell, (obj.cell_contents, )
-
-
-def _classmethod_reduce(obj):
-    orig_func = obj.__func__
-    return type(obj), (orig_func,)
-
-
-def _file_reduce(obj):
-    """Save a file"""
-    import io
-
-    if not hasattr(obj, "name") or not hasattr(obj, "mode"):
-        raise pickle.PicklingError(
-            "Cannot pickle files that do not map to an actual file"
-        )
-    if obj is sys.stdout:
-        return getattr, (sys, "stdout")
-    if obj is sys.stderr:
-        return getattr, (sys, "stderr")
-    if obj is sys.stdin:
-        raise pickle.PicklingError("Cannot pickle standard input")
-    if obj.closed:
-        raise pickle.PicklingError("Cannot pickle closed files")
-    if hasattr(obj, "isatty") and obj.isatty():
-        raise pickle.PicklingError(
-            "Cannot pickle files that map to tty objects"
-        )
-    if "r" not in obj.mode and "+" not in obj.mode:
-        raise pickle.PicklingError(
-            "Cannot pickle files that are not opened for reading: %s"
-            % obj.mode
-        )
-
-    name = obj.name
-
-    retval = io.StringIO()
-
-    try:
-        # Read the whole file
-        curloc = obj.tell()
-        obj.seek(0)
-        contents = obj.read()
-        obj.seek(curloc)
-    except IOError as e:
-        raise pickle.PicklingError(
-            "Cannot pickle file %s as it cannot be read" % name
-        ) from e
-    retval.write(contents)
-    retval.seek(curloc)
-
-    retval.name = name
-    return _file_reconstructor, (retval,)
-
-
-def _getset_descriptor_reduce(obj):
-    return getattr, (obj.__objclass__, obj.__name__)
-
-
-def _mappingproxy_reduce(obj):
-    return types.MappingProxyType, (dict(obj),)
-
-
-def _memoryview_reduce(obj):
-    return bytes, (obj.tobytes(),)
-
-
-def _module_reduce(obj):
-    if _should_pickle_by_reference(obj):
-        return subimport, (obj.__name__,)
-    else:
-        # Some external libraries can populate the "__builtins__" entry of a
-        # module's `__dict__` with unpicklable objects (see #316). For that
-        # reason, we do not attempt to pickle the "__builtins__" entry, and
-        # restore a default value for it at unpickling time.
-        state = obj.__dict__.copy()
-        state.pop('__builtins__', None)
-        return dynamic_subimport, (obj.__name__, state)
-
-
-def _method_reduce(obj):
-    return (types.MethodType, (obj.__func__, obj.__self__))
-
-
-def _logger_reduce(obj):
-    return logging.getLogger, (obj.name,)
-
-
-def _root_logger_reduce(obj):
-    return logging.getLogger, ()
-
-
-def _property_reduce(obj):
-    return property, (obj.fget, obj.fset, obj.fdel, obj.__doc__)
-
-
-def _weakset_reduce(obj):
-    return weakref.WeakSet, (list(obj),)
-
-
-def _dynamic_class_reduce(obj):
-    """
-    Save a class that can't be stored as module global.
-
-    This method is used to serialize classes that are defined inside
-    functions, or that otherwise can't be serialized as attribute lookups
-    from global modules.
-    """
-    if Enum is not None and issubclass(obj, Enum):
-        return (
-            _make_skeleton_enum, _enum_getnewargs(obj), _enum_getstate(obj),
-            None, None, _class_setstate
-        )
-    else:
-        return (
-            _make_skeleton_class, _class_getnewargs(obj), _class_getstate(obj),
-            None, None, _class_setstate
-        )
-
-
-def _class_reduce(obj):
-    """Select the reducer depending on the dynamic nature of the class obj"""
-    if obj is type(None):  # noqa
-        return type, (None,)
-    elif obj is type(Ellipsis):
-        return type, (Ellipsis,)
-    elif obj is type(NotImplemented):
-        return type, (NotImplemented,)
-    elif obj in _BUILTIN_TYPE_NAMES:
-        return _builtin_type, (_BUILTIN_TYPE_NAMES[obj],)
-    elif not _should_pickle_by_reference(obj):
-        return _dynamic_class_reduce(obj)
-    return NotImplemented
-
-
-def _dict_keys_reduce(obj):
-    # Safer not to ship the full dict as sending the rest might
-    # be unintended and could potentially cause leaking of
-    # sensitive information
-    return _make_dict_keys, (list(obj), )
-
-
-def _dict_values_reduce(obj):
-    # Safer not to ship the full dict as sending the rest might
-    # be unintended and could potentially cause leaking of
-    # sensitive information
-    return _make_dict_values, (list(obj), )
-
-
-def _dict_items_reduce(obj):
-    return _make_dict_items, (dict(obj), )
-
-
-def _odict_keys_reduce(obj):
-    # Safer not to ship the full dict as sending the rest might
-    # be unintended and could potentially cause leaking of
-    # sensitive information
-    return _make_dict_keys, (list(obj), True)
-
-
-def _odict_values_reduce(obj):
-    # Safer not to ship the full dict as sending the rest might
-    # be unintended and could potentially cause leaking of
-    # sensitive information
-    return _make_dict_values, (list(obj), True)
-
-
-def _odict_items_reduce(obj):
-    return _make_dict_items, (dict(obj), True)
-
-
-# COLLECTIONS OF OBJECTS STATE SETTERS
-# ------------------------------------
-# state setters are called at unpickling time, once the object is created and
-# it has to be updated to how it was at unpickling time.
-
-
-def _function_setstate(obj, state):
-    """Update the state of a dynamic function.
-
-    As __closure__ and __globals__ are readonly attributes of a function, we
-    cannot rely on the native setstate routine of pickle.load_build, that calls
-    setattr on items of the slotstate. Instead, we have to modify them inplace.
-    """
-    state, slotstate = state
-    obj.__dict__.update(state)
-
-    obj_globals = slotstate.pop("__globals__")
-    obj_closure = slotstate.pop("__closure__")
-    # _cloudpickle_subimports is a set of submodules that must be loaded for
-    # the pickled function to work correctly at unpickling time. Now that these
-    # submodules are depickled (hence imported), they can be removed from the
-    # object's state (the object state only served as a reference holder to
-    # these submodules)
-    slotstate.pop("_cloudpickle_submodules")
-
-    obj.__globals__.update(obj_globals)
-    obj.__globals__["__builtins__"] = __builtins__
-
-    if obj_closure is not None:
-        for i, cell in enumerate(obj_closure):
-            try:
-                value = cell.cell_contents
-            except ValueError:  # cell is empty
-                continue
-            cell_set(obj.__closure__[i], value)
-
-    for k, v in slotstate.items():
-        setattr(obj, k, v)
-
-
-def _class_setstate(obj, state):
-    state, slotstate = state
-    registry = None
-    for attrname, attr in state.items():
-        if attrname == "_abc_impl":
-            registry = attr
-        else:
-            setattr(obj, attrname, attr)
-    if registry is not None:
-        for subclass in registry:
-            obj.register(subclass)
-
-    return obj
-
-
-class CloudPickler(Pickler):
-    # set of reducers defined and used by cloudpickle (private)
-    _dispatch_table = {}
-    _dispatch_table[classmethod] = _classmethod_reduce
-    _dispatch_table[io.TextIOWrapper] = _file_reduce
-    _dispatch_table[logging.Logger] = _logger_reduce
-    _dispatch_table[logging.RootLogger] = _root_logger_reduce
-    _dispatch_table[memoryview] = _memoryview_reduce
-    _dispatch_table[property] = _property_reduce
-    _dispatch_table[staticmethod] = _classmethod_reduce
-    _dispatch_table[CellType] = _cell_reduce
-    _dispatch_table[types.CodeType] = _code_reduce
-    _dispatch_table[types.GetSetDescriptorType] = _getset_descriptor_reduce
-    _dispatch_table[types.ModuleType] = _module_reduce
-    _dispatch_table[types.MethodType] = _method_reduce
-    _dispatch_table[types.MappingProxyType] = _mappingproxy_reduce
-    _dispatch_table[weakref.WeakSet] = _weakset_reduce
-    _dispatch_table[typing.TypeVar] = _typevar_reduce
-    _dispatch_table[_collections_abc.dict_keys] = _dict_keys_reduce
-    _dispatch_table[_collections_abc.dict_values] = _dict_values_reduce
-    _dispatch_table[_collections_abc.dict_items] = _dict_items_reduce
-    _dispatch_table[type(OrderedDict().keys())] = _odict_keys_reduce
-    _dispatch_table[type(OrderedDict().values())] = _odict_values_reduce
-    _dispatch_table[type(OrderedDict().items())] = _odict_items_reduce
-    _dispatch_table[abc.abstractmethod] = _classmethod_reduce
-    _dispatch_table[abc.abstractclassmethod] = _classmethod_reduce
-    _dispatch_table[abc.abstractstaticmethod] = _classmethod_reduce
-    _dispatch_table[abc.abstractproperty] = _property_reduce
-
-    dispatch_table = ChainMap(_dispatch_table, copyreg.dispatch_table)
-
-    # function reducers are defined as instance methods of CloudPickler
-    # objects, as they rely on a CloudPickler attribute (globals_ref)
-    def _dynamic_function_reduce(self, func):
-        """Reduce a function that is not pickleable via attribute lookup."""
-        newargs = self._function_getnewargs(func)
-        state = _function_getstate(func)
-        return (_make_function, newargs, state, None, None,
-                _function_setstate)
-
-    def _function_reduce(self, obj):
-        """Reducer for function objects.
-
-        If obj is a top-level attribute of a file-backed module, this
-        reducer returns NotImplemented, making the CloudPickler fallback to
-        traditional _pickle.Pickler routines to save obj. Otherwise, it reduces
-        obj using a custom cloudpickle reducer designed specifically to handle
-        dynamic functions.
-
-        As opposed to cloudpickle.py, There no special handling for builtin
-        pypy functions because cloudpickle_fast is CPython-specific.
-        """
-        if _should_pickle_by_reference(obj):
-            return NotImplemented
-        else:
-            return self._dynamic_function_reduce(obj)
-
-    def _function_getnewargs(self, func):
-        code = func.__code__
-
-        # base_globals represents the future global namespace of func at
-        # unpickling time. Looking it up and storing it in
-        # CloudpiPickler.globals_ref allow functions sharing the same globals
-        # at pickling time to also share them once unpickled, at one condition:
-        # since globals_ref is an attribute of a CloudPickler instance, and
-        # that a new CloudPickler is created each time pickle.dump or
-        # pickle.dumps is called, functions also need to be saved within the
-        # same invocation of cloudpickle.dump/cloudpickle.dumps (for example:
-        # cloudpickle.dumps([f1, f2])). There is no such limitation when using
-        # CloudPickler.dump, as long as the multiple invocations are bound to
-        # the same CloudPickler.
-        base_globals = self.globals_ref.setdefault(id(func.__globals__), {})
-
-        if base_globals == {}:
-            # Add module attributes used to resolve relative imports
-            # instructions inside func.
-            for k in ["__package__", "__name__", "__path__", "__file__"]:
-                if k in func.__globals__:
-                    base_globals[k] = func.__globals__[k]
-
-        # Do not bind the free variables before the function is created to
-        # avoid infinite recursion.
-        if func.__closure__ is None:
-            closure = None
-        else:
-            closure = tuple(
-                _make_empty_cell() for _ in range(len(code.co_freevars)))
-
-        return code, base_globals, None, None, closure
-
-    def dump(self, obj):
-        try:
-            return Pickler.dump(self, obj)
-        except RuntimeError as e:
-            if len(e.args) > 0 and "recursion" in e.args[0]:
-                msg = (
-                    "Could not pickle object as excessively deep recursion "
-                    "required."
-                )
-                raise pickle.PicklingError(msg) from e
-            else:
-                raise
-
-    if pickle.HIGHEST_PROTOCOL >= 5:
-        def __init__(self, file, protocol=None, buffer_callback=None):
-            if protocol is None:
-                protocol = DEFAULT_PROTOCOL
-            Pickler.__init__(
-                self, file, protocol=protocol, buffer_callback=buffer_callback
-            )
-            # map functions __globals__ attribute ids, to ensure that functions
-            # sharing the same global namespace at pickling time also share
-            # their global namespace at unpickling time.
-            self.globals_ref = {}
-            self.proto = int(protocol)
-    else:
-        def __init__(self, file, protocol=None):
-            if protocol is None:
-                protocol = DEFAULT_PROTOCOL
-            Pickler.__init__(self, file, protocol=protocol)
-            # map functions __globals__ attribute ids, to ensure that functions
-            # sharing the same global namespace at pickling time also share
-            # their global namespace at unpickling time.
-            self.globals_ref = {}
-            assert hasattr(self, 'proto')
-
-    if pickle.HIGHEST_PROTOCOL >= 5 and not PYPY:
-        # Pickler is the C implementation of the CPython pickler and therefore
-        # we rely on reduce_override method to customize the pickler behavior.
-
-        # `CloudPickler.dispatch` is only left for backward compatibility - 
note
-        # that when using protocol 5, `CloudPickler.dispatch` is not an
-        # extension of `Pickler.dispatch` dictionary, because CloudPickler
-        # subclasses the C-implemented Pickler, which does not expose a
-        # `dispatch` attribute.  Earlier versions of the protocol 5 
CloudPickler
-        # used `CloudPickler.dispatch` as a class-level attribute storing all
-        # reducers implemented by cloudpickle, but the attribute name was not a
-        # great choice given the meaning of `CloudPickler.dispatch` when
-        # `CloudPickler` extends the pure-python pickler.
-        dispatch = dispatch_table
-
-        # Implementation of the reducer_override callback, in order to
-        # efficiently serialize dynamic functions and classes by subclassing
-        # the C-implemented Pickler.
-        # TODO: decorrelate reducer_override (which is tied to CPython's
-        # implementation - would it make sense to backport it to pypy? - and
-        # pickle's protocol 5 which is implementation agnostic. Currently, the
-        # availability of both notions coincide on CPython's pickle and the
-        # pickle5 backport, but it may not be the case anymore when pypy
-        # implements protocol 5
-
-        def reducer_override(self, obj):
-            """Type-agnostic reducing callback for function and classes.
-
-            For performance reasons, subclasses of the C _pickle.Pickler class
-            cannot register custom reducers for functions and classes in the
-            dispatch_table. Reducer for such types must instead implemented in
-            the special reducer_override method.
-
-            Note that method will be called for any object except a few
-            builtin-types (int, lists, dicts etc.), which differs from reducers
-            in the Pickler's dispatch_table, each of them being invoked for
-            objects of a specific type only.
-
-            This property comes in handy for classes: although most classes are
-            instances of the ``type`` metaclass, some of them can be instances
-            of other custom metaclasses (such as enum.EnumMeta for example). In
-            particular, the metaclass will likely not be known in advance, and
-            thus cannot be special-cased using an entry in the dispatch_table.
-            reducer_override, among other things, allows us to register a
-            reducer that will be called for any class, independently of its
-            type.
-
-
-            Notes:
-
-            * reducer_override has the priority over dispatch_table-registered
-            reducers.
-            * reducer_override can be used to fix other limitations of
-              cloudpickle for other types that suffered from type-specific
-              reducers, such as Exceptions. See
-              https://github.com/cloudpipe/cloudpickle/issues/248
-            """
-            if sys.version_info[:2] < (3, 7) and 
_is_parametrized_type_hint(obj):  # noqa  # pragma: no branch
-                return (
-                    _create_parametrized_type_hint,
-                    parametrized_type_hint_getinitargs(obj)
-                )
-            t = type(obj)
-            try:
-                is_anyclass = issubclass(t, type)
-            except TypeError:  # t is not a class (old Boost; see SF #502085)
-                is_anyclass = False
-
-            if is_anyclass:
-                return _class_reduce(obj)
-            elif isinstance(obj, types.FunctionType):
-                return self._function_reduce(obj)
-            else:
-                # fallback to save_global, including the Pickler's
-                # dispatch_table
-                return NotImplemented
-
-    else:
-        # When reducer_override is not available, hack the pure-Python
-        # Pickler's types.FunctionType and type savers. Note: the type saver
-        # must override Pickler.save_global, because pickle.py contains a
-        # hard-coded call to save_global when pickling meta-classes.
-        dispatch = Pickler.dispatch.copy()
-
-        def _save_reduce_pickle5(self, func, args, state=None, listitems=None,
-                                 dictitems=None, state_setter=None, obj=None):
-            save = self.save
-            write = self.write
-            self.save_reduce(
-                func, args, state=None, listitems=listitems,
-                dictitems=dictitems, obj=obj
-            )
-            # backport of the Python 3.8 state_setter pickle operations
-            save(state_setter)
-            save(obj)  # simple BINGET opcode as obj is already memoized.
-            save(state)
-            write(pickle.TUPLE2)
-            # Trigger a state_setter(obj, state) function call.
-            write(pickle.REDUCE)
-            # The purpose of state_setter is to carry-out an
-            # inplace modification of obj. We do not care about what the
-            # method might return, so its output is eventually removed from
-            # the stack.
-            write(pickle.POP)
-
-        def save_global(self, obj, name=None, pack=struct.pack):
-            """
-            Save a "global".
-
-            The name of this method is somewhat misleading: all types get
-            dispatched here.
-            """
-            if obj is type(None):  # noqa
-                return self.save_reduce(type, (None,), obj=obj)
-            elif obj is type(Ellipsis):
-                return self.save_reduce(type, (Ellipsis,), obj=obj)
-            elif obj is type(NotImplemented):
-                return self.save_reduce(type, (NotImplemented,), obj=obj)
-            elif obj in _BUILTIN_TYPE_NAMES:
-                return self.save_reduce(
-                    _builtin_type, (_BUILTIN_TYPE_NAMES[obj],), obj=obj)
-
-            if sys.version_info[:2] < (3, 7) and 
_is_parametrized_type_hint(obj):  # noqa  # pragma: no branch
-                # Parametrized typing constructs in Python < 3.7 are not
-                # compatible with type checks and ``isinstance`` semantics. For
-                # this reason, it is easier to detect them using a
-                # duck-typing-based check (``_is_parametrized_type_hint``) than
-                # to populate the Pickler's dispatch with type-specific savers.
-                self.save_reduce(
-                    _create_parametrized_type_hint,
-                    parametrized_type_hint_getinitargs(obj),
-                    obj=obj
-                )
-            elif name is not None:
-                Pickler.save_global(self, obj, name=name)
-            elif not _should_pickle_by_reference(obj, name=name):
-                self._save_reduce_pickle5(*_dynamic_class_reduce(obj), obj=obj)
-            else:
-                Pickler.save_global(self, obj, name=name)
-        dispatch[type] = save_global
-
-        def save_function(self, obj, name=None):
-            """ Registered with the dispatch to handle all function types.
-
-            Determines what kind of function obj is (e.g. lambda, defined at
-            interactive prompt, etc) and handles the pickling appropriately.
-            """
-            if _should_pickle_by_reference(obj, name=name):
-                return Pickler.save_global(self, obj, name=name)
-            elif PYPY and isinstance(obj.__code__, builtin_code_type):
-                return self.save_pypy_builtin_func(obj)
-            else:
-                return self._save_reduce_pickle5(
-                    *self._dynamic_function_reduce(obj), obj=obj
-                )
+from . import cloudpickle
 
-        def save_pypy_builtin_func(self, obj):
-            """Save pypy equivalent of builtin functions.
-            PyPy does not have the concept of builtin-functions. Instead,
-            builtin-functions are simple function instances, but with a
-            builtin-code attribute.
-            Most of the time, builtin functions should be pickled by attribute.
-            But PyPy has flaky support for __qualname__, so some builtin
-            functions such as float.__new__ will be classified as dynamic. For
-            this reason only, we created this special routine. Because
-            builtin-functions are not expected to have closure or globals,
-            there is no additional hack (compared the one already implemented
-            in pickle) to protect ourselves from reference cycles. A simple
-            (reconstructor, newargs, obj.__dict__) tuple is save_reduced.  Note
-            also that PyPy improved their support for __qualname__ in v3.6, so
-            this routing should be removed when cloudpickle supports only PyPy
-            3.6 and later.
-            """
-            rv = (types.FunctionType, (obj.__code__, {}, obj.__name__,
-                                       obj.__defaults__, obj.__closure__),
-                  obj.__dict__)
-            self.save_reduce(*rv, obj=obj)
 
-        dispatch[types.FunctionType] = save_function
+def __getattr__(name):
+    return getattr(cloudpickle, name)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to