This is an automated email from the ASF dual-hosted git repository. mwalenia pushed a commit to branch revert-10885-synthetic-source-perf-fix in repository https://gitbox.apache.org/repos/asf/beam.git
commit 683d03f384789b141445c5c8ca172853e320a26a Author: Michal Walenia <32354134+mwale...@users.noreply.github.com> AuthorDate: Fri Feb 21 14:14:34 2020 +0100 Revert "[BEAM-9085] Fix performance regression in SyntheticSource (#10885)" This reverts commit b4668a1a1e234c071c3a7b182a76f8f4cf6bfe64. --- .../apache_beam/testing/synthetic_pipeline.py | 25 +++++++--------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/sdks/python/apache_beam/testing/synthetic_pipeline.py b/sdks/python/apache_beam/testing/synthetic_pipeline.py index af21e6e..6dd41c8 100644 --- a/sdks/python/apache_beam/testing/synthetic_pipeline.py +++ b/sdks/python/apache_beam/testing/synthetic_pipeline.py @@ -41,10 +41,7 @@ import argparse import json import logging import math -import random -import struct import time -from builtins import range import apache_beam as beam from apache_beam.io import WriteToText @@ -418,31 +415,25 @@ class SyntheticSource(iobase.BoundedSource): tracker = range_trackers.UnsplittableRangeTracker(tracker) return tracker - @staticmethod - def random_bytes(length, generator): - """Return random bytes.""" - return b''.join( - (struct.pack('B', generator.getrandbits(8)) for _ in range(length))) - - def _gen_kv_pair(self, generator, index): - generator.seed(index) - rand = generator.random() + def _gen_kv_pair(self, index): + r = np.random.RandomState(index) + rand = r.random_sample() # Determines whether to generate hot key or not. if rand < self._hot_key_fraction: # Generate hot key. # An integer is randomly selected from the range [0, numHotKeys-1] # with equal probability. - generator.seed(index % self._num_hot_keys) - return self.random_bytes(self._key_size, generator), self.random_bytes( - self._value_size, generator) + r_hot = np.random.RandomState(index % self._num_hot_keys) + return r_hot.bytes(self._key_size), r.bytes(self._value_size) + else: + return r.bytes(self._key_size), r.bytes(self._value_size) def read(self, range_tracker): index = range_tracker.start_position() - generator = random.Random() while range_tracker.try_claim(index): time.sleep(self._sleep_per_input_record_sec) - yield self._gen_kv_pair(generator, index) + yield self._gen_kv_pair(index) index += 1 def default_output_coder(self):