This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f063b15  Logging relies on StateSampler for context
f063b15 is described below

commit f063b157eea480d079c4e966e528eef050a0c192
Author: Pablo <pabl...@google.com>
AuthorDate: Mon May 14 11:29:21 2018 -0700

    Logging relies on StateSampler for context
---
 sdks/python/apache_beam/runners/common.pxd         | 12 -------
 sdks/python/apache_beam/runners/common.py          | 29 ++--------------
 .../apache_beam/runners/worker/bundle_processor.py | 11 +++---
 sdks/python/apache_beam/runners/worker/logger.pxd  | 25 --------------
 sdks/python/apache_beam/runners/worker/logger.py   | 17 ++++++----
 .../apache_beam/runners/worker/logger_test.py      | 39 ++++++++++++++--------
 .../apache_beam/runners/worker/operation_specs.py  |  2 +-
 .../apache_beam/runners/worker/operations.py       | 18 +++-------
 .../apache_beam/runners/worker/statesampler.py     | 31 +++++++++++++++--
 .../runners/worker/statesampler_fast.pxd           |  4 ++-
 .../runners/worker/statesampler_fast.pyx           | 21 +++++++++---
 .../runners/worker/statesampler_slow.py            | 20 +++++++----
 12 files changed, 109 insertions(+), 120 deletions(-)

diff --git a/sdks/python/apache_beam/runners/common.pxd 
b/sdks/python/apache_beam/runners/common.pxd
index 5c5eba2..4bb2264 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -81,9 +81,7 @@ cdef class PerWindowInvoker(DoFnInvoker):
 
 cdef class DoFnRunner(Receiver):
   cdef DoFnContext context
-  cdef LoggingContext logging_context
   cdef object step_name
-  cdef ScopedMetricsContainer scoped_metrics_container
   cdef list side_inputs
   cdef DoFnInvoker do_fn_invoker
 
@@ -112,15 +110,5 @@ cdef class DoFnContext(object):
   cpdef set_element(self, WindowedValue windowed_value)
 
 
-cdef class LoggingContext(object):
-  # TODO(robertwb): Optimize "with [cdef class]"
-  cpdef enter(self)
-  cpdef exit(self)
-
-
-cdef class _LoggingContextAdapter(LoggingContext):
-  cdef object underlying
-
-
 cdef class _ReceiverAdapter(Receiver):
   cdef object underlying
diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index d5f35de..88745c7 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -119,16 +119,6 @@ class DataflowNameContext(NameContext):
     return self.user_name
 
 
-class LoggingContext(object):
-  """For internal use only; no backwards-compatibility guarantees."""
-
-  def enter(self):
-    pass
-
-  def exit(self):
-    pass
-
-
 class Receiver(object):
   """For internal use only; no backwards-compatibility guarantees.
 
@@ -551,20 +541,15 @@ class DoFnRunner(Receiver):
       windowing: windowing properties of the output PCollection(s)
       tagged_receivers: a dict of tag name to Receiver objects
       step_name: the name of this step
-      logging_context: a LoggingContext object
+      logging_context: DEPRECATED [BEAM-4728]
       state: handle for accessing DoFn state
-      scoped_metrics_container: Context switcher for metrics container
+      scoped_metrics_container: DEPRECATED
       operation_name: The system name assigned by the runner for this 
operation.
     """
     # Need to support multiple iterations.
     side_inputs = list(side_inputs)
 
-    from apache_beam.metrics.execution import ScopedMetricsContainer
-
-    self.scoped_metrics_container = (
-        scoped_metrics_container or ScopedMetricsContainer())
     self.step_name = step_name
-    self.logging_context = logging_context or LoggingContext()
     self.context = DoFnContext(step_name, state=state)
 
     do_fn_signature = DoFnSignature(fn)
@@ -595,26 +580,16 @@ class DoFnRunner(Receiver):
 
   def process(self, windowed_value):
     try:
-      self.logging_context.enter()
-      self.scoped_metrics_container.enter()
       self.do_fn_invoker.invoke_process(windowed_value)
     except BaseException as exn:
       self._reraise_augmented(exn)
-    finally:
-      self.scoped_metrics_container.exit()
-      self.logging_context.exit()
 
   def _invoke_bundle_method(self, bundle_method):
     try:
-      self.logging_context.enter()
-      self.scoped_metrics_container.enter()
       self.context.set_element(None)
       bundle_method()
     except BaseException as exn:
       self._reraise_augmented(exn)
