[GitHub] [beam] mszb commented on a change in pull request #11210: [BEAM-8949] SpannerIO integration tests
mszb commented on a change in pull request #11210: URL: https://github.com/apache/beam/pull/11210#discussion_r422567587 ## File path: sdks/python/apache_beam/io/gcp/experimental/spannerio.py ## @@ -1008,31 +1007,30 @@ def _reset_count(self): self._cells = 0 def process(self, element): -mg_info = element.info +for elem in element: Review comment: Done! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mszb commented on a change in pull request #11210: [BEAM-8949] SpannerIO integration tests
mszb commented on a change in pull request #11210: URL: https://github.com/apache/beam/pull/11210#discussion_r422567566 ## File path: sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py ## @@ -499,6 +499,7 @@ def test_batch_byte_size( # and each bach should contains 25 mutations. res = ( p | beam.Create(mutation_group) + | 'combine to list' >> beam.combiners.ToList() Review comment: Done! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mszb commented on a change in pull request #11210: [BEAM-8949] SpannerIO integration tests
mszb commented on a change in pull request #11210: URL: https://github.com/apache/beam/pull/11210#discussion_r422567474 ## File path: sdks/python/apache_beam/io/gcp/experimental/spannerio.py ## @@ -1008,31 +1007,30 @@ def _reset_count(self): self._cells = 0 def process(self, element): -mg_info = element.info +for elem in element: + mg_info = elem.info + if mg_info['byte_size'] + self._size_in_bytes > \ Review comment: Since i've reverted the changes from the connector, there is no need to create new tickets. Resolving this conversation! - Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mszb commented on a change in pull request #11210: [BEAM-8949] SpannerIO integration tests
mszb commented on a change in pull request #11210: URL: https://github.com/apache/beam/pull/11210#discussion_r422280614 ## File path: sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py ## @@ -499,6 +499,7 @@ def test_batch_byte_size( # and each bach should contains 25 mutations. res = ( p | beam.Create(mutation_group) + | 'combine to list' >> beam.combiners.ToList() Review comment: The user does not have to add ToList transform in the production pipeline. I only added this to test the batch process. The previous implementation of batching (without ToList transform) was as per the java implementation but without the sorting of the transactions by table and primary key (this is also documented as a feature to be added later). ## File path: sdks/python/apache_beam/io/gcp/experimental/spannerio.py ## @@ -1008,31 +1007,30 @@ def _reset_count(self): self._cells = 0 def process(self, element): -mg_info = element.info +for elem in element: Review comment: Make sense, in that case, we don't need to alter the connector code anymore, it was working as expected. Thanks, @chamikaramj for the feedback as it is always helpful. I'll remove the changes from the spanner io connector and update the IT test code for the assertion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mszb commented on a change in pull request #11210: [BEAM-8949] SpannerIO integration tests
mszb commented on a change in pull request #11210: URL: https://github.com/apache/beam/pull/11210#discussion_r422104136 ## File path: sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py ## @@ -499,6 +499,7 @@ def test_batch_byte_size( # and each bach should contains 25 mutations. res = ( p | beam.Create(mutation_group) + | 'combine to list' >> beam.combiners.ToList() Review comment: Yes, the `_BatchFn` requires a single iterable of collection and loop through them to make the batches. Just replicating the same pipeline for the batching in the `_WriteGroup` transform. ## File path: sdks/python/apache_beam/io/gcp/experimental/spannerio.py ## @@ -1008,31 +1007,30 @@ def _reset_count(self): self._cells = 0 def process(self, element): -mg_info = element.info +for elem in element: Review comment: There was no issue in processing mutation group, the issue was with the batch size. According to the Beam execution model, ‘**The division of the collection into bundles is arbitrary and selected by the runner.**’ Which causes finish_bundle to be called multiple times rather than on the complete collection unit which causes the improper number of batches in the dataflow runner. That's the reason I've added the ToList transform to make a single collection and generate the batches properly. ## File path: sdks/python/apache_beam/io/gcp/experimental/spannerio.py ## @@ -1008,31 +1007,30 @@ def _reset_count(self): self._cells = 0 def process(self, element): -mg_info = element.info +for elem in element: + mg_info = elem.info + if mg_info['byte_size'] + self._size_in_bytes > \ Review comment: Sure. Should I create a new Jira ticket and (1) add ticket number in this PR for reference OR (2) create a new PR for this change, and once it gets merge then I rebase this PR and request review? I think the first approach required less time to close the tickets! What you suggest? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mszb commented on a change in pull request #11210: [BEAM-8949] SpannerIO integration tests
mszb commented on a change in pull request #11210: URL: https://github.com/apache/beam/pull/11210#discussion_r414016644 ## File path: sdks/python/apache_beam/io/gcp/experimental/spannerio.py ## @@ -1008,31 +1007,30 @@ def _reset_count(self): self._cells = 0 def process(self, element): -mg_info = element.info +for elem in element: Review comment: I've updated the batch process to support the dataflow runner. The process is almost the same as the previous commit but now I've added the `ToList()` combine transform before passing the mutation groups from to the `_BatchFn` to create batches. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org