[jira] [Commented] (BEAM-3042) Add tracking of bytes read / time spent when reading side inputs

2017-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16288159#comment-16288159
 ] 

ASF GitHub Bot commented on BEAM-3042:
--

pabloem opened a new pull request #4248: [BEAM-3042] Tracking of time spent 
reading side inputs, and bytes read in Dataflow.
URL: https://github.com/apache/beam/pull/4248
 
 
   This pull request adds changes the PrefetchingSourceReader used to fetch 
side input data. 
   
   Specifically, this change helps it track how long was spent blocked waiting 
to fetch side inputs; and for the Dataflow runner (which uses 
NativeAvroSource), it also helps track how many bytes were read.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add tracking of bytes read / time spent when reading side inputs
> 
>
> Key: BEAM-3042
> URL: https://issues.apache.org/jira/browse/BEAM-3042
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>
> It is difficult for Dataflow users to understand how modifying a pipeline or 
> data set can affect how much inter-transform IO is used in their job. The 
> intent of this feature request is to help users understand how side inputs 
> behave when they are consumed.
> This will allow users to understand how much time and how much data their 
> pipeline uses to read/write to inter-transform IO. Users will also be able to 
> modify their pipelines and understand how their changes affect these IO 
> metrics.
> For further information, please review the internal Google doc 
> go/insights-transform-io-design-doc.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3042) Add tracking of bytes read / time spent when reading side inputs

2017-12-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16286814#comment-16286814
 ] 

ASF GitHub Bot commented on BEAM-3042:
--

aaltay closed pull request #4241: [BEAM-3042] Renaming properties form IO 
target counter name.
URL: https://github.com/apache/beam/pull/4241
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/utils/counters.py 
b/sdks/python/apache_beam/utils/counters.py
index ae974344259..e2e0a1a730b 100644
--- a/sdks/python/apache_beam/utils/counters.py
+++ b/sdks/python/apache_beam/utils/counters.py
@@ -29,19 +29,18 @@
 from apache_beam.transforms import cy_combiners
 
 # Information identifying the IO being measured by a counter.
-IOTargetName = namedtuple('IOTargetName', ['side_input_step_name',
-   'side_input_index',
-   'original_shuffle_step_name'])
+IOTargetName = namedtuple('IOTargetName', ['requesting_step_name',
+   'input_index'])
 
 
 def side_input_id(step_name, input_index):
   """Create an IOTargetName that identifies the reading of a side input."""
-  return IOTargetName(step_name, input_index, None)
+  return IOTargetName(step_name, input_index)
 
 
 def shuffle_id(step_name):
   """Create an IOTargetName that identifies a GBK step."""
-  return IOTargetName(None, None, step_name)
+  return IOTargetName(step_name, None)
 
 
 _CounterName = namedtuple('_CounterName', ['name',


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add tracking of bytes read / time spent when reading side inputs
> 
>
> Key: BEAM-3042
> URL: https://issues.apache.org/jira/browse/BEAM-3042
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>
> It is difficult for Dataflow users to understand how modifying a pipeline or 
> data set can affect how much inter-transform IO is used in their job. The 
> intent of this feature request is to help users understand how side inputs 
> behave when they are consumed.
> This will allow users to understand how much time and how much data their 
> pipeline uses to read/write to inter-transform IO. Users will also be able to 
> modify their pipelines and understand how their changes affect these IO 
> metrics.
> For further information, please review the internal Google doc 
> go/insights-transform-io-design-doc.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3042) Add tracking of bytes read / time spent when reading side inputs

2017-12-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16286601#comment-16286601
 ] 

ASF GitHub Bot commented on BEAM-3042:
--

pabloem closed pull request #3943: [BEAM-3042] Add tracking of bytes read / 
time spent when reading side inputs
URL: https://github.com/apache/beam/pull/3943
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/runners/worker/opcounters.py 
b/sdks/python/apache_beam/runners/worker/opcounters.py
index f4ba6b9a9a8..c997d23a39d 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters.py
+++ b/sdks/python/apache_beam/runners/worker/opcounters.py
@@ -25,6 +25,7 @@
 import random
 
 from apache_beam.utils.counters import Counter
+from apache_beam.utils.counters import CounterName
 
 # This module is experimental. No backwards-compatibility guarantees.
 
@@ -42,6 +43,58 @@ def value(self):
 return self._value
 
 