-    finally:
-      self.scoped_metrics_container.exit()
-      self.logging_context.exit()
 
   def start(self):
     self._invoke_bundle_method(self.do_fn_invoker.invoke_start_bundle)
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py 
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 4193ea2..958731d 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -63,13 +63,12 @@ OLD_DATAFLOW_RUNNER_HARNESS_READ_URN = 
'urn:org.apache.beam:source:java:0.1'
 class RunnerIOOperation(operations.Operation):
   """Common baseclass for runner harness IO operations."""
 
-  def __init__(self, operation_name, step_name, consumers, counter_factory,
+  def __init__(self, name_context, step_name, consumers, counter_factory,
                state_sampler, windowed_coder, target, data_channel):
     super(RunnerIOOperation, self).__init__(
-        operation_name, None, counter_factory, state_sampler)
+        name_context, None, counter_factory, state_sampler)
     self.windowed_coder = windowed_coder
     self.windowed_coder_impl = windowed_coder.get_impl()
-    self.step_name = step_name
     # target represents the consumer for the bytes in the data plane for a
     # DataInputOperation or a producer of these bytes for a 
DataOutputOperation.
     self.target = target
@@ -106,9 +105,9 @@ class DataInputOperation(RunnerIOOperation):
         windowed_coder, target=input_target, data_channel=data_channel)
     # We must do this manually as we don't have a spec or spec.output_coders.
     self.receivers = [
-        operations.ConsumerSet(self.counter_factory, self.step_name, 0,
-                               next(itervalues(consumers)),
-                               self.windowed_coder)]
+        operations.ConsumerSet(
+            self.counter_factory, self.name_context.step_name, 0,
+            next(itervalues(consumers)), self.windowed_coder)]
 
   def process(self, windowed_value):
     self.output(windowed_value)
diff --git a/sdks/python/apache_beam/runners/worker/logger.pxd 
b/sdks/python/apache_beam/runners/worker/logger.pxd
deleted file mode 100644
index 201daf4..0000000
--- a/sdks/python/apache_beam/runners/worker/logger.pxd
+++ /dev/null
@@ -1,25 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-cimport cython
-
-from apache_beam.runners.common cimport LoggingContext
-
-
-cdef class PerThreadLoggingContext(LoggingContext):
-  cdef kwargs
-  cdef list stack
diff --git a/sdks/python/apache_beam/runners/worker/logger.py 
b/sdks/python/apache_beam/runners/worker/logger.py
index 07cd320..ae9cdd3 100644
--- a/sdks/python/apache_beam/runners/worker/logger.py
+++ b/sdks/python/apache_beam/runners/worker/logger.py
@@ -26,7 +26,7 @@ import logging
 import threading
 import traceback
 
-from apache_beam.runners.common import LoggingContext
+from apache_beam.runners.worker import statesampler
 
 # This module is experimental. No backwards-compatibility guarantees.
 
@@ -38,7 +38,6 @@ class _PerThreadWorkerData(threading.local):
 
   def __init__(self):
     super(_PerThreadWorkerData, self).__init__()
-    # TODO(robertwb): Consider starting with an initial (ignored) ~20 elements
     # in the list, as going up and down all the way to zero incurs several
     # reallocations.
     self.stack = []
@@ -53,7 +52,7 @@ class _PerThreadWorkerData(threading.local):
 per_thread_worker_data = _PerThreadWorkerData()
 
 
-class PerThreadLoggingContext(LoggingContext):
+class PerThreadLoggingContext(object):
   """A context manager to add per thread attributes."""
 
   def __init__(self, **kwargs):
@@ -150,10 +149,14 @@ class JsonLogFormatter(logging.Formatter):
     data = per_thread_worker_data.get_data()
     if 'work_item_id' in data:
       output['work'] = data['work_item_id']
-    if 'stage_name' in data:
-      output['stage'] = data['stage_name']
-    if 'step_name' in data:
-      output['step'] = data['step_name']
+
+    tracker = statesampler.get_current_tracker()
+    if tracker:
+      output['stage'] = tracker.stage_name
+
+      if tracker.current_state() and tracker.current_state().name_context:
+        output['step'] = tracker.current_state().name_context.logging_name()
+
     # All logging happens using the root logger. We will add the basename of 
the
     # file and the function name where the logging happened to make it easier
     # to identify who generated the record.
diff --git a/sdks/python/apache_beam/runners/worker/logger_test.py 
b/sdks/python/apache_beam/runners/worker/logger_test.py
index 73ec1aa..c131775 100644
--- a/sdks/python/apache_beam/runners/worker/logger_test.py
+++ b/sdks/python/apache_beam/runners/worker/logger_test.py
@@ -18,6 +18,7 @@
 """Tests for worker logging utilities."""
 
 from __future__ import absolute_import
