[GitHub] [beam] mszb commented on a change in pull request #11210: [BEAM-8949] SpannerIO integration tests

2020-05-09 Thread GitBox


mszb commented on a change in pull request #11210:
URL: https://github.com/apache/beam/pull/11210#discussion_r422567587



##
File path: sdks/python/apache_beam/io/gcp/experimental/spannerio.py
##
@@ -1008,31 +1007,30 @@ def _reset_count(self):
 self._cells = 0
 
   def process(self, element):
-mg_info = element.info
+for elem in element:

Review comment:
   Done!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] mszb commented on a change in pull request #11210: [BEAM-8949] SpannerIO integration tests

2020-05-09 Thread GitBox


mszb commented on a change in pull request #11210:
URL: https://github.com/apache/beam/pull/11210#discussion_r422567566



##
File path: sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py
##
@@ -499,6 +499,7 @@ def test_batch_byte_size(
   # and each bach should contains 25 mutations.
   res = (
   p | beam.Create(mutation_group)
+  | 'combine to list' >> beam.combiners.ToList()

Review comment:
   Done!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] mszb commented on a change in pull request #11210: [BEAM-8949] SpannerIO integration tests

2020-05-09 Thread GitBox


mszb commented on a change in pull request #11210:
URL: https://github.com/apache/beam/pull/11210#discussion_r422567474



##
File path: sdks/python/apache_beam/io/gcp/experimental/spannerio.py
##
@@ -1008,31 +1007,30 @@ def _reset_count(self):
 self._cells = 0
 
   def process(self, element):
-mg_info = element.info
+for elem in element:
+  mg_info = elem.info
+  if mg_info['byte_size'] + self._size_in_bytes > \

Review comment:
   Since i've reverted the changes from the connector, there is no need to 
create new tickets. Resolving this conversation! - Thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] mszb commented on a change in pull request #11210: [BEAM-8949] SpannerIO integration tests

2020-05-08 Thread GitBox


mszb commented on a change in pull request #11210:
URL: https://github.com/apache/beam/pull/11210#discussion_r422280614



##
File path: sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py
##
@@ -499,6 +499,7 @@ def test_batch_byte_size(
   # and each bach should contains 25 mutations.
   res = (
   p | beam.Create(mutation_group)
+  | 'combine to list' >> beam.combiners.ToList()

Review comment:
   The user does not have to add ToList transform in the production 
pipeline. I only added this to test the batch process.
   The previous implementation of batching (without ToList transform) was as 
per the java implementation but without the sorting of the transactions by 
table and primary key (this is also documented as a feature to be added later). 

##
File path: sdks/python/apache_beam/io/gcp/experimental/spannerio.py
##
@@ -1008,31 +1007,30 @@ def _reset_count(self):
 self._cells = 0
 
   def process(self, element):
-mg_info = element.info
+for elem in element:

Review comment:
   Make sense, in that case, we don't need to alter the connector code 
anymore, it was working as expected. Thanks, @chamikaramj for the feedback as 
it is always helpful.
   I'll remove the changes from the spanner io connector and update the IT test 
code for the assertion.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] mszb commented on a change in pull request #11210: [BEAM-8949] SpannerIO integration tests

2020-05-08 Thread GitBox


mszb commented on a change in pull request #11210:
URL: https://github.com/apache/beam/pull/11210#discussion_r422104136



##
File path: sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py
##
@@ -499,6 +499,7 @@ def test_batch_byte_size(
   # and each bach should contains 25 mutations.
   res = (
   p | beam.Create(mutation_group)
+  | 'combine to list' >> beam.combiners.ToList()

Review comment:
   Yes, the `_BatchFn` requires a single iterable of collection and loop 
through them to make the batches. Just replicating the same pipeline for the 
batching in the `_WriteGroup` transform.

##
File path: sdks/python/apache_beam/io/gcp/experimental/spannerio.py
##
@@ -1008,31 +1007,30 @@ def _reset_count(self):
 self._cells = 0
 
   def process(self, element):
-mg_info = element.info
+for elem in element:

Review comment:
   There was no issue in processing mutation group, the issue was with the 
batch size. According to the Beam execution model, ‘**The division of the 
collection into bundles is arbitrary and selected by the runner.**’ Which 
causes finish_bundle to be called multiple times rather than on the complete 
collection unit which causes the improper number of batches in the dataflow 
runner. That's the reason I've added the ToList transform to make a single 
collection and generate the batches properly.

##
File path: sdks/python/apache_beam/io/gcp/experimental/spannerio.py
##
@@ -1008,31 +1007,30 @@ def _reset_count(self):
 self._cells = 0
 
   def process(self, element):
-mg_info = element.info
+for elem in element:
+  mg_info = elem.info
+  if mg_info['byte_size'] + self._size_in_bytes > \

Review comment:
   Sure. Should I create a new Jira ticket and (1) add ticket number in 
this PR for reference OR (2) create a new PR for this change, and once it gets 
merge then I rebase this PR and request review? 
   
   I think the first approach required less time to close the tickets! What you 
suggest?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] mszb commented on a change in pull request #11210: [BEAM-8949] SpannerIO integration tests

2020-04-23 Thread GitBox


mszb commented on a change in pull request #11210:
URL: https://github.com/apache/beam/pull/11210#discussion_r414016644



##
File path: sdks/python/apache_beam/io/gcp/experimental/spannerio.py
##
@@ -1008,31 +1007,30 @@ def _reset_count(self):
 self._cells = 0
 
   def process(self, element):
-mg_info = element.info
+for elem in element:

Review comment:
   I've updated the batch process to support the dataflow runner. The 
process is almost the same as the previous commit but now I've added the 
`ToList()` combine transform before passing the mutation groups from to the 
`_BatchFn` to create batches.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org