+class TransformIoCounter(object):
+
+  def add_bytes_read(self, n):
+pass
+
+  def __enter__(self):
+self.enter()
+
+  def __exit__(self, unused_exc_type, unused_exc_value, unused_traceback):
+self.exit()
+
+  def enter(self):
+pass
+
+  def exit(self):
+pass
+
+  def check_step(self):
+pass
+
+
+class SideInputReadCounter(TransformIoCounter):
+
+  def __init__(self, counter_factory, state_sampler, io_target):
+self._counter_factory = counter_factory
+self._state_sampler = state_sampler
+self._bytes_read_cache = 0
+self.io_target = io_target
+self.check_step()
+
+  def check_step(self):
+current_state = self._state_sampler.current_state()
+operation_name = current_state.name.step_name
+self.scoped_state = self._state_sampler.scoped_state(
+operation_name, 'read-sideinput', io_target=self.io_target)
+self.bytes_read_counter = self._counter_factory.get_counter(
+CounterName('bytes-read',
+step_name=operation_name,
+io_target=self.io_target),
+Counter.SUM)
+
+  def add_bytes_read(self, n):
+if n > 0:
+  self.bytes_read_counter.update(n)
+
+  def enter(self):
+self.scoped_state.__enter__()
+
+  def exit(self):
+self.scoped_state.__exit__(None, None, None)
+
+
 class OperationCounters(object):
   """The set of basic counters to attach to an Operation."""
 
diff --git a/sdks/python/apache_beam/runners/worker/operations.py 
b/sdks/python/apache_beam/runners/worker/operations.py
index ed3f3b8f466..132a61fb131 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -42,6 +42,7 @@
 from apache_beam.transforms.combiners import PhasedCombineFnExecutor
 from apache_beam.transforms.combiners import curry_combine_fn
 from apache_beam.transforms.window import GlobalWindows
+from apache_beam.utils import counters
 from apache_beam.utils.windowed_value import WindowedValue
 
 # Allow some "pure mode" declarations.
@@ -281,7 +282,7 @@ def _read_side_inputs(self, tags_and_types):
 # Note that for each tag there could be several read operations in the
 # specification. This can happen for instance if the source has been
 # sharded into several files.
-for side_tag, view_class, view_options in tags_and_types:
+for i, (side_tag, view_class, view_options) in enumerate(tags_and_types):
   sources = []
   # Using the side_tag in the lambda below will trigger a pylint warning.
   # However in this case it is fine because the lambda is used right away
@@ -293,7 +294,13 @@ def _read_side_inputs(self, tags_and_types):
 if not isinstance(si, operation_specs.WorkerSideInputSource):
   raise NotImplementedError('Unknown side input type: %r' % si)
 sources.append(si.source)
-  iterator_fn = sideinputs.get_iterator_fn_for_sources(sources)
+
+  si_counter = opcounters.SideInputReadCounter(
+  self.counter_factory, self.state_sampler,
+  # Inputs are 1-indexed, so we add 1 to i in the side input id
+  counters.side_input_id(self.operation_name, i+1))
+  iterator_fn = sideinputs.get_iterator_fn_for_sources(
+  sources, read_counter=si_counter)
 
   # Backwards compatibility for pre BEAM-733 SDKs.
   if isinstance(view_options, tuple):
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs.py 
b/sdks/python/apache_beam/runners/worker/sideinputs.py
index bdf9f4e71f5..b11ab3cfba6 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs.py
@@ -24,6 +24,7 @@
 import traceback
 
 from apache_beam.io import iobase
+from apache_beam.runners.worker 

[jira] [Commented] (BEAM-3042) Add tracking of bytes read / time spent when reading side inputs

2017-12-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16285595#comment-16285595
 ] 

ASF GitHub Bot commented on BEAM-3042:
--

pabloem opened a new pull request #4241: [BEAM-3042] Renaming properties form 
IO target counter name.
URL: https://github.com/apache/beam/pull/4241
 
 
   These metrics will be used internally to track Bytes read and time spent 
reading both side inputs and shuffle. This rename is to match the internal 
dataflow design for these metrics.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add tracking of bytes read / time spent when reading side inputs
> 
>
> Key: BEAM-3042
> URL: https://issues.apache.org/jira/browse/BEAM-3042
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>
> It is difficult for Dataflow users to understand how modifying a pipeline or 
> data set can affect how much inter-transform IO is used in their job. The 
> intent of this feature request is to help users understand how side inputs 
> behave when they are consumed.
> This will allow users to understand how much time and how much data their 
> pipeline uses to read/write to inter-transform IO. Users will also be able to 
> modify their pipelines and understand how their changes affect these IO 
> metrics.
> For further information, please review the internal Google doc 
> go/insights-transform-io-design-doc.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3042) Add tracking of bytes read / time spent when reading side inputs

2017-12-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16281044#comment-16281044
 ] 

ASF GitHub Bot commented on BEAM-3042:
--

chamikaramj closed pull request #4222: [BEAM-3042] Updating Dataflow Api protos
URL: https://github.com/apache/beam/pull/4222
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py
 
b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py
index b0d4e44816c..6c55d4e830f 100644
--- 
a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py
+++ 
b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py
@@ -16,7 +16,6 @@
 #
 
 """Generated message classes for dataflow version v1b3.
-
 Develops and executes data processing patterns like ETL, batch computation,
 and continuous computation.
 """