+from __future__ import unicode_literals
 
 import json
 import logging
@@ -27,6 +28,8 @@ import unittest
 from builtins import object
 
 from apache_beam.runners.worker import logger
+from apache_beam.runners.worker import statesampler
+from apache_beam.utils.counters import CounterFactory
 
 
 class PerThreadLoggingContextTest(unittest.TestCase):
@@ -129,30 +132,38 @@ class JsonLogFormatterTest(unittest.TestCase):
     self.execute_multiple_cases(test_cases)
 
   def test_record_with_per_thread_info(self):
-    with logger.PerThreadLoggingContext(
-        work_item_id='workitem', stage_name='stage', step_name='step'):
-      formatter = logger.JsonLogFormatter(job_id='jobid', worker_id='workerid')
-      record = self.create_log_record(**self.SAMPLE_RECORD)
-      log_output = json.loads(formatter.format(record))
+    self.maxDiff = None
+    tracker = statesampler.StateSampler('stage', CounterFactory())
+    statesampler.set_current_tracker(tracker)
+    formatter = logger.JsonLogFormatter(job_id='jobid', worker_id='workerid')
+    with logger.PerThreadLoggingContext(work_item_id='workitem'):
+      with tracker.scoped_state('step', 'process'):
+        record = self.create_log_record(**self.SAMPLE_RECORD)
+        log_output = json.loads(formatter.format(record))
     expected_output = dict(self.SAMPLE_OUTPUT)
     expected_output.update(
         {'work': 'workitem', 'stage': 'stage', 'step': 'step'})
     self.assertEqual(log_output, expected_output)
+    statesampler.set_current_tracker(None)
 
   def test_nested_with_per_thread_info(self):
+    self.maxDiff = None
+    tracker = statesampler.StateSampler('stage', CounterFactory())
+    statesampler.set_current_tracker(tracker)
     formatter = logger.JsonLogFormatter(job_id='jobid', worker_id='workerid')
-    with logger.PerThreadLoggingContext(
-        work_item_id='workitem', stage_name='stage', step_name='step1'):
-      record = self.create_log_record(**self.SAMPLE_RECORD)
-      log_output1 = json.loads(formatter.format(record))
-
-      with logger.PerThreadLoggingContext(step_name='step2'):
+    with logger.PerThreadLoggingContext(work_item_id='workitem'):
+      with tracker.scoped_state('step1', 'process'):
         record = self.create_log_record(**self.SAMPLE_RECORD)
-        log_output2 = json.loads(formatter.format(record))
+        log_output1 = json.loads(formatter.format(record))
 
-      record = self.create_log_record(**self.SAMPLE_RECORD)
-      log_output3 = json.loads(formatter.format(record))
+        with tracker.scoped_state('step2', 'process'):
+          record = self.create_log_record(**self.SAMPLE_RECORD)
+          log_output2 = json.loads(formatter.format(record))
+
+        record = self.create_log_record(**self.SAMPLE_RECORD)
+        log_output3 = json.loads(formatter.format(record))
 
+    statesampler.set_current_tracker(None)
     record = self.create_log_record(**self.SAMPLE_RECORD)
     log_output4 = json.loads(formatter.format(record))
 
diff --git a/sdks/python/apache_beam/runners/worker/operation_specs.py 
b/sdks/python/apache_beam/runners/worker/operation_specs.py
index 58ba571..d64920f 100644
--- a/sdks/python/apache_beam/runners/worker/operation_specs.py
+++ b/sdks/python/apache_beam/runners/worker/operation_specs.py
@@ -378,9 +378,9 @@ class MapTask(object):
                step_names=None,
                original_names=None,
                name_contexts=None):
+    # TODO(BEAM-4028): Remove arguments other than name_contexts.
     self.operations = operations
     self.stage_name = stage_name
-    # TODO(BEAM-4028): Remove arguments other than name_contexts.
     self.name_contexts = name_contexts or self._make_name_contexts(
         original_names, step_names, system_names)
 
diff --git a/sdks/python/apache_beam/runners/worker/operations.py 
b/sdks/python/apache_beam/runners/worker/operations.py
index 143974e..78a67bc 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -38,7 +38,6 @@ from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.runners import common
 from apache_beam.runners.common import Receiver
 from apache_beam.runners.dataflow.internal.names import PropertyNames
-from apache_beam.runners.worker import logger
 from apache_beam.runners.worker import opcounters
 from apache_beam.runners.worker import operation_specs
 from apache_beam.runners.worker import sideinputs
@@ -127,10 +126,6 @@ class Operation(object):
     else:
       self.name_context = common.NameContext(name_context)
 
-    # TODO(BEAM-4028): Remove following two lines. Rely on name context.
-    self.operation_name = self.name_context.step_name
-    self.step_name = self.name_context.logging_name()
-
     self.spec = spec
     self.counter_factory = counter_factory
     self.consumers = collections.defaultdict(list)
@@ -143,14 +138,11 @@ class Operation(object):
 
     self.state_sampler = state_sampler
     self.scoped_start_state = self.state_sampler.scoped_state(
-        self.name_context.metrics_name(), 'start',
-        metrics_container=self.metrics_container)
+        self.name_context, 'start', metrics_container=self.metrics_container)
     self.scoped_process_state = self.state_sampler.scoped_state(
-        self.name_context.metrics_name(), 'process',
-        metrics_container=self.metrics_container)
+        self.name_context, 'process', metrics_container=self.metrics_container)
     self.scoped_finish_state = self.state_sampler.scoped_state(
-        self.name_context.metrics_name(), 'finish',
-        metrics_container=self.metrics_container)
+        self.name_context, 'finish', metrics_container=self.metrics_container)
     # TODO(ccy): the '-abort' state can be added when the abort is supported in
     # Operations.
     self.receivers = []
@@ -390,11 +382,9 @@ class DoOperation(Operation):
           fn, args, kwargs, self.side_input_maps, window_fn,
           tagged_receivers=self.tagged_receivers,
           step_name=self.name_context.logging_name(),
-          logging_context=logger.PerThreadLoggingContext(
-              step_name=self.name_context.logging_name()),
           state=state,
-          scoped_metrics_container=None,
           operation_name=self.name_context.metrics_name())
+
       self.dofn_receiver = (self.dofn_runner
                             if isinstance(self.dofn_runner, Receiver)
                             else DoFnRunnerReceiver(self.dofn_runner))
diff --git a/sdks/python/apache_beam/runners/worker/statesampler.py 
b/sdks/python/apache_beam/runners/worker/statesampler.py
index b0c2b67..b73029c 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler.py
@@ -22,6 +22,7 @@ from __future__ import absolute_import
 import threading
 from collections import namedtuple
 
+from apache_beam.runners import common
 from apache_beam.utils.counters import Counter
 from apache_beam.utils.counters import CounterName
 
@@ -69,8 +70,18 @@ class StateSampler(statesampler_impl.StateSampler):
     self._states_by_name = {}
     self.sampling_period_ms = sampling_period_ms
     self.tracked_thread = None
+    self.finished = False
+    self.started = False
     super(StateSampler, self).__init__(sampling_period_ms)
 
+  @property
+  def stage_name(self):
+    return self._prefix
+
+  def stop(self):
+    set_current_tracker(None)
+    super(StateSampler, self).stop()
+
   def stop_if_still_running(self):
     if self.started and not self.finished:
       self.stop()
@@ -90,13 +101,28 @@ class StateSampler(statesampler_impl.StateSampler):
         self.tracked_thread)
 
   def scoped_state(self,
-                   step_name,
+                   name_context,
                    state_name,
                    io_target=None,
                    metrics_container=None):
+    """Returns a ScopedState object associated to a Step and a State.
+
+    Args:
+      name_context: common.NameContext. It is the step name information.
+      state_name: str. It is the state name (e.g. process / start / finish).
+      io_target:
+      metrics_container: MetricsContainer. The step's metrics container.
+
+    Returns:
+      A ScopedState that keeps the execution context and is able to switch it
+      for the execution thread.
+    """
+    if not isinstance(name_context, common.NameContext):
+      name_context = common.NameContext(name_context)
+
     counter_name = CounterName(state_name + '-msecs',
                                stage_name=self._prefix,
-                               step_name=step_name,
+                               step_name=name_context.metrics_name(),
                                io_target=io_target)
     if counter_name in self._states_by_name:
       return self._states_by_name[counter_name]
@@ -105,6 +131,7 @@ class StateSampler(statesampler_impl.StateSampler):
                                                          Counter.SUM)
       self._states_by_name[counter_name] = super(
           StateSampler, self)._scoped_state(counter_name,
+                                            name_context,
                                             output_counter,
                                             metrics_container)
       return self._states_by_name[counter_name]
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_fast.pxd 
b/sdks/python/apache_beam/runners/worker/statesampler_fast.pxd
index 76b379b..799bd0d 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler_fast.pxd
+++ b/sdks/python/apache_beam/runners/worker/statesampler_fast.pxd
@@ -43,7 +43,8 @@ cdef class StateSampler(object):
 
   cdef int32_t current_state_index
 