@@ -347,11 +346,19 @@ class CounterStructuredName(_messages.Message):
   workers.
 executionStepName: Name of the stage. An execution step contains multiple
   component steps.
+inputIndex: Index of an input collection that's being read from/written to
+  as a side input. The index identifies a step's side inputs starting by 1
+  (e.g. the first side input has input_index 1, the third has input_index
+  3). Side inputs are identified by a pair of (original_step_name,
+  input_index). This field helps uniquely identify them.
 name: Counter name. Not necessarily globally-unique, but unique within the
   context of the other fields. Required.
 origin: One of the standard Origins defined above.
 originNamespace: A string containing a more specific namespace of the
   counter's origin.
+originalRequestingStepName: The step name requesting an operation, such as
+  GBK. I.e. the ParDo causing a read/write from shuffle to occur, or a
+  read from side inputs.
 originalStepName: System generated name of the original step in the user's
   graph, before optimization.
 portion: Portion of this counter, either key or value.
@@ -382,12 +389,14 @@ class PortionValueValuesEnum(_messages.Enum):
 
   componentStepName = _messages.StringField(1)
   executionStepName = _messages.StringField(2)
-  name = _messages.StringField(3)
-  origin = _messages.EnumField('OriginValueValuesEnum', 4)
-  originNamespace = _messages.StringField(5)
-  originalStepName = _messages.StringField(6)
-  portion = _messages.EnumField('PortionValueValuesEnum', 7)
-  workerId = _messages.StringField(8)
+  inputIndex = _messages.IntegerField(3, variant=_messages.Variant.INT32)
+  name = _messages.StringField(4)
+  origin = _messages.EnumField('OriginValueValuesEnum', 5)
+  originNamespace = _messages.StringField(6)
+  originalRequestingStepName = _messages.StringField(7)
+  originalStepName = _messages.StringField(8)
+  portion = _messages.EnumField('PortionValueValuesEnum', 9)
+  workerId = _messages.StringField(10)
 
 
 class CounterStructuredNameAndMetadata(_messages.Message):
@@ -1401,8 +1410,7 @@ class DistributionUpdate(_messages.Message):
 
   Fields:
 count: The count of the number of elements present in the distribution.
-logBuckets: (Optional) Logarithmic histogram of values. Each log may be in
-  no more than one bucket. Order does not matter.
+histogram: (Optional) Histogram of value counts for the distribution.
 max: The maximum value present in the distribution.
 min: The minimum value present in the distribution.
 sum: Use an int64 since we'd prefer the added precision. If overflow is a
@@ -1412,7 +1420,7 @@ class DistributionUpdate(_messages.Message):
   """
 
   count = _messages.MessageField('SplitInt64', 1)
-  logBuckets = _messages.MessageField('LogBucket', 2, repeated=True)
+  histogram = _messages.MessageField('Histogram', 2)
   max = _messages.MessageField('SplitInt64', 3)
   min = _messages.MessageField('SplitInt64', 4)
   sum = _messages.MessageField('SplitInt64', 5)
@@ -1808,6 +1816,27 @@ class GetTemplateResponse(_messages.Message):
   status = _messages.MessageField('Status', 2)
 
 
+class Histogram(_messages.Message):
+  """Histogram of value counts for a distribution.  Buckets have an inclusive
+  lower bound and exclusive upper bound and use "1,2,5 bucketing": The first
+  bucket range is from [0,1) and all subsequent bucket boundaries are powers
+  of ten multiplied by 1, 2, or 5. Thus, bucket boundaries are 0, 1, 2, 5, 10,
+  20, 50, 100, 200, 500, 1000, ... Negative values are not supported.
+
+  Fields:
+bucketCounts: Counts of values in each buc

[jira] [Commented] (BEAM-3042) Add tracking of bytes read / time spent when reading side inputs

2017-12-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16280916#comment-16280916
 ] 

ASF GitHub Bot commented on BEAM-3042:
--

pabloem opened a new pull request #4222: [BEAM-3042] Updating Dataflow Api 
protos
URL: https://github.com/apache/beam/pull/4222
 
 
   This is necessary to be able to report the new Dataflow metrics.
   r: @chamikaramj 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add tracking of bytes read / time spent when reading side inputs
> 
>
> Key: BEAM-3042
> URL: https://issues.apache.org/jira/browse/BEAM-3042
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>
> It is difficult for Dataflow users to understand how modifying a pipeline or 
> data set can affect how much inter-transform IO is used in their job. The 
> intent of this feature request is to help users understand how side inputs 
> behave when they are consumed.
> This will allow users to understand how much time and how much data their 
> pipeline uses to read/write to inter-transform IO. Users will also be able to 
> modify their pipelines and understand how their changes affect these IO 
> metrics.
> For further information, please review the internal Google doc 
> go/insights-transform-io-design-doc.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)