-  cpdef _scoped_state(self, counter_name, output_counter, metrics_container)
+  cpdef _scoped_state(
+      self, counter_name, name_context, output_counter, metrics_container)
 
 cdef class ScopedState(object):
   """Context manager class managing transitions for a given sampler state."""
@@ -52,6 +53,7 @@ cdef class ScopedState(object):
   cdef readonly int32_t state_index
   cdef readonly object counter
   cdef readonly object name
+  cdef readonly object name_context
   cdef readonly int64_t _nsecs
   cdef int32_t old_state_index
   cdef readonly MetricsContainer _metrics_container
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx 
b/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx
index fdf4969..8aa5217 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx
+++ b/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx
@@ -90,8 +90,12 @@ cdef class StateSampler(object):
     self.current_state_index = 0
     self.time_since_transition = 0
     self.state_transition_count = 0
-    unknown_state = ScopedState(
-        self, CounterName('unknown'), self.current_state_index)
+    unknown_state = ScopedState(self,
+                                CounterName('unknown'),
+                                None,
+                                self.current_state_index,
+                                None,
+                                None)
     pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK)
     self.scoped_states_by_index = [unknown_state]
     pythread.PyThread_release_lock(self.lock)
@@ -153,7 +157,7 @@ cdef class StateSampler(object):
   def current_state(self):
     return self.scoped_states_by_index[self.current_state_index]
 
-  cpdef _scoped_state(self, counter_name, output_counter,
+  cpdef _scoped_state(self, counter_name, name_context, output_counter,
                       metrics_container):
     """Returns a context manager managing transitions for a given state.
     Args:
@@ -168,6 +172,7 @@ cdef class StateSampler(object):
     new_state_index = len(self.scoped_states_by_index)
     scoped_state = ScopedState(self,
                                counter_name,
+                               name_context,
                                new_state_index,
                                output_counter,
                                metrics_container)
@@ -183,10 +188,16 @@ cdef class StateSampler(object):
 cdef class ScopedState(object):
   """Context manager class managing transitions for a given sampler state."""
 
-  def __init__(
-      self, sampler, name, state_index, counter=None, metrics_container=None):
+  def __init__(self,
+               sampler,
+               name,
+               step_name_context,
+               state_index,
+               counter,
+               metrics_container):
     self.sampler = sampler
     self.name = name
+    self.name_context = step_name_context
     self.state_index = state_index
     self.counter = counter
     self._metrics_container = metrics_container
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_slow.py 
b/sdks/python/apache_beam/runners/worker/statesampler_slow.py
index 2f09d0e..4b1bf83 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler_slow.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler_slow.py
@@ -21,15 +21,18 @@ from __future__ import absolute_import
 
 from builtins import object
 
+from apache_beam.runners import common
+from apache_beam.utils import counters
+
 
 class StateSampler(object):
 
   def __init__(self, sampling_period_ms):
-    self._state_stack = [ScopedState(None, self, None)]
+    self._state_stack = [ScopedState(self,
+                                     counters.CounterName('unknown'),
+                                     None)]
     self.state_transition_count = 0
     self.time_since_transition = 0
-    self.started = False
-    self.finished = False
 
   def current_state(self):
     """Returns the current execution state.
@@ -40,9 +43,12 @@ class StateSampler(object):
 
   def _scoped_state(self,
                     counter_name,
+                    name_context,
                     output_counter,
                     metrics_container=None):
-    return ScopedState(self, counter_name, output_counter, metrics_container)
+    assert isinstance(name_context, common.NameContext)
+    return ScopedState(
+        self, counter_name, name_context, output_counter, metrics_container)
 
   def _enter_state(self, state):
     self.state_transition_count += 1
@@ -57,14 +63,16 @@ class StateSampler(object):
     pass
 
   def stop(self):
-    self.finished = True
+    pass
 
 
 class ScopedState(object):
 
-  def __init__(self, sampler, name, counter=None, metrics_container=None):
+  def __init__(self, sampler, name, step_name_context,
+               counter=None, metrics_container=None):
     self.state_sampler = sampler
     self.name = name
+    self.name_context = step_name_context
     self.counter = counter
     self.nsecs = 0
     self.metrics_container = metrics_container

Reply via email to